Проводник: несколько потребителей потока

Я пишу программу, которая подсчитывает частоты NGrams в корпусе. У меня уже есть функция, которая потребляет поток токенов и производит NGram одного порядка:

ngram :: Monad m => Int -> Conduit t m [t]
trigrams = ngram 3
countFreq :: (Ord t, Monad m) => Consumer [t] m (Map [t] Int)

На данный момент я просто могу подключить одного потребителя потока к источнику потока:

tokens --- trigrams --- countFreq

Как подключить несколько потребителей потока к одному и тому же источнику потока? Я хотел бы иметь что-то вроде этого:

           .--- unigrams --- countFreq
           |--- bigrams  --- countFreq
tokens ----|--- trigrams --- countFreq
           '--- ...      --- countFreq

Плюсом будет параллельный запуск каждого потребителя

EDIT: благодаря Петру я придумал это решение

spawnMultiple orders = do
    chan <- atomically newBroadcastTMChan

    results <- forM orders $ \_ -> newEmptyMVar
    threads <- forM (zip results orders) $
                        forkIO . uncurry (sink chan)

    forkIO . runResourceT $ sourceFile "test.txt"
                         $$ javascriptTokenizer
                         =$ sinkTMChan chan

    forM results readMVar

    where
        sink chan result n = do
            chan' <- atomically $ dupTMChan chan
            freqs <- runResourceT $ sourceTMChan chan'
                                 $$ ngram n
                                 =$ frequencies
            putMVar result freqs

person SvenK    schedule 29.07.2013    source источник
comment
Вы хотите, чтобы когда tokens давал значение, все ваши ...grams получали его?   -  person Petr    schedule 29.07.2013


Ответы (1)


Я предполагаю, что вы хотите, чтобы все ваши приемники получали все значения.

Я бы предложил:

  1. Используйте newBroadcastTMChan для создания нового канала Control.Concurrent.STM.TMChan (stm-chans).
  2. Используйте этот канал для создания приемника с помощью sinkTBMChan от Data.Conduit.TMChan (stm-conduit) для вашего основного продюсера.
  3. Для каждого клиента используйте dupTMChan, чтобы создать собственную копию для чтения. Запустите новый поток, который будет читать эту копию, используя sourceTBMChan.
  4. Соберите результаты из ваших потоков.
  5. Убедитесь, что ваши клиенты могут считывать данные так же быстро, как они создаются, иначе вы можете получить переполнение кучи.

(Я не пробовал, дайте нам знать, как это работает.)


Обновление: один из способов сбора результатов — создать MVar для каждого потока потребителя. Каждый из них будет putMVar своего результата после его завершения. И ваш основной поток будет takeMVar на всех этих MVars, таким образом, ожидая завершения каждого потока. Например, если vars — это список ваших MVar, основной поток выдаст mapM takeMVar vars для сбора всех результатов.

person Petr    schedule 29.07.2013
comment
Спасибо за ответ, как мне собрать результаты, если я создаю потоки с помощью forkIO? - person SvenK; 29.07.2013
comment
@SvenK Я обновил ответ, добавив идею, как собирать результаты. - person Petr; 30.07.2013
comment
Почему у TMChan есть широковещательная версия, а у TBMChan нет, где я могу найти newBroadcastTBMChan? - person CMCDragonkai; 12.10.2016