See a typo? Have a suggestion? Edit this page on Github

Get new blog posts via email

I want to discuss two limitations in standard Haskell libraries around concurrency, and discuss methods of improving the status quo. Overall, Haskell's concurrency story is - in my opinion - the best in class versus any other language I'm aware of, at least for the single-machine use case. The following are two issues that I run into fairly regularly and are a surprising wart:

  • putStrLn is not thread-safe
  • Channels cannot be closed

Let me back up these claims, and then ask for some feedback on how to solve them.

putStrLn is not thread-safe

The example below is, in my opinion, a prime example of beautiful concurrency in Haskell:

#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async
import Control.Concurrent.Async
import Control.Monad (replicateM_)

worker :: Int -> IO ()
worker num = replicateM_ 5 $ putStrLn $ "Hi, I'm worker #" ++ show num

main :: IO ()
main = do
    mapConcurrently worker [1..5]
    return ()

Well, it's beautiful until you see the (abridged) output:

Hi, HIiH'HH,imii , ,,I w  'IoIIm'r'' mkmmw e  owrwwro ookr#rrek2kkre
ee rrr# H  3#i##
4,51

Your mileage may vary of course. The issue here is that Prelude.putStrLn works on String, which is a lazy list of Chars, and in fact sends one character at a time to stdout. This is clearly not what we want. However, at the same time, many Haskellers - myself included - consider String-based I/O a bad choice anyway. So let's replace this with Text-based I/O:

#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Monad (replicateM_)
import qualified Data.Text as T
import qualified Data.Text.IO as T

worker :: Int -> IO ()
worker num = replicateM_ 5
           $ T.putStrLn
           $ T.pack
           $ "Hi, I'm worker #" ++ show num

main :: IO ()
main = do
    mapConcurrently worker [1..5]
    return ()

Unfortunately, if you run this (at least via runghc), the results are the same. If you look at the implementation of Data.Text.IO.hPutStr, you'll see that there are different implementations of that function depending on the buffering straregy of the Handle we're writing to. In the case of NoBuffering (which is the default with GHCi and runghc), this will output one character at a time (just like String), whereas LineBuffering and BlockBuffering have batch behavior. You can see this with:

#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Monad (replicateM_)
import qualified Data.Text as T
import qualified Data.Text.IO as T
import System.IO

worker :: Int -> IO ()
worker num = replicateM_ 5
           $ T.putStrLn
           $ T.pack
           $ "Hi, I'm worker #" ++ show num

main :: IO ()
main = do
    hSetBuffering stdout LineBuffering
    mapConcurrently worker [1..5]
    return ()

While better, this still isn't perfect:

Hi, I'm worker #4Hi, I'm worker #5Hi, I'm worker #1


Hi, I'm worker #4Hi, I'm worker #5Hi, I'm worker #1


Hi, I'm worker #4Hi, I'm worker #5

Unfortunately, because newlines are written to stdout separately from the message, these kinds of issues happen too frequently. This can be worked around too by using putStr instead and manually appending a newline character:

#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Monad (replicateM_)
import qualified Data.Text as T
import qualified Data.Text.IO as T
import System.IO

worker :: Int -> IO ()
worker num = replicateM_ 5
           $ T.putStr
           $ T.pack
           $ "Hi, I'm worker #" ++ show num ++ "\n"

main :: IO ()
main = do
    hSetBuffering stdout LineBuffering
    mapConcurrently worker [1..5]
    return ()

Finally, we can avoid the buffering-dependent code in the text package and use ByteString output, which has the advantage of automatically using this append-a-newline logic for small-ish ByteStrings:

#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Monad (replicateM_)
import qualified Data.ByteString.Char8 as S8

worker :: Int -> IO ()
worker num = replicateM_ 100
           $ S8.putStrLn
           $ S8.pack
           $ "Hi, I'm worker #" ++ show num

main :: IO ()
main = do
    mapConcurrently worker [1..100]
    return ()

However, this has the downside of assuming a certain character encoding, which may be different from the encoding of the Handle.

What I'd like I would like a function Text -> IO () which - regardless of buffering strategy - appends a newline to the Text value and sends the entire chunk of data to a Handle in a thread-safe manner. Ideally it would account for character encoding (though assuming UTF8 may be an acceptable compromise for most use cases), and it would be OK if very large values are occassionally compromised during output (due to the write system call not accepting the entire chunk at once).

What I'd recommend today In a number of my smaller applications/scripts, I've become accustomed to defining a say = BS.hPutStrLn stdout . encodeUtf8. I'm tempted to add this to a library - possibly even classy-prelude - along with either reimplementing print as print = say . T.pack . show (or providing an alternative to print). I've also considered replacing the putStrLn in classy-prelude with this implementation of say.

However, I'm hoping others have some better thoughts on this, because I don't really find these solutions very appealing.

