# From conduit to streamly

October 22, 2021, Posted by Julian Ospald## Motivation

At GHCup I recently put a lot of effort into reducing the dependency footprint to improve build times. Since `conduit`

was not a direct dependency and only used for yaml parsing and some other things, I replaced those deps with alternatives or re-implemented them (like logging).

yaml, which uses conduit under the hood, was replaced with HsYAML, but to my despair… that turned out to be 10 times slower, which also caused issues for pandoc.

Conduit is an excellent fully featured streaming library, but I didn’t want to go back to it by re-introducing yaml, since GHCup previously depended on `streamly`

and will likely do so in the future. So I simply decided to migrate yaml to streamly: https://hackage.haskell.org/package/yaml-streamly.

Streamly is a very general streaming library with a the strong focus on performance through inlining and stream fusion optimizations. As such, it may exceed other implementations performance, but also depends quite heavily on GHC behavior, flags, INLINE pragmas etc. It can also be used as an alternative for async, for reactive programming and much more.

So in this post, I will shortly explain conduit and streamly and provide a simple migration guide.

## Recap on conduit

There are many approaches on streaming. Conduit and streamly diverge quite heavily in terms of paradigm and API.

Conduit expresses streaming by providing a type that captures `i`

nput, `o`

utput and a possible final `r`

esult, all in one type (and the obligatory effect `m`

):

`data ConduitT i o m r`

As such, it expresses:

### Producers

These are generators from a seed value. Conduit defines it generically as such:

`unfold :: Monad m => (b -> Maybe (a, b)) -> b -> ConduitT i a m ()`

A simple unfold that lets us turn a list into a stream would be:

```
-- this is also provided by conduit
sourceList :: Monad m => [a] -> ConduitT i a m ()
= unfold gen
sourceList where
gen :: [a] -> Maybe (a, [a])
:xs) = Just (x,xs)
gen (x= Nothing -- stream aborts
gen _
-- our own stream of "output" chars with no final result
chars :: Monad m => ConduitT i Char m ()
= sourceList "abc" chars
```

As can be seen, the `o`

in `data ConduitT i o m r`

gets fixed to `Char`

. A Producer can then be be “piped” into another conduit, e.g. a transformer.

A producer focuses on the `o`

utput.

### Transformer

A transformer is like `map :: (a -> b) -> [a] -> [b]`

. It transforms the stream and may yield a different type.

```
-- provided by conduit, notice how it has only one argument
map :: Monad m => (a -> b) -> ConduitT a b m ()
-- transforms Char to Int
charToInt :: Monad m => ConduitT Char Int m ()
= map ord
charToInt
-- applies the transformation to the chars, yielding a Producer
-- we'll explaing '.|' shortly
ints :: Monad m => ConduitM a Int m ()
= chars .| charToInt ints
```

Notable is also that the Functor `fmap`

isn’t a transformation. It would map on the final value, not the produced values. That’s why we need `Data.Conduit.List.map`

. Streamly is very different here.

A transformer maps the `i`

nput to the `o`

utput.

To apply a transformation, we use the `(.|)`

pipe operator, which reminds us of shell pipes:

```
(.|) :: Monad m
=> ConduitM a b m () -- ^ producer of values 'b'
-> ConduitM b c m r -- ^ transformer (b -> c), or consumer
-> ConduitM a c m r
```

It takes a little while to see what’s going on. The type variables guide us.

### Consumer

A consumer works on the input stream, much like a transformer, but may also yield a final result. E.g. If we wanted to return all the Int’s we just converted from the Char stream, we’d do:

```
-- provided by conduit
foldl :: Monad m => (a -> b -> a) -> a -> ConduitT b o m a
-- 'a' (the input) gets folded as a list, so the final result is '[a]'
toList :: Monad m => ConduitT a o m [a]
= foldl (\a b -> b:a) []
toList
-- applying the fold on the stream of Ints
foldedInts :: Monad m => ConduitM a c m [Int]
= ints .| toList foldedInts
```

The consumer focuses on the `i`

nput to produce a final `r`

esult (however, consumers may also drop elements from the stream).

As demonstrated, one has to look closely at the type parameters in `data ConduitT i o m r`

to understand a conduit.

All concepts are unified in one type. Most operations need specific combinators.

### Wrapping up conduit

Finally, we can get our Ints:

```
ints :: Monad m => m [Int]
= runConduit foldedInts ints
```

