Pekko and Akka runtimes

Once required interpreters and typeclass instances have been defined, deploying an entity with Pekko or Akka boils down to a single call to deployRepository. This requires an actor system and the cluster sharding extension in implicit scope, bundled in the type PekkoCluster (or AkkaCluster). The recommended pattern is to use the built-in managedResource helper method to obtain an instance of this class, which wraps actor system creation and shutdown with a Resource.

How to initiate the distributed cluster

The entrypoint is available by importing endless.runtime.pekko.syntax.deploy.* or endless.runtime.akka.syntax.deploy.* and calling deployRepository with the required parameters.

This function ties everything together and delivers a cats effect Resource with an instance of DeployedPekkoRepository in context F bundling the RepositoryAlg instance together with the ref to the shard region actor returned by the call to Pekko’s ClusterSharding.init.

The following snippet is the scaffolding for the library’s sample application (in its Pekko form), a simple API to manage vehicles and bookings:

sourceResource
  .eval(Slf4jLogger.create[IO])
  .flatMap { implicit logger: Logger[IO] =>
    PekkoCluster.managedResource[IO](actorSystem).flatMap {
      implicit cluster: PekkoCluster[IO] =>
        implicit val eventSourcingDeploymentParameters
            : PekkoDeploymentParameters[IO, Booking, BookingEvent] =
          PekkoDeploymentParameters[IO, Booking, BookingEvent](
            customizeBehavior = (_, behavior) =>
              behavior.eventAdapter(
                new EventAdapter[
                  BookingEvent,
                  endless.example.proto.booking.events.BookingEvent
                ] {
                  def toJournal(
                      e: BookingEvent
                  ): endless.example.proto.booking.events.BookingEvent =
                    eventAdapter.toJournal(e)

                  def manifest(event: BookingEvent): String = event.getClass.getName

                  def fromJournal(
                      p: endless.example.proto.booking.events.BookingEvent,
                      manifest: String
                  ): EventSeq[BookingEvent] = EventSeq.single(eventAdapter.fromJournal(p))
                }
              )
          )
        implicit val durableDeploymentParameters
            : PekkoDurableDeploymentParameters[IO, Vehicle] =
          PekkoDurableDeploymentParameters[IO, Vehicle](
            customizeBehavior = (_, behavior) =>
              behavior.snapshotAdapter(new SnapshotAdapter[Option[Vehicle]] {
                def toJournal(state: Option[Vehicle]): Any = stateAdapter.toJournal(state)
                def fromJournal(from: Any): Option[Vehicle] = stateAdapter.fromJournal(from)
              })
          )
        Resource
          .both(
            deployRepository[
              IO,
              BookingID,
              Booking,
              BookingEvent,
              BookingAlg,
              BookingsAlg
            ](
              RepositoryInterpreter.lift(ShardedBookings(_)),
              BehaviorInterpreter.lift(BookingEntityBehavior(_)),
              SideEffectInterpreter.lift { case (_, _) => new BookingSideEffect() }
            ),
            deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg](
              RepositoryInterpreter.lift(ShardedVehicles(_)),
              DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)),
              SideEffectInterpreter.lift { case (_, _) => new VehicleSideEffect() }
            )
          )
          .flatMap { case (bookingDeployment, vehicleDeployment) =>
            HttpServer(
              port,
              bookingDeployment.repository,
              vehicleDeployment.repository,
              cluster.isMemberUp
            )
          }
    }
  }

deployRepository is parameterized with the context F and the various involved types: S for entity state, E for events, ID for entity ID and Alg & RepositoryAlg for entity and repository algebras respectively (both higher-kinded type constructors).

In order to bridge Pekko/Akka’s implicit asynchronicity with the side-effect free context F used for algebras, it requires Async from F. This makes it possible to use the Dispatcher mechanism for running the command handling monadic chain synchronously from within the actor thread.

Logger from log4cats is also required as the library supports basic logging capabilities.

Important

deployRepository needs to be called upon application startup, before joining the cluster as the ClusterSharding extension needs to know about the various entity types beforehand.

Durable entity

The equivalent method for durable entities is deployDurableRepository.

Internals

Protocol

Thanks to the CommandProtocol instance, entity algebra calls can be “materialized” into concrete commands and replies. These types are encoded to a binary payload, which is transported within an internal protobuf envelope command.proto.

ShardingCommandSender takes care of delivering the commands to the right entity and returning the reply simply by using Pekko/Akka’s ask.

Deployer

Internally, deployRepository uses EventSourcedBehavior DSL to configure the entity in the following way (for deployDurableRepository, this is DurableStateBehavior):

Command handler

  1. use CommandProtocol.server to decode the command and invoke the corresponding algebra logic, interpreted internally with EntityT. Interpretation of the monadic chain occurs, leading to one or more events and a return value.
  2. hand in produced events to Pekko/Akka’s Effect.persist.
  3. trigger any side effects in Effector with a run-and-forget using Dispatcher from within thenRun
  4. encode the reply and feed it into thenReply

Event handler

This is simply a synchronous run of EventApplier (using Dispatcher). Left values are translated into thrown exceptions as Pekko/Akka doesn’t give us other means to deal with event handling errors.

Recovery

Upon successful recovery, we log an info entry and run the effector, while we log a warning entry upon recovery failure.

Custom behavior

The built-in behavior is further customizable via a customizeBehavior function parameter that can be optionally passed into PekkoDeploymentParameters or AkkaDeploymentParameters.

Compatibility

Since Akka/Pekko does not allow mixed versions in a project, Akka/Pekko dependencies of endless-runtime-akka and endless-runtime-pekko respectively are marked a Provided. This means that your application libraryDependencies needs to directly include Akka or Pekko as a dependency. The minimal supported Akka version is 2.6.20, and Pekko version is 1.1.1.