Я собираюсь использовать rxandroid в приложении для Android. Я пытаюсь смоделировать поведение прямо сейчас в rxpy, потому что мне было проще всего настроить и поиграть с ним. В приведенном ниже примере source3 выдает правильные данные; который представляет собой объединение инициализации, которая занимает некоторое время, и постоянной подписки, которую я только что подделал. Я хочу BehaviorSubject, потому что мне нужно последнее значение немедленно для инициализации поля.
Я не могу понять, как связать BehaviorSubject поверх source3, чтобы он испускал источник 3, запоминая последнее значение. Я искал в Интернете два дня и не нашел четкого направления для этого варианта использования. Вот мой код, и вопрос в том, почему я не получаю никаких выбросов от наблюдателя.
from rx import Observable, Observer
from rx.subjects import BehaviorSubject
import time, random
def fake_initialization(observer):
time.sleep(5) # It takes some time
observer.on_next("Alpha")
observer.on_completed()
def fake_subscription(observer):
iter = 0 # Subscription emits forever
while True:
observer.on_next("message %02d"%(iter))
time.sleep(random.randrange(2,5))
iter += 1
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
#bsubject.on_next(value)
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source1 = Observable.create(fake_initialization)
source2 = Observable.create(fake_subscription)
source3 = source1 + source2
bsubject = BehaviorSubject(False)
source4 = source3.multicast(bsubject)
source4.connect()
source4.subscribe(PrintObserver())