API Reference¶
Contents
async-rx¶
async-rx definition.
Classes:
|
Subscription Protocol. |
|
NextHandler Protocol. |
|
CompleteHandler Protocol. |
|
ErrorHandler Protocol. |
|
Observable Protocol. |
|
Observer Protocol. |
|
Collector Observer Protocol. |
|
Subscribe Protocol. |
|
A Subject is like an Observable, but can multicast to many Observers. |
|
Connect Handler Protocol. |
|
RefCount Handler Protocol. |
|
Define a connectable observable protocol. |
|
Async ObservableFactory Protocol. |
|
Subject Event Handler Procotol. |
|
Subscribe Handler Protocol. |
|
Connectable Observable Event Handler Protocol. |
|
Connectable Observable Handler Protocol. |
|
Functions:
|
Return an observer. |
|
Build an observer from another one. |
|
Create an observer collector. |
|
Create an observable with specific delayed execution ‘subscribe’. |
|
Create an observable when a subscription occurs. |
|
Create an observable which send distinct event inside a windows of size #frame_size. |
|
Create an empty Observable. |
|
Create an observable which event are filtered by a predicate function. |
|
Create an observale which only take the first event and complete. |
|
Create an observable wich forward event. |
|
Convert almost anything to an Observable. |
|
Create an observale which only take #count (or less) last events and complete. |
|
Convert arguments into an observable sequence. |
|
Create an observable sequence of range. |
|
Create an obervable wich skip #count event on source. |
|
Create an observable which take only first #count event maximum (could be less). |
|
Create an observable wich always call error. |
|
Create an observable which reduce source with accumulator and seed value. |
|
Create an observable wich counts the emissions on the source and emits result. |
|
Create an observable wich returns the maximal item in the source when completes. |
|
Create an observable wich returns minimal item in the source when completes. |
|
Create an observable wich return the sum items in the source when completes. |
|
Create an observable wich return the average items in the source when completes. |
|
Buffer operator. |
|
Window operator. |
|
Flattens multiple Observables together by blending their values into one Observable. |
|
Concat operator. |
|
Combine multiple Observables to create an Observable. |
|
Amb operator. |
|
Map operator. |
|
Merge map operator. |
|
Group by operator. |
|
Sample operator used to rate-limit the sequence. |
|
Throttle operator. |
|
Delay operator. |
|
Debounce operator. |
|
Create an observable on dictionnary. |
|
Create an observable on list. |
|
Repeat data. |
|
Repeat a series (delay, value) as an observable for each subscription. |
|
Create a subject. |
|
Build a subject from another one by override some function. |
|
Create a replay subject. |
|
Create a behavior subject. |
|
Create a Connectable Observable. |
|
Create a publish_replay. |
|
Create a publish_behavior. |
-
class
async_rx.
Subscription
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Subscription Protocol.
Subscription is a function to release resources or cancel Observable executions (act as a Disposable). It define something to be used and thrown away after you call it.
Methods:
__call__
()Release subscription.
-
class
async_rx.
NextHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
NextHandler Protocol.
A next handler process an item from associated observable.
Methods:
__call__
(item)Process item.
-
class
async_rx.
CompleteHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
CompleteHandler Protocol.
A complete handler is call when no more item will came from the associated observable.
Methods:
__call__
()Signal completion of this observable.
-
class
async_rx.
ErrorHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
ErrorHandler Protocol.
An error handler receive a message or an exception and raise it.
Methods:
__call__
(err)Raise error.
-
class
async_rx.
Observable
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Observable Protocol.
An observable is something on which we can subscribe to listen event.
Methods:
subscribe
(an_observer)
-
class
async_rx.
Observer
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Observer Protocol.
What is an Observer?
An Observer is a consumer of values delivered by an Observable.
Observers are simply a set of callbacks, one for each type of notification delivered by the Observable:
next,
error,
and complete.
Observers are just “objects” with three callbacks, one for each type of notification that an Observable may deliver.
Methods:
Signal completion of this observable.
on_error
(err)on_next
(item)Process item.
-
class
async_rx.
Collector
(*args, **kwargs)[source]¶ Bases:
async_rx.protocol.definition.Observer
,typing.Protocol
Collector Observer Protocol.
Methods:
error
()Return error if observable has meet error.
Return true if observable has meet error.
Return true if observable has completed.
result
()Returns result.
-
class
async_rx.
Subscribe
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Subscribe Protocol.
It’s a (sync/async) function wich take an observer and return a subscription.
Methods:
__call__
(an_observer)Implement observer subscription.
-
class
async_rx.
Subject
(*args, **kwargs)[source]¶ Bases:
async_rx.protocol.definition.Observable
,async_rx.protocol.definition.Observer
,typing.Protocol
A Subject is like an Observable, but can multicast to many Observers.
Subjects are like EventEmitters: they maintain a registry of many listeners.
-
class
async_rx.
ConnectHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Connect Handler Protocol.
-
class
async_rx.
RefCountHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
RefCount Handler Protocol.
-
class
async_rx.
ConnectableObservable
(*args, **kwargs)[source]¶ Bases:
async_rx.protocol.definition.Observable
,typing.Protocol
Define a connectable observable protocol.
- We have :
subscribe function (it’s an observable)
connect function: start executing
- ref_count function: makes the Observable automatically start executing
when the first subscriber arrives, and stop executing when the last subscriber leaves.
Methods:
connect
()Connect.
Reference counter.
-
class
async_rx.
ObservableFactory
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Async ObservableFactory Protocol.
Define function which create Observable.
Methods:
__call__
()Create an Observable.
-
class
async_rx.
SubjectEventHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Subject Event Handler Procotol.
-
class
async_rx.
SubjectHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Subscribe Handler Protocol.
This handler could be called on subscription/unsubscribe event.
Methods:
on_subscribe
(count, source)Notify on subscribe event.
on_unsubscribe
(count, source)Notify on unsubscribe event.
-
class
async_rx.
ConnectableObservableEventHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Connectable Observable Event Handler Protocol.
-
class
async_rx.
ConnectableObservableHandler
(*args, **kwargs)[source]¶ Bases:
typing.Protocol
Connectable Observable Handler Protocol.
This handler could be called on conect/disconnect event.
Methods:
Called on connect event.
Called on disconnect event.
-
async_rx.
rx_observer
(on_next: async_rx.protocol.definition.NextHandler, on_error: async_rx.protocol.definition.ErrorHandler = <function default_error>, on_completed: async_rx.protocol.definition.CompleteHandler = <function default_on_completed>) → async_rx.protocol.definition.Observer[source]¶ Return an observer.
The underlying implementation use an named tuple.
- Parameters
on_next (NextHandler) – on_next handler which process items
on_error (ErrorHandler) – on_error handler (default with default_error which raise Exception)
on_completed (CompleteHandler) – on_completed handler (default with noop)
- Returns
an Observer
- Return type
(Observer)
-
async_rx.
rx_observer_from
(observer: async_rx.protocol.definition.Observer, on_next: Optional[async_rx.protocol.definition.NextHandler] = None, on_error: Optional[async_rx.protocol.definition.ErrorHandler] = None, on_completed: Optional[async_rx.protocol.definition.CompleteHandler] = None) → async_rx.protocol.definition.Observer[source]¶ Build an observer from another one.
- Parameters
observer (Observer) – the observer to override
on_next (Optional[NextHandler]) – override on_next handler if set
on_error (Optional[ErrorHandler]) – override on_error handler if set
on_completed (Optional[CompleteHandler]) – override on_completed handler if set
- Returns
an Observer
- Return type
(Observer)
-
async_rx.
rx_collector
(initial_value: T) → async_rx.protocol.definition.Collector[source]¶ Create an observer collector.
- Parameters
initial_value (T) – initial value which determin result type (list, dict, base type)
- Returns
a collector instance
- Return type
(Collector[T])
-
async_rx.
rx_subject
(subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None) → async_rx.protocol.definition.Subject[source]¶ Create a subject.
A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners, and then dispatch events/items to them.
As subject is also an Observer, it can subscribe to an observable which act at his stream data source.
- Parameters
subject_handler (Optional[SubjectHandler]) – optional suject handler callback
- Returns
the subject
- Return type
(Subject)
Example 1:
# create a subject a_subject = subject(subject_handler=my_handler) # few observer subscribe on this subject sub_1 = await a_subject.subscribe(obs_1) sub_2 = await a_subject.subscribe(obs_2) # the subject subscribe himself on an observable await rx_range(start=0, stop=10).subscribe(a_subject) # obs_1 and obs_2 receive 10 #items
Example 2: A subject as event emitter
# create a subject a_subject = subject() # few observer subscribe on this subject sub_1 = await a_subject.subscribe(obs_1) sub_2 = await a_subject.subscribe(obs_2) # send your data by your self await a_subject.on_next("my value") # obs_1 and obs_2 receive "my value" await a_subject.on_completed() # obs_1 and obs_2 receive on_completed
-
async_rx.
rx_subject_replay
(buffer_size: int, subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None) → async_rx.protocol.definition.Subject[source]¶ Create a replay subject.
A ReplaySubject is similar to a BehaviorSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution.
A ReplaySubject records multiple values from the Observable execution and replays them to new subscribers. When a replay occurs, completed and error events are also replayed.
- Parameters
buffer_size (int) – buffer size, or #items which be replayed on subscription
subject_handler (Optional[SubjectHandler]) – optional suject handler callback
- Returns
the subject
- Return type
(Subject)
- Raise:
(RuntimeError): if buffer_size <= 0
-
async_rx.
rx_subject_behavior
(subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None) → async_rx.protocol.definition.Subject[source]¶ Create a behavior subject.
One of the variants of Subjects is the BehaviorSubject, which has a notion of “the current value”. It stores the latest value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the “current value” from the BehaviorSubject.
BehaviorSubjects are useful for representing “values over time”. For instance, an event stream of birthdays is a Subject, but the stream of a person’s age would be a BehaviorSubject.
- Parameters
subject_handler (Optional[SubjectHandler]) – optional suject handler callback
- Returns
the subject
- Return type
(Subject)
-
async_rx.
rx_subject_from
(a_subject: async_rx.protocol.definition.Subject, subscribe: Optional[async_rx.protocol.definition.Subscribe] = None, on_next: Optional[async_rx.protocol.definition.NextHandler] = None, on_error: Optional[async_rx.protocol.definition.ErrorHandler] = None, on_completed: Optional[async_rx.protocol.definition.CompleteHandler] = None) → async_rx.protocol.definition.Subject[source]¶ Build a subject from another one by override some function.
- Parameters
a_subject (Subject) – the source subject
subscribe (Optional[Subscribe]) – override subscribe if set
on_next (Optional[NextHandler]) – override on_next if set
on_error (Optional[ErrorHandler]) – override on_error if set
on_completed (Optional[CompleteHandler]) – override on_completed if set
- Returns;
(Subject): a new subject
-
async_rx.
rx_publish
(an_observable: async_rx.protocol.definition.Observable, subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None, connection_handler: Optional[async_rx.protocol.definition.ConnectableObservableHandler] = None, subject_factory: async_rx.protocol.definition.SubjectFactory = <function rx_subject>) → async_rx.protocol.definition.ConnectableObservable[source]¶ Create a Connectable Observable.
A multicasted Observable (rx_publish) uses a Subject under the hood to make multiple Observers see the same Observable execution.
- Parameters
an_observable (Observable) – observable to connect
subject_handler (Optional[SubjectHandler]) – optional subject handler
connection_handler (Optional[ConnectableObservableHandler]) – optional connection handler
subject_factory (Optional[SubjectFactory]) – subject factory, per default use subject
- Returns
the multicasted Observable instance
- Return type
-
async_rx.
rx_publish_replay
(an_observable: async_rx.protocol.definition.Observable, buffer_size: int, subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None, connection_handler: Optional[async_rx.protocol.definition.ConnectableObservableHandler] = None) → async_rx.protocol.definition.ConnectableObservable[source]¶ Create a publish_replay.
A publish_replay uses a replay_subject under the hood to make multiple Observers see the same Observable execution.
- Parameters
buffer_size (int) – max #items to replay
an_observable (Observable) – observable to connect
subject_handler (Optional[SubjectHandler]) – optional subject handler
connection_handler (Optional[ConnectableObservableHandler]) – optional connection handler
- Returns
the publish_replay instance
- Return type
-
async_rx.
rx_publish_behavior
(an_observable: async_rx.protocol.definition.Observable, subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None, connection_handler: Optional[async_rx.protocol.definition.ConnectableObservableHandler] = None) → async_rx.protocol.definition.ConnectableObservable[source]¶ Create a publish_behavior.
A publish_behavior uses a behavior_subject under the hood to make multiple Observers see the same Observable execution.
- Parameters
an_observable (Observable) – observable to connect
subject_handler (Optional[SubjectHandler]) – optional subject handler
connection_handler (Optional[ConnectableObservableHandler]) – optional connection handler
- Returns
the publish_behavior instance
- Return type
-
async_rx.
rx_create
(subscribe: async_rx.protocol.definition.Subscribe, ensure_contract: Optional[bool] = True, max_observer: Optional[int] = None) → Union[async_rx.protocol.definition.Observable, NoReturn][source]¶ Create an observable with specific delayed execution ‘subscribe’.
Observables can be created with create, but usually we use the so-called creation operators, like of, from, interval, etc. Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.
- Parameters
subscribe (Subscribe) – subcribe function to use on observable
ensure_contract (bool) – boolean flag (default True) to ensure that this observable will follow Observable contract.
max_observer (int) – maximum observer on this Observable (default None <=> unlimited)
- Returns
an observable instance.
- Return type
- Raise:
(RuntimeError): if subscribe parameter is undefined
-
async_rx.
rx_defer
(observable_factory: async_rx.protocol.definition.ObservableFactory) → async_rx.protocol.definition.Observable[source]¶ Create an observable when a subscription occurs.
Defer allows you to create the Observable only when the Observer subscribes, and create a fresh Observable for each Observer.
It waits until an Observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual Observable.
- Parameters
observable_factory (ObservableFactory) – observable factory
- Returns
- an observable instance wich create observable
when subscription occurs.
- Return type
-
async_rx.
rx_distinct
(observable: async_rx.protocol.definition.Observable, frame_size: int) → async_rx.protocol.definition.Observable[source]¶ Create an observable which send distinct event inside a windows of size #frame_size.
- Parameters
observable (Observable) – observable source
frame_size (int) – windows size
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if frame_size <= 0
-
async_rx.
rx_empty
() → async_rx.protocol.definition.Observable[source]¶ Create an empty Observable.
An “empty” Observable emits only the complete notification.
- Returns
(Observable) observable instance
-
async_rx.
rx_filter
(observable: async_rx.protocol.definition.Observable, predicate: Union[async_rx.protocol.definition._AsyncPredicateOperator, async_rx.protocol.definition._SyncPredicateOperator]) → async_rx.protocol.definition.Observable[source]¶ Create an observable which event are filtered by a predicate function.
- Parameters
observable (Observable) – observable source
predicate (Operator) – predicate function which take on argument and return a truthy value
- Returns
observable instance
- Return type
-
async_rx.
rx_first
(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Create an observale which only take the first event and complete.
- Parameters
observable (Observable) – observable source
- Returns
observable instance
- Return type
-
async_rx.
rx_forward
(observable: async_rx.protocol.definition.Observable, except_complet: bool = False, except_error: bool = False) → async_rx.protocol.definition.Observable[source]¶ Create an observable wich forward event.
- Parameters
observable (Observable) – observable source
except_complet (bool) – if true then did not forward ‘on_complet’ (default: {False})
except_error (bool) – if true then did not forward ‘on_error’ (default: {False})
- Returns
observable instance.
- Return type
-
async_rx.
rx_from
(observable_input: Any) → async_rx.protocol.definition.Observable[source]¶ Convert almost anything to an Observable.
- Anything means:
a dictionnary in an rx_dict
an async iterable
an iterable
something which can be cast to an Observable (have a subscribe function)
an object
- Parameters
observable_input (Any) – A subscribable object
- Returns
- The Observable whose values are originally from the input object
that was converted.
- Return type
-
async_rx.
rx_last
(observable: async_rx.protocol.definition.Observable, count: int = 1) → async_rx.protocol.definition.Observable[source]¶ Create an observale which only take #count (or less) last events and complete.
- Parameters
observable (Observable) – observable source
count (int) – number of event to get (default 1)
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if count <= 0
-
async_rx.
rx_of
(*args) → async_rx.protocol.definition.Observable[source]¶ Convert arguments into an observable sequence.
- Parameters
args – any list of argument to send to observer.
- Returns
observable instance.
- Return type
-
async_rx.
rx_range
(start: int, stop: int, step: int = 1) → async_rx.protocol.definition.Observable[source]¶ Create an observable sequence of range.
- Parameters
start (int) – initiale value
stop (int) – last value
step (int) – default increment (default: {1})
- Returns
observable instance.
- Return type
-
async_rx.
rx_skip
(observable: async_rx.protocol.definition.Observable, count: int) → async_rx.protocol.definition.Observable[source]¶ Create an obervable wich skip #count event on source.
- Parameters
observable (Observable) – observable source
count (int) – number of event to skip
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if count <= 0
-
async_rx.
rx_take
(observable: async_rx.protocol.definition.Observable, count: int) → async_rx.protocol.definition.Observable[source]¶ Create an observable which take only first #count event maximum (could be less).
- Parameters
observable (Observable) – observable source
count (int) – #items to take
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if count <= 0
-
async_rx.
rx_throw
(error: Any) → async_rx.protocol.definition.Observable[source]¶ Create an observable wich always call error.
- Parameters
error (Union[Any, Exception]) – the error to observe
- Returns
observable instance.
- Return type
-
async_rx.
rx_reduce
(observable: async_rx.protocol.definition.Observable, accumulator: Union[async_rx.protocol.definition._AsyncAccumulatorOperator, async_rx.protocol.definition._SyncAccumulatorOperator], seed: Optional[Any] = None) → async_rx.protocol.definition.Observable[source]¶ Create an observable which reduce source with accumulator and seed value.
- Parameters
observable (Observable) – source
accumulator (AccumulatorOperator) – accumulator function (two argument, one result) async or sync.
seed (Optional[Any]) – optional seed value (default none)
- Returns
a new observable
- Return type
-
async_rx.
rx_count
(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Create an observable wich counts the emissions on the source and emits result.
- Parameters
observable (observable) – the observable source
- Returns
observable instance
- Return type
-
async_rx.
rx_max
(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Create an observable wich returns the maximal item in the source when completes.
- Parameters
observable (observable) – the observable source
- Returns
observable instance
- Return type
-
async_rx.
rx_min
(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Create an observable wich returns minimal item in the source when completes.
- Parameters
observable (observable) – the observable source
- Returns
observable instance
- Return type
-
async_rx.
rx_sum
(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Create an observable wich return the sum items in the source when completes.
- Parameters
observable (observable) – the observable source
- Returns
observable instance
- Return type
-
async_rx.
rx_avg
(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Create an observable wich return the average items in the source when completes.
- Parameters
observable (observable) – the observable source
- Returns
observable instance
- Return type
-
async_rx.
rx_buffer
(observable: async_rx.protocol.definition.Observable, buffer_size: int) → async_rx.protocol.definition.Observable[source]¶ Buffer operator.
Buffer and Window collect elements from the source sequence and emit them in groups. Buffer projects these elements onto list and emits them, start to process source on first subscription.
- Parameters
observable (Observable) – the source
buffer_size (int) – buffer size
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if buffer_size <= 0
-
async_rx.
rx_window
(observable: async_rx.protocol.definition.Observable, buffer_size: int) → async_rx.protocol.definition.Observable[source]¶ Window operator.
Window collect elements from the source sequence and emit them in groups. Window emits these elements in nested observables. It will emit a new inner observable when a window opens and will complete the inner observable when the window closes. Notice that there can be overlap between multiple windows if the next one opens before the last one closes.
For example Window with a count of 2 and a skip of 1 will emit the last 2 elements (count 2) for every element (skip 1), so the sequence -1-2-3-4-| becomes –[12][23][34][4]|.
- Parameters
observable (Observable) – the source
buffer_size (int) – buffer size
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if buffer_size <= 0
-
async_rx.
rx_merge
(*observables: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Flattens multiple Observables together by blending their values into one Observable.
Creates an output Observable which concurrently emits all values from every given input Observable. ‘merge’ subscribes to each given input Observable (either the source or an Observable given as argument), and simply forwards (without doing any transformation) all the values from all the input Observables to the output Observable. The output Observable only completes once all input Observables have completed. Any error delivered by an input Observable will be immediately emitted on the output Observable.
- Parameters
observables (Observable) – a list of observable instance
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if #observables < 1
-
async_rx.
rx_concat
(*observables: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Concat operator.
Merge and Concat combine multiple sequences into one. Merge might interweave elements from different sequence whereas Concat emits all elements from the first sequence before turning to the next one.
- Parameters
observables (Observable) – a list of observable instance
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if len(observables) <= 0
-
async_rx.
rx_zip
(*observables: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Combine multiple Observables to create an Observable.
The Obsevable values are calculated from the values, in order, of each of its input Observables.
- Parameters
(Observable) – a list of observable instance
- Returns
observable instance
- Return type
-
async_rx.
rx_amb
(*observables: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]¶ Amb operator.
The Amb operator (stands for ambiguous), alias race, subscribes to a number of observables and retrieves the first observable that yields a value, closing off all others. For example, Amb can automatically select the best server to download from: Amb listens to both servers and the first server that replies is used.
- Parameters
observables (Observable) – a list of observable instance
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if #observables < 1
-
async_rx.
rx_map
(observable: async_rx.protocol.definition.Observable, transform: Callable, expand_arg_parameters: Optional[bool] = False, expand_kwarg_parameters: Optional[bool] = False) → async_rx.protocol.definition.Observable[source]¶ Map operator.
Map operator modifies an Observable<A> into Observable<B> given a function with the type A->B.
For example, if we take the function x => 10 ∗ x and a list of 1,2,3. The result is 10,20,30, see figure 4. Note that this function did not change the type of the Observable but did change the values.
- Parameters
observable (Observable) – an observable instance
transform (Callable) – transform function (sync or async)
expand_arg_parameters (Optional[bool]) – if true each item will be expanded as args before call transform (implique expand_kwarg_parameters = False).
expand_kwarg_parameters (Optional[bool]) – if true each item will be expanded as kwargs before call transform.
- Returns
observable instance
- Return type
-
async_rx.
rx_merge_map
(*observables: async_rx.protocol.definition.Observable, transform: Callable) → async_rx.protocol.definition.Observable[source]¶ Merge map operator.
rx_merge_map allows asynchronous queries, resulting in an observable of observables and it flattens the results. There may be multiple inner observables that run simultaneously, so the results from these inner observables may be intertwined.
- Parameters
observables (Observable) – a list of observable instance
transform (Callable) – transform function (sync or async)
- Returns
observable instance
- Return type
-
async_rx.
rx_group_by
(observable: async_rx.protocol.definition.Observable, key_selector: Callable) → async_rx.protocol.definition.Observable[source]¶ Group by operator.
Similar to Window, GroupBy projects the sequence onto a number of inner observables but as opposite to Window where all windows receive the same sequence, GroupBy will emit elements only to one inner observable that is associated with the current element based on a key selector function. The observer receive tuple with (key, subject).
- Parameters
observable (Observable) – observable instance
key_selector (Callable) – key selector function (sync/async) [Any]->[Any]
- Returns
observable instance
- Return type
-
async_rx.
rx_sample
(observable: async_rx.protocol.definition.Observable, duration: datetime.timedelta) → async_rx.protocol.definition.Observable[source]¶ Sample operator used to rate-limit the sequence.
Sample filter out elements based on the timing. Sample will emit the LATEST value on a set interval or emit nothing if no new value arrived during the last interval.
- Parameters
observable (Observable) – an observable instance
duration (timedelta) – timedelta of interval (the duration)
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if no observable or duration are provided
-
async_rx.
rx_throttle
(observable: async_rx.protocol.definition.Observable, duration: datetime.timedelta) → async_rx.protocol.definition.Observable[source]¶ Throttle operator.
Throttle are used to rate-limit the sequence. They will filter out elements based on the timing.
Throttle will emit the first event from a burst and will ignore all subsequent values that arrive during the set timeout
- Parameters
observable (Observable) – an observable instance
duration (timedelta) – timedelta of interval (the duration)
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if no observable or duration are provided
-
async_rx.
rx_delay
(observable: async_rx.protocol.definition.Observable, duration: datetime.timedelta, buffer_size: Optional[int] = None, ignore_events_if_full: Optional[bool] = True) → async_rx.protocol.definition.Observable[source]¶ Delay operator.
Delay will project the sequence unmodified, but shifted into the future with a specified delay.
Underlaying implementation use a queue and a dedicated consumer.
- Parameters
observable (Observable) – an observable instance
duration (timedelta) – timedelta of delay (the duration).
buffer_size (Optional[int]) – optional buffer size, if not specified size is unlimited (ignore_events_if_full has no meaning, but not your memory…)
ignore_events_if_full (Optional[bool]) – When true, if internal buffer (here a queue) is full, events will be ignored until older will be consumed. Otherwise, producer will be locked until older will be consumed.
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if no observable or duration are provided or buffer_size <= 0
-
async_rx.
rx_debounce
(an_observable: async_rx.protocol.definition.Observable, duration: datetime.timedelta) → async_rx.protocol.definition.Observable[source]¶ Debounce operator.
Debounce are used to rate-limit the sequence. Debounce will delay a value when it arrives and only emits the last value in a burst of events after the set delay is over and no new event arrives during this delay.
- Parameters
an_observable (Observable) – an observable instance
duration (timedelta) – timedelta of interval (the duration)
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if no observable or duration are provided
-
async_rx.
rx_dict
(initial_value: Optional[Dict] = None) → async_rx.protocol.definition.Observable[source]¶ Create an observable on dictionnary.
The observer receive the current value of dictionnary on subscribe and when a key change (added, updated or deleted). This observable implements a UserDict, so you can use it as a classic dictionnary.
- Parameters
initial_value (Optional[Dict]) – intial value (default: {})
- Returns
observable instance
- Return type
-
async_rx.
rx_list
(initial_value: Optional[List] = None) → async_rx.protocol.definition.Observable[source]¶ Create an observable on list.
The observer receive the current value of list on subscribe and when an item change (added, updated or deleted, …). This observable implements a UserList, so you can use it as a classic list.
- Parameters
initial_value (Optional[List]) – intial value (default: [])
- Returns
observable instance
- Return type
-
async_rx.
rx_repeat
(duration: datetime.timedelta, producer: Callable, initial_delay: Optional[datetime.timedelta] = None) → async_rx.protocol.definition.Observable[source]¶ Repeat data.
rx_repeat send data generated by producer function at duration rate until observer dispose his subscription.
- Parameters
duration (timedelta) – duration between each sended item
producer (Callable) – producer (asyn/sync) function
initial_delay (Optional[timedelta]) – initial delay before produce value (default: None)
- Returns
observable instance
- Return type
- Raise:
(RuntimeError): if no producer or duration are provided
-
async_rx.
rx_repeat_series
(source: Any, ratio: Optional[float] = 1.0) → async_rx.protocol.definition.Observable[source]¶ Repeat a series (delay, value) as an observable for each subscription.
- Parameters
source (Any) – iterable or async iterable source of tuple (duration, value)
ratio (Optional[float]) – ratio apply on duration (1.0 per default)
- Returns
an observable
- Return type
- Raise:
(RuntimeError): if source is not iterable (sync or async)