There has been a great deal of buzz around the Internet of Things lately. The advent of small inexpensive devices and in particular the Arduino has inspired a generation of people with no background in electrical engineering to do some very creative things. I myself was first inspired by the Arduino and had built several small projects around the DueMilaNove model. I am a software consultant by trade so naturally I loved the fact that I could program these devices to do whatever I was inspired to have them do.
Most recently I became interested in another aspect of small devices, the capturing of data and subsequent control of other devices based on that data. In this series, I will build an application that measures moisture levels in soil and activates a valve to turn on watering of the soil when it becomes dry. The devices I have purchased for this project are the Particle Photon board and moisture sensor.
This initial application will demonstrate how to capture and persist the data from a device. In this first installment, I will be sending simulated device data to the server from a script that dispatches messages to an MQTT broker. In a future posting, I’ll build out and code the Photon device to send that data.
Before we get into the code, the following is a diagram on the interaction that will take place between the simulated device, the broker and the Akka actor that I will be discussing:
The Application Bootstrap
The bootstrap is fairly straight forward. It initializes a settings object that parses configurations on line 6. On line 9 it iterates through an array of device configurations and initializes an actor to monitor each of the configured devices. Each device will have its own topic which it publishes to.
object Main extends App { implicit val system = ActorSystem("iot-series") implicit val ctx = system.dispatcher val settings = Settings(system) // Start up an actor for each configured device. settings.deviceConfigurations.foreach { deviceConfiguration => val deviceId = deviceConfiguration.id // Actor names don't support a forward slash in the name so we filter it out. val actorName = s"device-monitor.${deviceId.replaceAll("/",".")}" // Initialize an instance of the device monitor with the configuration settings and a device identifier. system.actorOf(DeviceMonitor.props(deviceId, settings), actorName) } }
The device configurations section the application.conf file defines an array containing device identifiers.
Currently, only one device is configured:
device-configurations = [ { id = "devices/mock/" } ]
Device identifiers should follow the pattern described in the MQTT Best Practices. The script that I have provided under the util directory will be publishing to the MQTT broker under the same topic configured in the configuration file.
The Device Monitor
The DeviceMonitor extends the Akka Actor to provide the handling of messages received through a subscription to the MQTT broker. On line 9 you’ll see the initialization of the Akka based subscription actor from the Paho-Akka library. A reference to the current actor instance is passed to the subscription actor during its construction. This is used for passing messaged back to the actor that initialized it which is the DeviceMonitor in this case. On line 14, a message which tells the actor to attempt a subscribe to the broker is scheduled to dispatch after 1 second.
class DeviceMonitor( deviceId: String, settings: Settings, now: () => Instant = () => Instant.now() )(implicit executionContext: ExecutionContext) extends Actor with ActorLogging { private val subscriberActor: ActorRef = { log.info("Broker Configuration: " + settings.messageBrokerConfig) context.actorOf(Props(new MqttPubSub(settings.messageBrokerConfig, Some(self))),"device-monitor-supervisor-subscriber") } context.system.scheduler.scheduleOnce(1.second, self, SubscribeToTopic) ... }
One important thing to mention here is that the Akka Paho library currently has a bug in which it does not reconnect if the MQTT broker has gone down and come back up. I have a fix and a PR in place to address this but until it is merged in you can build your own local version from my fork of this project here. For the sake of convenience, I’ve included a prebuilt jar in the project as an unmanaged dependency.
The actor’s receive method is overridden on line 11 to delegate to the unInitialized function on line 13 that will handle three messages while the actor is initializing:
class DeviceMonitor( deviceId: String, settings: Settings, now: () => Instant = () => Instant.now() )(implicit executionContext: ExecutionContext) extends Actor with ActorLogging { ... def receive: Actor.Receive = unInitialized def unInitialized: Receive = { case SubscribeToTopic ⇒ ... case ReceiveTimeout ⇒ ... case SubscribeSuccess ⇒ ... } ... }
When the scheduled SubscribeToTopic message is fired, it will be handled in the unInitialized function as seen on line 15.
class DeviceMonitor( deviceId: String, settings: Settings, now: () => Instant = () => Instant.now() )(implicit executionContext: ExecutionContext) extends Actor with ActorLogging { ... def receive: Actor.Receive = unInitialized def unInitialized: Receive = { case SubscribeToTopic ⇒ log.debug(s"Subscribing to topic ") subscriberActor ! Subscribe(deviceId, self) context.setReceiveTimeout(10.seconds) ... } ... }
What this is doing is dispatching a message to the subscriber actor handing it a reference to the deviceId and a reference to the current actor instance. The device identifier is used as the name of the topic to subscribe to and the reference to the actor instance is used by the subscriber actor to send messages back to. A receive timeout is set to 10 seconds at this point.
If the actor does not get a SubscribeSuccess within that period of time, the actor will dispatch a ReceiveTimeout message to itself which is handled on line 16. Handling of that message amounts to rescheduling another SubscribeToTopic message to be sent thus restarting the subscription process again.
class DeviceMonitor( deviceId: String, settings: Settings, now: () => Instant = () => Instant.now() )(implicit executionContext: ExecutionContext) extends Actor with ActorLogging { ... def receive: Actor.Receive = unInitialized def unInitialized: Receive = { ... case ReceiveTimeout ⇒ log.error("Timed out while attempting to connection to message server, retrying.") // Resubscribe if we haven't received a subscription ack yet. context.system.scheduler.scheduleOnce(1.second, self, SubscribeToTopic) ... } ... }
If the actor does receive a SubscribeSuccess, as seen on line 15, within the expected time period, it will reset the receive timeout to infinite, dispatch a MonitorInitialized event to any configured listener actors then change the behavior of the receive to the initialized handling which we will describe next. For this current implementation, there are no configured listeners other than the one used in the test class but we’ll go further into that in future articles
class DeviceMonitor( deviceId: String, settings: Settings, now: () => Instant = () => Instant.now() )(implicit executionContext: ExecutionContext) extends Actor with ActorLogging { ... def receive: Actor.Receive = unInitialized def unInitialized: Receive = { case SubscribeSuccess ⇒ log.debug(s"${self.path} is subscribed to topic $deviceId") // Reset timeout to infinite. context.setReceiveTimeout(Duration.Undefined) dispatchEvent(MonitoringInitialized(deviceId)) context.become(initialized) } ... }
Once the actor behavior has been changed to the initialized state, the initialized function will handle the Message dispatched from the subscription actor as seen on line 12. Each message received will contain a topic and a payload. The handler then calls the parseMessage seen on line 20. This will parse the byte array into a message to be dispatched to any listeners. Dispatching of events to configured listeners is handled in the dispatchEvent method on line 26. This method will check the configuration for any actors configured to receive device monitor messages. These messages are not currently used but will be in a future installment of this series.
class DeviceMonitor( deviceId: String, settings: Settings, now: () => Instant = () => Instant.now() )(implicit executionContext: ExecutionContext) extends Actor with ActorLogging { ... def initialized: Receive = { case msg: Message => parseMessage(msg).foreach{event ⇒ log.debug(s"Received device data ${event.value}") dispatchEvent(event) } } private def parseMessage(message: Message): Option[DeviceDataReceived] = { val msg: String = (message.payload map (_.toChar)).mkString("") log.debug(s"Monitor Received MQTT message: $msg") Try(Some(DeviceDataReceived(message.topic, BigDecimal(msg), now()))).getOrElse(None) } private def dispatchEvent(event: MonitorEvent) = { log.debug(s"Dispatching device event $event for device $deviceId") settings.deviceMonitorListeners.foreach { listener ⇒ log.debug(s"Dispatching $event for device $deviceId to $listener") context.actorSelection(listener) ! event } } }
The application in this current state serves as the basis which we will build upon to form a complete Internet of Things application. The full source code for this project can be found here. There is documentation on the site that describes how to set up the application and get it running in the readme file.
The next in this series of articles can be found here.