Цель состоит в том, чтобы иметь канал со следующей сигнатурой типа
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
Канал должен многократно анализировать буферы протокола (с помощью функции ByteString -> a
), полученные через TCP/IP (с помощью пакета network-conduit
).
Формат телеграфного сообщения
{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...
(Фигурные скобки не являются частью протокола, они используются здесь только для разделения объектов).
Первая идея состояла в том, чтобы использовать sequenceSink
для многократного применения Sink
, способного анализировать один ProtoBuf:
[...]
import qualified Data.Binary as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util as CU
protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
CU.sequenceSink () $ \() ->
do lenBytes <- CB.take 4 -- read protobuf length
let len :: Word32
len = B.decode lengthBytes -- decode ProtoBuf length
intLen = fromIntegral len
protobufBytes <- CB.take intLen -- read the ProtoBuf bytes
return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf
Это не работает (работает только для первого буфера протокола), потому что кажется, что есть некоторое количество оставшихся байтов, уже прочитанных из источника, но не использованных через CB.take
, которые отбрасываются.
И я не нашел способа запихнуть остальные обратно в источник.
Я полностью неправильно понял концепцию?
PS: Даже если я использую здесь буферы протоколов, проблема не связана с буферами протоколов. Для отладки проблемы я всегда использую {length}{UTF8 encoded string}{length}{UTF8 encoded string}...
и канал, аналогичный приведенному выше (utf8StringConduit :: MonadResource m => Conduit ByteString m Text
).
Обновлять:
Я просто попытался заменить состояние (в приведенном выше примере нет состояния ()
) оставшимися байтами и заменил вызовы CB.take
вызовами функции, которая сначала потребляет уже прочитанные байты (из состояния) и вызывает await
только по мере необходимости (когда государство недостаточно велико). К сожалению, это тоже не работает, потому что, как только в Source не осталось байтов, sequenceSink
не выполняет код, но состояние все еще содержит оставшиеся байты :-(.
Если вас должен заинтересовать код (который не оптимизирован или очень хорош, но его должно быть достаточно для тестирования):
utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
CU.sequenceSink [] $ \st ->
do (lengthBytes, st') <- takeWithState BS.empty st 4
let len :: Word32
len = B.decode $ BSL.fromChunks [lengthBytes]
intLength = fromIntegral len
(textBytes, st'') <- takeWithState BS.empty st' intLength
return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]
takeWithState :: Monad m
=> ByteString
-> [ByteString]
-> Int
-> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
let stateLenSum = foldl' (+) 0 $ map BS.length state
in if stateLenSum >= neededLen
then do let (firstChunk:state') = state
(neededChunk, pushBack) = BS.splitAt neededLen firstChunk
acc' = acc `BS.append` neededChunk
neededLen' = neededLen - BS.length neededChunk
state'' = if BS.null pushBack
then state'
else pushBack:state'
takeWithState acc' state'' neededLen'
else do aM <- await
case aM of
Just a -> takeWithState acc (state ++ [a]) neededLen
Nothing -> error "to be fixed later"
protobufBytes <- CB.take intLen
. - person Matt S   schedule 24.09.2012protobuf
предшествуют 4 байта, которые представляют длину (в байтах)protobuf
. Моя первоначальная теория заключалась в том, что эти 4 байта могут на самом деле представлять длинуprotobuf
PLUS 4-байтового заголовка длины. Это приведет к тому, что ваш код будет читать 4 байта после фактического концаprotobuf
, ошибочно потребляя 4 байта, которые должны представлять длину следующегоprotobuf
. - person Matt S   schedule 24.09.2012