from datetime import timedelta
from inspect import iscoroutinefunction
from typing import Callable, Optional
from curio import TaskCancelled, spawn, time
from ..protocol import Observable, Observer, Subscription
from .rx_create import rx_create
__all__ = ["rx_repeat"]
[docs]def rx_repeat(duration: timedelta, producer: Callable, initial_delay: Optional[timedelta] = None) -> Observable:
"""Repeat data.
rx_repeat send data generated by producer function at duration rate until observer
dispose his subscription.
Args:
duration (timedelta): duration between each sended item
producer (Callable): producer (asyn/sync) function
initial_delay (Optional[timedelta]): initial delay before produce value (default: None)
Returns:
(Observable): observable instance
Raise:
(RuntimeError): if no producer or duration are provided
"""
if not producer or not duration:
raise RuntimeError("producer and duration are mandatory")
_is_awaitable = iscoroutinefunction(producer)
_duration = duration.total_seconds()
async def _subscribe(an_observer: Observer) -> Subscription:
_task = None
async def _producer():
nonlocal _duration, _is_awaitable
try:
# initial delay
if initial_delay:
await time.sleep(initial_delay.total_seconds())
while True:
start = await time.clock()
value = await producer() if _is_awaitable else producer()
await an_observer.on_next(value)
duration = await time.clock() - start
# adjust wait time
time_shift = _duration - duration
if time_shift > 0:
await time.sleep(time_shift)
except TaskCancelled:
# it's time to finish
pass
_task = await spawn(_producer())
async def _subscribe():
nonlocal _task
if _task:
await an_observer.on_completed()
await _task.cancel()
_task = None
return _subscribe
return rx_create(subscribe=_subscribe)