API Reference

async-rx

async-rx definition.

Classes:

Subscription(*args, **kwargs)

Subscription Protocol.

NextHandler(*args, **kwargs)

NextHandler Protocol.

CompleteHandler(*args, **kwargs)

CompleteHandler Protocol.

ErrorHandler(*args, **kwargs)

ErrorHandler Protocol.

Observable(*args, **kwargs)

Observable Protocol.

Observer(*args, **kwargs)

Observer Protocol.

Collector(*args, **kwargs)

Collector Observer Protocol.

Subscribe(*args, **kwargs)

Subscribe Protocol.

Subject(*args, **kwargs)

A Subject is like an Observable, but can multicast to many Observers.

ConnectHandler(*args, **kwargs)

Connect Handler Protocol.

RefCountHandler(*args, **kwargs)

RefCount Handler Protocol.

ConnectableObservable(*args, **kwargs)

Define a connectable observable protocol.

ObservableFactory(*args, **kwargs)

Async ObservableFactory Protocol.

SubjectEventHandler(*args, **kwargs)

Subject Event Handler Procotol.

SubjectHandler(*args, **kwargs)

Subscribe Handler Protocol.

ConnectableObservableEventHandler(*args, …)

Connectable Observable Event Handler Protocol.

ConnectableObservableHandler(*args, **kwargs)

Connectable Observable Handler Protocol.

SubjectFactory(*args, **kwargs)

Functions:

rx_observer(on_next[, on_error, on_completed])

Return an observer.

rx_observer_from(observer[, on_next, …])

Build an observer from another one.

rx_collector(initial_value)

Create an observer collector.

rx_create(subscribe[, ensure_contract, …])

Create an observable with specific delayed execution ‘subscribe’.

rx_defer(observable_factory)

Create an observable when a subscription occurs.

rx_distinct(observable, frame_size)

Create an observable which send distinct event inside a windows of size #frame_size.

rx_empty()

Create an empty Observable.

rx_filter(observable, predicate)

Create an observable which event are filtered by a predicate function.

rx_first(observable)

Create an observale which only take the first event and complete.

rx_forward(observable[, except_complet, …])

Create an observable wich forward event.

rx_from(observable_input)

Convert almost anything to an Observable.

rx_last(observable[, count])

Create an observale which only take #count (or less) last events and complete.

rx_of(*args)

Convert arguments into an observable sequence.

rx_range(start, stop[, step])

Create an observable sequence of range.

rx_skip(observable, count)

Create an obervable wich skip #count event on source.

rx_take(observable, count)

Create an observable which take only first #count event maximum (could be less).

rx_throw(error)

Create an observable wich always call error.

rx_reduce(observable, accumulator[, seed])

Create an observable which reduce source with accumulator and seed value.

rx_count(observable)

Create an observable wich counts the emissions on the source and emits result.

rx_max(observable)

Create an observable wich returns the maximal item in the source when completes.

rx_min(observable)

Create an observable wich returns minimal item in the source when completes.

rx_sum(observable)

Create an observable wich return the sum items in the source when completes.

rx_avg(observable)

Create an observable wich return the average items in the source when completes.

rx_buffer(observable, buffer_size)

Buffer operator.

rx_window(observable, buffer_size)

Window operator.

rx_merge(*observables)

Flattens multiple Observables together by blending their values into one Observable.

rx_concat(*observables)

Concat operator.

rx_zip(*observables)

Combine multiple Observables to create an Observable.

rx_amb(*observables)

Amb operator.

rx_map(observable, transform[, …])

Map operator.

rx_merge_map(*observables, transform)

Merge map operator.

rx_group_by(observable, key_selector)

Group by operator.

rx_sample(observable, duration)

Sample operator used to rate-limit the sequence.

rx_throttle(observable, duration)

Throttle operator.

rx_delay(observable, duration[, …])

Delay operator.

rx_debounce(an_observable, duration)

Debounce operator.

rx_dict([initial_value])

Create an observable on dictionnary.

rx_list([initial_value])

Create an observable on list.

rx_repeat(duration, producer[, initial_delay])

Repeat data.

rx_repeat_series(source[, ratio])