Non-closable channels

Let's implement a very simple multi-worker application with communication over a Chan:

#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad (forever)
import Data.Text (Text, pack)
import Data.Text.Encoding (encodeUtf8)
import qualified Data.ByteString.Char8 as S8

say :: Text -> IO ()
say = S8.putStrLn . encodeUtf8

worker :: Chan Int -> Int -> IO ()
worker chan num = forever $ do
    i <- readChan chan
    say $ pack $ concat
        [ "Worker #"
        , show num
        , " received value "
        , show i
        ]

main :: IO ()
main = do
    chan <- newChan
    mapConcurrently (worker chan) [1..5] `concurrently`
        mapM_ (writeChan chan) [1..10]
    return ()

(Yes, I used the aforementioned say function.)

This looks all well and good, but check out the end of the output:

Worker #5 received value 8
Worker #3 received value 9
Worker #1 received value 10
Main: thread blocked indefinitely in an MVar operation

You see, the worker threads have no way of knowing that there are no more writeChan calls incoming, so they continue to block. The runtime system notes this, and sends them an async exception to kill them. This is a really bad idea for program structure as it can easily lead to deadlocks. Said more simply:

If you rely on exceptions for non-exceptional cases, you're gonna have a bad time

Instead, the workers should have some way of knowing that the channel is closed. This is a common pattern in other languages, and one I think we should borrow. Implementing this with STM isn't too bad actually, and can easily have an IO-based API if desired:

#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Applicative ((<|>))
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Text (Text, pack)
import Data.Text.Encoding (encodeUtf8)
import qualified Data.ByteString.Char8 as S8

say :: Text -> IO ()
say = S8.putStrLn . encodeUtf8

data TCChan a = TCChan (TChan a) (TVar Bool)

newTCChan :: IO (TCChan a)
newTCChan = atomically $ TCChan <$> newTChan <*> newTVar False

closeTCChan :: TCChan a -> IO ()
closeTCChan (TCChan _ var) = atomically $ writeTVar var True

writeTCChan :: TCChan a -> a -> IO ()
writeTCChan (TCChan chan var) val = atomically $ do
    closed <- readTVar var
    if closed
        -- Could use nicer exception types, or return a Bool to
        -- indicate if writing failed
        then error "Wrote to a closed TCChan"
        else writeTChan chan val

readTCChan :: TCChan a -> IO (Maybe a)
readTCChan (TCChan chan var) = atomically $
    (Just <$> readTChan chan) <|> (do
        closed <- readTVar var
        check closed
        return Nothing)

worker :: TCChan Int -> Int -> IO ()
worker chan num =
    loop
  where
    loop = do
        mi <- readTCChan chan
        case mi of
            Nothing -> return ()
            Just i -> do
                say $ pack $ concat
                    [ "Worker #"
                    , show num
                    , " received value "
                    , show i
                    ]
                loop

main :: IO ()
main = do
    chan <- newTCChan
    mapConcurrently (worker chan) [1..5] `concurrently` do
        mapM_ (writeTCChan chan) [1..10]
        closeTCChan chan
    return ()

Fortunately, this problem has a preexisting solution: the stm-chans package, which provides closable and bounded channels and queues. Our problem above can be more easily implemented with:

#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text --package stm-chans
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TMQueue
import Data.Text (Text, pack)
import Data.Text.Encoding (encodeUtf8)
import qualified Data.ByteString.Char8 as S8

say :: Text -> IO ()
say = S8.putStrLn . encodeUtf8

worker :: TMQueue Int -> Int -> IO ()
worker q num =
    loop
  where
    loop = do
        mi <- atomically $ readTMQueue q
        case mi of
            Nothing -> return ()
            Just i -> do
                say $ pack $ concat
                    [ "Worker #"
                    , show num
                    , " received value "
                    , show i
                    ]
                loop

main :: IO ()
main = do
    q <- newTMQueueIO
    mapConcurrently (worker q) [1..5] `concurrently` do
        mapM_ (atomically . writeTMQueue q) [1..10]
        atomically $ closeTMQueue q
    return ()

What I'd like The biggest change needed here is just to get knowledge of this very awesome stm-chans package out there more. That could be with blog posts, or even better with links from the stm package itself. A step up from there could be to include this functionality in the stm package itself. Another possible niceity would be to add a non-STM API for these - whether based on STM or MVars internally - for more ease of use. I may take a first step here by simply depending on and reexporting stm-chans from classy-prelude.

What I'd recommend Probably pretty obvious: use stm-chans!

Like the previous point though, I'm interested to see how other people have approached this problem, since I haven't heard it discussed much in the past. Either others haven't run into this issue as frequently as I have, everyone already knows about stm-chans, or there's some other solution people prefer.

Get new blog posts via email