Akka and Pekko runtimes
Once required interpreters and typeclass instances have been defined, deploying an entity with Akka or Pekko boils down to a single call to deployEntity
. This requires an actor system and the cluster sharding extension in implicit scope, bundled in the type AkkaCluster
. The recommended pattern is to use the built-in managedResource
helper method to obtain an instance of this class, which wraps actor system creation and shutdown with a cats effect Resource.
deployEntity
This function brings everything together and delivers a cats effect Resource with the repository instance in context F
bundled with the ref to the shard region actor returned by the call to Akka’s ClusterSharding.init.
The following snippet is the scaffolding for the library’s sample application (in its Pekko form), a simple API to manage vehicles and bookings:
sourceResource
.eval(Slf4jLogger.create[IO])
.flatMap { implicit logger: Logger[IO] =>
PekkoCluster.managedResource[IO](actorSystem).flatMap {
implicit cluster: PekkoCluster[IO] =>
Resource
.both(
deployEntity[
IO,
Booking,
BookingEvent,
BookingID,
BookingAlg,
BookingRepositoryAlg
](
BookingEntity(_),
BookingRepository(_),
{ case (effector, _, _) => BookingEffector(effector) },
customizeBehavior = (_, behavior) =>
behavior.eventAdapter(
new EventAdapter[
BookingEvent,
endless.example.proto.booking.events.BookingEvent
] {
def toJournal(e: BookingEvent)
: endless.example.proto.booking.events.BookingEvent =
eventAdapter.toJournal(e)
def manifest(event: BookingEvent): String = event.getClass.getName
def fromJournal(
p: endless.example.proto.booking.events.BookingEvent,
manifest: String
): EventSeq[BookingEvent] = EventSeq.single(eventAdapter.fromJournal(p))
}
)
),
deployDurableEntityF[IO, Vehicle, VehicleID, VehicleAlg, VehicleRepositoryAlg](
VehicleEntity(_).pure[IO],
VehicleRepository(_).pure[IO],
{ case (effector, _, _) => VehicleEffector.apply[IO](effector).map(_.apply) },
customizeBehavior = (_, behavior) =>
behavior.snapshotAdapter(new SnapshotAdapter[Option[Vehicle]] {
def toJournal(state: Option[Vehicle]): Any = stateAdapter.toJournal(state)
def fromJournal(from: Any): Option[Vehicle] = stateAdapter.fromJournal(from)
})
)
)
.flatMap { case ((bookingRepository, _), (vehicleRepository, _)) =>
HttpServer(port, bookingRepository, vehicleRepository, cluster.isMemberUp)
}
}
}
deployEntity
is parameterized with the context F
and the various involved types: S
for entity state, E
for events, ID
for entity ID and Alg
& RepositoryAlg
for entity and repository algebras respectively (both higher-kinded type constructors).
Entity algebra Alg
must also be equipped with an instance of FunctorK
to support natural transformations and CommandRouter.
In order to bridge Akka’s implicit asynchronicity with the side-effect free context F
used for algebras, it requires Async from F
. This makes it possible to use the Dispatcher mechanism for running the command handling monadic chain synchronously from within the actor thread.
Logger
from log4cats
is also required as the library supports basic logging capabilities.
deployEntity
needs to be called upon application startup, before joining the cluster as the ClusterSharding
extension needs to know about the various entity types beforehand.
The equivalent method for durable entities is deployDurableEntity
.
Internals
Protocol
Thanks to the CommandProtocol instance, entity algebra calls can be “materialized” into concrete commands and replies which are carried in an internal protobuf binary format command.proto. ShardingCommandRouter takes care of delivering the commands to the right entity and returning the reply simply by using Akka’s ask
.
Deployer
Internally, deployEntity uses Akka EventSourcedBehavior DSL to configure the entity in the following way (for deployDurableEntity, this is DurableStateBehavior):
Command handler
- use CommandProtocol.server to decode the command and invoke the corresponding algebra logic, interpreted internally with
EntityT
. Interpretation of the monadic chain occurs, leading to one or more events and a return value. - hand in produced events to Akka’s
Effect.persist
. - trigger any side effects in Effector by interpreting it with
EffectorT
and running it synchronously withDispatcher
from withinthenRun
- encode the reply and feed it into
thenReply
Event handler
This is simply a synchronous run of EventApplier (using Dispatcher
). Left values are translated into thrown exceptions as Akka doesn’t give us other means to deal with event handling errors.
Recovery
Upon successful recovery, we log an info entry and run the effector, while we log a warning entry upon recovery failure.
The built-in behavior is further customizable via a customizeBehavior
function parameter that can be optionally passed into deployEntity
.
Since Akka does not allow mixed versions in a project, Akka dependencies of endless-runtime-akka
are marked a Provided
. This means that your application libraryDependencies
needs to directly include Akka as a dependency. The minimal supported Akka version is 2.6.17.