Repeat a series (delay, value) as an observable for each subscription.

rx_subject([subject_handler])

Create a subject.

rx_subject_from(a_subject[, subscribe, …])

Build a subject from another one by override some function.

rx_subject_replay(buffer_size[, subject_handler])

Create a replay subject.

rx_subject_behavior([subject_handler])

Create a behavior subject.

rx_publish(an_observable[, subject_handler, …])

Create a Connectable Observable.

rx_publish_replay(an_observable, buffer_size)

Create a publish_replay.

rx_publish_behavior(an_observable[, …])

Create a publish_behavior.

class async_rx.Subscription(*args, **kwargs)[source]

Bases: typing.Protocol

Subscription Protocol.

Subscription is a function to release resources or cancel Observable executions (act as a Disposable). It define something to be used and thrown away after you call it.

Methods:

__call__()

Release subscription.

async __call__() → None[source]

Release subscription.

class async_rx.NextHandler(*args, **kwargs)[source]

Bases: typing.Protocol

NextHandler Protocol.

A next handler process an item from associated observable.

Methods:

__call__(item)

Process item.

async __call__(item: Any) → None[source]

Process item.

class async_rx.CompleteHandler(*args, **kwargs)[source]

Bases: typing.Protocol

CompleteHandler Protocol.

A complete handler is call when no more item will came from the associated observable.

Methods:

__call__()

Signal completion of this observable.

async __call__() → None[source]

Signal completion of this observable.

class async_rx.ErrorHandler(*args, **kwargs)[source]

Bases: typing.Protocol

ErrorHandler Protocol.

An error handler receive a message or an exception and raise it.

Methods:

__call__(err)

Raise error.

async __call__(err: Any) → Optional[NoReturn][source]

Raise error.

Parameters

err (Union[Any, Exception]) – the error to raise

Raises

(Exception) – the exception

class async_rx.Observable(*args, **kwargs)[source]

Bases: typing.Protocol

Observable Protocol.

An observable is something on which we can subscribe to listen event.

Methods:

subscribe(an_observer)

async subscribe(an_observer: async_rx.protocol.definition.Observer) → async_rx.protocol.definition.Subscription[source]
class async_rx.Observer(*args, **kwargs)[source]

Bases: typing.Protocol

Observer Protocol.

What is an Observer?

An Observer is a consumer of values delivered by an Observable.

Observers are simply a set of callbacks, one for each type of notification delivered by the Observable:

  • next,

  • error,

  • and complete.

Observers are just “objects” with three callbacks, one for each type of notification that an Observable may deliver.

Methods:

on_completed()

Signal completion of this observable.

on_error(err)

on_next(item)

Process item.

async on_next(item: Any) → None[source]

Process item.

async on_completed() → None[source]

Signal completion of this observable.

async on_error(err: Any) → Optional[NoReturn][source]
class async_rx.Collector(*args, **kwargs)[source]

Bases: async_rx.protocol.definition.Observer, typing.Protocol

Collector Observer Protocol.

Methods:

error()

Return error if observable has meet error.

has_error()

Return true if observable has meet error.

is_finish()

Return true if observable has completed.

result()

Returns result.

result() → Any[source]

Returns result.

is_finish() → bool[source]

Return true if observable has completed.

has_error() → bool[source]

Return true if observable has meet error.

error() → Any[source]

Return error if observable has meet error.

class async_rx.Subscribe(*args, **kwargs)[source]

Bases: typing.Protocol

Subscribe Protocol.

It’s a (sync/async) function wich take an observer and return a subscription.

Methods:

__call__(an_observer)

Implement observer subscription.

async __call__(an_observer: async_rx.protocol.definition.Observer) → async_rx.protocol.definition.Subscription[source]

Implement observer subscription.

Parameters

observer (Observer) – the observer instance

Returns

subscription

Return type

(Subscription)

class async_rx.Subject(*args, **kwargs)[source]

Bases: async_rx.protocol.definition.Observable, async_rx.protocol.definition.Observer, typing.Protocol

A Subject is like an Observable, but can multicast to many Observers.

Subjects are like EventEmitters: they maintain a registry of many listeners.

