Example app
Endless example application is a small API for managing imaginary bookings for passenger trips from some origin to some destination, as well as tracking positions and speeds of vehicles. It can be found in endless-example
and can be run directly: sbt run
.
API
It has a simple CRUD API for bookings and vehicles:
source HttpRoutes
.of[IO] {
case req @ POST -> Root / "booking" => postBooking(bookingRepository, req)
case GET -> Root / "booking" / UUIDVar(id) => getBooking(bookingRepository, id)
case req @ PATCH -> Root / "booking" / UUIDVar(id) =>
patchBooking(bookingRepository, req, id)
case POST -> Root / "booking" / UUIDVar(id) / "cancel" =>
cancelBooking(bookingRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "speed" =>
getVehicleSpeed(vehicleRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "position" =>
getVehiclePosition(vehicleRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "recoveryCount" =>
getVehicleRecoveryCount(vehicleRepository, id)
case req @ POST -> Root / "vehicle" / UUIDVar(id) / "speed" =>
setVehicleSpeed(vehicleRepository, id, req)
case req @ POST -> Root / "vehicle" / UUIDVar(id) / "position" =>
setVehiclePosition(vehicleRepository, id, req)
case GET -> Root / "health" =>
isUp.flatMap {
case true => Ok("OK")
case false => ServiceUnavailable("Cluster member is down")
}
}
.orNotFound
)
Scaffolding
The application is assembled via calls to deployRepository
(for bookings) and deployDurableEntity
(for vehicles) (see runtime for more details)
Akka and Pekko runtimes essentially have the same API, so we’ll use Pekko for the example:
sourceResource
.eval(Slf4jLogger.create[IO])
.flatMap { implicit logger: Logger[IO] =>
PekkoCluster.managedResource[IO](actorSystem).flatMap {
implicit cluster: PekkoCluster[IO] =>
implicit val eventSourcingDeploymentParameters
: PekkoDeploymentParameters[IO, Booking, BookingEvent] =
PekkoDeploymentParameters[IO, Booking, BookingEvent](
customizeBehavior = (_, behavior) =>
behavior.eventAdapter(
new EventAdapter[
BookingEvent,
endless.example.proto.booking.events.BookingEvent
] {
def toJournal(
e: BookingEvent
): endless.example.proto.booking.events.BookingEvent =
eventAdapter.toJournal(e)
def manifest(event: BookingEvent): String = event.getClass.getName
def fromJournal(
p: endless.example.proto.booking.events.BookingEvent,
manifest: String
): EventSeq[BookingEvent] = EventSeq.single(eventAdapter.fromJournal(p))
}
)
)
implicit val durableDeploymentParameters
: PekkoDurableDeploymentParameters[IO, Vehicle] =
PekkoDurableDeploymentParameters[IO, Vehicle](
customizeBehavior = (_, behavior) =>
behavior.snapshotAdapter(new SnapshotAdapter[Option[Vehicle]] {
def toJournal(state: Option[Vehicle]): Any = stateAdapter.toJournal(state)
def fromJournal(from: Any): Option[Vehicle] = stateAdapter.fromJournal(from)
})
)
Resource
.both(
deployRepository[
IO,
BookingID,
Booking,
BookingEvent,
BookingAlg,
BookingsAlg
](
RepositoryInterpreter.lift(ShardedBookings(_)),
BehaviorInterpreter.lift(BookingEntityBehavior(_)),
SideEffectInterpreter.lift { case (_, _) => new BookingSideEffect() }
),
deployDurableRepository[IO, VehicleID, Vehicle, VehicleAlg, VehiclesAlg](
RepositoryInterpreter.lift(ShardedVehicles(_)),
DurableBehaviorInterpreter.lift(VehicleEntityBehavior(_)),
SideEffectInterpreter.lift { case (_, _) => new VehicleSideEffect() }
)
)
.flatMap { case (bookingDeployment, vehicleDeployment) =>
HttpServer(
port,
bookingDeployment.repository,
vehicleDeployment.repository,
cluster.isMemberUp
)
}
}
}
Algebras
You might have spotted the two algebra types in the snippet above:
Repository
sourcetrait BookingsAlg[F[_]] {
def bookingFor(bookingID: BookingID): BookingAlg[F]
}
Here’s the sequence of operations happening behind the scenes when retrieving an instance of entity algebra:
Entity
sourcetrait BookingAlg[F[_]] {
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): F[BookingAlreadyExists \/ Unit]
def get: F[BookingUnknown.type \/ Booking]
def changeOrigin(newOrigin: LatLon): F[BookingUnknown.type \/ Unit]
def changeDestination(newDestination: LatLon): F[BookingUnknown.type \/ Unit]
def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): F[BookingUnknown.type \/ Unit]
def cancel: F[CancelError \/ Unit]
def notifyCapacity(isAvailable: Boolean): F[BookingUnknown.type \/ Unit]
}
Implementations
Implementation of the repository algebra is trivial using Sharding
instance (injected by deployRepository
):
sourcefinal case class ShardedBookings[F[_]: Monad](sharding: Sharding[F, BookingID, BookingAlg])
extends BookingsAlg[F] {
def bookingFor(bookingID: BookingID): BookingAlg[F] = sharding.entityFor(bookingID)
}
Implementation of behavior algebra is done using the Entity
typeclass instance (also injected by deployRepository
):
sourcefinal case class BookingEntityBehavior[F[_]: Logger: Clock](
entity: Entity[F, Booking, BookingEvent]
) extends BookingAlg[F] {
import entity.*
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): F[BookingAlreadyExists \/ Unit] =
ifUnknownF(
Logger[F].info(show"Creating booking with ID $bookingID") >> write(
BookingPlaced(bookingID, time, origin, destination, passengerCount)
)
)(_ => BookingAlreadyExists(bookingID))
def get: F[BookingUnknown.type \/ Booking] = ifKnown(identity)(BookingUnknown)
def changeOrigin(newOrigin: LatLon): F[BookingUnknown.type \/ Unit] =
ifKnownF(booking =>
if (booking.origin =!= newOrigin) entity.write(OriginChanged(newOrigin)) else ().pure
)(BookingUnknown)
def changeDestination(newDestination: LatLon): F[BookingUnknown.type \/ Unit] =
ifKnownF(booking =>
if (booking.destination =!= newDestination) entity.write(DestinationChanged(newDestination))
else ().pure
)(BookingUnknown)
def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): F[BookingUnknown.type \/ Unit] = changeOrigin(newOrigin) >> changeDestination(newDestination)
def cancel: F[CancelError \/ Unit] =
ifKnownT[CancelError, Unit](booking =>
booking.status match {
case Status.Accepted | Status.Pending =>
EitherT.liftF(
(Clock[F].realTimeInstant >>= (timestamp =>
Logger[F]
.info(show"Cancelling booking with ID ${booking.id} at ${timestamp.toString}")
)) >> entity.write(BookingCancelled)
)
case Status.Cancelled => EitherT.pure(())
case Status.Rejected => EitherT.leftT[F, Unit](BookingAlg.BookingWasRejected(booking.id))
}
)(
BookingUnknown
)
def notifyCapacity(isAvailable: Boolean): F[BookingAlg.BookingUnknown.type \/ Unit] =
ifKnownF(_.status match {
case Status.Pending =>
if (isAvailable) entity.write(BookingAccepted) else entity.write(BookingRejected)
case _ => ().pure
})(
BookingUnknown
)
}
Event handling
In this simple example, events essentially set fields in the state:
sourceclass BookingEventApplier extends EventApplier[Booking, BookingEvent] {
def apply(state: Option[Booking], event: BookingEvent): String \/ Option[Booking] =
(event match {
case BookingPlaced(bookingID, time, origin, destination, passengerCount) =>
state
.toLeft(Booking(bookingID, time, origin, destination, passengerCount))
.leftMap(_ => "Booking already exists")
case OriginChanged(newOrigin) =>
state
.toRight("Attempt to change unknown booking")
.map(_.copy(origin = newOrigin))
case DestinationChanged(newDestination) =>
state
.toRight("Attempt to change unknown booking")
.map(_.copy(destination = newDestination))
case BookingAccepted =>
state
.toRight("Attempt to accept unknown booking")
.map(_.copy(status = Booking.Status.Accepted))
case BookingRejected =>
state
.toRight("Attempt to reject unknown booking")
.map(_.copy(status = Booking.Status.Rejected))
case BookingCancelled =>
state
.toRight("Attempt to cancel unknown booking")
.map(_.copy(status = Booking.Status.Cancelled))
}).map(Option(_))
}
Protocol
Command and reply encoding/decoding on client and server side is implemented with a subclass of CommandProtocol
, in this case ProtobufCommandProtocol
as we’ll be using protobuf. On the client side, we translate invocations into instances of OutgoingCommand
and make use of the sendCommand
helper function:
sourceclass BookingCommandProtocol extends ProtobufCommandProtocol[BookingID, BookingAlg] {
override def clientFor[F[_]](
id: BookingID
)(implicit sender: CommandSender[F, BookingID]): BookingAlg[F] =
new BookingAlg[F] {
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): F[BookingAlreadyExists \/ Unit] =
sendCommand[F, BookingCommand, replies.PlaceBookingReply, BookingAlreadyExists \/ Unit](
id,
BookingCommand.of(
Command.PlaceBookingV1(
PlaceBookingV1(
proto.BookingID(bookingID.show),
Timestamp.of(time.getEpochSecond, time.getNano),
passengerCount,
proto.LatLonV1(origin.lat, origin.lon),
proto.LatLonV1(destination.lat, destination.lon)
)
)
),
{
case replies
.PlaceBookingReply(replies.PlaceBookingReply.Reply.AlreadyExists(booking), _) =>
Left(BookingAlreadyExists(BookingID(UUID.fromString(booking.bookingId.value))))
case replies.PlaceBookingReply(replies.PlaceBookingReply.Reply.Unit(_), _) => Right(())
case replies.PlaceBookingReply(replies.PlaceBookingReply.Reply.Empty, _) =>
throw new UnexpectedReplyException
}
)
// ...
On the server side, we decode instances of IncomingCommand
and make use of handleCommand
to trigger relevant entity logic and encode the reply:
sourceoverride def server[F[_]]: Decoder[IncomingCommand[F, BookingAlg]] =
ProtobufDecoder[BookingCommand].map(_.command match {
case Command.Empty => throw new UnexpectedCommandException
case Command.PlaceBookingV1(
PlaceBookingV1(bookingID, time, passengerCount, origin, destination, _)
) =>
handleCommand[F, replies.PlaceBookingReply, BookingAlreadyExists \/ Unit](
_.place(
BookingID(UUID.fromString(bookingID.value)),
Instant.ofEpochSecond(time.seconds, time.nanos),
passengerCount,
LatLon(origin.lat, origin.lon),
LatLon(destination.lat, destination.lon)
),
{
case Left(bookingAlreadyExists) =>
replies.PlaceBookingReply(
replies.PlaceBookingReply.Reply.AlreadyExists(
replies.BookingAlreadyExistsV1(
proto.BookingID(bookingAlreadyExists.bookingID.show)
)
)
)
case Right(_) =>
replies.PlaceBookingReply(replies.PlaceBookingReply.Reply.Unit(UnitReply()))
}
)
Here’s an illustration of the chain of interactions taking place when placing a booking, both from the client and the server side:
Side-effects
We describe the availability process as well as entity passivation using Effector
:
sourceclass BookingSideEffect[F[_]: Logger: Monad]()(implicit
availabilityAlg: AvailabilityAlg[F]
) extends SideEffect[F, Booking, BookingAlg] {
def apply(trigger: Trigger, effector: Effector[F, Booking, BookingAlg]): F[Unit] = {
import effector.*
val availabilityProcess: Booking => F[Unit] = booking =>
(booking.status, trigger) match {
case (Status.Pending, Trigger.AfterRecovery | Trigger.AfterPersistence) =>
(availabilityAlg.isCapacityAvailable(
booking.time,
booking.passengerCount
) >>= self.notifyCapacity).void
case _ => ().pure
}
val handlePassivation: Booking => F[Unit] = {
_.status match {
case Status.Pending => Applicative[F].unit
case Status.Accepted => enablePassivation(passivationDelay)
case Status.Rejected => enablePassivation()
case Status.Cancelled => enablePassivation()
}
}
ifKnown(booking => Logger[F].info(show"State is now $booking")) >> ifKnown(
availabilityProcess
) >> ifKnown(handlePassivation)
}
private val passivationDelay = 1.hour
}
Testing
Unit testing for entity algebra implementation, event handling and effector benefits from to the parametric nature of F
:
sourceclass BookingEntityBehaviorSuite
extends munit.CatsEffectSuite
with munit.ScalaCheckEffectSuite
with Generators {
implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
private val bookingAlg = BookingEntityBehavior(EntityT.instance[IO, Booking, BookingEvent])
private implicit val eventApplier: BookingEventApplier = new BookingEventApplier
test("place booking") {
forAllF { (booking: Booking) =>
bookingAlg
.place(
booking.id,
booking.time,
booking.passengerCount,
booking.origin,
booking.destination
)
.run(None)
.map {
case Right((events, _)) =>
assertEquals(
events,
Chain(
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
)
case Left(error) => fail(error)
}
.flatMap(_ => assertIOBoolean(logger.logged.map(_.nonEmpty)))
}
}
test("change origin and destination") {
forAllF { (booking: Booking, newOrigin: LatLon, newDestination: LatLon) =>
bookingAlg
.changeOriginAndDestination(newOrigin, newDestination)
.run(Some(booking))
.map {
case Right((events, _)) =>
assertEquals(
events,
Chain[BookingEvent](OriginChanged(newOrigin), DestinationChanged(newDestination))
)
case _ => fail("unexpected")
}
}
}
sourceclass BookingEventApplierSuite extends munit.ScalaCheckSuite with Generators {
property("booking placed when unknown") {
forAll { (booking: Booking) =>
val fold = new BookingEventApplier()(
None,
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
assertEquals(fold, Right(Some(booking)))
}
}
property("booking placed when known") {
forAll { (booking: Booking) =>
val fold = new BookingEventApplier()(
Some(booking),
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
assert(fold.isLeft)
}
}
property("origin changed when known") {
forAll { (booking: Booking, newOrigin: LatLon) =>
val fold = new BookingEventApplier()(Some(booking), OriginChanged(newOrigin))
assertEquals(fold.toOption.flatMap(_.map(_.origin)), Option(newOrigin))
}
}
property("origin changed when unknown") {
forAll { (newOrigin: LatLon) =>
val fold = new BookingEventApplier()(None, OriginChanged(newOrigin))
assert(fold.isLeft)
}
}
property("destination changed when known") {
forAll { (booking: Booking, newDestination: LatLon) =>
val fold = new BookingEventApplier()(Some(booking), DestinationChanged(newDestination))
assertEquals(fold.toOption.flatMap(_.map(_.destination)), Option(newDestination))
}
}
property("destination changed when unknown") {
forAll { (newDestination: LatLon) =>
val fold = new BookingEventApplier()(None, DestinationChanged(newDestination))
assert(fold.isLeft)
}
}
property("booking accepted when known") {
forAll { (booking: Booking) =>
val fold = new BookingEventApplier()(Some(booking), BookingAccepted)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Accepted))
}
}
test("booking accepted when unknown") {
val fold = new BookingEventApplier()(None, BookingAccepted)
assert(fold.isLeft)
}
property("booking rejected when known") {
forAll { (booking: Booking) =>
val fold = new BookingEventApplier()(Some(booking), BookingRejected)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Rejected))
}
}
test("booking rejected when unknown") {
val fold = new BookingEventApplier()(None, BookingRejected)
assert(fold.isLeft)
}
property("booking cancelled when known") {
forAll { (booking: Booking) =>
val fold = new BookingEventApplier()(Some(booking), BookingCancelled)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Cancelled))
}
}
test("booking cancelled when unknown") {
val fold = new BookingEventApplier()(None, BookingCancelled)
assert(fold.isLeft)
}
}
sourceclass BookingSideEffectSuite
extends munit.CatsEffectSuite
with munit.ScalaCheckEffectSuite
with Generators {
implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
implicit private def availabilityAlg: AvailabilityAlg[IO] = (_: Instant, _: Int) => IO(true)
test("some state generates logs after persistence") {
forAllF { (booking: Booking) =>
val acceptedBooking = booking.copy(status = Booking.Status.Accepted)
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(acceptedBooking)
)
_ <- new BookingSideEffect().apply(Trigger.AfterPersistence, effector)
_ <- assertIO(logger.logged.map(_.map(_.message).last), show"State is now $acceptedBooking")
} yield ()
}
}
test("some state passivates after one hour") {
forAllF { (booking: Booking, trigger: Trigger) =>
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(booking.copy(status = Booking.Status.Accepted))
)
_ <- new BookingSideEffect().apply(trigger, effector)
_ <- assertIO(effector.passivationState, Effector.PassivationState.After(1.hour))
} yield ()
}
}
test("passivates immediately when cancelled") {
forAllF { (booking: Booking, trigger: Trigger) =>
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {},
Some(booking.copy(status = Booking.Status.Cancelled))
)
_ <- new BookingSideEffect().apply(trigger, effector)
_ <- assertIO(effector.passivationState, PassivationState.After(Duration.Zero))
} yield ()
}
}
test("notifies availability when pending and does not passivate") {
forAllF { (booking: Booking) =>
for {
effector <- Effector.apply[IO, Booking, BookingAlg](
new SelfEntity {
override def notifyCapacity(
isAvailable: Boolean
): IO[BookingAlg.BookingUnknown.type \/ Unit] = {
assert(isAvailable)
IO.pure(().asRight)
}
},
Some(booking.copy(status = Booking.Status.Pending))
)
_ <- new BookingSideEffect().apply(Trigger.AfterPersistence, effector)
} yield ()
}
}
trait SelfEntity extends BookingAlg[IO] {
lazy val raiseError = IO.raiseError(new RuntimeException("should not be called"))
override def place(
bookingID: Booking.BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): IO[BookingAlg.BookingAlreadyExists \/ Unit] = raiseError
override def get: IO[BookingAlg.BookingUnknown.type \/ Booking] = raiseError
override def changeOrigin(newOrigin: LatLon): IO[BookingAlg.BookingUnknown.type \/ Unit] =
raiseError
override def changeDestination(
newDestination: LatLon
): IO[BookingAlg.BookingUnknown.type \/ Unit] = raiseError
override def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): IO[BookingAlg.BookingUnknown.type \/ Unit] = raiseError
override def cancel: IO[BookingAlg.CancelError \/ Unit] = raiseError
override def notifyCapacity(isAvailable: Boolean): IO[BookingAlg.BookingUnknown.type \/ Unit] =
raiseError
}
}
Command protocol can be also be covered in isolation with synchronous round-trip tests:
sourceclass BookingCommandProtocolSuite extends munit.ScalaCheckSuite with Generators {
test("place booking") {
forAll { (booking: Booking, reply: BookingAlg.BookingAlreadyExists \/ Unit) =>
implicit val sender: CommandSender[Id, Booking.BookingID] = CommandSender.local(
protocol,
new TestBookingAlg {
override def place(
bookingID: Booking.BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): Id[BookingAlg.BookingAlreadyExists \/ Unit] = reply
}
)
val actualReply = protocol
.clientFor(booking.id)
.place(
booking.id,
booking.time,
booking.passengerCount,
booking.origin,
booking.destination
)
assertEquals(actualReply, reply)
}
}
Component and integration tests using Akka or Pekko testkits are also advisable and work as usual, see PekkoExampleAppSuite.