Part of the "Why use F#?" series (more)

In this post, we'll look at the message-based (or actor-based) approach to concurrency.

In this approach, when one task wants to communicate with another, it sends it a message, rather than contacting it directly. The messages are put on a queue, and the receiving task (known as an "actor" or "agent") pulls the messages off the queue one at a time to process them.

This message-based approach has been applied to many situations, from low-level network sockets (built on TCP/IP) to enterprise wide application integration systems (for example MSMQ or IBM WebSphere MQ).

From a software design point of view, a message-based approach has a number of benefits:

  • You can manage shared data and resources without locks.
  • You can easily follow the "single responsibility principle", because each agent can be designed to do only one thing.
  • It encourages a "pipeline" model of programming with "producers" sending messages to decoupled "consumers", which has additional benefits:
    • The queue acts as a buffer, eliminating waiting on the client side.
    • It is straightforward to scale up one side or the other of the queue as needed in order to maximize throughput.
    • Errors can be handled gracefully, because the decoupling means that agents can be created and destroyed without affecting their clients.

From a practical developer's point of view, what I find most appealing about the message-based approach is that when writing the code for any given actor, you don't have to hurt your brain by thinking about concurrency. The message queue forces a "serialization" of operations that otherwise might occur concurrently. And this in turn makes it much easier to think about (and write code for) the logic for processing a message, because you can be sure that your code will be isolated from other events that might interrupt your flow.

With these advantages, it is not surprising that when a team inside Ericsson wanted to design a programming language for writing highly-concurrent telephony applications, they created one with a message-based approach, namely Erlang. Erlang has now become the poster child for the whole topic, and has created a lot of interest in implementing the same approach in other languages.

How F# implements a message-based approach

F# has a built-in agent class called MailboxProcessor. These agents are very lightweight compared with threads - you can instantiate tens of thousands of them at the same time.

These are similar to the agents in Erlang, but unlike the Erlang ones, they do not work across process boundaries, only in the same process. And unlike a heavyweight queueing system such as MSMQ, the messages are not persistent. If your app crashes, the messages are lost.

But these are minor issues, and can be worked around. In a future series, I will go into alternative implementations of message queues. The fundamental approach is the same in all cases.

Let's see a simple agent implementation in F#:

#nowarn "40"
let printerAgent = MailboxProcessor.Start(fun inbox-> 

    // the message processing function
    let rec messageLoop = async{
        
        // read a message
        let! msg = inbox.Receive()
        
        // process a message
        printfn "message is: %s" msg

        // loop to top
        return! messageLoop  
        }

    // start the loop 
    messageLoop 
    )

The MailboxProcessor.Start function takes a simple function parameter. That function loops forever, reading messages from the queue (or "inbox") and processing them.

Note: I have added the #nowarn "40" pragma to avoid the warning "FS0040", which can be safely ignored in this case.

Here's the example in use:

// test it
printerAgent.Post "hello" 
printerAgent.Post "hello again" 
printerAgent.Post "hello a third time" 

In the rest of this post we'll look at two slightly more useful examples:

  • Managing shared state without locks
  • Serialized and buffered access to shared IO

In both of these cases, a message based approach to concurrency is elegant, efficient, and easy to program.

Managing shared state

Let's look at the shared state problem first.

A common scenario is that you have some state that needs to be accessed and changed by multiple concurrent tasks or threads. We'll use a very simple case, and say that the requirements are:

  • A shared "counter" and "sum" that can be incremented by multiple tasks concurrently.
  • Changes to the counter and sum must be atomic -- we must guarantee that they will both be updated at the same time.

The locking approach to shared state

Using locks or mutexes is a common solution for these requirements, so let's write some code using a lock, and see how it performs.

First let's write a static LockedCounter class that protects the state with locks.

open System
open System.Threading
open System.Diagnostics

// a utility function
type Utility() = 
    static let rand = new Random()
    
    static member RandomSleep() = 
        let ms = rand.Next(1,10)
        Thread.Sleep ms

