The reactive paradigm is a wonderful thing. The basic idea is that a reactive application, as much as possible, is asynchronous from beginning to end. It should be event driven, fault tolerant, scalable and responsive. More details on reactive programming can be found in
Writing an asynchronous application, however, has its own set of unique challenges. On a recent project I was working on we were building an application architected on the principles of reactive programming and CQRS. For this application we utilized the Spray library to handle http requests in an asynchronous manner. The EventSourced library was used to handle state persistence and the at-least-once message delivery guarantee. The features of this library are currently being rolled into the new AKKA Persistence which will soon become its replacement.
MongoDB was chosen because of its ability to scale, handle high volume traffic and because its aggregation framework lends itself nicely to big data applications. In conjunction with that we used the Reactive Mongo driver to facilitate asynchronous database access. The AKKA library was used to provide the fault tolerance, scalability and remoting.
In this post I’ll demonstrate an approach we took to solve the challenge of maintaining a definite order, specifically when performing database updates in asynchronous code within an actor.
A little background
In implementing CQRS, we separated the read and write-side concerns into separate modules.
When a command to add, update or delete data is processed on the write side module, it is first logged in the write-ahead log via the EventSourced api. After logging the command, a remote message is propagated to a read-side AKKA actor. The update is handled in the actor and persisted in a format optimized for query.
The read-side actors were configured to use the default mailbox implementation (FIFO), so messages would be processed in the order received. When a message is received on the read side, an insert or update of a MongoDB document occurs.
The Problem
When handling mutation of in-memory or persisted state within an AKKA receive asynchronously, we can guarantee the order that messages are received but we can not guarantee the order in which the updates are processed. Solving this problem by blocking within the actor to enforce ordering of updates is not an option. This is because blocking within an actor’s receive method or within a method that it calls can lead to thread starvation. The problems associated with blocking in the receive are compounded in remote actors, where you’ll see issues such as latency spikes and timeouts on ask/reply messages.
An actor implementation
Lets look at an example of an actor implementation that receives a message then updates in an asynchronous manner. For the sake of simplicity, we will replace MongoDB with an in memory cache. We will use a simplified form of the age-old bank account example, and we’ll limit it to only one bank account.
The bank account class will intentionally not be thread safe, to illustrate a problem. Imagine that the in-memory bank account is the MongoDB database we wish to query from and update. This example replicates the asynchronous nature of the Reactive Mongo driver.
Let’s first define the bank account class:
class BankAccount { private var accountBalance = 0.0 def incrementBalance(deposit:Double) = { val previousBalance = accountBalance Thread.sleep(Random.nextInt(5) * 50) accountBalance = previousBalance + deposit } def balance = accountBalance }
The BankAccount class has only two methods, one to increment the balance and one to display the resulting balance in the console.
In the incrementBalance method you will notice the thread sleep call. This randomizes the time between retrieval and update, intentionally creating the problem that we had witnessed when processing a heavy load of messages that asynchronously performed updates to MongoDB.
Next we define two message classes for the messages. We’ll use these classes to send our requests to the actor, asking it to perform a deposit and display the balance to the console:
case object DisplayBalance case class Deposit(value: Int)
Finally we define the actor that will receive those messages. We’ll call it ChaosActor:
class ChaosActor extends Actor { import scala.concurrent.future private val bankAccount = new BankAccount() def receive = { case Deposit(value) => future { bankAccount.incrementBalance(value) } case DisplayBalance => println(s"Final balance is: ${bankAccount.balance}") } }
Take a look above at receive, which is a partial function where you can pattern-match for messages. In our handling of the Deposit message, you’ll notice that we’ve wrapped the code in a future closure.
Future has the signature of future[T](body: =>T). It takes a function that returns a value of type T. The future closure then returns a Future that will complete at some later point in time. I’m using the future to simulate the asynchronous nature of the Reactive Mongo driver. The find, update and insert methods of the Reactive Mongo api return either a future or a cursor for which when a headOption or toList is applied will return a future.
Now, let’s create an application to run from the SBT REPL.
object Foo extends App { override def main(args: Array[String]) { implicit val actorSystem = ActorSystem("OrderedActorSystem") val actor = actorSystem.actorOf(Props(new ChaosActor())) (0 until 30).foreach(cnt => actor ! Deposit(50)) Thread.sleep(10000) actor ! DisplayBalance actorSystem.shutdown() } }
The application’s main method initializes an actor system, and sends the Deposit message to the actor 30 times, each time with a deposit value of 50 dollars. It then sleeps for 10 seconds to allow all of the futures to complete. Finally it dispatches a message to the actor to display the balance, and it shuts down the actor system.
Executing the initial example
In the REPL you can now execute run to start up the application. After making 30 deposits of 50 dollars you would expect that the final balance to be 1500.00. We’ll run it several times and review the final balance in the console:
> run [info] Running Foo Final balance is: 150.0 [success] Total time: 10 s, completed Jan 29, 2014 4:31:58 PM > run [info] Running Foo Final balance is: 300.0 [success] Total time: 10 s, completed Jan 29, 2014 4:32:10 PM > run [info] Running Foo Final balance is: 150.0 [success] Total time: 10 s, completed Jan 29, 2014 4:32:23 PM
What happened here? Not only are the balances incorrect but they are inconsistent. Chaos! Why?
The balances are invalid because some futures had their balances altered between the retrieval and update steps of the incrementBalance method. In our real-world application, we first encountered this while using the ReactiveMongo driver, but as you may know, this can easily occur in other circumstances where asynchronous calls are being made.
Avoiding inconsistency
Let’s revisit the actor. To address this problem we can mix in the Stash and ActorLogging traits from the AKKA library. We’ve renamed our actor to OrderedActor:
class OrderedActor extends Actor with Stash with ActorLogging { ... def receive { ... } }
The Stash trait provides us with a method named stash, which tells the actor to stash away the last message it had received. Stashed messages can then be un-stashed and processed at a later time by calling the unstashAll method, which will prepend all the stashed messages into the actor’s mailbox.
By default the stash trait can only be used in actors that have a deque-based mailbox. In this particular case we also want the mailbox to be unbounded.
To address both of these concerns, let’s revisit the application and configure the actor system with an unbound, dequeue-based mailbox.
object Foo extends App { override def main(args: Array[String]) { val config = ConfigFactory.parseString(""" akka{ actor { queued-dispatcher { mailbox-type =”akka.dispatch. ↵ UnboundedDequeueBasedMailBox” } } }""") implicit val actorSystem = ActorSystem("OrderedActorSystem", ConfigFactory.load(config)) val actor = actorSystem.actorOf(Props(new OrderedActor())) (0 until 30).foreach(cnt => actor ! Deposit(50)) Thread.sleep(10000) actor ! DisplayBalance actorSystem.shutdown() } }
Now we’ll focus on the receive partial function in our actor.
Each actor implicitly has an actorContext reference. The actor context contains two methods of interest here. The become(Actor.Receive, Boolean) method takes a partial function and a boolean. When this is called, it will replace the receive partial function in the actor with a new implementation. If the boolean argument is passed as false, the method will push the current implementation onto the hotswap stack. We are going to use this hot swap feature below.
The second method, unbecome(), will replace the current implementation of the receive partial function with the next one on the stack.
Let’s first provide an alternate implementation for the receive, we’ll call it waiting, and a new message we’ll call ResumeMessageProcessing.
Ordering our chaos – stashing, stacks and message processing
class OrderedActor extends Actor with Stash with ActorLogging { def receive = { case Deposit(value) => future { bankAccount.incrementBalance(value) } case DisplayBalance => println(s"Final balance is: ${bankAccount.balance}") } def waiting: Receive = { case ResumeMessageProcessing => context.unbecome() unstashAll() case _ ⇒ stash() } } case object ResumeMessageProcessing
When a ResumeMessageProcessing message is received, the unbecome call will be made to the actor context, seen in the sample above on line twelve. This swaps out the current receive implementation with the next one on the stack.
The unstashAll method will then be called on line thirteen, which un-stashes all previously stashed messages. Any message that doesn’t match ResumeMessageProcessing will be stashed away for later processing.
Now we’ll modify the handling of the Deposit message in the receive method:
class OrderedActor extends Actor with Stash with ActorLogging { def receive = { case Deposit(value) => context.become(waiting, discardOld = false) future { bankAccount.incrementBalance(value) }.onComplete{ case Failure(e) => log.error(e, "An error has occured") self ! ResumeMessageProcessing case Success(v) => self ! ResumeMessageProcessing } case DisplayBalance => println(s"Final balance is: ${bankAccount.balance}") } def waiting: Receive = { case ResumeMessageProcessing => context.unbecome() unstashAll() case _ ⇒ stash() } } case object ResumeMessageProcessing
On line five we’ve swapped out the implementation of the receive method. This means any further messages received will be handled by the hot-swapped implementation, which stashes away messages not of type ResumeMessageProcessing. We do this by making a call to stash as shown on line twenty three.
On line six we then spawn a future with a function which will complete at some later point in time. In the handling of the completion of that future, in both the Failure or Success cases, a ResumeMessageProcessing message will be dispatched back to the actor itself.
When the hot-swapped implementation of receive gets the message, it will restore the previous implementation from the stack by calling the unbecome method on the context, as shown on line twenty one. It will then make a call to unstashAll to restore any previously stashed messages as shown on line twenty two.
Lets run the app again a few times and look at the output.
> run [info] Running Foo Final balance is: 1500.0 [success] Total time: 10 s, completed Feb 4, 2014 4:51:05 PM > run [info] Running Foo Final balance is: 1500.0 [success] Total time: 10 s, completed Feb 4, 2014 4:51:17 PM > run [info] Running Foo Final balance is: 1500.0 [success] Total time: 10 s, completed Feb 4, 2014 4:51:29 PM > run [info] Running Foo Final balance is: 1500.0 [success] Total time: 10 s, completed Feb 4, 2014 4:51:40 PM
Finally, some order around here! Not only is the result of each run consistent, but the expected value is also correct.
Conclusions
If we temporarily redirect incoming messages, while processing any given message, and then restore those redirected messages later, we can enforce message processing order. This is done without having to block while processing a message, and also without impeding the reception of additional messages.
Keep in mind that order to achieve this you are using an unbound mailbox type, so you are bound to available memory.
The full source for this example can be downloaded from here.