Source code for async_rx.observable.rx_delay

from datetime import timedelta
from typing import Any, Optional

import curio

from ..protocol import Observable, Observer, Subscription, rx_observer
from .rx_create import rx_create

__all__ = ["rx_delay"]


[docs]def rx_delay(observable: Observable, duration: timedelta, buffer_size: Optional[int] = None, ignore_events_if_full: Optional[bool] = True) -> Observable: """Delay operator. Delay will project the sequence unmodified, but shifted into the future with a specified delay. Underlaying implementation use a queue and a dedicated consumer. Args: observable (Observable): an observable instance duration (timedelta): timedelta of delay (the duration). buffer_size (Optional[int]): optional buffer size, if not specified size is unlimited (ignore_events_if_full has no meaning, but not your memory...) ignore_events_if_full (Optional[bool]): When true, if internal buffer (here a queue) is full, events will be ignored until older will be consumed. Otherwise, producer will be locked until older will be consumed. Returns: (Observable): observable instance Raise: (RuntimeError): if no observable or duration are provided or buffer_size <= 0 """ if not observable or not duration: raise RuntimeError("observable and duration are mandatory") if buffer_size and buffer_size <= 0: raise RuntimeError("buffer_size must be greather than zero or None") async def _subscribe(an_observer: Observer) -> Subscription: _queue = curio.Queue(buffer_size) if buffer_size else curio.Queue() _consumer_task = None _subscription: Optional[Subscription] = None _duration = duration.total_seconds() async def consumer(): nonlocal _queue, _duration try: while True: item = await _queue.get() # retreaive an item (lock until one) await curio.sleep(_duration) # add duration delay before send await an_observer.on_next(item) await _queue.task_done() # notify that job is done except curio.TaskCancelled: # it's time to finish pass async def _cancel_consumer(): nonlocal _consumer_task if _consumer_task: await _consumer_task.cancel() _consumer_task = None async def _on_next(item: Any): nonlocal _queue if ignore_events_if_full and _queue.full(): return await _queue.put(item) async def _on_completed(): nonlocal _queue, _consumer_task await _queue.join() # wait complete processing await _cancel_consumer() await an_observer.on_completed() async def _on_error(err: Any): nonlocal _consumer_task await curio.sleep(_duration) # add duration delay on error await _cancel_consumer() await an_observer.on_error(err=err) async def _subscribe(): nonlocal _consumer_task, _subscription await _cancel_consumer() if _subscription: await _subscription() _subscription = None _consumer_task = await curio.spawn(consumer()) _subscription = await observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)) return _subscribe return rx_create(subscribe=_subscribe, max_observer=1)