Akka runtime

Once required interpreters and typeclass instances have been defined, deploying an entity with Akka boils down to a single call to deployEntity. This naturally requires an actor system and the cluster sharding extension in implicit scope.

deployEntity

This function brings everything together and delivers a cats effect Resource with the repository instance in context F bundled with the ref to the shard region actor returned by the call to Akka’s ClusterSharding.init.

The following snippet is the scaffolding for the library’s sample application, a simple API to manage bookings:

sourcedef apply(implicit actorSystem: ActorSystem[Nothing]): IO[Resource[IO, Server]] = {
  implicit val clusterSharding: ClusterSharding = ClusterSharding(actorSystem)
  implicit val bookingCommandProtocol: BookingCommandProtocol = new BookingCommandProtocol
  implicit val vehicleCommandProtocol: VehicleCommandProtocol = new VehicleCommandProtocol
  implicit val eventApplier: BookingEventApplier = new BookingEventApplier
  implicit val bookingEntityNameProvider: EntityNameProvider[BookingID] = () => "booking"
  implicit val vehicleEntityNameProvider: EntityNameProvider[VehicleID] = () => "vehicle"
  implicit val bookingIDEncoder: EntityIDCodec[BookingID] =
    EntityIDCodec(_.id.show, BookingID.fromString)
  implicit val vehicleIDEncoder: EntityIDCodec[VehicleID] =
    EntityIDCodec(_.id.show, VehicleID.fromString)
  implicit val askTimeout: Timeout = Timeout(10.seconds)

  Slf4jLogger
    .create[IO]
    .map { implicit logger: Logger[IO] =>
      Resource
        .both(
          deployEntity[IO, Booking, BookingEvent, BookingID, BookingAlg, BookingRepositoryAlg](
            BookingEntity(_),
            BookingRepository(_),
            { case (effector, _, _) => BookingEffector(effector) }
          ),
          deployDurableEntityF[IO, Vehicle, VehicleID, VehicleAlg, VehicleRepositoryAlg](
            VehicleEntity(_).pure[IO],
            VehicleRepository(_).pure[IO],
            { case (effector, _, _) => VehicleEffector.apply[IO](effector).map(_.apply) },
            customizeBehavior = (_, behavior) => behavior.snapshotAdapter(new VehicleStateAdapter)
          )
        )
        .map { case ((bookingRepository, _), (vehicleRepository, _)) =>
          httpService(bookingRepository, vehicleRepository)
        }
    }
    .map(
      _.flatMap(service =>
        BlazeServerBuilder[IO]
          .bindHttp(8080, "localhost")
          .withHttpApp(service)
          .resource
      )
    )
}

deployEntity is parameterized with the context F and the various involved types: S for entity state, E for events, ID for entity ID and Alg & RepositoryAlg for entity and repository algebras respectively (both higher-kinded type constructors).

Entity algebra Alg must also be equipped with an instance of FunctorK to support natural transformations and CommandRouter.

In order to bridge Akka’s implicit asynchronicity with the side-effect free context F used for algebras, it requires Async from F. This makes it possible to use the Dispatcher mechanism for running the command handling monadic chain synchronously from within the actor thread.

Logger from log4cats is also required as the library supports basic logging capabilities.

Important

deployEntity needs to be called upon application startup, before joining the cluster as the ClusterSharding extension needs to know about the various entity types beforehand.

Durable entity

The equivalent method for durable entities is deployDurableEntity.

Internals

Protocol

Thanks to the CommandProtocol instance, entity algebra calls can be “materialized” into concrete commands and replies which are carried in an internal protobuf binary format command.proto. ShardingCommandRouter takes care of delivering the commands to the right entity and returning the reply simply by using Akka’s ask.

Deployer

Internally, deployEntity uses Akka EventSourcedBehavior DSL to configure the entity in the following way (for deployDurableEntity, this is DurableStateBehavior):

Command handler

  1. use CommandProtocol.server to decode the command and invoke the corresponding algebra logic, interpreted internally with EntityT. Interpretation of the monadic chain occurs, leading to one or more events and a return value.
  2. hand in produced events to Akka’s Effect.persist.
  3. trigger any side effects in Effector by interpreting it with EffectorT and running it synchronously with Dispatcher from within thenRun
  4. encode the reply and feed it into thenReply

Event handler

This is simply a synchronous run of EventApplier (using Dispatcher). Left values are translated into thrown exceptions as Akka doesn’t give us other means to deal with event handling errors.

Recovery

Upon successful recovery, we log an info entry and run the effector, while we log a warning entry upon recovery failure.

Custom behavior

The built-in behavior is further customizable via a customizeBehavior function parameter that can be optionally passed into deployEntity.

Compatibility

Since Akka does not allow mixed versions in a project, Akka dependencies of endless-runtime-akka are marked a Provided. This means that your application libraryDependencies needs to directly include Akka as a dependency. The minimal supported Akka version is 2.6.17.