codemonkey

extends Human with Blog

Actor Per Request With Akka HTTP

Akka HTTP is the continuation of Spray, which used to be the web server implemented on top of Akka. A common pattern from Spray that devs want to implement with Akka HTTP is “actor per request” where each request is sent to an actor whose life cycle is bound to the request, so it is started, gets the request, does some processing and produces a complete or failed result back to the HTTP client.

Let’s look at three ways to do this!

I have not included any fancy patterns to organize actors and put the focus on how to get a result from an actor to complete a request. The order is in my personal order of preference, but of course you may have other preferences.

Ask pattern

The first way to do this is to combine the ask pattern with the routing DSL. Performing an ask returns a Future whose completion we can react on using the onSuccess directive like so:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import akka.actor.{Actor, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.util.Timeout

import scala.concurrent.duration._
import scala.io.StdIn

object AskActorPerRequest extends App {
  object RequestHandler {
    case object Handle
    case class Result(data: String)
  }
  class RequestHandler extends Actor {
    import RequestHandler._
    def receive = {
      case Handle =>
        sender ! Result("ok")
        context.stop(self)
    }
  }

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  import akka.pattern.ask
  import system.dispatcher // ask needs a dispatcher

  val route =
    pathEndOrSingleSlash {
      get {
        implicit val askTimeout: Timeout = 3.seconds // and a timeout
        val actor = system.actorOf(Props[RequestHandler])
        onSuccess((actor ? RequestHandler.Handle).mapTo[RequestHandler.Result]) { result =>
          complete(result.data)
        }
      }
    }

  Http().bindAndHandle(route, "localhost", 8080)

  println("ENTER to terminate")
  StdIn.readLine()
  system.terminate()
}

What is good with this solution is that the actor will behave just like actors do, and does not have to be adapted in any way for processing.

(You would of course not define your routes and actor implementation in the same object like this in an actual application, that was done to keep the sample short)

Complete a function

Option number two gets rid of the ask and sends the actor a function to call:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import akka.actor.{Actor, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import http.ActorPerRequest.RequestHandler.Handle

import scala.io.StdIn


object ActorPerRequest extends App {

  object RequestHandler {
    case class Handle(complete: String => Unit)
  }
  class RequestHandler extends Actor {
    def receive = {
      case Handle(complete) =>
        complete("ok")
        context.stop(self)
    }
  }

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val route =
    pathEndOrSingleSlash {
      get {
        // we have to decide on the result type already here to be able
        // to provide the right marshaller
        completeWith(implicitly[ToResponseMarshaller[String]]) { f =>
          system.actorOf(Props[RequestHandler]) ! RequestHandler.Handle(f)
        }
      }
    }

  Http().bindAndHandle(route, "localhost", 8080)

  println("ENTER to terminate")
  StdIn.readLine()
  system.terminate()
}

Instead of a “normal” actor protocol where completing the processing leads to a message sent back this variation requires a specific protocol, taking a function to call upon completing the processing. The completeWith directive matches what was called produce in Spray.

The neat thing with this version is that the actor still does not know about any Akka HTTP APIs and can easily be tested by just sending it a dummy function.

A problem could be that you have to decide on the result entity type up front, there is also no obvious way to fail the request if processing fails in the actor.

Complete that context

The third option is the closest to the variation where you essentially send the RequestContext to an actor and have it complete the request. This requires some trixing with the APIs as a route expects to end up with the future result at some point.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import akka.actor.{Actor, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{RequestContext, Route, RouteResult}
import akka.stream.ActorMaterializer

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.io.StdIn


object ActorPerRequestLikeSpray extends App {

  object RequestHandler {
    case class Handle(ctx: ImperativeRequestContext)
  }
  class RequestHandler extends Actor {
    import RequestHandler._
    def receive = {
      case Handle(ctx) =>
        ctx.complete("ok")
        context.stop(self)
    }
  }

  // an imperative wrapper for request context
  final class ImperativeRequestContext(ctx: RequestContext, promise: Promise[RouteResult]) {
    private implicit val ec = ctx.executionContext
    def complete(obj: ToResponseMarshallable): Unit = ctx.complete(obj).onComplete(promise.complete)
    def fail(error: Throwable): Unit = ctx.fail(error).onComplete(promise.complete)
  }

  // a custom directive
  def imperativelyComplete(inner: ImperativeRequestContext => Unit): Route = { ctx: RequestContext =>
    val p = Promise[RouteResult]()
    inner(new ImperativeRequestContext(ctx, p))
    p.future
  }

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()

  val route =
    pathEndOrSingleSlash {
      get {
        imperativelyComplete { ctx =>
          system.actorOf(Props[RequestHandler]) ! RequestHandler.Handle(ctx)
        }
      }
    }

  Http().bindAndHandle(route, "localhost", 8080)

  println("ENTER to terminate")
  StdIn.readLine()
  system.terminate()
}

This introduces the tightest coupling with Akka HTTP in the actor as you will have to have marshalling in scope in the actor and marshall the result before completing the wrapped request context with it.

On the other hand since the wrapper type is yours you may abstract over it and provide a mock in your tests.

I hope this three samples was good inspiration on how to interact with actors from Akka HTTP routes. Happy hakking!