POSTS
Writing a Daytime Server with Akka Streams
Inspired by this article which shows how to implement a daytime server with swift NIO I thought I’d show how do the same on top of Akka in Scala.
Prerequisites
To follow this little guide you are expected to have a JDK installed and sbt
the Scala build tool. If you
do not, start with getting those installed: https://www.scala-sbt.org/1.x/docs/Setup.html
Creating a project
In a suitable directory, create a new project from the sbt akka-quickstart template:
$ sbt new akka/akka-quickstart-scala.g8
... some irrelevant sbt output ...
A template to demonstrate a minimal Scala application
name [akka-quickstart-scala]: akka-daytime
akka_version [2.5.16]:(enter)
package [com.example]: akkadaytime
Template applied in /Users/johan/Code/./akka-daytime
Then add the akka-streams
module as a dependency to the project in the build.sbt
, so that the
complete set of dependencies looks like this:
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
)
Let’s delete the two generated scala files (in your IDE or however you want, for example):
$ rm src/main/scala/akkadaytime/AkkaQuickstart.scala
$ rm src/test/scala/akkadaytime/AkkaQuickstartSpec.scala
And create a new main like this:
package akkadaytime
import akka.actor.ActorSystem
object Server {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("daytime")
implicit val mat = ActorMaterializer()
}
}
We need the actor system and the materializer to run the Akka streams based TCP server we are going to create.
To accept TCP connections, let’s use TCP().bind()
which will give us a Source
of IncomingConnection
s,
one for every client connecting.
Tcp().bind(host, port).runForeach { incomingConnection =>
system.log.info("New connection, client address {}:{}",
incomingConnection.remoteAddress.getHostString,
incomingConnection.remoteAddress.getPort,
)
// TODO we actually need to handle the incoming connection
// incomingConnection.handleWith(...)
}
The actual logic used for each connection is expressed as a Flow[ByteString, ByteString, _]
- bytes from the connection
flowing through and out back to the client. In the case of the daytime protocol
we don’t care about the input from the client, we just emit a date time string and close the connection. This
can be represented by using the Flow.fromSinkAndSource
factory which takes a separate sink for input from the client
and Source for output from the server:
val dayTimeSource: Source[ByteString, NotUsed] =
Source.single(Unit)
.map { _ =>
val string = DateTimeFormatter.RFC_1123_DATE_TIME
.format(ZonedDateTime.now())
ByteString(string, StandardCharsets.US_ASCII)
}
val daytimeFlow: Flow[ByteString, ByteString, NotUsed] =
Flow.fromSinkAndSourceCoupled(Sink.ignore, dayTimeSource)
Note that the parameter for Source.single
is not lazy, which means we need cannot pass the right string directly to
it, that would then contain the string when the application was started and the Source
created. Instead we send a
single unit/whatever element down the stream, and map that into the current timestamp, map will be invoked when the
stream is materialized and running giving us the right time to aquire a timestamp.
We can then wire it together in the connection handler:
Tcp().bind(host, port).runForeach { incomingConnection =>
system.log.info("New connection, client address {}:{}",
incomingConnection.remoteAddress.getHostString,
incomingConnection.remoteAddress.getPort,
)
incomingConnection.handleWith(daytimeFlow)
}
Now we can run the server from sbt:
sbt
> run
And query it for daytime using nc
:
$ nc 0.0.0.0 1313
Tue, 25 Sep 2018 20:19:16 +0200
That’s it, to paraphrase the original blog article: And with that you’ve now implemented a 3 decade old tcp protocol using a hip, cutting-edge language.
Here’s the complete sources for the server: https://github.com/johanandren/akka-daytime
To learn more about what you can do with Akka Streams, checkout the reference documentation and the alpakka docs