clojure core.async — неожиданные несоответствия

я не занимался Clojure пару лет, поэтому решил вернуться и не игнорировать core.async на этот раз) довольно крутая штука, но это меня почти сразу удивило. Теперь я понимаю, что существует неотъемлемая индетерминизм, когда задействовано несколько потоков, но здесь есть нечто большее, чем это.

Исходный код моего очень простого примера, где я пытаюсь скопировать строки из STDIN в файл:

(defn append-to-file
  "Write a string to the end of a file"
  ([filename s]
   (spit filename (str s "\n")
         :append true))
  ([s]
   (append-to-file "/tmp/journal.txt" s)))

(defn -main
  "I don't do a whole lot ... yet."
  [& args]
  (println "Initializing..")
  (let [out-chan (a/chan)]
    (loop [line (read-line)]
      (if (empty? line) :ok
          (do
            (go (>! out-chan line))
            (go (append-to-file (<! out-chan)))
            (recur (read-line)))))))

только, конечно, это оказалось не так просто. Я думаю, что сузил его до чего-то, что не убрано должным образом. По сути, запуск основной функции приводит к противоречивым результатам. Иногда я запускаю его 4 раза и вижу на выходе 12 строк. Но иногда 4 прогона дают всего 10 строк. Или, как показано ниже, 3 раза, 6 строк:

akamac.home ➜  coras git:(master) ✗ make clean
cat /dev/null > /tmp/journal.txt
lein clean
akamac.home ➜  coras git:(master) ✗ make compile
lein uberjar
Compiling coras.core
Created /Users/akarpov/repos/coras/target/uberjar/coras-0.1.0-SNAPSHOT.jar
Created /Users/akarpov/repos/coras/target/uberjar/coras-0.1.0-SNAPSHOT-standalone.jar
akamac.home ➜  coras git:(master) ✗ make run    
java -jar target/uberjar/coras-0.1.0-SNAPSHOT-standalone.jar < resources/input.txt
Initializing..
akamac.home ➜  coras git:(master) ✗ make run
java -jar target/uberjar/coras-0.1.0-SNAPSHOT-standalone.jar < resources/input.txt
Initializing..
akamac.home ➜  coras git:(master) ✗ make run
java -jar target/uberjar/coras-0.1.0-SNAPSHOT-standalone.jar < resources/input.txt
Initializing..
akamac.home ➜  coras git:(master) ✗ make check  
cat /tmp/journal.txt
line a
line z
line b
line a
line b
line z

(В основном, иногда запуск производил 3 строки, иногда 0, иногда 1 или 2). Тот факт, что строки появляются в случайном порядке, меня не беспокоит — блоки go делают что-то параллельно/поточно, и все ставки сняты. Но почему они не выполняют всю работу все время? (Потому что я как-то злоупотребляю ими, но где?) Спасибо!


person alexakarpov    schedule 25.01.2018    source источник
comment
единственное, что вызывает у меня подозрения, это то, что после того, как я поставлю сообщение в очередь в канале, я не знаю, сколько времени потребуется потоку из блока go, чтобы подобрать его и выполнить свою работу. А так как я ничего не блокирую, основной поток может завершиться до того, как поток блока перехода получит возможность выполнить работу. Итак, состояние гонки...   -  person alexakarpov    schedule 25.01.2018
comment
@CharlesDuffy, ты прав. Я забыл, что я ограничен в принятии собственных ответов двумя днями, но не в предоставлении их!   -  person alexakarpov    schedule 26.01.2018


Ответы (2)


В этом коде много проблем, позвольте мне быстро их рассмотреть:

1) Каждый раз, когда вы вызываете (go ...), вы запускаете новый «поток», который будет выполняться в пуле потоков. Не определено, когда этот поток будет запущен.

2) Вы не ждете завершения этих потоков, поэтому возможно (и очень вероятно), что вы в конечном итоге прочитаете несколько строк из файла, записав несколько строк в канал еще до того, как произойдет чтение.