class async_rx.ConnectHandler(*args, **kwargs)[source]

Bases: typing.Protocol

Connect Handler Protocol.

class async_rx.RefCountHandler(*args, **kwargs)[source]

Bases: typing.Protocol

RefCount Handler Protocol.

class async_rx.ConnectableObservable(*args, **kwargs)[source]

Bases: async_rx.protocol.definition.Observable, typing.Protocol

Define a connectable observable protocol.

We have :
  • subscribe function (it’s an observable)

  • connect function: start executing

  • ref_count function: makes the Observable automatically start executing

    when the first subscriber arrives, and stop executing when the last subscriber leaves.

Methods:

connect()

Connect.

ref_count()

Reference counter.

async connect() → async_rx.protocol.definition.Subscription[source]

Connect.

async ref_count() → async_rx.protocol.definition.Observable[source]

Reference counter.

Make the multicasted Observable automatically start executing when the first subscriber arrives, and stop executing when the last subscriber leaves.

class async_rx.ObservableFactory(*args, **kwargs)[source]

Bases: typing.Protocol

Async ObservableFactory Protocol.

Define function which create Observable.

Methods:

__call__()

Create an Observable.

async __call__() → async_rx.protocol.definition.Observable[source]

Create an Observable.

Returns

the new observable instance.

Return type

(Observable)

class async_rx.SubjectEventHandler(*args, **kwargs)[source]

Bases: typing.Protocol

Subject Event Handler Procotol.

class async_rx.SubjectHandler(*args, **kwargs)[source]

Bases: typing.Protocol

Subscribe Handler Protocol.

This handler could be called on subscription/unsubscribe event.

Methods:

on_subscribe(count, source)

Notify on subscribe event.

on_unsubscribe(count, source)

Notify on unsubscribe event.

async on_subscribe(count: int, source: async_rx.protocol.definition.Observer) → None[source]

Notify on subscribe event.

Parameters
  • count (int) – current #subscribers after subscription

  • source (Observer) – observer source

async on_unsubscribe(count: int, source: async_rx.protocol.definition.Observer) → None[source]

Notify on unsubscribe event.

Parameters
  • count (int) – current #subscribers after unsubscribe

  • source (Observer) – observer source

class async_rx.ConnectableObservableEventHandler(*args, **kwargs)[source]

Bases: typing.Protocol

Connectable Observable Event Handler Protocol.

class async_rx.ConnectableObservableHandler(*args, **kwargs)[source]

Bases: typing.Protocol

Connectable Observable Handler Protocol.

This handler could be called on conect/disconnect event.

Methods:

on_connect()

Called on connect event.

on_disconnect()

Called on disconnect event.

async on_connect() → None[source]

Called on connect event.

async on_disconnect() → None[source]

Called on disconnect event.

class async_rx.SubjectFactory(*args, **kwargs)[source]

Bases: typing.Protocol

async_rx.rx_observer(on_next: async_rx.protocol.definition.NextHandler, on_error: async_rx.protocol.definition.ErrorHandler = <function default_error>, on_completed: async_rx.protocol.definition.CompleteHandler = <function default_on_completed>) → async_rx.protocol.definition.Observer[source]

Return an observer.

The underlying implementation use an named tuple.

Parameters
  • on_next (NextHandler) – on_next handler which process items

  • on_error (ErrorHandler) – on_error handler (default with default_error which raise Exception)

  • on_completed (CompleteHandler) – on_completed handler (default with noop)

Returns

an Observer

Return type

(Observer)

async_rx.rx_observer_from(observer: async_rx.protocol.definition.Observer, on_next: Optional[async_rx.protocol.definition.NextHandler] = None, on_error: Optional[async_rx.protocol.definition.ErrorHandler] = None, on_completed: Optional[async_rx.protocol.definition.CompleteHandler] = None) → async_rx.protocol.definition.Observer[source]

Build an observer from another one.

Parameters
  • observer (Observer) – the observer to override

  • on_next (Optional[NextHandler]) – override on_next handler if set

  • on_error (Optional[ErrorHandler]) – override on_error handler if set

  • on_completed (Optional[CompleteHandler]) – override on_completed handler if set

Returns

