Merge Pipelines using Monoids

Note: These are my rough notes from Gabriel Gonzalez’s equational reasoning talk on How to Prove Large Software Projects Correct.

A Simple Task Pipeline

Say you have a task pipeline, where each task (aka job) goes through five stages:

generate -> package -> upload -> deploy -> log

Let’s say generate generates a bunch of files, package puts them together into one package, upload uploads that package to a server, deploy runs that package on the server, and log writes some log information.

Now, you want to get some output from each stage, like “Packaging job #4” from job #4 when it goes through the packaging stage.

So, it would make sense to have a function package of the type Int -> IO (). However, the function for each stage might depend on the outputs from previous stages, not just the overall input.

So then, we could have something like generate :: Int -> IO a and package :: a -> IO b and so on.

import Data.Monoid ((<>))
import Control.Monad

instance Monoid b => Monoid (IO b) where
    mempty = return mempty
    mappend = liftM2 mappend

generate1 :: Int -> IO Int
generate1 n = do
  putStrLn ("Generating job #" <> show n)
  return n

package1 :: Int -> IO Int
package1 n = do
  putStrLn ("Packaging job #" <> show n)
  return n

upload1 :: Int -> IO Int
upload1 n = do
  putStrLn ("Uploading job #" <> show n)
  return n

deploy1 :: Int -> IO Int
deploy1 n = do
  putStrLn ("Deploying job #" <> show n)
  return n

log1 :: Int -> IO Int
log1 n = do
  putStrLn ("Logging job #" <> show n)
  return n

job1 :: Int -> IO Int
job1 = generate1 >=> package1 >=> upload1 >=> deploy1 >=> log1

main = job1 1

To run the whole job (as in job1), you thread each stage’s output into the next stage’s input using the fish operator >=>.

Here’s the output:

Generating job #1
Packaging job #1
Uploading job #1
Deploying job #1
Logging job #1

How to Run Two Jobs together?

Now, if you had another batch job, you’d write its version of the pipeline, with different functions for generate, package, etc.:

job2 :: Int -> IO Int
job2 = generate2 >=> package2 >=> upload2 >=> deploy2 >=> log2

How to run them together? Simple, just do them one after the other in main:

main = do job1 1; job2 2

You will get the output:

Generating job #1
Packaging job #1
Uploading job #1
Deploying job #1
Logging job #1
Generating job #2
Packaging job #2
Uploading job #2
Deploying job #2
Logging job #2

They are done separately and the messages they print are collected separately. But you want the similar stages of the pipeline to be done together. You want all the generate stages of the jobs to be done together, and then the package stages, and so on. How to do that?

One way is to have a messy Job data structure which stores the functions representing each stage of this job’s pipeline (generate2, etc.) along with this job’s input data.

So, you will get n Job objects for all the different jobs. You will now have to map over the objects and run all of their different generate stages and store the outputs. Then, you have to map over them and run their different package stages with the stored result output as input and store this output, and so on through all the stages. Messy.

You have to do quite a bit of book-keeping to thread each job’s information to its next stage. This is nuts. You already specified the pipeline perfectly well once. All you want to do extra is group different pipelines so that they move through the stages together. But you have to repeat the information all over again.

Merging Two Jobs

What if your pipeline knew how to add itself to other pipelines?

Enter the Monoid.

Use the same functions for each job’s pipeline (generate2, etc.) but put them together in a different way.

job2 :: Int -> IO (IO (IO (IO (IO Int))))
job2 a = do
  b <- generate2 a
  return (do c <- package2 b
             return (do d <- upload2 c
                        return (do e <- deploy2 d
                                   return (log2 e))))

Now all you need is a deployment function that takes that nested IO subroutine and turns it into a flat IO subroutine.

deploy' :: IO (IO (IO (IO (IO ())))) -> IO ()
deploy' = join . join . join . join

Finally, add all the job pipelines together to create a unified job pipeline and deploy it.

job = job1 1 <> job2 2 <> job3 3
main = deploy' job

Now all the jobs run their stages together:

Generating job #1
Generating job #2
Generating job #3
Packaging job #1
Packaging job #2
Packaging job #3
Uploading job #1
Uploading job #2
Uploading job #3
Deploying job #1
Deploying job #2
Deploying job #3
Logging job #1
Logging job #2
Logging job #3

Monoid Proofs Scale

