Source code for async_rx.multicast.rx_publish_behavior

from typing import Optional

from ..protocol import ConnectableObservable, ConnectableObservableHandler, Observable, SubjectHandler
from ..subject import rx_subject_behavior
from .rx_publish import rx_publish

__all__ = ["rx_publish_behavior"]


[docs]def rx_publish_behavior( an_observable: Observable, subject_handler: Optional[SubjectHandler] = None, connection_handler: Optional[ConnectableObservableHandler] = None ) -> ConnectableObservable: """Create a publish_behavior. A publish_behavior uses a behavior_subject under the hood to make multiple Observers see the same Observable execution. Args: an_observable (Observable): observable to connect subject_handler (Optional[SubjectHandler]): optional subject handler connection_handler (Optional[ConnectableObservableHandler]): optional connection handler Returns: (ConnectableObservable): the publish_behavior instance """ return rx_publish(subject_factory=rx_subject_behavior, an_observable=an_observable, subject_handler=subject_handler, connection_handler=connection_handler)