Последовательное декодирование двоичных данных с использованием каналов

Цель состоит в том, чтобы иметь канал со следующей сигнатурой типа

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a

Канал должен многократно анализировать буферы протокола (с помощью функции ByteString -> a), полученные через TCP/IP (с помощью пакета network-conduit).

Формат телеграфного сообщения

{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...

(Фигурные скобки не являются частью протокола, они используются здесь только для разделения объектов).

Первая идея состояла в том, чтобы использовать sequenceSink для многократного применения Sink, способного анализировать один ProtoBuf:

[...]
import qualified Data.Binary         as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util   as CU

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
    CU.sequenceSink () $ \() ->
        do lenBytes <- CB.take 4                                -- read protobuf length
           let len :: Word32
               len = B.decode lengthBytes                       -- decode ProtoBuf length
               intLen = fromIntegral len
           protobufBytes <- CB.take intLen                      -- read the ProtoBuf bytes
           return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf

Это не работает (работает только для первого буфера протокола), потому что кажется, что есть некоторое количество оставшихся байтов, уже прочитанных из источника, но не использованных через CB.take, которые отбрасываются.

И я не нашел способа запихнуть остальные обратно в источник.

Я полностью неправильно понял концепцию?

PS: Даже если я использую здесь буферы протоколов, проблема не связана с буферами протоколов. Для отладки проблемы я всегда использую {length}{UTF8 encoded string}{length}{UTF8 encoded string}... и канал, аналогичный приведенному выше (utf8StringConduit :: MonadResource m => Conduit ByteString m Text).

Обновлять:

Я просто попытался заменить состояние (в приведенном выше примере нет состояния ()) оставшимися байтами и заменил вызовы CB.take вызовами функции, которая сначала потребляет уже прочитанные байты (из состояния) и вызывает await только по мере необходимости (когда государство недостаточно велико). К сожалению, это тоже не работает, потому что, как только в Source не осталось байтов, sequenceSink не выполняет код, но состояние все еще содержит оставшиеся байты :-(.

Если вас должен заинтересовать код (который не оптимизирован или очень хорош, но его должно быть достаточно для тестирования):

utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
    CU.sequenceSink [] $ \st ->
        do (lengthBytes, st') <- takeWithState BS.empty st 4
           let len :: Word32
               len = B.decode $ BSL.fromChunks [lengthBytes]
               intLength = fromIntegral len
           (textBytes, st'') <- takeWithState BS.empty st' intLength
           return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]

takeWithState :: Monad m
              => ByteString
              -> [ByteString]
              -> Int
              -> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
    let stateLenSum = foldl' (+) 0 $ map BS.length state
     in if stateLenSum >= neededLen
           then do let (firstChunk:state') = state
                       (neededChunk, pushBack) = BS.splitAt neededLen firstChunk
                       acc' = acc `BS.append` neededChunk
                       neededLen' = neededLen - BS.length neededChunk
                       state'' = if BS.null pushBack
                                    then state'
                                    else pushBack:state'
                   takeWithState acc' state'' neededLen'
           else do aM <- await
                   case aM of
                     Just a -> takeWithState acc (state ++ [a]) neededLen
                     Nothing -> error "to be fixed later"

person Johannes Weiss    schedule 24.09.2012    source источник
comment
Возможно ли, что четыре байта, используемые для кодирования длины, включены в длину? Это приведет к чтению четырех дополнительных байтов с помощью protobufBytes <- CB.take intLen.   -  person Matt S    schedule 24.09.2012
comment
@MattS, извините, я не правильно понял ваш вопрос? Что вы подразумеваете под длиной, включенной в длину?   -  person Johannes Weiss    schedule 24.09.2012
comment
В указанном вами потоке байтов каждому protobuf предшествуют 4 байта, которые представляют длину (в байтах) protobuf. Моя первоначальная теория заключалась в том, что эти 4 байта могут на самом деле представлять длину protobuf PLUS 4-байтового заголовка длины. Это приведет к тому, что ваш код будет читать 4 байта после фактического конца protobuf, ошибочно потребляя 4 байта, которые должны представлять длину следующего protobuf.   -  person Matt S    schedule 24.09.2012
comment
Да, они размером с следующий ПБ.   -  person Johannes Weiss    schedule 25.09.2012


Ответы (1)


Для синтаксического анализа и сериализации буфера протокола мы используем messageWithLengthPutM и messageWithLengthGetM (см. ниже), но я предполагаю, что для длины используется кодировка varint, а это не то, что вам нужно. Я бы, вероятно, попытался адаптировать нашу реализацию ниже, заменив messageWithLength Get/Put чем-то вроде

myMessageWithLengthGetM = 
   do size <- getWord32be 
      getMessageWithSize size

но я понятия не имею, как реализовать getMessageWithSize, используя доступные функции из пакета буфера протокола. С другой стороны, вы можете просто getByteString, а затем "повторно разобрать" строку байтов.

Что касается каналов: вы пытались реализовать канал без Data.Conduit.Util? Что-то типа

protobufConduit protobufDecode = loop
   where
      loop = 
         do len <- liftM convertLen (CB.take 4)
            bs <- CB.take len
            yield (protobufDecode bs)
            loop

Вот код, который мы используем:

pbufSerialize :: (ReflectDescriptor w, Wire w) => Conduit w IO ByteString
pbufSerialize = awaitForever f
    where f pb = M.mapM_ yield $ BSL.toChunks $ runPut (messageWithLengthPutM pb)

pbufParse :: (ReflectDescriptor w, Wire w, Show w) => Conduit ByteString IO w
pbufParse = new
    where
      new = read (runGet messageWithLengthGetM . BSL.fromChunks . (:[]))
      read parse =
          do mbs <- await
             case mbs of
               Just bs -> checkResult (parse bs)
               Nothing -> return ()
      checkResult result =
          case result of
            Failed _ errmsg -> fail errmsg
            Partial cont -> read (cont . Just . BSL.fromChunks . (:[]))
            Finished rest _ msg ->
                do yield msg
                   checkResult (runGet messageWithLengthGetM rest)
person David Leuschner    schedule 24.09.2012
comment
Спасибо @David Leuschner, поскольку я использую Haskell с обеих сторон канала, я просто изменил свой протокол и теперь использую код ваших функций pbufSerialize и pbufParse :-). Идея не использовать CU.sequenceSink была решающей. Спасибо! - person Johannes Weiss; 25.09.2012