Why was that? How come we didn’t have to struggle to merge all their respective stages?

Well, that funky nested type IO (IO (IO (IO (IO ())))) is a monoid. Why? Because its inner funky nested type IO (IO (IO (IO ()))) is a monoid, and so on all the way down to the unit type ().

And we had made the IO monad a monoid by getting the results of the two subroutines and then calling mappend on those two.

So, the way execution will proceed is by computing the first stage of the pipeline (generate1, etc.) for all the jobs. That will give us the inner subroutines. Then, by way of doing mappend on all those subroutines, we will compute the next stage of the pipeline (package1, etc.), and so on till we reach the end of the pipeline for all the jobs.

Thus, by making our job pipeline a Monoid, we were able to merge different job pipelines so that they went through each stage together.

As Gabriel says, you get a staged deploy system in two lines of code (just the deploy function and the job concatenation) without doing any other work. Plus, you can prove the correctness of interleaving the jobs by looking at their type IO(IO(IO(IO(IO())))), which is a monoid.

The key insight here is that IO a is a monoid such that mappend on two IO a values will perform their two computations together (respecting the order in which they were mappend-ed). And it will then handle their nested values together. So, naturally, mappend on two IO (IO a) values will do the outer computations first in order, and then the inner computations.

All this is possible because of the automatic typeclass instances such as:

instance Monoid a => Monoid (Maybe a)
instance Monoid b => Monoid (a -> b)
instance (Monoid a, Monoid b) => Monoid (a, b)

This is what lets you scale composition.

(Note that you don’t have this neat property for monads. Monad m does not imply that Monad (m' m). This is why we need the whole idea of monad transformers.)

Monoids let you Compose Complex Types

Basically, the main advantage of monoids is when you have complex data structures (or even functions and mixed up stuff). Then you don’t have to worry about how they go together or what operators they use.

For a function monoid, it’s simple - you can just get the outputs and append them yourself. Big deal. For a unit test monoid, instead of mappending fifteen tests, you can just put them in a list and do a fold using the correct function.

But what if you have a tree of tests (five tests that go together here, seven there, etc.)? How will you fold them? You have to define the tree and make it Foldable then use foldMap and such to collapse it into one test. With a monoid, though, your job is simplified. And the test runner only has to know how to run one test, that’s all. It doesn’t have to know anything more about the tree structure or anything.

Basically, instead of dealing with complex structures by hand (yes, you can do it, but why?!), you just deal with one simple value at each point - one monoid. If you get one more, you mappend that with this sucker and end up with… still one monoid. The complexity of your code thus never gets out of hand.

Sometimes it can be hard to get at the “inside” of a monad to extract the value and then do stuff with it. For example, you have three Maybe (String, String) values and want to concat the strings (if any). Normally, you’d have to use catMaybes :: [Maybe a] -> [a] to get the existent pairs and then maybe split the pairs and then concatenate the results.

foo = [Just ("foo", "boyz"), Nothing, Just ("ounht", "boeut")]
bar = catMaybes foo

result = (concat (map fst bar), concat (map snd bar)

-- ("fooounht","boyzboeut")

But if you realize that all you want to do with the inner values is mappend them (like any monoid), then you can see that Maybe m is a monoid when m is a monoid. Now your answer simply becomes:

mconcat foo
-- Just ("fooounht","boyzboeut")

The same goes for functions that have a common input and a monoid output. No need to extract their values and do complicated stuff, because you know that all you want to do is mappend their results. So, just treat the function like a monoid.

greeting s = "Hello, Mr. " ++ s ++ ", welcome to our store.\n"
offer s = "Here are some great offers we have for you, Mr. " ++ s ++ ".\n"
sendoff s = "It was a pleasure meeting you, Mr. " ++ s

putStr $ greeting <> offer <> sendoff $ "Yoboyz"
-- Hello, Mr. Yoboyz, welcome to our store.
-- Here are some great offers we have for you.
-- It was a pleasure meeting you, Mr. Yoboyz

Notes

Aside: instance Monoid b => Monoid (IO b) doesn’t seem to be part of Haskell by default and I don’t know if it’s good style to add an instance here (I believe it’s called an orphan instance). But I think it is fine for the current purpose.

Created: September 10, 2015
Last modified: January 1, 2018
Status: in-progress notes
Tags: notes, haskell

comments powered by Disqus