Source code for async_rx.observable.rx_empty

from ..protocol import Observable, Observer, Subscription, default_subscription
from .rx_create import rx_create

__all__ = ["rx_empty"]


[docs]def rx_empty() -> Observable: """Create an empty Observable. An "empty" Observable emits only the complete notification. Returns: (Observable) observable instance """ async def _subscribe(an_observer: Observer) -> Subscription: await an_observer.on_completed() return default_subscription return rx_create(subscribe=_subscribe)