3) Вы запускаете несколько вызовов append-to-file одновременно (см. № 2), эти функции не синхронизированы, поэтому возможно одновременное добавление нескольких потоков. Поскольку доступ к файлам в большинстве ОС не скоординирован, два потока могут одновременно писать в ваш файл, перезаписывая результаты друг друга.

4) Поскольку вы создаете новый блок go для каждой прочитанной строки, возможно, они будут выполняться в другом порядке, чем вы ожидаете, это означает, что строки в выходном файле могут быть не в порядке.

Я думаю, все это можно немного исправить, избегая довольно распространенного антипаттерна с core.async: не создавать go блоков (или потоков) внутри неограниченных или больших циклов. Часто это делает то, чего вы не ожидаете. Вместо этого создайте один core.async/thread с циклом, который читает из файла (поскольку он выполняет ввод-вывод, никогда не выполняйте ввод-вывод внутри блока go) и записывает в канал, и один, который читает из канала и записывает в выходной файл.

Посмотрите на это как на сборочную линию, состоящую из рабочих (go блоков) и конвейерных лент (каналов). Если бы вы построили фабрику, у вас не было бы кучи людей, которые объединялись бы в пары и говорили: «Вы берете один предмет, а когда закончите, отдайте его ему». Вместо этого вы бы организовали всех людей сразу, с конвейерами между ними и «передавали бы» работу (или данные) между работниками. Ваши воркеры должны быть статичными, а ваши данные должны перемещаться.

person Timothy Baldridge    schedule 26.01.2018
comment
Большое спасибо! Я мог бы решить конкретную проблему, ответственную за эти конкретные симптомы, но, конечно, дал более глубокий ответ. - person alexakarpov; 26.01.2018

.. и, конечно же, это было неправильное использование core.async с моей стороны:

Если мне важно видеть все данные на выходе, я должен использовать блокирующий «дубль» на канале, когда я хочу передать значение в свой ввод-вывод. code -- и, как было указано, этот блокирующий вызов не должен находиться внутри блока go. Мне нужно было всего одно изменение строки:

от:

(go (append-to-file (<! out-chan)))

to:

(append-to-file (<!! out-chan))
person alexakarpov    schedule 25.01.2018
comment
Не используйте <!! внутри блока go или внутри любой функции, вызываемой блоком go. Это может заблокировать очередь выполнения go и создать очень сложные для отладки проблемы. - person Timothy Baldridge; 26.01.2018
comment
да, это идет рука об руку с отзывами, представленными в вашем ответе. Оглядываясь назад, кажется, что блокировка рабочего процесса, созданного в ограниченном пуле потоков, не является хорошей идеей. - person alexakarpov; 27.01.2018
comment
Как объяснил Тимоти Болдридж, это не устраняет ни одной фундаментальной проблемы вашего подхода. В другом комментарии вы говорите, что я мог бы решить конкретную проблему, но вы этого не сделали: вы просто немного нарушили ситуацию, чтобы реже сталкиваться с состоянием гонки. Тот же самый баг все еще там. - person amalloy; 27.01.2018
comment
@amalloy Я сказал, что изменения в моем собственном ответе решили исходную проблему. Первоначальная проблема, как описано в вопросе, заключалась в том, что наблюдались условия гонки - потоки запуска не имели возможности завершить свою работу. Для каждой прочитанной строки я порождал рабочего процесса из пула потоков; этот воркер попытается подключиться к небуферизованному каналу; с канала никто не берет, и он паркуется. Вернувшись в основной поток, я создаю еще одного рабочего, на этот раз берущего с канала. Что-то ждет, так что эти двое обмениваются сообщениями. Основной поток продолжается без ожидания. - person alexakarpov; 27.01.2018
comment
@amalloy - это объясняет проблему, с которой я столкнулся: основной поток порождал рабочих, которые брали и помещали для каждой строки, а затем завершались. Иногда они заканчивали эту работу, иногда нет. - person alexakarpov; 27.01.2018
comment
ПРИМЕЧАНИЕ. Я отредактировал ответ, так что он показывает, что мое исправление, каким бы слабым оно ни было, не поставило ›!! внутри блока go (это не было очевидно из того, как я это описал) - person alexakarpov; 27.01.2018