Source code for async_rx.observable.rx_range

from ..protocol import Observable, Observer, Subscription, default_subscription
from .rx_create import rx_create

__all__ = ["rx_range"]


[docs]def rx_range(start: int, stop: int, step: int = 1) -> Observable: """Create an observable sequence of range. Args: start (int): initiale value stop (int): last value step (int): default increment (default: {1}) Returns: (Observable): observable instance. """ async def _subscribe(an_observer: Observer) -> Subscription: for i in range(start, stop, step): await an_observer.on_next(i) await an_observer.on_completed() return default_subscription return rx_create(subscribe=_subscribe)