an Observer

Return type

(Observer)

async_rx.rx_collector(initial_value: T) → async_rx.protocol.definition.Collector[source]

Create an observer collector.

Parameters

initial_value (T) – initial value which determin result type (list, dict, base type)

Returns

a collector instance

Return type

(Collector[T])

async_rx.rx_subject(subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None) → async_rx.protocol.definition.Subject[source]

Create a subject.

A Subject is like an Observable, but can multicast to many Observers. Subjects are like EventEmitters: they maintain a registry of many listeners, and then dispatch events/items to them.

As subject is also an Observer, it can subscribe to an observable which act at his stream data source.

Parameters

subject_handler (Optional[SubjectHandler]) – optional suject handler callback

Returns

the subject

Return type

(Subject)

Example 1:

# create a subject
a_subject = subject(subject_handler=my_handler)

# few observer subscribe on this subject
sub_1 = await a_subject.subscribe(obs_1)
sub_2 = await a_subject.subscribe(obs_2)

# the subject subscribe himself on an observable
await rx_range(start=0, stop=10).subscribe(a_subject)

# obs_1 and obs_2 receive 10 #items

Example 2: A subject as event emitter

# create a subject
a_subject = subject()

# few observer subscribe on this subject
sub_1 = await a_subject.subscribe(obs_1)
sub_2 = await a_subject.subscribe(obs_2)

# send your data by your self
await a_subject.on_next("my value") # obs_1 and obs_2 receive "my value"
await a_subject.on_completed() # obs_1 and obs_2 receive on_completed
async_rx.rx_subject_replay(buffer_size: int, subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None) → async_rx.protocol.definition.Subject[source]

Create a replay subject.

A ReplaySubject is similar to a BehaviorSubject in that it can send old values to new subscribers, but it can also record a part of the Observable execution.

A ReplaySubject records multiple values from the Observable execution and replays them to new subscribers. When a replay occurs, completed and error events are also replayed.

Parameters
  • buffer_size (int) – buffer size, or #items which be replayed on subscription

  • subject_handler (Optional[SubjectHandler]) – optional suject handler callback

Returns

the subject

Return type

(Subject)

Raise:

(RuntimeError): if buffer_size <= 0

async_rx.rx_subject_behavior(subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None) → async_rx.protocol.definition.Subject[source]

Create a behavior subject.

One of the variants of Subjects is the BehaviorSubject, which has a notion of “the current value”. It stores the latest value emitted to its consumers, and whenever a new Observer subscribes, it will immediately receive the “current value” from the BehaviorSubject.

BehaviorSubjects are useful for representing “values over time”. For instance, an event stream of birthdays is a Subject, but the stream of a person’s age would be a BehaviorSubject.

Parameters

subject_handler (Optional[SubjectHandler]) – optional suject handler callback

Returns

the subject

Return type

(Subject)

async_rx.rx_subject_from(a_subject: async_rx.protocol.definition.Subject, subscribe: Optional[async_rx.protocol.definition.Subscribe] = None, on_next: Optional[async_rx.protocol.definition.NextHandler] = None, on_error: Optional[async_rx.protocol.definition.ErrorHandler] = None, on_completed: Optional[async_rx.protocol.definition.CompleteHandler] = None) → async_rx.protocol.definition.Subject[source]

Build a subject from another one by override some function.

Parameters
  • a_subject (Subject) – the source subject

  • subscribe (Optional[Subscribe]) – override subscribe if set

  • on_next (Optional[NextHandler]) – override on_next if set

  • on_error (Optional[ErrorHandler]) – override on_error if set

  • on_completed (Optional[CompleteHandler]) – override on_completed if set

Returns;

(Subject): a new subject

async_rx.rx_publish(an_observable: async_rx.protocol.definition.Observable, subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None, connection_handler: Optional[async_rx.protocol.definition.ConnectableObservableHandler] = None, subject_factory: async_rx.protocol.definition.SubjectFactory = <function rx_subject>) → async_rx.protocol.definition.ConnectableObservable[source]

Create a Connectable Observable.

A multicasted Observable (rx_publish) uses a Subject under the hood to make multiple Observers see the same Observable execution.