That’s basically conduit. A conduit as such doesn’t really express streams. Instead we’re dealing with stream processors (functions).

## Streamly

Streamly’s approach is very different. It focuses on the simple concept of a *stream* of elements. It has 4 main types:

- streams:
`(Monad m, IsStream t) => t m a`

- unfolds:
`data Unfold m a b`

- folds:
`data Fold m a b`

- parsers:
`newtype Parser m a b`

As can be seen, this is nothing like `data ConduitT i o m r`

. I also note that `IsStream t`

is abstract to allow for different types of streams like `SerialT`

or `AsyncT`

, which I won’t go into detail about here.

We’ll now figure out how these concepts translate to conduit.

### Producers

Conduits producers are basically *Unfolds*.

The simplest function to create an `Unfold`

is:

`unfoldr :: Applicative m => (a -> Maybe (b, a)) -> Unfold m a b`

…which actually looks a lot like conduit:

`unfold :: Monad m => (b -> Maybe (a, b)) -> b -> ConduitT i a m ()`

The difference in streamly is that we provide the initial seed value when we turn the Unfold into a Stream.

So, let’s do the same procedure as above. We’ll create a list of Chars:

```
-- equivalent to conduits 'sourceList', also provided by streamly
fromList :: Monad m => Unfold m [a] a
= unfoldr gen
fromList where
gen :: [a] -> Maybe (a, [a])
:xs) = Just (x,xs)
gen (x= Nothing -- stream aborts
gen _
-- provided by streamly
-- given a seed value, turn an Unfold into a stream
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
-- we turn the unfold into a stream of chars
chars :: (IsStream t, Monad m) => t m Char
= Streamly.Prelude.unfold fromList "abc" chars
```

This type `t m Char`

looks a lot simpler. It’s basically a glorified list with possible effects run for every element.

### Transformers

A transformer doesn’t have its own type. It’s in my opinion much simpler than conduit. Here, we can simply reuse the Prelude’s `fmap`

. The main difference is that we have an input and an output stream, so:

```
-- transforms Char to Int
charToInt :: (IsStream t, Monad m, Functor (t m)) => t m Char -> t m Int
= fmap ord inputStream
charToInt inputStream
-- applies the transformation to the chars, yielding a stream of Ints
ints :: (IsStream t, Monad m, Functor (t m)) => t m Int
= charToInt chars ints
```

This feels much more like lists! Compare with `fmap ord "abc"`

. Streams can be passed around and transformed just like lists. If you want to run effects for every item, you just use the Monad interface:

```
charToInt :: (IsStream t, Monad m, Monad (t m)) => t m Char -> t m Int
= inputStream >>= pure . ord charToInt inputStream
```

However, this creates a data dependency (as we’re used from Monad). There’s the more general `mapM`

that can run effects in parallel:

`mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b`

Excellent. So Functor, Monad etc. follow our intuition.

### Consumers

Simple consumers in streamly terms are usually **Folds**.

E.g. if we wanted to convert our stream of Ints to an actual list of Ints we would combine our input stream with a Fold.

Remember the Fold type `data Fold m a b`

, where `a`

are the values of the input stream and `b`

is the final folded value.

```
-- provided by streamly for creating a Fold
foldl' :: Monad m => (b -> a -> b) -> b -> Fold m a b
-- provided by streamly for executing a fold over a stream
fold :: Monad m => Fold m a b -> Stream m a -> m b
-- A Fold that turns any input stream into a list
toList :: Monad m => Fold m a [a]
= foldl' (\a b -> b:a) []
toList
-- Applying the Fold to an actual stream already executes it
foldedInts :: Monad m => m [Int]
= fold toList ints foldedInts
```

### Parsers

Folds don’t have a monadic interface (yet). If we want backtracking and a monadic interface to choose the next step depending on the current element in the stream, we can use a Parser.

In conduit, we can use consumers like head and peek and utilize the Monad interface of `ConduitT`

to make our decisions. Theoretically, we could do the same in the Stream type of streamly via uncons, but the parser feels more idiomatic here.

I note that there is a parser-like package conduit-parse, but the yaml conduit code doesn’t utilize that and this blog was written while I converted yaml to streamly.

The streamly parser type is the same as a Fold: `newtype Parser m a b`

.

It parses a streamed value `a`

into `b`

. Much of the API resembles what you’re used to of `parsec`

or `attoparsec`

etc.

