Как избавиться от наблюдаемого после завершения другого наблюдаемого?

У меня есть наблюдаемая source, на которую я подписываюсь с наблюдателем logger для целей ведения журнала.

Я также подписываюсь на source, чтобы выполнять вычисления. Когда мои вычисления завершены, я закончил с source и хочу избавиться от logger:

             +-------------------+
             |                   |
   +---------+ source observable +--------+
   |         |                   |        |
   |         +-------------------+        |
   |                                      |
   |                                      |
+--v---------------+         +------------v--------+
|                  |         |                     |
|     logger       |         |    computations     |
|    (observer)    |         |    (observable)     |
+-------^----------+         +-----------+---------+
        |                                |
        |                                |
        |        dispose logger          |
        +--------------------------------+
            when computations completed

Однако logger не совсем убирается в нужное время - обычно происходит один или два дополнительных тика:

MWE

from rx import Observable

# Some source
source = Observable.interval(1)

# Create logger for source
logged = []
logger = source.subscribe(logged.append)

# Now do stuff/computations with source
calculated = source.map(lambda x: x**2).take_while(lambda x: x < 20)

# Output computed values and stop logging when we're done with our computation
calculated.subscribe(print, print, logger.dispose)

# I expect only values that passed through our computation to have been logged
# The last value should be 5 because 5**2 = 25 which is larger than 20
# which in turn causes our computation to terminate
assert logged == [0, 1, 2, 3, 4, 5], logged

Но я получаю:

Traceback (most recent call last):
  File "C:\Program Files (x86)\Python27\lib\site-packages\IPython\core\interactiveshell.py", line 3035, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-54-e8cb1fb583bf>", line 1, in <module>
    assert logged == [0, 1, 2, 3, 4, 5], logged
AssertionError: [0, 1, 2, 3, 4, 5, 6, 7]

Как 7 попал в журнал? Наше вычисление должно завершиться после того, как source выдаст 5, после чего logger будет удалено.

Что я делаю не так?


person mchen    schedule 19.04.2016    source источник


Ответы (1)


Это проблема синхронизации потоков. Оператор interval() запускает новые потоки для вызова on_next() через определенные промежутки времени. После того, как вы избавитесь от подписки, потребуется время, пока другие потоки обнаружат этот сигнал и перестанут работать. И одна миллисекунда близка к тому времени, которое требуется.

Чтобы логировать сообщения, проходящие через реактивную цепочку, надежнее вставить функцию логирования прямо в эту цепочку:

logged = []
def logger(x):
    logged.append(x)
    return x

calculated = source \
    .map(logger) \
    .map(lambda x: x**2) \
    .take_while(lambda x: x < 20) \
    .subscribe(print, print)
person Yaroslav Stavnichiy    schedule 13.01.2017