Parameters
Returns

the multicasted Observable instance

Return type

(ConnectableObservable)

async_rx.rx_publish_replay(an_observable: async_rx.protocol.definition.Observable, buffer_size: int, subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None, connection_handler: Optional[async_rx.protocol.definition.ConnectableObservableHandler] = None) → async_rx.protocol.definition.ConnectableObservable[source]

Create a publish_replay.

A publish_replay uses a replay_subject under the hood to make multiple Observers see the same Observable execution.

Parameters
  • buffer_size (int) – max #items to replay

  • an_observable (Observable) – observable to connect

  • subject_handler (Optional[SubjectHandler]) – optional subject handler

  • connection_handler (Optional[ConnectableObservableHandler]) – optional connection handler

Returns

the publish_replay instance

Return type

(ConnectableObservable)

async_rx.rx_publish_behavior(an_observable: async_rx.protocol.definition.Observable, subject_handler: Optional[async_rx.protocol.definition.SubjectHandler] = None, connection_handler: Optional[async_rx.protocol.definition.ConnectableObservableHandler] = None) → async_rx.protocol.definition.ConnectableObservable[source]

Create a publish_behavior.

A publish_behavior uses a behavior_subject under the hood to make multiple Observers see the same Observable execution.

Parameters
Returns

the publish_behavior instance

Return type

(ConnectableObservable)

async_rx.rx_create(subscribe: async_rx.protocol.definition.Subscribe, ensure_contract: Optional[bool] = True, max_observer: Optional[int] = None) → Union[async_rx.protocol.definition.Observable, NoReturn][source]

Create an observable with specific delayed execution ‘subscribe’.

Observables can be created with create, but usually we use the so-called creation operators, like of, from, interval, etc. Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.

Parameters
  • subscribe (Subscribe) – subcribe function to use on observable

  • ensure_contract (bool) – boolean flag (default True) to ensure that this observable will follow Observable contract.

  • max_observer (int) – maximum observer on this Observable (default None <=> unlimited)

Returns

an observable instance.

Return type

(Observable)

Raise:

(RuntimeError): if subscribe parameter is undefined

async_rx.rx_defer(observable_factory: async_rx.protocol.definition.ObservableFactory) → async_rx.protocol.definition.Observable[source]

Create an observable when a subscription occurs.

Defer allows you to create the Observable only when the Observer subscribes, and create a fresh Observable for each Observer.

It waits until an Observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual Observable.

Parameters

observable_factory (ObservableFactory) – observable factory

Returns

an observable instance wich create observable

when subscription occurs.

Return type

(Observable)

async_rx.rx_distinct(observable: async_rx.protocol.definition.Observable, frame_size: int) → async_rx.protocol.definition.Observable[source]

Create an observable which send distinct event inside a windows of size #frame_size.

Parameters
  • observable (Observable) – observable source

  • frame_size (int) – windows size

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if frame_size <= 0

async_rx.rx_empty() → async_rx.protocol.definition.Observable[source]

Create an empty Observable.

An “empty” Observable emits only the complete notification.

Returns

(Observable) observable instance

async_rx.rx_filter(observable: async_rx.protocol.definition.Observable, predicate: Union[async_rx.protocol.definition._AsyncPredicateOperator, async_rx.protocol.definition._SyncPredicateOperator]) → async_rx.protocol.definition.Observable[source]

Create an observable which event are filtered by a predicate function.

Parameters
  • observable (Observable) – observable source

  • predicate (Operator) – predicate function which take on argument and return a truthy value

Returns

observable instance

Return type

(Observable)

