Source code for async_rx.observable.rx_repeat_series

from typing import Any, Optional

import curio

from ..protocol import Observable, Observer, Subscription
from .rx_create import rx_create

__all__ = ["rx_repeat_series"]


[docs]def rx_repeat_series(source: Any, ratio: Optional[float] = 1.0) -> Observable: """Repeat a series (delay, value) as an observable for each subscription. Args: source (Any): iterable or async iterable source of tuple (duration, value) ratio (Optional[float]): ratio apply on duration (1.0 per default) Returns: (Observable): an observable Raise: (RuntimeError): if source is not iterable (sync or async) """ if not hasattr(source, "__iter__") and not hasattr(source, "__aiter__"): raise RuntimeError("source must be (async/sync) iterable") async def _subscribe(an_observer: Observer) -> Subscription: _task = None async def _proceed_item(item: Any): (duration, value) = item await curio.sleep(duration * ratio) await an_observer.on_next(value) async def _producer(): nonlocal _task try: if hasattr(source, "__aiter__"): async for item in source: await _proceed_item(item) else: for item in source: await _proceed_item(item) _task = None # do not cancel this task if concurrent call to _subscribe occurs await an_observer.on_completed() except curio.TaskCancelled: # pragma: no cover # it's time to finish pass _task = await curio.spawn(_producer()) async def _subscribe(): nonlocal _task if _task: # pragma: no cover await _task.cancel() return _subscribe return rx_create(subscribe=_subscribe)