Source code for async_rx.observable.rx_map

from inspect import iscoroutinefunction
from typing import Any, Callable, Optional

from ..protocol import Observable, Observer, Subscription, rx_observer_from
from .rx_create import rx_create

__all__ = ["rx_map"]


[docs]def rx_map( observable: Observable, transform: Callable, expand_arg_parameters: Optional[bool] = False, expand_kwarg_parameters: Optional[bool] = False ) -> Observable: """Map operator. Map operator modifies an Observable<A> into Observable<B> given a function with the type A->B. For example, if we take the function x => 10 ∗ x and a list of 1,2,3. The result is 10,20,30, see figure 4. Note that this function did not change the type of the Observable but did change the values. Args: observable (Observable): an observable instance transform (Callable): transform function (sync or async) expand_arg_parameters (Optional[bool]): if true each item will be expanded as args before call transform (implique expand_kwarg_parameters = False). expand_kwarg_parameters (Optional[bool]): if true each item will be expanded as kwargs before call transform. Returns: (Observable): observable instance """ _is_awaitable = iscoroutinefunction(transform) async def _subscribe(an_observer: Observer) -> Subscription: async def _on_next(item: Any): nonlocal _is_awaitable if expand_kwarg_parameters: _next_item = await transform(**item) if _is_awaitable else transform(**item) elif expand_arg_parameters: _next_item = await transform(*item) if _is_awaitable else transform(*item) else: _next_item = await transform(item) if _is_awaitable else transform(item) await an_observer.on_next(_next_item) return await observable.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next)) return rx_create(subscribe=_subscribe)