// an implementation of a shared counter using locks
type LockedCounter () = 

    static let _lock = new Object()

    static let mutable count = 0
    static let mutable sum = 0

    static let updateState i = 
        // increment the counters and...
        sum <- sum + i
        count <- count + 1
        printfn "Count is: %i. Sum is: %i" count sum 

        // ...emulate a short delay
        Utility.RandomSleep()


    // public interface to hide the state
    static member Add i = 
        // see how long a client has to wait
        let stopwatch = new Stopwatch()
        stopwatch.Start() 

        // start lock. Same as C# lock{...}
        lock _lock (fun () ->
        
            // see how long the wait was
            stopwatch.Stop()
            printfn "Client waited %i" stopwatch.ElapsedMilliseconds

            // do the core logic
            updateState i 
            )
        // release lock

Some notes on this code:

  • This code is written in using a very imperative approach, with mutable variables and locks
  • The public Add method has explicit Monitor.Enter and Monitor.Exit expressions to get and release the lock. This is the same as the lock{...} statement in C#.
  • We've also added a stopwatch to measure how long a client has to wait to get the lock.
  • The core "business logic" is the updateState method, which not only updates the state, but adds a small random wait as well to emulate the time taken to do the processing.

Let's test it in isolation:

// test in isolation
LockedCounter.Add 4
LockedCounter.Add 5

Next, we'll create a task that will try to access the counter:

let makeCountingTask addFunction taskId  = async {
    let name = sprintf "Task%i" taskId
    for i in [1..3] do 
        addFunction i
    }

// test in isolation
let task = makeCountingTask LockedCounter.Add 1
Async.RunSynchronously task

In this case, when there is no contention at all, the wait times are all 0.

But what happens when we create 10 child tasks that all try to access the counter at once:

let lockedExample5 = 
    [1..10]
        |> List.map (fun i -> makeCountingTask LockedCounter.Add i)
        |> Async.Parallel
        |> Async.RunSynchronously
        |> ignore

Oh dear! Most tasks are now waiting quite a while. If two tasks want to update the state at the same time, one must wait for the other's work to complete before it can do its own work, which affects performance.

And if we add more and more tasks, the contention will increase, and the tasks will spend more and more time waiting rather than working.

The message-based approach to shared state

Let's see how a message queue might help us. Here's the message based version:

type MessageBasedCounter () = 

    static let updateState (count,sum) msg = 

        // increment the counters and...
        let newSum = sum + msg
        let newCount = count + 1
        printfn "Count is: %i. Sum is: %i" newCount newSum 

        // ...emulate a short delay
        Utility.RandomSleep()

        // return the new state
        (newCount,newSum)

    // create the agent
    static let agent = MailboxProcessor.Start(fun inbox -> 

        // the message processing function
        let rec messageLoop oldState = async{

            // read a message
            let! msg = inbox.Receive()

            // do the core logic
            let newState = updateState oldState msg

            // loop to top
            return! messageLoop newState 
            }

        // start the loop 
        messageLoop (0,0)
        )

    // public interface to hide the implementation
    static member Add i = agent.Post i

Some notes on this code:

  • The core "business logic" is again in the updateState method, which has almost the same implementation as the earlier example, except the state is immutable, so that a new state is created and returned to the main loop.
  • The agent reads messages (simple ints in this case) and then calls updateState method
  • The public method Add posts a message to the agent, rather than calling the updateState method directly
  • This code is written in a more functional way; there are no mutable variables and no locks anywhere. In fact, there is no code dealing with concurrency at all! The code only has to focus on the business logic, and is consequently much easier to understand.

Let's test it in isolation:

// test in isolation
MessageBasedCounter.Add 4
MessageBasedCounter.Add 5

Next, we'll reuse a task we defined earlier, but calling MessageBasedCounter.Add instead:

let task = makeCountingTask MessageBasedCounter.Add 1
Async.RunSynchronously task

