POSTS

Writing a Daytime Server with Akka Streams

Inspired by this article which shows how to implement a daytime server with swift NIO I thought I’d show how do the same on top of Akka in Scala.

Prerequisites

To follow this little guide you are expected to have a JDK installed and sbt the Scala build tool. If you do not, start with getting those installed: https://www.scala-sbt.org/1.x/docs/Setup.html

Creating a project

In a suitable directory, create a new project from the sbt akka-quickstart template:

$ sbt new akka/akka-quickstart-scala.g8
... some irrelevant sbt output ...
A template to demonstrate a minimal Scala application

name [akka-quickstart-scala]: akka-daytime
akka_version [2.5.16]:(enter)
package [com.example]: akkadaytime

Template applied in /Users/johan/Code/./akka-daytime

Then add the akka-streams module as a dependency to the project in the build.sbt, so that the complete set of dependencies looks like this:

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % akkaVersion,
  "com.typesafe.akka" %% "akka-testkit" % akkaVersion,
  "com.typesafe.akka" %% "akka-stream" % akkaVersion,
  "org.scalatest" %% "scalatest" % "3.0.5" % "test"
)

Let’s delete the two generated scala files (in your IDE or however you want, for example):

$ rm src/main/scala/akkadaytime/AkkaQuickstart.scala
$ rm src/test/scala/akkadaytime/AkkaQuickstartSpec.scala

And create a new main like this:

package akkadaytime

import akka.actor.ActorSystem

object Server {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("daytime")
    implicit val mat = ActorMaterializer()
  }
}

We need the actor system and the materializer to run the Akka streams based TCP server we are going to create.

To accept TCP connections, let’s use TCP().bind() which will give us a Source of IncomingConnections, one for every client connecting.

Tcp().bind(host, port).runForeach { incomingConnection =>

  system.log.info("New connection, client address {}:{}", 
    incomingConnection.remoteAddress.getHostString,
    incomingConnection.remoteAddress.getPort,
  )
      
  // TODO we actually need to handle the incoming connection
  // incomingConnection.handleWith(...)
}

The actual logic used for each connection is expressed as a Flow[ByteString, ByteString, _] - bytes from the connection flowing through and out back to the client. In the case of the daytime protocol we don’t care about the input from the client, we just emit a date time string and close the connection. This can be represented by using the Flow.fromSinkAndSource factory which takes a separate sink for input from the client and Source for output from the server:

  val dayTimeSource: Source[ByteString, NotUsed] =
    Source.single(Unit)
      .map { _ =>
        val string = DateTimeFormatter.RFC_1123_DATE_TIME
          .format(ZonedDateTime.now())
        ByteString(string, StandardCharsets.US_ASCII)
      }

  val daytimeFlow: Flow[ByteString, ByteString, NotUsed] =
    Flow.fromSinkAndSourceCoupled(Sink.ignore, dayTimeSource)

Note that the parameter for Source.single is not lazy, which means we need cannot pass the right string directly to it, that would then contain the string when the application was started and the Source created. Instead we send a single unit/whatever element down the stream, and map that into the current timestamp, map will be invoked when the stream is materialized and running giving us the right time to aquire a timestamp.

We can then wire it together in the connection handler:

Tcp().bind(host, port).runForeach { incomingConnection =>

    system.log.info("New connection, client address {}:{}",
      incomingConnection.remoteAddress.getHostString,
      incomingConnection.remoteAddress.getPort,
    )

    incomingConnection.handleWith(daytimeFlow)
  }

Now we can run the server from sbt:

sbt 
> run

And query it for daytime using nc:

$ nc 0.0.0.0 1313
Tue, 25 Sep 2018 20:19:16 +0200

That’s it, to paraphrase the original blog article: And with that you’ve now implemented a 3 decade old tcp protocol using a hip, cutting-edge language.

Here’s the complete sources for the server: https://github.com/johanandren/akka-daytime

To learn more about what you can do with Akka Streams, checkout the reference documentation and the alpakka docs