Source code for async_rx.protocol.rx_collector

from collections import namedtuple
from typing import Any, TypeVar

from .definition import Collector

__all__ = ["rx_collector"]

T = TypeVar("T")

CollectorDefinition = namedtuple("CollectorDefinition", ["on_next", "on_error", "on_completed", "result", "is_finish", "has_error", "error"])


[docs]def rx_collector(initial_value: T) -> Collector: """Create an observer collector. Args: initial_value (T): initial value which determin result type (list, dict, base type) Returns: (Collector[T]): a collector instance """ _is_finish = False _has_error = False _error = None if isinstance(initial_value, dict): _dict = dict(initial_value) async def _on_next(item: Any): nonlocal _dict (k, v) = item _dict[k] = v def _get_result() -> Any: nonlocal _dict return _dict elif isinstance(initial_value, list): _list = list(initial_value) async def _on_next(item: Any): nonlocal _list _list.append(item) def _get_result() -> Any: nonlocal _list return _list else: _value = initial_value async def _on_next(item: Any): nonlocal _value _value = item def _get_result() -> Any: nonlocal _value return _value async def _on_completed(): nonlocal _is_finish _is_finish = True async def _on_error(err: Any): nonlocal _has_error, _error _error = err _has_error = True def _get_is_finish(): nonlocal _is_finish return _is_finish def _get_has_error(): nonlocal _has_error return _has_error def _get_error(): nonlocal _error return _error return CollectorDefinition( on_next=_on_next, on_error=_on_error, on_completed=_on_completed, result=_get_result, is_finish=_get_is_finish, has_error=_get_has_error, error=_get_error, )