Reactivex: как сделать так, чтобы объект поведения излучал из наблюдаемого

Я собираюсь использовать rxandroid в приложении для Android. Я пытаюсь смоделировать поведение прямо сейчас в rxpy, потому что мне было проще всего настроить и поиграть с ним. В приведенном ниже примере source3 выдает правильные данные; который представляет собой объединение инициализации, которая занимает некоторое время, и постоянной подписки, которую я только что подделал. Я хочу BehaviorSubject, потому что мне нужно последнее значение немедленно для инициализации поля.

Я не могу понять, как связать BehaviorSubject поверх source3, чтобы он испускал источник 3, запоминая последнее значение. Я искал в Интернете два дня и не нашел четкого направления для этого варианта использования. Вот мой код, и вопрос в том, почему я не получаю никаких выбросов от наблюдателя.

from rx import Observable, Observer
from rx.subjects import BehaviorSubject
import time, random

def fake_initialization(observer):
    time.sleep(5)  # It takes some time
    observer.on_next("Alpha")
    observer.on_completed()

def fake_subscription(observer):
    iter = 0 # Subscription emits forever
    while True:
        observer.on_next("message %02d"%(iter))
        time.sleep(random.randrange(2,5))
        iter += 1

class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source1 = Observable.create(fake_initialization)
source2 = Observable.create(fake_subscription)
source3 = source1 + source2

bsubject = BehaviorSubject(False)
source4 = source3.multicast(bsubject)
source4.connect()
source4.subscribe(PrintObserver())

person malibu    schedule 28.03.2018    source источник


Ответы (1)


На самом деле это был довольно простой ответ для кого-то. Я публикую это на случай, если кто-то еще окажется в такой ситуации. По общему признанию, я недостаточно внимательно читал страницу rxpy. Вам нужно добавить параллелизм самостоятельно, по-видимому, потому, что в Python так много параллельных решений. Вот окончательный рабочий код:

import random
import time

import multiprocessing
from rx import Observable,Observer
from rx.concurrency import ThreadPoolScheduler
from rx.subjects import Subject

class PrintObserver1(Observer):

    def on_next(self, value):
        print("Received 1 {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done 1!")

    def on_error(self, error):
        print("Error Occurred: 1 {0}".format(error))

class PrintObserver2(Observer):

    def on_next(self, value):
        print("Received 2 {0}".format(value))
        #bsubject.on_next(value)

    def on_completed(self):
        print("Done 2!")

    def on_error(self, error):
        print("Error Occurred: 2 {0}".format(error))

def fake_initialization(observer):
    time.sleep(5)  # It takes some time
    observer.on_next("Alpha")
    observer.on_completed()

def fake_subscription(observer):
    iter = 0 # Subscription emits forever
    while True:
        observer.on_next("message %02d"%(iter))
        time.sleep(random.randrange(2,5))
        iter += 1

optimal_thread_count = multiprocessing.cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

source1 = Observable.create(fake_initialization).subscribe_on(pool_scheduler)
source2 = Observable.create(fake_subscription).subscribe_on(pool_scheduler)
catted_source = source1 + source2

native_source = Observable.interval(1000)
print native_source,catted_source
#source = source3
subject = Subject()
# native_source = works
# catted_source = not works
subSource = catted_source.subscribe(subject)
#####

subSubject1 = subject.subscribe(PrintObserver1())
subSubject2 = subject.subscribe(PrintObserver2())
time.sleep(30)
subject.on_completed()
subSubject1.dispose()
subSubject2.dispose()

Также обратите внимание, что вам необходимо установить пакет «futures», чтобы параллелизм работал на Python 2.7.

Если вы получили эту ошибку:

from concurrent.futures import ThreadPoolExecutor 
ImportError: No module named concurrent.futures

Прочитайте это (ссылка для немного другой ошибки, но решение работает):

ImportError: нет модуля с именем concurrent.futures.process

person malibu    schedule 29.03.2018