Source code for async_rx.subject.rx_subject_replay

from collections import deque
from typing import Any, Deque, NoReturn, Optional

from ..protocol import Observer, Subject, SubjectHandler, Subscription, subject
from .rx_subject import rx_subject

__all__ = ["rx_subject_replay"]


[docs]def rx_subject_replay(buffer_size: int, subject_handler: Optional[SubjectHandler] = None) -> Subject: """Create a replay subject. A ReplaySubject is similar to a BehaviorSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution. A ReplaySubject records multiple values from the Observable execution and replays them to new subscribers. When a replay occurs, completed and error events are also replayed. Args: buffer_size (int): buffer size, or #items which be replayed on subscription subject_handler (Optional[SubjectHandler]): optional suject handler callback Returns: (Subject): the subject Raise: (RuntimeError): if buffer_size <= 0 """ if buffer_size <= 0: raise RuntimeError("buffer_size must be greater than zero!") _queue: Deque = deque(maxlen=buffer_size) _has_completed = False _error = None _subject = rx_subject(subject_handler=subject_handler) async def _on_next(item: Any) -> None: nonlocal _queue, _subject _queue.append(item) await _subject.on_next(item) async def _subscribe(an_observer: Observer) -> Subscription: nonlocal _queue, _subject, _has_completed, _error subscription = await _subject.subscribe(an_observer) if _queue: for value in _queue: await an_observer.on_next(value) if _error: await an_observer.on_error(_error) elif _has_completed: await an_observer.on_completed() return subscription async def _on_complete(): nonlocal _has_completed _has_completed = True await _subject.on_completed() async def _on_error(err: Any) -> Optional[NoReturn]: nonlocal _error _error = err await _subject.on_error(err=err) return None return subject(subscribe=_subscribe, on_next=_on_next, on_error=_on_error, on_completed=_on_complete)