Построение RequestBodyStream из Lazy ByteString, когда известна длина

Я пытаюсь адаптировать этот код загрузки AWS S3 для обработки Lazy ByteString где длина уже известна (чтобы ее не читали целиком в память - она ​​приходит по сети, куда длина отправляется заранее). Кажется, мне нужно определить GivesPopper над Lazy ByteString, чтобы преобразовать его в RequestBodyStream. Из-за запутанного способа определения GivesPopper я не уверен, как написать его для Lazy ByteString. Буду признателен за подсказки о том, как это написать. Вот так написано для чтения из файла :

let file ="test"
-- streams large file content, without buffering more than 10k in memory
let streamer sink = withFile file ReadMode $ \h -> sink $ S.hGet h 10240

streamer в приведенном выше коде имеет тип GivesPopper (), если я правильно понимаю. Учитывая Lazy ByteString с известной длиной len, что было бы хорошим способом написать над ним функцию GivesPopper? Мы можем читать по одному куску за раз.


person Sal    schedule 03.06.2016    source источник


Ответы (1)


Это то, что вы ищете?

import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import System.IO

file = "test"
-- original streamer for feeding a sink from a file
streamer :: (IO S.ByteString -> IO r) -> IO r
streamer sink = withFile file ReadMode $ \h -> sink $ S.hGet h 10240

-- feed a lazy ByteString to sink    
lstreamer :: L.ByteString -> (IO S.ByteString -> IO r) -> IO r
lstreamer lbs sink = sink (return (L.toStrict lbs))

lstreamer проверяет тип, но, вероятно, делает не совсем то, что вы хотите. Он просто возвращает одни и те же данные каждый раз, когда его вызывает приемник. С другой стороны, S.hGet h ... в конечном итоге вернет пустую строку.

Вот решение, которое использует IORef, чтобы отслеживать, должны ли мы начать возвращать пустую строку:

import Data.IORef

mklstream :: L.ByteString -> (IO S.ByteString -> IO r) -> IO r
mklstream lbs sink = do
  ref <- newIORef False
  let fetch :: IO S.ByteString
      fetch = do sent <- readIORef ref
                 writeIORef ref True
                 if sent
                   then return S.empty
                   else return (L.toStrict lbs)
  sink fetch

Здесь fetch — это действие, которое получает следующий фрагмент. При первом вызове вы получите исходную ленивую байтовую строку (строгую). Последующие вызовы всегда будут возвращать пустую строку.

Обновить

Вот как выдать небольшую сумму за раз:

mklstream :: L.ByteString -> (IO S.ByteString -> IO r) -> IO r
mklstream lbs sink = do
  ref <- newIORef (L.toChunks lbs)
  let fetch :: IO S.ByteString
      fetch = do chunks <- readIORef ref
                 case chunks of
                   [] -> return S.empty
                   (c:cs) -> do writeIORef ref cs
                                return c
  sink fetch
person ErikR    schedule 04.06.2016
comment
Разве это решение не будет считывать всю ленивую строку байтов в память из-за вызова L.toStrict? Мы хотим читать по одному фрагменту за раз или установить верхнюю границу использования памяти. - person Sal; 04.06.2016
comment
Что ж, он показывает вам, как создать некоторое состояние (IOref), чтобы отслеживать то, что вы уже выдали. Обратите внимание, что если вы не используете ленивый ввод-вывод, строка ленивых байтов уже находится в памяти. - person ErikR; 04.06.2016
comment
Вероятно, следует сказать, что обновленный mklstream опирается на инвариант, согласно которому ленивая строка байтов никогда не имеет пустого фрагмента строки байтов. Было бы понятнее, что он делает то, что хочет http-client, если бы ссылка была определена с помощью newIORef (filter (not . S.null) (L.toChunks lbs)), но, насколько я понимаю, в этом нет необходимости. Затем он имеет желаемую структуру, которая должна иметь вид hGet - person Michael; 04.06.2016
comment
Тот же результат получается более или менее предварительно упакованным с io-streams. Там вы должны написать ref <- Streams.fromLazyByteString lbs, а затем функция fetch будет чем-то вроде fmap (maybe S.empty id) (Streams.read ref) - person Michael; 04.06.2016
comment
@Майкл, да, хорошее замечание по поводу io-streams. Я посмотрел на реализацию там. Интересно, эффективен ли writeIORef в приведенном выше коде, потому что способ, которым он поднимается на вершину стека, заключается в перезаписи нового стека после удаления верхнего элемента. io-streams тоже имеет аналогичную логику в makeInputStream. Так что, думаю, это будет эффективно. - person Sal; 04.06.2016