See a typo? Have a suggestion? Edit this page on Github
I want to discuss two limitations in standard Haskell libraries around concurrency, and discuss methods of improving the status quo. Overall, Haskell's concurrency story is - in my opinion - the best in class versus any other language I'm aware of, at least for the single-machine use case. The following are two issues that I run into fairly regularly and are a surprising wart:
putStrLn
is not thread-safe- Channels cannot be closed
Let me back up these claims, and then ask for some feedback on how to solve them.
putStrLn
is not thread-safe
The example below is, in my opinion, a prime example of beautiful concurrency in Haskell:
#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async
import Control.Concurrent.Async
import Control.Monad (replicateM_)
worker :: Int -> IO ()
worker num = replicateM_ 5 $ putStrLn $ "Hi, I'm worker #" ++ show num
main :: IO ()
main = do
mapConcurrently worker [1..5]
return ()
Well, it's beautiful until you see the (abridged) output:
Hi, HIiH'HH,imii , ,,I w 'IoIIm'r'' mkmmw e owrwwro ookr#rrek2kkre
ee rrr# H 3#i##
4,51
Your mileage may vary of course. The issue here is that
Prelude.putStrLn
works on String
, which is a lazy list of Char
s,
and in fact sends one character at a time to stdout
. This is clearly
not what we want. However, at the same time, many Haskellers -
myself included - consider String
-based I/O a bad choice anyway. So
let's replace this with Text
-based I/O:
#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Monad (replicateM_)
import qualified Data.Text as T
import qualified Data.Text.IO as T
worker :: Int -> IO ()
worker num = replicateM_ 5
$ T.putStrLn
$ T.pack
$ "Hi, I'm worker #" ++ show num
main :: IO ()
main = do
mapConcurrently worker [1..5]
return ()
Unfortunately, if you run this (at least via runghc
), the results
are the same. If you
look at the implementation of Data.Text.IO.hPutStr
,
you'll see that there are different implementations of that function
depending on the buffering straregy of the Handle
we're writing
to. In the case of NoBuffering
(which is the default with GHCi and
runghc
), this will output one character at a time (just like
String
), whereas LineBuffering
and BlockBuffering
have batch
behavior. You can see this with:
#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Monad (replicateM_)
import qualified Data.Text as T
import qualified Data.Text.IO as T
import System.IO
worker :: Int -> IO ()
worker num = replicateM_ 5
$ T.putStrLn
$ T.pack
$ "Hi, I'm worker #" ++ show num
main :: IO ()
main = do
hSetBuffering stdout LineBuffering
mapConcurrently worker [1..5]
return ()
While better, this still isn't perfect:
Hi, I'm worker #4Hi, I'm worker #5Hi, I'm worker #1
Hi, I'm worker #4Hi, I'm worker #5Hi, I'm worker #1
Hi, I'm worker #4Hi, I'm worker #5
Unfortunately, because
newlines are written to stdout separately from the message,
these kinds of issues happen too frequently. This can be worked around
too by using putStr
instead and manually appending a newline
character:
#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Monad (replicateM_)
import qualified Data.Text as T
import qualified Data.Text.IO as T
import System.IO
worker :: Int -> IO ()
worker num = replicateM_ 5
$ T.putStr
$ T.pack
$ "Hi, I'm worker #" ++ show num ++ "\n"
main :: IO ()
main = do
hSetBuffering stdout LineBuffering
mapConcurrently worker [1..5]
return ()
Finally, we can avoid the buffering-dependent code in the text package
and use ByteString
output, which has the advantage of automatically
using this append-a-newline logic for small-ish ByteString
s:
#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Monad (replicateM_)
import qualified Data.ByteString.Char8 as S8
worker :: Int -> IO ()
worker num = replicateM_ 100
$ S8.putStrLn
$ S8.pack
$ "Hi, I'm worker #" ++ show num
main :: IO ()
main = do
mapConcurrently worker [1..100]
return ()
However, this has the downside of assuming a certain character
encoding, which may be different from the encoding of the Handle
.
What I'd like I would like a function Text -> IO ()
which -
regardless of buffering strategy - appends a newline to the Text
value and sends the entire chunk of data to a Handle
in a
thread-safe manner. Ideally it would account for character encoding
(though assuming UTF8 may be an acceptable compromise for most use
cases), and it would be OK if very large values are occassionally
compromised during output (due to the write
system call not
accepting the entire chunk at once).
What I'd recommend today In a number of my smaller
applications/scripts, I've become accustomed to defining a say = BS.hPutStrLn stdout . encodeUtf8
. I'm tempted to add this to a
library - possibly even classy-prelude
- along with either
reimplementing print
as print = say . T.pack . show
(or providing
an alternative to print
). I've also considered replacing the putStrLn
in
classy-prelude
with this implementation of say
.
However, I'm hoping others have some better thoughts on this, because I don't really find these solutions very appealing.
Non-closable channels
Let's implement a very simple multi-worker application with
communication over a Chan
:
#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent
import Control.Concurrent.Async
import Control.Monad (forever)
import Data.Text (Text, pack)
import Data.Text.Encoding (encodeUtf8)
import qualified Data.ByteString.Char8 as S8
say :: Text -> IO ()
say = S8.putStrLn . encodeUtf8
worker :: Chan Int -> Int -> IO ()
worker chan num = forever $ do
i <- readChan chan
say $ pack $ concat
[ "Worker #"
, show num
, " received value "
, show i
]
main :: IO ()
main = do
chan <- newChan
mapConcurrently (worker chan) [1..5] `concurrently`
mapM_ (writeChan chan) [1..10]
return ()
(Yes, I used the aforementioned say
function.)
This looks all well and good, but check out the end of the output:
Worker #5 received value 8
Worker #3 received value 9
Worker #1 received value 10
Main: thread blocked indefinitely in an MVar operation
You see, the worker threads have no way of knowing that there are no more writeChan
calls incoming, so they continue to block. The runtime system notes this, and sends them an async exception to kill them. This is a really bad idea for program structure as it can easily lead to deadlocks. Said more simply:
Instead, the workers should have some way of knowing that the channel
is closed. This is a common pattern in other languages, and one I
think we should borrow. Implementing this with STM isn't too bad
actually, and can easily have an IO
-based API if desired:
#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text
{-# LANGUAGE OverloadedStrings #-}
import Control.Applicative ((<|>))
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Text (Text, pack)
import Data.Text.Encoding (encodeUtf8)
import qualified Data.ByteString.Char8 as S8
say :: Text -> IO ()
say = S8.putStrLn . encodeUtf8
data TCChan a = TCChan (TChan a) (TVar Bool)
newTCChan :: IO (TCChan a)
newTCChan = atomically $ TCChan <$> newTChan <*> newTVar False
closeTCChan :: TCChan a -> IO ()
closeTCChan (TCChan _ var) = atomically $ writeTVar var True
writeTCChan :: TCChan a -> a -> IO ()
writeTCChan (TCChan chan var) val = atomically $ do
closed <- readTVar var
if closed
-- Could use nicer exception types, or return a Bool to
-- indicate if writing failed
then error "Wrote to a closed TCChan"
else writeTChan chan val
readTCChan :: TCChan a -> IO (Maybe a)
readTCChan (TCChan chan var) = atomically $
(Just <$> readTChan chan) <|> (do
closed <- readTVar var
check closed
return Nothing)
worker :: TCChan Int -> Int -> IO ()
worker chan num =
loop
where
loop = do
mi <- readTCChan chan
case mi of
Nothing -> return ()
Just i -> do
say $ pack $ concat
[ "Worker #"
, show num
, " received value "
, show i
]
loop
main :: IO ()
main = do
chan <- newTCChan
mapConcurrently (worker chan) [1..5] `concurrently` do
mapM_ (writeTCChan chan) [1..10]
closeTCChan chan
return ()
Fortunately, this problem has a preexisting solution: the stm-chans package, which provides closable and bounded channels and queues. Our problem above can be more easily implemented with:
#!/usr/bin/env stack
-- stack --resolver lts-6.23 --install-ghc runghc --package async --package text --package stm-chans
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TMQueue
import Data.Text (Text, pack)
import Data.Text.Encoding (encodeUtf8)
import qualified Data.ByteString.Char8 as S8
say :: Text -> IO ()
say = S8.putStrLn . encodeUtf8
worker :: TMQueue Int -> Int -> IO ()
worker q num =
loop
where
loop = do
mi <- atomically $ readTMQueue q
case mi of
Nothing -> return ()
Just i -> do
say $ pack $ concat
[ "Worker #"
, show num
, " received value "
, show i
]
loop
main :: IO ()
main = do
q <- newTMQueueIO
mapConcurrently (worker q) [1..5] `concurrently` do
mapM_ (atomically . writeTMQueue q) [1..10]
atomically $ closeTMQueue q
return ()
What I'd like The biggest change needed here is just to get
knowledge of this very awesome stm-chans
package out there
more. That could be with blog posts, or even better with links from
the stm
package itself. A step up from there could be to include
this functionality in the stm
package itself. Another possible
niceity would be to add a non-STM API for these - whether based on STM
or MVars internally - for more ease of use. I may take a first step
here by simply depending on and reexporting stm-chans
from
classy-prelude
.
What I'd recommend Probably pretty obvious: use stm-chans
!
Like the previous point though, I'm interested to see how other people
have approached this problem, since I haven't heard it discussed much
in the past. Either others haven't run into this issue as frequently
as I have, everyone already knows about stm-chans
, or there's some
other solution people prefer.