from inspect import iscoroutinefunction
from typing import Any, Optional, TypeVar
from ..protocol import AccumulatorOperator, Observable, Observer, Subscription, rx_observer_from
from .rx_create import rx_create
__all__ = ["rx_reduce"]
T = TypeVar('T')
[docs]def rx_reduce(observable: Observable, accumulator: AccumulatorOperator, seed: Optional[Any] = None) -> Observable:
"""Create an observable which reduce source with accumulator and seed value.
Args:
observable (Observable): source
accumulator (AccumulatorOperator): accumulator function (two argument, one result) async or sync.
seed (Optional[Any]): optional seed value (default none)
Returns:
(Observable): a new observable
"""
is_awaitable = iscoroutinefunction(accumulator)
async def _subscribe(an_observer: Observer) -> Subscription:
nonlocal is_awaitable
_buffer = seed
async def _on_next(item: Any):
nonlocal _buffer
_buffer = await accumulator(_buffer, item) if is_awaitable else accumulator(_buffer, item)
async def _on_completed():
nonlocal _buffer
await an_observer.on_next(_buffer)
await an_observer.on_completed()
return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next, on_completed=_on_completed))
return rx_create(subscribe=_subscribe)