Effector
trait StateReader[F[_], S] {
def read: F[Option[S]]
}
trait Passivator[F[_]] {
def enablePassivation(after: FiniteDuration = Duration.Zero): F[Unit]
def disablePassivation: F[Unit]
}
trait Self[F[_], Alg[_[_]]] {
def self: F[Alg[F]]
}
trait Effector[F[_], S] extends StateReader[F, S] with Passivator[F] with Self[F]
Effector
is a typeclass used to describe side effects occurring after event persistence and entity recovery.
Side-effects are typically asynchronous operations such as kafka writes, outgoing REST requests, and entity passivation (flushing out of memory). Effector
is used in a Effector => F[Unit]
function provided upon entity deployment (e.g. BookingEffector). In the provided Akka runtime, the resulting F[Unit]
is executed in run & forget mode so that command reply is not delayed by any lengthy side-effect (Self
can be used to notify success or failure of asynchronous operations back to the entity).
In the provided Akka runtime, read-only commands (commands that do not generate events) do not trigger side-effects, which corresponds to sound practice.
Defining an effector is entirely optional with the Akka runtime, pass-in (_, _) => EffectorT.unit
in deployEntity
to disable effector.
EffectorT
(the monad transformer used to interpret the effector algebra) supports cats-effect kernel typeclasses all to way down to Async
, which makes it possible to take advantage of semantics such as timeouts, concurrency, etc.
State-derived side-effects
StateReader
allows reading the updated entity state after event persistence or recovery.
Passivation
Passivator
allow fine grain control over passivation. In certain domains, entities can evolve into “dormant” states (e.g. after a BookingCancelled
event) for which it is beneficial to trigger passivation, either immediately or after a certain delay. This enables proactive optimization of cluster resources.
Self & process definition
Self
exposes the algebra of the entity within the effector context. This allows definition of asynchronous processes that involve interaction with the very same entity, typically to define entities acting as process managers (see below for more detail).
For most processes, at least once delivery guarantees are required. This can be achieved with a projection, however at the cost of some incurred latency. Actual latency depends on the database and event journal implementation used, as well as the projection throughput. One must also make sure to distribute the projection across the cluster to avoid creating a central choke point. Even so, if a projector process gets stalled for some reason, this can create a cascade effect with events pending processing building up.
An effective alternative to using a projection is to track process completion in the entity state itself. Launching asynchronous operations directly as a side-effect of an event has zero latency overhead and also the added advantage that the process launches within the node of the entity which triggered it, thus benefiting from inherent distribution.
By enabling remember-entities, we can achieve guaranteed at-least-once completion of asynchronous processes thanks to effector running right after recovery (thus withstanding node crash or shard rebalancing).
endless makes it easy to implement this pattern with Self
. Here’s the recipe, as illustrated in the example application example:
BookingPlaced
event gets persisted. At this point, entity state represents pending acceptation of the bookingBooking(..., status = Pending)
- Effector function inspects the state, and in case of
Pending
status, asks a third-party service for availability and notifies the entity of the result:
val availabilityProcess: Booking => F[Unit] = booking =>
booking.status match {
case Status.Pending =>
for {
isAvailable <- availabilityAlg.isCapacityAvailable(booking.time, booking.passengerCount)
entity <- self
_ <- entity.notifyCapacity(isAvailable)
} yield ()
case _ => ().pure
}
3.BookingAccepted
or BookingRejected
events are persisted and entity state is updated accordingly.