Source code for async_rx.observable.rx_forward

from typing import Any

from ..protocol import Observable, Observer, Subscription, rx_observer
from .rx_create import rx_create

__all__ = ["rx_forward"]


[docs]def rx_forward(observable: Observable, except_complet: bool = False, except_error: bool = False) -> Observable: """Create an observable wich forward event. Args: observable (Observable): observable source except_complet (bool): if true then did not forward 'on_complet' (default: {False}) except_error (bool): if true then did not forward 'on_error' (default: {False}) Returns: (Observable): observable instance. """ async def _dummy_on_completed(): pass async def _dummy_on_error(err: Any): pass async def _subscribe(an_observer: Observer) -> Subscription: return await observable.subscribe( an_observer=rx_observer( on_next=an_observer.on_next, on_error=_dummy_on_error if except_error else an_observer.on_error, on_completed=_dummy_on_completed if except_complet else an_observer.on_completed, ) ) return rx_create(subscribe=_subscribe)