Akka and Pekko runtimes

Once required interpreters and typeclass instances have been defined, deploying an entity with Akka or Pekko boils down to a single call to deployEntity. This requires an actor system and the cluster sharding extension in implicit scope, bundled in the type AkkaCluster. The recommended pattern is to use the built-in managedResource helper method to obtain an instance of this class, which wraps actor system creation and shutdown with a cats effect Resource.

deployEntity

This function brings everything together and delivers a cats effect Resource with the repository instance in context F bundled with the ref to the shard region actor returned by the call to Akka’s ClusterSharding.init.

The following snippet is the scaffolding for the library’s sample application (in its Pekko form), a simple API to manage vehicles and bookings:

sourceResource
  .eval(Slf4jLogger.create[IO])
  .flatMap { implicit logger: Logger[IO] =>
    PekkoCluster.managedResource[IO](actorSystem).flatMap {
      implicit cluster: PekkoCluster[IO] =>
        Resource
          .both(
            deployEntity[
              IO,
              Booking,
              BookingEvent,
              BookingID,
              BookingAlg,
              BookingRepositoryAlg
            ](
              BookingEntity(_),
              BookingRepository(_),
              { case (effector, _, _) => BookingEffector(effector) },
              customizeBehavior = (_, behavior) =>
                behavior.eventAdapter(
                  new EventAdapter[
                    BookingEvent,
                    endless.example.proto.booking.events.BookingEvent
                  ] {
                    def toJournal(e: BookingEvent)
                        : endless.example.proto.booking.events.BookingEvent =
                      eventAdapter.toJournal(e)
                    def manifest(event: BookingEvent): String = event.getClass.getName
                    def fromJournal(
                        p: endless.example.proto.booking.events.BookingEvent,
                        manifest: String
                    ): EventSeq[BookingEvent] = EventSeq.single(eventAdapter.fromJournal(p))
                  }
                )
            ),
            deployDurableEntityF[IO, Vehicle, VehicleID, VehicleAlg, VehicleRepositoryAlg](
              VehicleEntity(_).pure[IO],
              VehicleRepository(_).pure[IO],
              { case (effector, _, _) => VehicleEffector.apply[IO](effector).map(_.apply) },
              customizeBehavior = (_, behavior) =>
                behavior.snapshotAdapter(new SnapshotAdapter[Option[Vehicle]] {
                  def toJournal(state: Option[Vehicle]): Any = stateAdapter.toJournal(state)
                  def fromJournal(from: Any): Option[Vehicle] = stateAdapter.fromJournal(from)
                })
            )
          )
          .flatMap { case ((bookingRepository, _), (vehicleRepository, _)) =>
            HttpServer(port, bookingRepository, vehicleRepository, cluster.isMemberUp)
          }
    }
  }

deployEntity is parameterized with the context F and the various involved types: S for entity state, E for events, ID for entity ID and Alg & RepositoryAlg for entity and repository algebras respectively (both higher-kinded type constructors).

Entity algebra Alg must also be equipped with an instance of FunctorK to support natural transformations and CommandRouter.

In order to bridge Akka’s implicit asynchronicity with the side-effect free context F used for algebras, it requires Async from F. This makes it possible to use the Dispatcher mechanism for running the command handling monadic chain synchronously from within the actor thread.

Logger from log4cats is also required as the library supports basic logging capabilities.

Important

deployEntity needs to be called upon application startup, before joining the cluster as the ClusterSharding extension needs to know about the various entity types beforehand.

Durable entity

The equivalent method for durable entities is deployDurableEntity.

Internals

Protocol

Thanks to the CommandProtocol instance, entity algebra calls can be “materialized” into concrete commands and replies which are carried in an internal protobuf binary format command.proto. ShardingCommandRouter takes care of delivering the commands to the right entity and returning the reply simply by using Akka’s ask.

Deployer

Internally, deployEntity uses Akka EventSourcedBehavior DSL to configure the entity in the following way (for deployDurableEntity, this is DurableStateBehavior):

Command handler

  1. use CommandProtocol.server to decode the command and invoke the corresponding algebra logic, interpreted internally with EntityT. Interpretation of the monadic chain occurs, leading to one or more events and a return value.
  2. hand in produced events to Akka’s Effect.persist.
  3. trigger any side effects in Effector by interpreting it with EffectorT and running it synchronously with Dispatcher from within thenRun
  4. encode the reply and feed it into thenReply

Event handler

This is simply a synchronous run of EventApplier (using Dispatcher). Left values are translated into thrown exceptions as Akka doesn’t give us other means to deal with event handling errors.

Recovery

Upon successful recovery, we log an info entry and run the effector, while we log a warning entry upon recovery failure.

Custom behavior

The built-in behavior is further customizable via a customizeBehavior function parameter that can be optionally passed into deployEntity.

Compatibility

Since Akka does not allow mixed versions in a project, Akka dependencies of endless-runtime-akka are marked a Provided. This means that your application libraryDependencies needs to directly include Akka as a dependency. The minimal supported Akka version is 2.6.17.