Pekko and Akka runtimes
Once required interpreters and typeclass instances have been defined, deploying an entity with Pekko or Akka boils down to a single call to deployRepository
. This requires an actor system and the cluster sharding extension in implicit scope, bundled in the type PekkoCluster
(or AkkaCluster
). The recommended pattern is to use the built-in managedResource
helper method to obtain an instance of this class, which wraps actor system creation and shutdown with a Resource.
How to initiate the distributed cluster
The entrypoint is available by importing endless.runtime.pekko.syntax.deploy.*
or endless.runtime.akka.syntax.deploy.*
and calling deployRepository
with the required parameters.
This function ties everything together and delivers a cats effect Resource with an instance of DeployedPekkoRepository in context F
bundling the RepositoryAlg
instance together with the ref to the shard region actor returned by the call to Pekko’s ClusterSharding.init.
The following snippet is the scaffolding for the library’s sample application (in its Pekko form), a simple API to manage vehicles and bookings:
sourceResource
.eval(Slf4jLogger.create[IO])
.flatMap { implicit logger: Logger[IO] =>
PekkoCluster.managedResource[IO](actorSystem).flatMap {
implicit cluster: PekkoCluster[IO] =>
implicit val eventSourcingDeploymentParameters
: PekkoDeploymentParameters[IO, Booking, BookingEvent] =
PekkoDeploymentParameters[IO, Booking, BookingEvent](
customizeBehavior = (_, behavior) =>
behavior.eventAdapter(
new EventAdapter[
BookingEvent,
endless.example.proto.booking.events.BookingEvent
] {
def toJournal(
e: BookingEvent
): endless.example.proto.booking.events.BookingEvent =
eventAdapter.toJournal(e)
def manifest(event: BookingEvent): String = event.getClass.getName
def fromJournal(
p: endless.example.proto.booking.events.BookingEvent,
manifest: String
): EventSeq[BookingEvent] = EventSeq.single(eventAdapter.fromJournal(p))
}
)
)
implicit val durableDeploymentParameters
: PekkoDurableDeploymentParameters[IO, Vehicle] =
PekkoDurableDeploymentParameters[IO, Vehicle](
customizeBehavior = (_, behavior) =>
behavior.snapshotAdapter(new SnapshotAdapter[Option[Vehicle]] {
def toJournal(state: Option[Vehicle]): Any = stateAdapter.toJournal(state)
def fromJournal(from: Any): Option[Vehicle] = stateAdapter.fromJournal(from)
})
)
Resource
.both(
deployRepository[
IO,
BookingID,
Booking,
BookingEvent,
BookingAlg,
BookingsAlg
](
RepositoryInterpreter.lift(ShardedBookings(_)),
BehaviorInterpreter.lift(BookingEntityBehavior(_)),
SideEffectInterpreter.lift { case (_, _) => new BookingSideEffect() }
),
deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg](
RepositoryInterpreter.lift(ShardedVehicles(_)),
DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)),
SideEffectInterpreter.lift { case (_, _) => new VehicleSideEffect() }
)
)
.flatMap { case (bookingDeployment, vehicleDeployment) =>
HttpServer(
port,
bookingDeployment.repository,
vehicleDeployment.repository,
cluster.isMemberUp
)
}
}
}
deployRepository
is parameterized with the context F
and the various involved types: S
for entity state, E
for events, ID
for entity ID and Alg
& RepositoryAlg
for entity and repository algebras respectively (both higher-kinded type constructors).
In order to bridge Pekko/Akka’s implicit asynchronicity with the side-effect free context F
used for algebras, it requires Async from F
. This makes it possible to use the Dispatcher mechanism for running the command handling monadic chain synchronously from within the actor thread.
Logger
from log4cats
is also required as the library supports basic logging capabilities.
deployRepository
needs to be called upon application startup, before joining the cluster as the ClusterSharding
extension needs to know about the various entity types beforehand.
The equivalent method for durable entities is deployDurableRepository
.
Internals
Protocol
Thanks to the CommandProtocol instance, entity algebra calls can be “materialized” into concrete commands and replies. These types are encoded to a binary payload, which is transported within an internal protobuf envelope command.proto.
ShardingCommandSender takes care of delivering the commands to the right entity and returning the reply simply by using Pekko/Akka’s ask
.
Deployer
Internally, deployRepository uses EventSourcedBehavior DSL to configure the entity in the following way (for deployDurableRepository, this is DurableStateBehavior):
Command handler
- use CommandProtocol.server to decode the command and invoke the corresponding algebra logic, interpreted internally with
EntityT
. Interpretation of the monadic chain occurs, leading to one or more events and a return value. - hand in produced events to Pekko/Akka’s
Effect.persist
. - trigger any side effects in Effector with a run-and-forget using
Dispatcher
from withinthenRun
- encode the reply and feed it into
thenReply
Event handler
This is simply a synchronous run of EventApplier (using Dispatcher
). Left values are translated into thrown exceptions as Pekko/Akka doesn’t give us other means to deal with event handling errors.
Recovery
Upon successful recovery, we log an info entry and run the effector, while we log a warning entry upon recovery failure.
The built-in behavior is further customizable via a customizeBehavior
function parameter that can be optionally passed into PekkoDeploymentParameters or AkkaDeploymentParameters.
Since Akka/Pekko does not allow mixed versions in a project, Akka/Pekko dependencies of endless-runtime-akka
and endless-runtime-pekko
respectively are marked a Provided
. This means that your application libraryDependencies
needs to directly include Akka or Pekko as a dependency. The minimal supported Akka version is 2.6.20, and Pekko version is 1.1.1.