Source code for async_rx.observable.rx_distinct

from collections import deque
from typing import Any, Deque

from ..protocol import Observable, Observer, Subscription, rx_observer_from
from .rx_create import rx_create

__all__ = ["rx_distinct"]


[docs]def rx_distinct(observable: Observable, frame_size: int) -> Observable: """Create an observable which send distinct event inside a windows of size #frame_size. Args: observable (Observable): observable source frame_size (int): windows size Returns: (Observable): observable instance Raise: (RuntimeError): if frame_size <= 0 """ if frame_size <= 0: raise RuntimeError('framesize must be greather than zero') async def _subscribe(an_observer: Observer) -> Subscription: # our frame buffer _q: Deque = deque(maxlen=frame_size) async def _on_next(item: Any): nonlocal _q if item not in _q: # distinct value _q.append(item) await an_observer.on_next(item) async def _on_completed(): nonlocal _q _q.clear() await an_observer.on_completed() return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next, on_completed=_on_completed)) return rx_create(subscribe=_subscribe)