toloka.streaming.pipeline.Pipeline
| Source code
Pipeline( self, period: timedelta = ..., storage: Optional[BaseStorage] = None, iteration_mode: IterationMode = IterationMode.FIRST_COMPLETED, *, name: Optional[str] = None)
An entry point for toloka streaming pipelines.
Allow you to register multiple observers and call them periodically while at least one of them may resume.
Parameters | Type | Description |
---|---|---|
period | timedelta | Period of observers calls. By default, 60 seconds. |
storage | Optional[BaseStorage] | Optional storage object to save pipeline's state. Allow to recover from previous state in case of failure. |
iteration_mode | IterationMode | When to start new iteration. Default is |
Examples:
Get assignments from segmentation pool and send them for verification to another pool.
def handle_submitted(events: List[AssignmentEvent]) -> None: verification_tasks = [create_verification_task(item.assignment) for item in events] toloka_client.create_tasks(verification_tasks, open_pool=True)def handle_accepted(events: List[AssignmentEvent]) -> None: do_some_aggregation([item.assignment for item in events])async_toloka_client = AsyncTolokaClient.from_sync_client(toloka_client)observer_123 = AssignmentsObserver(async_toloka_client, pool_id='123')observer_123.on_submitted(handle_submitted)observer_456 = AssignmentsObserver(async_toloka_client, pool_id='456')observer_456.on_accepted(handle_accepted)pipeline = Pipeline()pipeline.register(observer_123)pipeline.register(observer_456)await pipeline.run()
One-liners version.
pipeline = Pipeline()pipeline.register(AssignmentsObserver(toloka_client, pool_id='123')).on_submitted(handle_submitted)pipeline.register(AssignmentsObserver(toloka_client, pool_id='456')).on_accepted(handle_accepted)await pipeline.run()
With external storage.
from toloka.streaming import S3Storage, ZooKeeperLockerlocker = ZooKeeperLocker(...)storage = S3Storage(locker=locker, ...)pipeline = Pipeline(storage=storage)await pipeline.run() # Save state after each iteration. Try to load saved at start.
Method | Description |
---|---|
observers_iter | Iterate over registered observers. |
register | Register given observer. |
run | None |
run_manually | None |