Слияние исходников с помощью Haskell Conduit

Можно ли в Conduit построить функцию (скажем zipC2), которая бы переворачивала следующие исходники:

series1 = yieldMany [2, 4, 6, 8, 16 :: Int]

series2 = yieldMany [1, 5, 6 :: Int]

в один, который будет производить следующие пары (показанные здесь в виде списка):

[(Nothing, Just 1), (Just 2, Just 1), (Just 4, Just 1), (Just 4, Just 5), (Just 6, Just 6), (Just 8, Just 6), (Just 16, Just 6)]

Он будет вызываться с помощью функции сравнения следующим образом:

runConduitPure ( zipC2 (<=) series1 series1 .| sinkList )

Раньше в предыдущих версиях была функция mergeSources, которая делала что-то относительно похожее (правда, без эффекта памяти), но она исчезла в самой последней версии (1.3.1).

Пояснение о том, как работает функция. Идея состоит в том, чтобы взять 2 источника A (генерирующие значения a) и B. (генерация значений b).

Затем мы генерируем пары:

Если a ‹ b, мы сначала строим (просто a, ничего)

Если b ‹ a, это даст (ничего, просто b)

Если a == b, мы обновляем обе стороны и получаем (Just a, Just b)

Необновленное значение из источника не используется и используется для следующего раунда сравнений. Используются только обновленные значения.

Затем мы продолжаем обновлять пару в соответствии со значениями из A и B относительно друг друга.

Другими словами: мы обновляем левую часть пары, если a ‹ b, правую часть, если b ‹ a, или обе стороны, если a == b . Любое неиспользованное значение сохраняется в памяти для следующего раунда сравнения.


person Christophe    schedule 12.02.2019    source источник


Ответы (2)


Мне удалось создать вашу функцию zipC2:

import Data.Ord
import Conduit
import Control.Monad

zipC2Def :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> (Maybe a, Maybe a) -> ConduitT () (Maybe a, Maybe a) m ()
zipC2Def f c1 c2 (s1, s2) = do
  ma <- c1 .| peekC
  mb <- c2 .| peekC
  case (ma, mb) of
    (Just a, Just b) ->
      case (f a b, f b a) of
        (True, True) -> do
          yield (ma, mb)
          zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, mb)
        (_, True) -> do
          yield (s1, mb)
          zipC2Def f c1 (c2 .| drop1) (s1, mb)
        (True, _) -> do
          yield (ma, s2)
          zipC2Def f (c1 .| drop1) c2 (ma, s2)
        _ ->
          zipC2Def f (c1 .| drop1) (c2 .| drop1) (ma, s2)
    (Just a, Nothing) -> do
      yield (ma, s2)
      zipC2Def f (c1 .| drop1) c2 (ma, s2)
    (Nothing, Just b) -> do
      yield (s1, mb)
      zipC2Def f c1 (c2 .| drop1) (s1, mb)
    _ -> return ()
  where
    drop1 = dropC 1 >> takeWhileC (const True)

zipC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (Maybe a, Maybe a) m ()
zipC2 f c1 c2 = zipC2Def f c1 c2 (Nothing, Nothing)

main :: IO ()
main = 
  let
    series1 = yieldMany [2, 4, 6, 8, 16 :: Int] :: ConduitT () Int Identity ()
    series2 = yieldMany [1, 5, 6 :: Int] :: ConduitT () Int Identity ()
  in
  putStrLn $ show $ runConduitPure $
    (zipC2 (<=) series1 series2)
    .| sinkList

выход:

[(Nothing,Just 1),(Just 2,Just 1),(Just 4,Just 1),(Just 4,Just 5),(Just 6,Just 6),(Just 8,Just 6),(Just 16,Just 6)]

person Karol Samborski    schedule 12.02.2019
comment
Да, я думал об его использовании, но, к сожалению, он потребляет элементы, даже если они не используются для обновления: например, я получаю: [(Ничего,Просто 1),(Просто 4,Ничего),(Просто 6,Просто 6)] вместо ожидаемого результата [(Ничего,Только 1),(Только 2,Только 1),(Только 4,Только 1),(Только 4,Только 5),(Только 6,Только 5),(Только 6,Только 6),(Всего 8,Всего 6),(Всего 16,Всего 6)] ... Для этого должен быть какой-то эффект памяти. - person Christophe; 12.02.2019
comment
Есть ли какой-то определенный порядок потребления элементов? Например. Сначала мы потребляем элемент из источника A, затем из B, а затем снова из A и т. д. - person Karol Samborski; 12.02.2019
comment
Мы потребляем только из источника с наименьшим элементом. Мы потребляем из обоих источников, только если элементы равны. Когда элемент не потребляется, в паре используется его предыдущее значение (или Ничего, если он никогда не потреблялся). - person Christophe; 12.02.2019
comment
@ Кристоф, я обновил свой ответ. Пожалуйста, проверьте это. - person Karol Samborski; 15.02.2019
comment
это то, что я действительно искал ... даже лучше, чем ответ, который я принял в первую очередь. - person Christophe; 15.02.2019

