Source code for async_rx.observable.rx_first

from ..protocol import Observable
from .rx_take import rx_take

__all__ = ["rx_first"]


[docs]def rx_first(observable: Observable) -> Observable: """Create an observale which only take the first event and complete. Args: observable (Observable): observable source Returns: (Observable): observable instance """ return rx_take(observable=observable, count=1)