Source code for async_rx.observable.rx_concat

from typing import Any, Optional

from ..protocol import Observable, Observer, Subscription, rx_observer_from
from .rx_create import rx_create

__all__ = ["rx_concat"]


[docs]def rx_concat(*observables: Observable) -> Observable: """Concat operator. Merge and Concat combine multiple sequences into one. Merge might interweave elements from different sequence whereas Concat emits all elements from the first sequence before turning to the next one. Args: observables (Observable): a list of observable instance Returns: (Observable): observable instance Raise: (RuntimeError): if len(observables) <= 0 """ if len(observables) <= 0: raise RuntimeError("#observables must be greather than zero") async def _subscribe(an_observer: Observer) -> Subscription: _source = list(observables) _current_subscription: Optional[Subscription] = None _observer: Optional[Observer] = None async def _on_completed() -> None: nonlocal _source, _current_subscription, _observer await _unsubscribe() if _source: obs = _source.pop(0) if _observer: _current_subscription = await obs.subscribe(_observer) else: await an_observer.on_completed() return None async def _on_error(err: Any) -> None: await _unsubscribe() await an_observer.on_error(err=err) return None async def _unsubscribe() -> None: nonlocal _current_subscription if _current_subscription: await _current_subscription() _current_subscription = None _observer = rx_observer_from(observer=an_observer, on_completed=_on_completed, on_error=_on_error) # initiate _current_subscription = await _source.pop(0).subscribe(_observer) return _unsubscribe return rx_create(subscribe=_subscribe, max_observer=1)