Код ниже работает, как и ожидалось (я вызвал функцию mergeSort):

module Data.Conduit.Merge where

import Prelude (Monad, Bool, Maybe(..), Show, Eq)
import Prelude (otherwise, return)
import Prelude (($))
import Conduit (ConduitT)
import Conduit (evalStateC, mapC, yield, await)
import Conduit ((.|))
import Control.Monad.State (get, put, lift)
import Control.Monad.Trans.State.Strict (StateT)

import qualified Data.Conduit.Internal as CI

-- | Takes two sources and merges them.
-- This comes from https://github.com/luispedro/conduit-algorithms made available thanks to Luis Pedro Coelho.
mergeC2 :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () a m ()
mergeC2 comparator (CI.ConduitT s1) (CI.ConduitT s2) = CI.ConduitT $  processMergeC2 comparator s1 s2

processMergeC2 :: Monad m => (a -> a -> Bool)
                        -> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s1    ConduitT () a m ()
                        -> ((() -> CI.Pipe () () a () m ()) -> CI.Pipe () () a () m ()) -- s2    ConduitT () a m ()
                        -> ((() -> CI.Pipe () () a () m b ) -> CI.Pipe () () a () m b ) -- rest  ConduitT () a m ()
processMergeC2 comparator s1 s2 rest = go (s1 CI.Done) (s2 CI.Done)
    where
        go s1''@(CI.HaveOutput s1' v1) s2''@(CI.HaveOutput s2' v2)  -- s1''@ and s2''@ simply name the pattern expressions
            | comparator v1 v2 = CI.HaveOutput (go s1' s2'') v1
            | otherwise = CI.HaveOutput (go s1'' s2') v2
        go s1'@CI.Done{} (CI.HaveOutput s v) = CI.HaveOutput (go s1' s) v
        go (CI.HaveOutput s v) s1'@CI.Done{}  = CI.HaveOutput (go s s1')  v
        go CI.Done{} CI.Done{} = rest ()
        go (CI.PipeM p) left = do
            next <- lift p
            go next left
        go right (CI.PipeM p) = do
            next <- lift p
            go right next
        go (CI.NeedInput _ next) left = go (next ()) left
        go right (CI.NeedInput _ next) = go right (next ())
        go (CI.Leftover next ()) left = go next left
        go right (CI.Leftover next ()) = go right next

data MergeTag = LeftItem | RightItem deriving (Show, Eq)
data TaggedItem a = TaggedItem MergeTag a deriving (Show, Eq)
mergeTag :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (TaggedItem a) m ()
mergeTag func series1 series2 = mergeC2 (tagSort func) taggedSeries1 taggedSeries2
                where
                    taggedSeries1 = series1 .| mapC (\item -> TaggedItem LeftItem item)
                    taggedSeries2 = series2 .| mapC (\item -> TaggedItem RightItem item)
                    tagSort :: (a -> a -> Bool) -> TaggedItem a -> TaggedItem a -> Bool
                    tagSort f (TaggedItem _ item1) (TaggedItem _ item2) = f item1 item2

type StateMergePair a = (Maybe a, Maybe a)
pairTagC :: (Monad m) => ConduitT  (TaggedItem a) (StateMergePair a) (StateT (StateMergePair a) m) ()
pairTagC = do
    input <- await
    case input of
        Nothing -> return ()
        Just taggedItem -> do
            stateMergePair <- lift get
            let outputState = updateStateMergePair taggedItem stateMergePair
            lift $ put outputState
            yield outputState
            pairTagC

updateStateMergePair :: TaggedItem a -> StateMergePair a -> StateMergePair a
updateStateMergePair (TaggedItem tag item) (Just leftItem, Just rightItem) = case tag of
    LeftItem -> (Just item, Just rightItem)
    RightItem -> (Just leftItem, Just item)

updateStateMergePair (TaggedItem tag item) (Nothing, Just rightItem) = case tag of
    LeftItem -> (Just item, Just rightItem)
    RightItem -> (Nothing, Just item)

updateStateMergePair (TaggedItem tag item) (Just leftItem, Nothing) = case tag of
    LeftItem -> (Just item, Nothing)
    RightItem -> (Just leftItem, Just item)

updateStateMergePair (TaggedItem tag item) (Nothing, Nothing) = case tag of
    LeftItem -> (Just item, Nothing)
    RightItem -> (Nothing, Just item)

pairTag :: (Monad m) => ConduitT  (TaggedItem a) (StateMergePair a) m ()
pairTag = evalStateC (Nothing, Nothing) pairTagC

mergeSort :: (Monad m) => (a -> a -> Bool) -> ConduitT () a m () -> ConduitT () a m () -> ConduitT () (StateMergePair a) m ()
mergeSort func series1 series2 = mergeTag func series1 series2 .| pairTag

Я позаимствовал функцию mergeC2 из https://github.com/luispedro/conduit-algorithms. ..

Я только новичок в Haskell, поэтому код определенно не оптимален.

person Nathan François    schedule 14.02.2019