Toloka documentation

Pipeline

toloka.streaming.pipeline.Pipeline | Source code

Pipeline(
    self,
    period: timedelta = ...,
    storage: Optional[BaseStorage] = None,
    iteration_mode: IterationMode = IterationMode.FIRST_COMPLETED,
    *,
    name: Optional[str] = None
)

An entry point for toloka streaming pipelines.

Allow you to register multiple observers and call them periodically while at least one of them may resume.

Parameters Description

Parameters Type Description
period timedelta

Period of observers calls. By default, 60 seconds.

storage Optional[BaseStorage]

Optional storage object to save pipeline's state. Allow to recover from previous state in case of failure.

iteration_mode IterationMode

When to start new iteration. Default is FIRST_COMPLETED

Examples:

Get assignments from segmentation pool and send them for verification to another pool.

def handle_submitted(events: List[AssignmentEvent]) -> None:
    verification_tasks = [create_verification_task(item.assignment) for item in events]
    toloka_client.create_tasks(verification_tasks, open_pool=True)

def handle_accepted(events: List[AssignmentEvent]) -> None:
    do_some_aggregation([item.assignment for item in events])

async_toloka_client = AsyncMultithreadWrapper(toloka_client)

observer_123 = AssignmentsObserver(async_toloka_client, pool_id='123')
observer_123.on_submitted(handle_submitted)

observer_456 = AssignmentsObserver(async_toloka_client, pool_id='456')
observer_456.on_accepted(handle_accepted)

pipeline = Pipeline()
pipeline.register(observer_123)
pipeline.register(observer_456)
await pipeline.run()

One-liners version.

pipeline = Pipeline()
pipeline.register(AssignmentsObserver(toloka_client, pool_id='123')).on_submitted(handle_submitted)
pipeline.register(AssignmentsObserver(toloka_client, pool_id='456')).on_accepted(handle_accepted)
await pipeline.run()

With external storage.

from toloka.streaming import S3Storage, ZooKeeperLocker
locker = ZooKeeperLocker(...)
storage = S3Storage(locker=locker, ...)
pipeline = Pipeline(storage=storage)
await pipeline.run()  # Save state after each iteration. Try to load saved at start.

Methods Summary

Method Description
observers_iter Iterate over registered observers.
register Register given observer.
run None
run_manually None