Pipeline

toloka.streaming.pipeline.Pipeline | Source code

Pipeline(
self,
period: timedelta = ...,
storage: Optional[BaseStorage] = None,
iteration_mode: IterationMode = IterationMode.FIRST_COMPLETED,
*,
name: Optional[str] = None
)

An entry point for toloka streaming pipelines.

Allow you to register multiple observers and call them periodically while at least one of them may resume.

Parameters description

ParametersTypeDescription
periodtimedelta

Period of observers calls. By default, 60 seconds.

storageOptional[BaseStorage]

Optional storage object to save pipeline's state. Allow to recover from previous state in case of failure.

iteration_modeIterationMode

When to start new iteration. Default is FIRST_COMPLETED

Examples:

Get assignments from segmentation pool and send them for verification to another pool.

def handle_submitted(events: List[AssignmentEvent]) -> None:
verification_tasks = [create_verification_task(item.assignment) for item in events]
toloka_client.create_tasks(verification_tasks, open_pool=True)
def handle_accepted(events: List[AssignmentEvent]) -> None:
do_some_aggregation([item.assignment for item in events])
async_toloka_client = AsyncTolokaClient.from_sync_client(toloka_client)
observer_123 = AssignmentsObserver(async_toloka_client, pool_id='123')
observer_123.on_submitted(handle_submitted)
observer_456 = AssignmentsObserver(async_toloka_client, pool_id='456')
observer_456.on_accepted(handle_accepted)
pipeline = Pipeline()
pipeline.register(observer_123)
pipeline.register(observer_456)
await pipeline.run()

One-liners version.

pipeline = Pipeline()
pipeline.register(AssignmentsObserver(toloka_client, pool_id='123')).on_submitted(handle_submitted)
pipeline.register(AssignmentsObserver(toloka_client, pool_id='456')).on_accepted(handle_accepted)
await pipeline.run()

With external storage.

from toloka.streaming import S3Storage, ZooKeeperLocker
locker = ZooKeeperLocker(...)
storage = S3Storage(locker=locker, ...)
pipeline = Pipeline(storage=storage)
await pipeline.run() # Save state after each iteration. Try to load saved at start.

Methods summary

MethodDescription
observers_iterIterate over registered observers.
registerRegister given observer.
runNone
run_manuallyNone

Last updated: August 28, 2023

Toloka-Kit
OverviewGetting API keyQuick start
Recipes
Reference
toloka.client
toloka.async_client
toloka.autoquality [autoquality]
toloka.metrics
toloka.streaming