Source code for async_rx.observable.rx_merge

from typing import Any, List, NoReturn, Optional

import curio

from ..protocol import Observable, Observer, Subscription, rx_observer
from .rx_create import rx_create

__all__ = ["rx_merge"]


[docs]def rx_merge(*observables: Observable) -> Observable: """Flattens multiple Observables together by blending their values into one Observable. Creates an output Observable which concurrently emits all values from every given input Observable. 'merge' subscribes to each given input Observable (either the source or an Observable given as argument), and simply forwards (without doing any transformation) all the values from all the input Observables to the output Observable. The output Observable only completes once all input Observables have completed. Any error delivered by an input Observable will be immediately emitted on the output Observable. Args: observables (Observable): a list of observable instance Returns: (Observable): observable instance Raise: (RuntimeError): if #observables < 1 """ if len(observables) < 1: raise RuntimeError("#observables must be greather than 1") terminated_observable = 0 deliver_next = True subscriptions: List[Subscription] = [] async def _subscription_handler() -> None: nonlocal subscriptions for s in subscriptions: await s() async def _subscribe(an_observer: Observer) -> Subscription: nonlocal subscriptions async def _on_next(item: Any) -> None: # filter item according to deliver_next nonlocal deliver_next if deliver_next: # if no previous error await an_observer.on_next(item) return None async def _on_completed() -> None: nonlocal terminated_observable, deliver_next if deliver_next: # if no previous error terminated_observable += 1 if terminated_observable == len(observables): # and all observable complete # lock on_next, on_error handler call and other on_completed call. deliver_next = False await an_observer.on_completed() return None async def _on_error(err: Any) -> Optional[NoReturn]: nonlocal deliver_next if deliver_next: # lock on_next, on_completed handler call and other on_error call. deliver_next = False await an_observer.on_error(err) return None # local observer definition _observer = rx_observer(on_next=_on_next, on_completed=_on_completed, on_error=_on_error) # local observer subscribe to all observables in parallele _tasks = [] async with curio.TaskGroup(wait=all) as g: for an_observable in observables: _tasks.append(await g.spawn(an_observable.subscribe, _observer)) subscriptions = [t.result for t in _tasks] return _subscription_handler return rx_create(subscribe=_subscribe, max_observer=1)