"""Protocol definition."""
import sys
from typing import Any, NoReturn, Optional, TypeVar, Union
# Protocol is only available in Python 3.8+.
if sys.version_info.minor > 7: # pragma: no cover
from typing import Protocol
else: # pragma: no cover
from typing_extensions import Protocol # type: ignore
__all__ = [
"Subscription",
"NextHandler",
"CompleteHandler",
"ErrorHandler",
"Observable",
"Observer",
"Collector",
"Subscribe",
"Subject",
"ConnectHandler",
"RefCountHandler",
"ConnectableObservable",
"ObservableFactory",
"SubjectEventHandler",
"SubjectHandler",
"ConnectableObservableEventHandler",
"ConnectableObservableHandler",
"PredicateOperator",
"AccumulatorOperator",
"SubjectFactory",
]
T = TypeVar('T')
[docs]class Subscription(Protocol):
"""Subscription Protocol.
Subscription is a function to release resources or cancel Observable executions (act as a Disposable).
It define something to be used and thrown away after you call it.
"""
[docs] async def __call__(self) -> None: # pragma: no cover
"""Release subscription."""
pass
[docs]class NextHandler(Protocol):
"""NextHandler Protocol.
A next handler process an item from associated observable.
"""
[docs] async def __call__(self, item: Any) -> None: # pragma: no cover
"""Process item."""
pass
[docs]class CompleteHandler(Protocol):
"""CompleteHandler Protocol.
A complete handler is call when no more item will came from the associated observable.
"""
[docs] async def __call__(self) -> None: # pragma: no cover
"""Signal completion of this observable."""
pass
[docs]class ErrorHandler(Protocol):
"""ErrorHandler Protocol.
An error handler receive a message or an exception and raise it.
"""
[docs] async def __call__(self, err: Any) -> Optional[NoReturn]: # pragma: no cover
"""Raise error.
Args:
err (Union[Any, Exception]): the error to raise
Raises:
(Exception): the exception
"""
pass
[docs]class Observer(Protocol):
"""Observer Protocol.
What is an Observer?
An Observer is a consumer of values delivered by an Observable.
Observers are simply a set of callbacks, one for each type of notification
delivered by the Observable:
- next,
- error,
- and complete.
Observers are just "objects" with three callbacks, one for each type of
notification that an Observable may deliver.
"""
[docs] async def on_next(self, item: Any) -> None: # pragma: no cover
"""Process item."""
pass
[docs] async def on_completed(self) -> None: # pragma: no cover
"""Signal completion of this observable."""
pass
[docs] async def on_error(self, err: Any) -> Optional[NoReturn]: # pragma: no cover
pass
[docs]class Collector(Observer, Protocol):
"""Collector Observer Protocol."""
[docs] def result(self) -> Any: # pragma: no cover
"""Returns result."""
pass
[docs] def is_finish(self) -> bool: # pragma: no cover
"""Return true if observable has completed."""
pass
[docs] def has_error(self) -> bool: # pragma: no cover
"""Return true if observable has meet error."""
pass
[docs] def error(self) -> Any: # pragma: no cover
"""Return error if observable has meet error."""
pass
[docs]class Subscribe(Protocol):
"""Subscribe Protocol.
It's a (sync/async) function wich take an observer and return a subscription.
"""
[docs] async def __call__(self, an_observer: Observer) -> Subscription: # pragma: no cover
"""Implement observer subscription.
Args:
observer (Observer): the observer instance
Returns:
(Subscription): subscription
"""
pass
[docs]class Observable(Protocol):
"""Observable Protocol.
An observable is something on which we can subscribe to listen event.
"""
[docs] async def subscribe(self, an_observer: Observer) -> Subscription: # pragma: no cover
pass
[docs]class ObservableFactory(Protocol):
"""Async ObservableFactory Protocol.
Define function which create Observable.
"""
[docs] async def __call__(self) -> Observable: # pragma: no cover
"""Create an Observable.
Returns:
(Observable): the new observable instance.
"""
pass
[docs]class Subject(Observable, Observer, Protocol):
"""A Subject is like an Observable, but can multicast to many Observers.
Subjects are like EventEmitters: they maintain a registry of many listeners.
"""
pass
[docs]class SubjectEventHandler(Protocol):
"""Subject Event Handler Procotol."""
async def __call__(self, count: int, source: Observer) -> None: # pragma: no cover
pass
[docs]class SubjectHandler(Protocol):
"""Subscribe Handler Protocol.
This handler could be called on subscription/unsubscribe event.
"""
[docs] async def on_subscribe(self, count: int, source: Observer) -> None: # pragma: no cover
"""Notify on subscribe event.
Args:
count (int): current #subscribers after subscription
source (Observer): observer source
"""
pass
[docs] async def on_unsubscribe(self, count: int, source: Observer) -> None: # pragma: no cover
"""Notify on unsubscribe event.
Args:
count (int): current #subscribers after unsubscribe
source (Observer): observer source
"""
pass
[docs]class SubjectFactory(Protocol):
def __call__(self, subject_handler: Optional[SubjectHandler] = None) -> Subject: # pragma: no cover
pass
[docs]class ConnectHandler(Protocol):
"""Connect Handler Protocol."""
async def __call__(self) -> Subscription: # pragma: no cover
pass
[docs]class RefCountHandler(Protocol):
"""RefCount Handler Protocol."""
async def __call__(self) -> Observable: # pragma: no cover
pass
[docs]class ConnectableObservable(Observable, Protocol):
"""Define a connectable observable protocol.
We have :
- subscribe function (it's an observable)
- connect function: start executing
- ref_count function: makes the Observable automatically start executing
when the first subscriber arrives,
and stop executing when the last subscriber leaves.
"""
[docs] async def connect(self) -> Subscription: # pragma: no cover
"""Connect."""
pass
[docs] async def ref_count(self) -> Observable: # pragma: no cover
"""Reference counter.
Make the multicasted Observable automatically start executing when
the first subscriber arrives,
and stop executing when the last subscriber leaves.
"""
pass
[docs]class ConnectableObservableEventHandler(Protocol):
"""Connectable Observable Event Handler Protocol."""
async def __call__(self) -> None: # pragma: no cover
pass
[docs]class ConnectableObservableHandler(Protocol):
"""Connectable Observable Handler Protocol.
This handler could be called on conect/disconnect event.
"""
[docs] async def on_connect(self) -> None: # pragma: no cover
"""Called on connect event."""
pass
[docs] async def on_disconnect(self) -> None: # pragma: no cover
"""Called on disconnect event."""
pass
class _AsyncAccumulatorOperator(Protocol[T]):
"""Async Accumulator Operator Protocol.
Accumulator are used in reduce operation.
"""
async def __call__(self, buffer: T, item: T) -> T: # pragma: no cover
pass
class _SyncAccumulatorOperator(Protocol[T]):
"""Async Accumulator Operator Protocol.
Accumulator are used in reduce operation.
"""
def __call__(self, buffer: T, item: T) -> T: # pragma: no cover
pass
AccumulatorOperator = Union[_AsyncAccumulatorOperator, _SyncAccumulatorOperator]
"""Accumulator Operator Protocol.
Accumulator are used in reduce operation.
"""
class _AsyncPredicateOperator(Protocol):
"""Async Predicate Operator Protocol.
Predicate are used in filter operation.
"""
async def __call__(self, item: Any) -> bool: # pragma: no cover
pass
class _SyncPredicateOperator(Protocol):
"""Sync Predicate Operator Protocol.
Predicate are used in filter operation.
"""
def __call__(self, item: Any) -> bool: # pragma: no cover
pass
PredicateOperator = Union[_AsyncPredicateOperator, _SyncPredicateOperator]
"""Predicate Operator Protocol.
Predicate are used in filter operation.
"""