Finally let's create 5 child tasks that try to access the counter at once.

let messageExample5 = 
    [1..5]
        |> List.map (fun i -> makeCountingTask MessageBasedCounter.Add i)
        |> Async.Parallel
        |> Async.RunSynchronously
        |> ignore

We can't measure the waiting time for the clients, because there is none!

Shared IO

A similar concurrency problem occurs when accessing a shared IO resource such as a file:

  • If the IO is slow, the clients can spend a lot of time waiting, even without locks.
  • If multiple threads write to the resource at the same time, you can get corrupted data.

Both problems can be solved by using asynchronous calls combined with buffering -- exactly what a message queue does.

In this next example, we'll consider the example of a logging service that many clients will write to concurrently. (In this trivial case, we'll just write directly to the Console.)

We'll first look at an implementation without concurrency control, and then at an implementation that uses message queues to serialize all requests.

IO without serialization

In order to make the corruption very obvious and repeatable, let's first create a "slow" console that writes each individual character in the log message and pauses for a millisecond between each character. During that millisecond, another thread could be writing as well, causing an undesirable interleaving of messages.

let slowConsoleWrite msg = 
    msg |> String.iter (fun ch->
        System.Threading.Thread.Sleep(1)
        System.Console.Write ch
        )

// test in isolation
slowConsoleWrite "abc"

Next, we will create a simple task that loops a few times, writing its name each time to the logger:

let makeTask logger taskId = async {
    let name = sprintf "Task%i" taskId
    for i in [1..3] do 
        let msg = sprintf "-%s:Loop%i-" name i
        logger msg 
    }

// test in isolation
let task = makeTask slowConsoleWrite 1
Async.RunSynchronously task

Next, we write a logging class that encapsulates access to the slow console. It has no locking or serialization, and is basically not thread-safe:

type UnserializedLogger() = 
    // interface
    member this.Log msg = slowConsoleWrite msg

// test in isolation
let unserializedLogger = UnserializedLogger()
unserializedLogger.Log "hello"

Now let's combine all these into a real example. We will create five child tasks and run them in parallel, all trying to write to the slow console.

let unserializedExample = 
    let logger = new UnserializedLogger()
    [1..5]
        |> List.map (fun i -> makeTask logger.Log i)
        |> Async.Parallel
        |> Async.RunSynchronously
        |> ignore

Ouch! The output is very garbled!

Serialized IO with messages

So what happens when we replace UnserializedLogger with a SerializedLogger class that encapsulates a message queue.

The agent inside SerializedLogger simply reads a message from its input queue and writes it to the slow console. Again there is no code dealing with concurrency and no locks are used.

type SerializedLogger() = 

    // create the mailbox processor
    let agent = MailboxProcessor.Start(fun inbox -> 

        // the message processing function
        let rec messageLoop () = async{

            // read a message
            let! msg = inbox.Receive()

            // write it to the log
            slowConsoleWrite msg

            // loop to top
            return! messageLoop ()
            }

        // start the loop
        messageLoop ()
        )

    // public interface
    member this.Log msg = agent.Post msg

// test in isolation
let serializedLogger = SerializedLogger()
serializedLogger.Log "hello"

So now we can repeat the earlier unserialized example but using the SerializedLogger instead. Again, we create five child tasks and run them in parallel:

let serializedExample = 
    let logger = new SerializedLogger()
    [1..5]
        |> List.map (fun i -> makeTask logger.Log i)
        |> Async.Parallel
        |> Async.RunSynchronously
        |> ignore

What a difference! This time the output is perfect.

Summary

There is much more to say about this message based approach. In a future series, I hope to go into much more detail, including discussion of topics such as:

  • alternative implementations of message queues with MSMQ and TPL Dataflow.
  • cancellation and out of band messages.
  • error handling and retries, and handling exceptions in general.
  • how to scale up and down by creating or removing child agents.
  • avoiding buffer overruns and detecting starvation or inactivity.

Comments

blog comments powered by Disqus