from inspect import iscoroutinefunction
from typing import Any, Callable, Optional
from ..protocol import Observable, Observer, Subscription, rx_observer_from
from .rx_create import rx_create
__all__ = ["rx_map"]
[docs]def rx_map(
    observable: Observable, transform: Callable, expand_arg_parameters: Optional[bool] = False, expand_kwarg_parameters: Optional[bool] = False
) -> Observable:
    """Map operator.
    Map operator modifies an Observable<A> into Observable<B> given a function with the type A->B.
    For example, if we take the function x => 10 ∗ x and a list of 1,2,3. The result is 10,20,30, see figure 4.
    Note that this function did not change the type of the Observable but did change the values.
    Args:
        observable (Observable): an observable instance
        transform (Callable): transform function (sync or async)
        expand_arg_parameters (Optional[bool]): if true each item will be expanded as args before call transform
            (implique expand_kwarg_parameters = False).
        expand_kwarg_parameters (Optional[bool]): if true each item will be expanded as kwargs before call transform.
    Returns:
        (Observable): observable instance
    """
    _is_awaitable = iscoroutinefunction(transform)
    async def _subscribe(an_observer: Observer) -> Subscription:
        async def _on_next(item: Any):
            nonlocal _is_awaitable
            if expand_kwarg_parameters:
                _next_item = await transform(**item) if _is_awaitable else transform(**item)
            elif expand_arg_parameters:
                _next_item = await transform(*item) if _is_awaitable else transform(*item)
            else:
                _next_item = await transform(item) if _is_awaitable else transform(item)
            await an_observer.on_next(_next_item)
        return await observable.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next))
    return rx_create(subscribe=_subscribe)