codemonkey

extends Human with Blog

Chat With Akka HTTP Websockets

This is an update of an older article using the new MergeHub and BroadcastHub that was introduced in Akka Streams 2.4.10 instead of actors for the dynamic registration of clients. If you are after modelling the chat room and users with actors the old article is still relevant!

Something that often comes up in the various Akka community channels is people who want to create a HTTP chat application but get stuck, they often get stuck on the same thing, so let us go through how one could build such a thing.

First thing you need to setup a project with the right dependencies, the new stages were introduced in Akka 2.4.10 so we need that version or higher. The complete dependencies we interact with includes akka-actor, akka-http-core and akka-http-experimental (although you could pull them all in with a dependency on just akka-http-experimental I prefer explicitly listing any library whose API my code actually touches.

With SBT it looks like this:

1
2
3
4
5
6
val akkaVersion = "2.4.11"
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-core" % akkaVersion,
  "com.typesafe.akka" %% "akka-http-core" % akkaVersion,
  "com.typesafe.akka" %% "akka-http-experimental" % akkaVersion
)

Then we will need a runnable application, some imports and both and ActorSystem and an ActorMaterializer to actually run our code, so let’s get that done right away so that the following code can just expect it to be:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import akka.NotUsed
import akka.actor._
import akka.http.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.ws.Message
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.io.StdIn

object Server {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()

    // the rest of the sample code will go here
    println("Started server at 127.0.0.1:8080, press enter to kill server")
    StdIn.readLine()
    system.terminate()
  }
}

With the route DSL we can then describe what our HTTP server does and run it:

1
2
3
4
5
6
7
8
val route =
  path("example") {
    get {
      complete("Such HTTP response")
    }
  }

val binding = Await.result(Http().bindAndHandle(route, "127.0.0.1", 8080), 3.seconds)

This minimal server will accept GET requests to http://127.0.0.1:8080/example and respond with “Such HTTP response”. Ok, now we got the minimal basics laid down, let’s move on to more advanced things.

Since websockets are bidirectional streams of messages they are modeled as a Source[Message, _] of incoming messages, and a Sink[Message, _] where you would stream outgoing messages. The source and sink are wrapped as a Flow[Message, Message, _]. In Akka Streams Source, Sink and Flow are all blueprints, this means you can describe your flow once and run it many times, this run step is called materialization.

As an example we can create a simple echo-web socket like this:

1
2
3
4
5
6
7
8
val echo = Flow[Message]

val route =
  path("echo") {
    get {
      handleWebsocketMessages(echo)
    }
  }

The blueprints are in general static and cannot change stream layout (what Sources and Sinks there are for example) after it has been materialized and started.

For a chat we would like each the input and outputs of each connection come and go. The new MergeHub and BroadcastHub stages makes this possible.

Let’s take a look at their individual usage.

The MergeHub is a Source of elements, that materialize into a Sink which in turn can be materialized any number of times, the element sent to that Sink will end up being emitted from the MergeHub itself.

In this sample a stream with the merge hub is started, and then the sink is used in two new streams and the strings will end up in the sink of the merge hub stream:

1
2
3
val sink = MergeHub.source[String].to(Sink.foreach(println)).run()
Source.single("hello world 1").runWith(sink)
Source.single("hello world 2").runWith(sink)

The BroadcastHub is the inverse and is instead a Sink that will accept values and broadcast them to any current subscriber, it materializes into a Source that can be materialized any number of times to “subscribe” to the broadcast hub. If there is no subscribers the broadcast hub will backpressure so that no element can be sent to it.

In this sample we start a stream that streams the values from 0 to 2000 into the broadcast hub, at first though there is no subscribers so the source will be backpressured until the first consumer registers, at some later point in time the second stream registers and starts printing the values after that (so it will potentially not print all the values the first source has printed)

1
2
3
val source = Source(0 to 2000).toMat(BroadcastHub.sink[Int])(Keep.right).run()
source.runForeach(n => println(s"source1: $n"))
source.runForeach(n => println(s"source2: $n"))

If we want incoming values to be consumed when there is no subscriber we can start with materializing the source into a Sink.ignore which will have the upstream emit as fast as possible when there is no other subscribers and then when a subscriber is started the pace the subscriber can handle elements will determine the overall pace.

Combining these two stages we can create something like a dynamic backpressured pub-sub:

1
2
val (chatSink, chatSource) =
  MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run()

We can now create streams emitting to the chatSink to have it emit values to any streams created with the chatSource.

To use these with websockets we need to adapt the incoming and outgoing elements to the websocket protocol model Message, and more specifically TextMessage, we also need to provide the combined stream as a Flow[Message, Message, _]. This can be achieved like so:

1
2
3
4
5
6
 val userFlow: Flow[Message, Message, NotUsed] =
  Flow[Message].mapAsync(1) {
    case TextMessage.Strict(text) =>       Future.successful(text)
    case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
  }.via(Flow.fromSinkAndSource(chatSink, chatSource))
  .map[Message](string => TextMessage(string))

(Note that I have added additional support for streamed entries from the clients, compared to the old article)

Now we can hand this Flow to the Akka HTTP to handle incoming requests with:

1
2
3
4
5
6
val route =
  path("chat") {
    get {
      handleWebSocketMessages(userFlow)
    }
  }

For the complete sources and a minimal Javascript/HTML client, checkout https://github.com/johanandren/chat-with-akka-http-websockets/tree/akka-2.4.10

akka, scala

« Actor Per Request with Akka HTTP