Source code for async_rx.observable.rx_debounce

from datetime import datetime, timedelta
from typing import Any, Optional

import curio

from ..protocol import Observable, Observer, Subscription, rx_observer
from .rx_create import rx_create

__all__ = ["rx_debounce"]


[docs]def rx_debounce(an_observable: Observable, duration: timedelta) -> Observable: """Debounce operator. Debounce are used to rate-limit the sequence. Debounce will delay a value when it arrives and only emits the last value in a burst of events after the set delay is over and no new event arrives during this delay. Args: an_observable (Observable): an observable instance duration (timedelta): timedelta of interval (the duration) Returns: (Observable): observable instance Raise: (RuntimeError): if no observable or duration are provided """ if not an_observable or not duration: raise RuntimeError("observable and duration are mandatory") async def _subscribe(an_observer: Observer) -> Subscription: _lastest_value_time = None _lastest_value = None _consumer_task = None _subscription: Optional[Subscription] = None _sleep_duration = duration.total_seconds() async def consumer(): nonlocal _sleep_duration, _lastest_value, _lastest_value_time try: while True: await curio.sleep(_sleep_duration) # add duration delay before process a new one if _lastest_value_time and (_lastest_value_time + duration <= datetime.utcnow()): # no value between time delta await an_observer.on_next(_lastest_value) _lastest_value_time = None except curio.TaskCancelled: # it's time to finish pass async def _on_next(item: Any): nonlocal _lastest_value, _lastest_value_time _lastest_value = item _lastest_value_time = datetime.utcnow() async def _cancel_consumer(): nonlocal _consumer_task if _consumer_task: await _consumer_task.cancel() _consumer_task = None async def _on_completed(): nonlocal _consumer_task await _cancel_consumer() await an_observer.on_completed() async def _on_error(err: Any): nonlocal _consumer_task await _cancel_consumer() await an_observer.on_error(err=err) async def _subscribe(): nonlocal _consumer_task, _subscription await _cancel_consumer() if _subscription: await _subscription() _subscription = None _consumer_task = await curio.spawn(consumer()) _subscription = await an_observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)) return _subscribe return rx_create(subscribe=_subscribe, max_observer=1)