async_rx.rx_first(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Create an observale which only take the first event and complete.

Parameters

observable (Observable) – observable source

Returns

observable instance

Return type

(Observable)

async_rx.rx_forward(observable: async_rx.protocol.definition.Observable, except_complet: bool = False, except_error: bool = False) → async_rx.protocol.definition.Observable[source]

Create an observable wich forward event.

Parameters
  • observable (Observable) – observable source

  • except_complet (bool) – if true then did not forward ‘on_complet’ (default: {False})

  • except_error (bool) – if true then did not forward ‘on_error’ (default: {False})

Returns

observable instance.

Return type

(Observable)

async_rx.rx_from(observable_input: Any) → async_rx.protocol.definition.Observable[source]

Convert almost anything to an Observable.

Anything means:
  • a dictionnary in an rx_dict

  • an async iterable

  • an iterable

  • something which can be cast to an Observable (have a subscribe function)

  • an object

Parameters

observable_input (Any) – A subscribable object

Returns

The Observable whose values are originally from the input object

that was converted.

Return type

(Observable)

async_rx.rx_last(observable: async_rx.protocol.definition.Observable, count: int = 1) → async_rx.protocol.definition.Observable[source]

Create an observale which only take #count (or less) last events and complete.

Parameters
  • observable (Observable) – observable source

  • count (int) – number of event to get (default 1)

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if count <= 0

async_rx.rx_of(*args) → async_rx.protocol.definition.Observable[source]

Convert arguments into an observable sequence.

Parameters

args – any list of argument to send to observer.

Returns

observable instance.

Return type

(Observable)

async_rx.rx_range(start: int, stop: int, step: int = 1) → async_rx.protocol.definition.Observable[source]

Create an observable sequence of range.

Parameters
  • start (int) – initiale value

  • stop (int) – last value

  • step (int) – default increment (default: {1})

Returns

observable instance.

Return type

(Observable)

async_rx.rx_skip(observable: async_rx.protocol.definition.Observable, count: int) → async_rx.protocol.definition.Observable[source]

Create an obervable wich skip #count event on source.

Parameters
  • observable (Observable) – observable source

  • count (int) – number of event to skip

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if count <= 0

async_rx.rx_take(observable: async_rx.protocol.definition.Observable, count: int) → async_rx.protocol.definition.Observable[source]

Create an observable which take only first #count event maximum (could be less).

Parameters
  • observable (Observable) – observable source

  • count (int) – #items to take

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if count <= 0

async_rx.rx_throw(error: Any) → async_rx.protocol.definition.Observable[source]

Create an observable wich always call error.

Parameters

error (Union[Any, Exception]) – the error to observe

Returns

observable instance.

Return type

(Observable)

async_rx.rx_reduce(observable: async_rx.protocol.definition.Observable, accumulator: Union[async_rx.protocol.definition._AsyncAccumulatorOperator, async_rx.protocol.definition._SyncAccumulatorOperator], seed: Optional[Any] = None) → async_rx.protocol.definition.Observable[source]

Create an observable which reduce source with accumulator and seed value.

Parameters
  • observable (Observable) – source

  • accumulator (AccumulatorOperator) – accumulator function (two argument, one result) async or sync.

  • seed (Optional[Any]) – optional seed value (default none)

Returns

a new observable

Return type

(Observable)

async_rx.rx_count(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Create an observable wich counts the emissions on the source and emits result.

Parameters

observable (observable) – the observable source

Returns

observable instance

Return type

(Observable)

async_rx.rx_max(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Create an observable wich returns the maximal item in the source when completes.

Parameters

observable (observable) – the observable source

Returns

observable instance

Return type

(Observable)

async_rx.rx_min(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Create an observable wich returns minimal item in the source when completes.

Parameters

observable (observable) – the observable source

Returns

observable instance

Return type

(Observable)

async_rx.rx_sum(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Create an observable wich return the sum items in the source when completes.

Parameters

observable (observable) – the observable source

Returns

observable instance

Return type

(Observable)

async_rx.rx_avg(observable: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Create an observable wich return the average items in the source when completes.

Parameters

observable (observable) – the observable source

Returns

observable instance

Return type

(Observable)

async_rx.rx_buffer(observable: async_rx.protocol.definition.Observable, buffer_size: int) → async_rx.protocol.definition.Observable[source]

Buffer operator.

Buffer and Window collect elements from the source sequence and emit them in groups. Buffer projects these elements onto list and emits them, start to process source on first subscription.

Parameters
  • observable (Observable) – the source

  • buffer_size (int) – buffer size

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if buffer_size <= 0

async_rx.rx_window(observable: async_rx.protocol.definition.Observable, buffer_size: int) → async_rx.protocol.definition.Observable[source]

Window operator.

Window collect elements from the source sequence and emit them in groups. Window emits these elements in nested observables. It will emit a new inner observable when a window opens and will complete the inner observable when the window closes. Notice that there can be overlap between multiple windows if the next one opens before the last one closes.

For example Window with a count of 2 and a skip of 1 will emit the last 2 elements (count 2) for every element (skip 1), so the sequence -1-2-3-4-| becomes –[12][23][34][4]|.

Parameters
  • observable (Observable) – the source

  • buffer_size (int) – buffer size

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if buffer_size <= 0

async_rx.rx_merge(*observables: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Flattens multiple Observables together by blending their values into one Observable.

Creates an output Observable which concurrently emits all values from every given input Observable. ‘merge’ subscribes to each given input Observable (either the source or an Observable given as argument), and simply forwards (without doing any transformation) all the values from all the input Observables to the output Observable. The output Observable only completes once all input Observables have completed. Any error delivered by an input Observable will be immediately emitted on the output Observable.

Parameters

observables (Observable) – a list of observable instance

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if #observables < 1

async_rx.rx_concat(*observables: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Concat operator.

Merge and Concat combine multiple sequences into one. Merge might interweave elements from different sequence whereas Concat emits all elements from the first sequence before turning to the next one.

Parameters

observables (Observable) – a list of observable instance

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if len(observables) <= 0

async_rx.rx_zip(*observables: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Combine multiple Observables to create an Observable.

The Obsevable values are calculated from the values, in order, of each of its input Observables.

Parameters

(Observable) – a list of observable instance

Returns

observable instance

Return type

(Observable)

async_rx.rx_amb(*observables: async_rx.protocol.definition.Observable) → async_rx.protocol.definition.Observable[source]

Amb operator.

The Amb operator (stands for ambiguous), alias race, subscribes to a number of observables and retrieves the first observable that yields a value, closing off all others. For example, Amb can automatically select the best server to download from: Amb listens to both servers and the first server that replies is used.

Parameters

observables (Observable) – a list of observable instance

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if #observables < 1

async_rx.rx_map(observable: async_rx.protocol.definition.Observable, transform: Callable, expand_arg_parameters: Optional[bool] = False, expand_kwarg_parameters: Optional[bool] = False) → async_rx.protocol.definition.Observable[source]

Map operator.

Map operator modifies an Observable<A> into Observable<B> given a function with the type A->B.

For example, if we take the function x => 10 ∗ x and a list of 1,2,3. The result is 10,20,30, see figure 4. Note that this function did not change the type of the Observable but did change the values.

Parameters
  • observable (Observable) – an observable instance

  • transform (Callable) – transform function (sync or async)

  • expand_arg_parameters (Optional[bool]) – if true each item will be expanded as args before call transform (implique expand_kwarg_parameters = False).

  • expand_kwarg_parameters (Optional[bool]) – if true each item will be expanded as kwargs before call transform.

Returns

observable instance

Return type

(Observable)

async_rx.rx_merge_map(*observables: async_rx.protocol.definition.Observable, transform: Callable) → async_rx.protocol.definition.Observable[source]

Merge map operator.

rx_merge_map allows asynchronous queries, resulting in an observable of observables and it flattens the results. There may be multiple inner observables that run simultaneously, so the results from these inner observables may be intertwined.

Parameters
  • observables (Observable) – a list of observable instance

  • transform (Callable) – transform function (sync or async)

Returns

observable instance

Return type

(Observable)

async_rx.rx_group_by(observable: async_rx.protocol.definition.Observable, key_selector: Callable) → async_rx.protocol.definition.Observable[source]

Group by operator.

Similar to Window, GroupBy projects the sequence onto a number of inner observables but as opposite to Window where all windows receive the same sequence, GroupBy will emit elements only to one inner observable that is associated with the current element based on a key selector function. The observer receive tuple with (key, subject).

Parameters
  • observable (Observable) – observable instance

  • key_selector (Callable) – key selector function (sync/async) [Any]->[Any]

Returns

observable instance

Return type

(Observable)

async_rx.rx_sample(observable: async_rx.protocol.definition.Observable, duration: datetime.timedelta) → async_rx.protocol.definition.Observable[source]

Sample operator used to rate-limit the sequence.

Sample filter out elements based on the timing. Sample will emit the LATEST value on a set interval or emit nothing if no new value arrived during the last interval.

Parameters
  • observable (Observable) – an observable instance

  • duration (timedelta) – timedelta of interval (the duration)

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if no observable or duration are provided

async_rx.rx_throttle(observable: async_rx.protocol.definition.Observable, duration: datetime.timedelta) → async_rx.protocol.definition.Observable[source]

Throttle operator.

Throttle are used to rate-limit the sequence. They will filter out elements based on the timing.

Throttle will emit the first event from a burst and will ignore all subsequent values that arrive during the set timeout

Parameters
  • observable (Observable) – an observable instance

  • duration (timedelta) – timedelta of interval (the duration)

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if no observable or duration are provided

async_rx.rx_delay(observable: async_rx.protocol.definition.Observable, duration: datetime.timedelta, buffer_size: Optional[int] = None, ignore_events_if_full: Optional[bool] = True) → async_rx.protocol.definition.Observable[source]

Delay operator.

Delay will project the sequence unmodified, but shifted into the future with a specified delay.

Underlaying implementation use a queue and a dedicated consumer.

Parameters
  • observable (Observable) – an observable instance

  • duration (timedelta) – timedelta of delay (the duration).

  • buffer_size (Optional[int]) – optional buffer size, if not specified size is unlimited (ignore_events_if_full has no meaning, but not your memory…)

  • ignore_events_if_full (Optional[bool]) – When true, if internal buffer (here a queue) is full, events will be ignored until older will be consumed. Otherwise, producer will be locked until older will be consumed.

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if no observable or duration are provided or buffer_size <= 0

async_rx.rx_debounce(an_observable: async_rx.protocol.definition.Observable, duration: datetime.timedelta) → async_rx.protocol.definition.Observable[source]

Debounce operator.

Debounce are used to rate-limit the sequence. Debounce will delay a value when it arrives and only emits the last value in a burst of events after the set delay is over and no new event arrives during this delay.

Parameters
  • an_observable (Observable) – an observable instance

  • duration (timedelta) – timedelta of interval (the duration)

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if no observable or duration are provided

async_rx.rx_dict(initial_value: Optional[Dict] = None) → async_rx.protocol.definition.Observable[source]

Create an observable on dictionnary.

The observer receive the current value of dictionnary on subscribe and when a key change (added, updated or deleted). This observable implements a UserDict, so you can use it as a classic dictionnary.

Parameters

initial_value (Optional[Dict]) – intial value (default: {})

Returns

observable instance

Return type

(Observable)

async_rx.rx_list(initial_value: Optional[List] = None) → async_rx.protocol.definition.Observable[source]

Create an observable on list.

The observer receive the current value of list on subscribe and when an item change (added, updated or deleted, …). This observable implements a UserList, so you can use it as a classic list.

Parameters

initial_value (Optional[List]) – intial value (default: [])

Returns

observable instance

Return type

(Observable)

async_rx.rx_repeat(duration: datetime.timedelta, producer: Callable, initial_delay: Optional[datetime.timedelta] = None) → async_rx.protocol.definition.Observable[source]

Repeat data.

rx_repeat send data generated by producer function at duration rate until observer dispose his subscription.

Parameters
  • duration (timedelta) – duration between each sended item

  • producer (Callable) – producer (asyn/sync) function

  • initial_delay (Optional[timedelta]) – initial delay before produce value (default: None)

Returns

observable instance

Return type

(Observable)

Raise:

(RuntimeError): if no producer or duration are provided

async_rx.rx_repeat_series(source: Any, ratio: Optional[float] = 1.0) → async_rx.protocol.definition.Observable[source]

Repeat a series (delay, value) as an observable for each subscription.

Parameters
  • source (Any) – iterable or async iterable source of tuple (duration, value)

  • ratio (Optional[float]) – ratio apply on duration (1.0 per default)

Returns

an observable

Return type

(Observable)

Raise:

(RuntimeError): if source is not iterable (sync or async)