Table of contents
BIO: composable callbacks
In previous sections, we have introduced the Z.IO.Buffered
module. And it provides APIs for buffered reading and writing. When combined with Builder and Parser facility, it is easy to handle some simple streaming tasks, for example, read/write packets from TCP wire. But sometimes, things could get complicated. Let’s say you want to use the zlib library to decompress a bytes stream from some file. The interface provided by zlib is like this:
int inflateInit (z_streamp strm, int level);
int inflate (z_streamp strm, int flush);
int inflateEnd (z_streamp strm);
It’s OK to draw a chunk from BufferedInput
, feed it to z_streamp
, check the status and do some computation if a decompressed chunk is produced. But how to read a line from decompressed streams? We can’t reuse readLine
from Z.IO.Buffered
since decompressed chunks are not drawn directly from BufferedInput
.
Ideally, we should have a composable BufferedInput
type, which can accept some transformations and yield another BufferedInput
. But BufferedInput
is all about managing reading from buffer so that raw byte chunks can be drawn from the device. In Z-IO the BIO
type is introduced to solve the composable streaming problem:
type BIO inp out = (Maybe out -> IO ()) -> Maybe inp -> IO ()
Conceptually a BIO
is a box doing transformation on data callbacks:
-- A pattern synonym for more meaningful pattern match
pattern EOF :: Maybe a
pattern EOF = Nothing
fooBIO :: BIO foo bar
fooBIO callback maybeFoo = do
... use callback to pass output data
case maybeFoo of
Just foo ->
... you can send result to downstream by pass Just values
... to callback, and you can call callback multiple times.
callback (Just ...)
...
callback (Just ...)
...
EOF ->
... you should pass EOF to callback to indicate current
... node also reaches its EOF
callback EOF
BIO
type have two params:
- A
callback :: Maybe out -> IO ()
(often written ask
) which get called when to write downstream:- A
Just out
value is an item passed to downstream. - A
EOF
notified downstream EOF.
- A
- A
Maybe inp
value which comes from upstream:- A
Just inp
value is an item from upstream. - A
EOF
notified upstream EOF.
- A
Let’s take zlib’s z_streamp
as an example to implement a compressing BIO node:
compressBIO :: ZStream -> BIO V.Bytes V.Bytes
compressBIO zs = \ callback mbs ->
case mbs of
Just bs -> do
-- feed input chunk to ZStream
set_avail_in zs bs (V.length bs)
let loop = do
oavail :: CUInt <- withCPtr zs $ \ ps -> do
-- perform deflate and peek output buffer remaining
throwZlibIfMinus_ (deflate ps (#const Z_NO_FLUSH))
(#peek struct z_stream_s, avail_out) ps
when (oavail == 0) $ do
-- when output buffer is full,
-- freeze chunk and call the callback
oarr <- A.unsafeFreezeArr =<< readIORef bufRef
callback (Just (V.PrimVector oarr 0 bufSiz))
newOutBuffer
loop
loop
_ -> ... similar to above, with no input chunk and Z_FINISH flag
Source and Sink types
Now let’s consider the following devices:
- A data source which doesn’t take any input but can be read until EOF.
- A data sink which only performs writing without producing any meaningful result.
We can have the definitions for data Source
and Sink
by using Void
from Data.Void
:
-- Source type doesn't need input
type Source a = BIO Void a
-- Sink type doesn't produce output
type Sink a = BIO a Void
Because Void
type doesn’t have constructors, one should ignore the Maybe Void
param when defining a Source
. For example, a BIO
node sourcing chunks from BufferedInput
can be implemented like this:
sourceFromBuffered :: BufferedInput -> Source V.Bytes
sourceFromBuffered i = \ k _ ->
let loop = readBuffer i >>= \ x ->
if V.null x then k EOF else k (Just x) >> loop
in loop
For type Sink a = BIO a Void
, the callback type is Maybe Void -> IO ()
, which means you can only pass EOF
to the callback, the convention here is to only call callback when EOF:
-- | The `BufferedOutput` device will get flushed only on EOF.
sinkToBuffered :: BufferedOutput -> Sink V.Bytes
sinkToBuffered bo = \ k mbs ->
case mbs of
Just bs -> writeBuffer bo bs
_ -> flushBuffer bo >> k EOF
Composing BIO
The BIO
type could be composed via (.)
, i.e. the function composition. The composition’s result has some interesting facts:
- If you compose a
Source a
toBIO a b
, you will get aSource b
. - If you compose a
BIO a b
toSink b
, you will get aSink a
.
So let’s say you want to count the line number of a file, you could use BIO
:
import Z.IO
import Z.Data.PrimRef
main :: IO ()
main = do
_:path:_ <- getArgs
withResource (initSourceFromFile path) $ \ fileSource -> do
counterRef <- newCounter 0
let counter = counterNode counterRef
splitter <- newLineSplitter
runBIO_ $ fileSource . splitter . counter
printStd =<< readPrimIORef counterRef
runBIO_ :: Source a -> IO ()
simply supply a EOF
to the BIO chain, and fileSource will drive the whole chain running until EOF, it’s defined as:
discard :: a -> IO ()
{-# INLINABLE discard #-}
discard _ = return ()
runBIO_ :: BIO inp out -> IO ()
{-# INLINABLE runBIO_ #-}
runBIO_ bio = bio discard EOF
Another example from the introduce BIO blog post:
import Z.Data.CBytes (CBytes)
import Z.IO
import Z.IO.BIO
import Z.IO.BIO.Zlib
base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO ()
base64AndCompressFile origin target = do
base64Enc <- newBase64Encoder
(_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31}
withResource (initSourceFromFile origin) $ \ src ->
withResource (initSinkToFile target) $ \ sink ->
runBIO_ $ src . base64Enc . zlibCompressor . sink
Above code is similar to command line cat origin | base | gzip > target
.