Source code for async_rx.observable.rx_repeat

from datetime import timedelta
from inspect import iscoroutinefunction
from typing import Callable, Optional

from curio import TaskCancelled, spawn, time

from ..protocol import Observable, Observer, Subscription
from .rx_create import rx_create

__all__ = ["rx_repeat"]


[docs]def rx_repeat(duration: timedelta, producer: Callable, initial_delay: Optional[timedelta] = None) -> Observable: """Repeat data. rx_repeat send data generated by producer function at duration rate until observer dispose his subscription. Args: duration (timedelta): duration between each sended item producer (Callable): producer (asyn/sync) function initial_delay (Optional[timedelta]): initial delay before produce value (default: None) Returns: (Observable): observable instance Raise: (RuntimeError): if no producer or duration are provided """ if not producer or not duration: raise RuntimeError("producer and duration are mandatory") _is_awaitable = iscoroutinefunction(producer) _duration = duration.total_seconds() async def _subscribe(an_observer: Observer) -> Subscription: _task = None async def _producer(): nonlocal _duration, _is_awaitable try: # initial delay if initial_delay: await time.sleep(initial_delay.total_seconds()) while True: start = await time.clock() value = await producer() if _is_awaitable else producer() await an_observer.on_next(value) duration = await time.clock() - start # adjust wait time time_shift = _duration - duration if time_shift > 0: await time.sleep(time_shift) except TaskCancelled: # it's time to finish pass _task = await spawn(_producer()) async def _subscribe(): nonlocal _task if _task: await an_observer.on_completed() await _task.cancel() _task = None return _subscribe return rx_create(subscribe=_subscribe)