Let’s look at this conduit code (not tested to compile):

```
import qualified Data.Conduit.Combinators as C
chars :: Monad m => ConduitT i Char m ()
= sourceList "a1b2c3"
chars
-- We parse '1' from 'a1', '2' from 'b2' and so on, no matter
-- the order the pairs appear in.
parse' :: MonadIO m => ConduitT Char o m [Int]
= do
parse' <- C.head
mc case mc of
Just 'a' -> do
<- C.head
mcn case mcn of
Just '1' -> (1:) <$> parse'
Just cn -> liftIO $ throwIO $ "Unexpected char: " ++ [cn]
Nothing -> pure []
Just ... -- and so on
Nothing -> pure []
```

To translate this to streamly, we would write:

```
chars :: (IsStream t, Monad m) => t m Char
= Streamly.Prelude.unfold fromList "a1b2c3"
chars
-- we define a helper that acts like conduits C.head
anyChar :: MonadCatch m => Parser m Char (Maybe Char)
= (Just <$> satisfy (const True)) <|> pure Nothing
anyChar
-- We parse '1' from 'a1', '2' from 'b2' and so on, no matter
-- the order the pairs appear in.
parse' :: MonadIO m => Parser m Char [Int]
= do
parse' <- anyChar
mc case mc of
Just 'a' -> do
<- anyChar
mcn case mcn of
Just '1' -> (1:) <$> parse'
Just cn -> liftIO $ throwIO $ "Unexpected char: " ++ [cn]
Nothing -> pure []
Just ... -- and so on
Nothing -> pure []
```

This looks *exactly* like the conduit code, except we replaced `head`

with `anyChar`

. Although we could likely reduce it further instead of pattern matching on the chars.

Running a parser is like running a fold. We need an input stream:

`parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b`

### Wrapping up streamly

Running a stream is usually done by applying a `Fold`

, as we’ve done above. We can also turn a stream into a list directly:

`toList :: Monad m => SerialT m a -> m [a]`

Or just evaluate the stream and discard the values:

`drain :: Monad m => SerialT m a -> m ()`

All these functions also exist as Folds, so these are just convenience wrappers.

As can be seen, streamly isn’t based on stream processors like conduit. Instead it composes stream data directly and behaves pretty much like lists. Usually we don’t need special operators. Functor, Monad etc. follow our intuition from lists.

We’ve also seen that there’s an abstract `IsStream`

class and specific streaming types like `SerialT`

(for serially processed streams), `AsyncT`

(for concurrent streams) and so on. These are explained in more detail in the streamly documentation.

## Back to yaml

So how does this translate to yaml parsing? Well, the `yaml`

package uses the libyaml C library for parsing, which is an event driven parser. So we get a stream of events and then turn that into a single JSON value and then let `aeson`

do its magic.

Finally, for reference, here’s the migration patch: https://github.com/hasufell/streamly-yaml/commit/bfd1da498588af906cbc5d3bb519f1ccdf7ad63e

In fact, it didn’t require a rewrite at all. Simply applying the concepts from above was enough. Figuring out that we need a Parser type etc. took a while (I tried with Fold first). Thanks to the helpful streamly developers for providing guidance. There were some rough edges here and there, since much of the streamly API is still marked as **Internal**.

### Performance

Did it actually improve performance?

On my first attempt, I used the wrong inefficient internal ParserD type, which seemed to cause exponential allocations. After fixing that, I was still slower than conduit. Since streamly heavily relies on GHCs inliner, this wasn’t a surprise. It required some effort, but finally the performance was on-par with conduit (tested informally via the `yaml2json`

executable on a 100mb YAML file).

Streamly also provides some guidance for optimization.

I guess since the actual parsing is done by the C code and the `event->json`

conversion is really a slow element-by-element monadic parsing transformation, there’s not much space to improve performance anyway.

If you find ideas about how to improve it further, please let me know.

### Dependency footprint

Did this actually reduce dependency footprint?

Well, no. But the point was to only depend on a single streaming framework. I also note that streamly is planning to split up the `streamly`

package into `streamly-core`

(only depends on boot packages) and separate out further feature-packages.

## Conclusion

- migrating conduit code to streamly is easier than I thought
- performance optimization in streamly requires some time and effort
- you definitely want performance regression tests with streamly to ensure new GHC versions or refactorings don’t cause regressions

## What’s next?

Writing a streamly yaml parser in pure Haskell?