Source code for async_rx.observable.rx_create

"""Define rx_create."""
from typing import NoReturn, Optional, Union

from ..protocol import Observable, Observer, Subscribe, Subscription, observable

__all__ = ["rx_create"]


[docs]def rx_create(subscribe: Subscribe, ensure_contract: Optional[bool] = True, max_observer: Optional[int] = None) -> Union[Observable, NoReturn]: """Create an observable with specific delayed execution 'subscribe'. Observables can be created with create, but usually we use the so-called creation operators, like of, from, interval, etc. Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to. Args: subscribe (Subscribe): subcribe function to use on observable ensure_contract (bool): boolean flag (default True) to ensure that this observable will follow Observable contract. max_observer (int): maximum observer on this Observable (default None <=> unlimited) Returns: (Observable): an observable instance. Raise: (RuntimeError): if subscribe parameter is undefined """ if subscribe is None: raise RuntimeError('a subscribe function must be provided') if max_observer: current_observer = 0 async def _subscribe_tracked(an_observer: Observer) -> Subscription: nonlocal current_observer if current_observer == max_observer: raise RuntimeError(f'{max_observer} #observers limit reached') current_observer = current_observer + 1 subscription = await subscribe(an_observer) async def _unsubscribe(): nonlocal current_observer current_observer = current_observer - 1 return await subscription() return _unsubscribe return observable(subscribe=_subscribe_tracked, ensure_contract=ensure_contract) return observable(subscribe=subscribe, ensure_contract=ensure_contract)