POSTS

Chat With Akka HTTP Websockets

Note since this was written Akka Streams has gained two dynamic stages that makes this possible without using actors which simplifies the wiring a lot if you are not after modelling the chat with actors.

See new article for more details.

Something that often comes up in the various Akka community channels is people who want to create a chat application but get stuck, they often get stuck on the same thing, so let us go through how one could build such a thing.

First thing you need to setup a project with the right dependencies, you will need akka-http-core and akka-http-experimental. With SBT it looks like this:

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http-core" % "2.4.4",
  "com.typesafe.akka" %% "akka-http-experimental" % "2.4.4"
)

Then we will need a runnable application, some imports and both and ActorSystem and an ActorMaterializer to actually run our code, so let’s get that done right away so that the following code can just expect it to be:

import akka.NotUsed
import akka.actor._
import akka.http.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.ws.Message
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.io.StdIn
object Server {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // the rest of the sample code will go here
    println("Started server at 127.0.0.1:8080, press enter to kill server")
    StdIn.readLine()
    system.terminate()
  }
}

With the route DSL we can then describe what our HTTP server does and run it:

val route =
  path("example") {
    get {
      complete("Such HTTP response")
    }
  }
  val binding = Await.result(Http().bindAndHandle(route, "127.0.0.1", 8080), 3.seconds)

This minimal server will accept GET requests to http://127.0.0.1:8080/example and respond with “Such HTTP response”. Ok, now we got the minimal basics laid down, let’s move on to more advanced things.

Since websockets are bidirectional streams of messages they are modeled as a Source[Message, _] of incoming messages, and a Sink[Message, _] where you would stream outgoing messages. The source and sink are wrapped as a Flow[Message, Message, _]. In Akka Streams Source, Sink and Flow are all blueprints, this means you can describe your flow once and run it many times, this run step is called materialization.

As an example we can create a simple echo-web socket like this:

val echo = Flow[Message]
  val route =
    path("echo") {
      get {
        handleWebsocketMessages(echo)
      }
    }

To create a chat room we would like to make each incoming message end up the same place and that place to know of all outgoing streams, and send incoming messages to all of those. Lets solve this by using actors, as they are pretty straight forward and simple to understand. We will create an actor representing each connection and have it register to another actor representing the chat, keeping a list of users in the room and publishing incoming messages to all of them.

First, lets design a little message protocol for registering and for sending chat messages to a chat room actor:

object ChatRoom {
  case object Join
  case class ChatMessage(user: String, message: String)
}
class ChatRoom extends Actor {
  import ChatRoom._
  var users: Set[ActorRef] = Set.empty
  def receive = {
    case Join =>
      users += sender()
      // we also would like to remove the user when its actor is stopped
      context.watch(sender())
    case Terminated(user) =>
      users -= user
    case msg: ChatMessage =>
      users.foreach(_ ! msg)
  }
}

Then we need an actor that represents a connected user, here I have taken two steps already by expecting to have an ActorRef that we can send outgoing messages to, it will hopefully clear up in a bit:

object User {
  case class IncomingMessage(text: String)
  case class OutgoingMessage(text: String)
}
class User(chatRoom: ActorRef) extends Actor {
  import User._
  def receive = {
    case Connected(outgoing) =>
      context.become(connected(outgoing))
  }
  
  def connected(outgoing: ActorRef): Receive = {
    chatRoom ! ChatRoom.Join
    
    {
      case IncomingMessage(text) =>
        chatRoom ! ChatRoom.ChatMessage(text)
   
       case ChatRoom.ChatMessage(text) =>
         outgoing ! OutgoingMessage(text)
    }
  }
}

Ok, now we just have to somehow glue this together with the incoming connection flow some how. The ChatRoom is easy, we just create one on application start and keep the ActorRef. For the User actor, we will want to create a new one for every new connection, and wire it for incoming and outgoing messages.

Incoming messages are easy as there is a Sink.actorRef(actorRef, messageOnShutdown) which we can just pass in a reference to our User actor to and get a Sink that we can provide for incoming websocket messages.

Outgoing is a little bit harder, there is Source.actorRef(buffersize, what-to-do-on-overflow) that returns a Source[Something, ActorRef] - did you notice the second type parameter there, when it is materialized it creates an actor that will make messages sent to it come from the returned source. We need to get this outgoing messages actorref to our User-actor upon materialization somehow.

.mapMaterializedValue(mat => new-mat) allows us to hook in this kind of logic, the function passed to it will be invoked with the materialized value when the flow is materialized.

Finally Flow.fromSinkAndSource allows us to produce a Flow from a Sink and a Source.

Tadaa:

val chatRoom = system.actorOf(Props(new ChatRoom), "chat")
def newUser(): Flow[Message, Message, NotUsed] = {
  // new connection - new user actor
  val userActor = system.actorOf(Props(new User(chatRoom)))
  val incomingMessages: Sink[Message, NotUsed] =
    Flow[Message].map {
      // transform websocket message to domain message
      case TextMessage.Strict(text) => User.IncomingMessage(text)
     }.to(Sink.actorRef[User.IncomingMessage](userActor, PoisonPill))
     
   val outgoingMessages: Source[Message, NotUsed] =
     Source.actorRef[User.OutgoingMessage](10, OverflowStrategy.fail)
       .mapMaterializedValue { outActor =>
         // give the user actor a way to send messages out
         userActor ! User.Connected(outActor)
         NotUsed
       }.map(
         // transform domain message to web socket message
         (outMsg: User.OutgoingMessage) => TextMessage(outMsg.text))
       // then combine both to a flow
       Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
     }
   val route =
     path("chat") {
       get {
       	  handleWebSocketMessages(newUser())   
       }
     }

For the complete sources and a minimal Javascript/HTML client, checkout https://github.com/johanandren/chat-with-akka-http-websockets/tree/akka-2.4.9