POSTS

Futures Plus Supervision in Akka

Reading a blog entry by Paul Cleary made me feel the simple and nice solution was missing when reaching the end of the blog entry, so here you go, what I would have ended it with.

Ideally what we’d want is something pretty close to the pipe pattern built into Akka, but we want to divide errors to the local actor and only send successfully completed future values to the recipient. When failures arrive to the local actor we want to handle them specificly by rethrowing the exception to trigger supervision.

object SupervisedPipe {
  case class SupervisedFail(ex: Throwable)
  class SupervisedPipeableFuture[T](future: Future[T]) {
    // implicit failure recipient goes to self when used inside an actor
    def supervisedPipeTo(successRecipient: ActorRef)(implicit failureRecipient: ActorRef, ec: ExecutionContext): Unit =
      future.andThen {
        case Success(result) => successRecipient ! result
        case Failure(ex) => failureRecipient ! SupervisedFail(ex)
      }
   }
   /** `orElse` with the actor receieve logic */
   val handleSupervisedPipeFailure: Receive = {
     // just throw the exception and make the actor logic handle it
     case SupervisedFail(ex) => throw ex
   }
   implicit def supervisedPipeTo[T](future: Future[T]): SupervisedPipeableFuture[T] =
     new SupervisedPipeableFuture[T](future)
}

This provides us with a decoration on instances of Future called supervisedPipeTo and an extra receive block that we can append to the end of our own receive logic using orElse like so:

class SampleActor extends Actor {
  import SupervisedPipe.{supervisedPipeTo, handleSupervisedPipeFailure}
    import context.dispatcher
    // some operation that returns a future
    def futureResult(): Future[String] = ???
    def logic: Receive = {
      case "go" =>
        futureResult() supervisedPipeTo sender()
    }
    // hook in the supervision glue if nothing matches in receive
    // another alternative would be to mix in a prepared onUnhandled
    // but as we are actually handling it I think this is neater
    def receive = logic orElse handleSupervisedPipeFailure
}

Happy hakking!