Example app
Endless example application is a small API for managing imaginary bookings for passenger trips from some origin to some destination, as well as tracking positions and speeds of vehicles. It can be found in endless-example
and can be run directly: sbt run
.
API
It has a simple CRUD API for bookings and vehicles:
source HttpRoutes
.of[IO] {
case req @ POST -> Root / "booking" => postBooking(bookingRepository, req)
case GET -> Root / "booking" / UUIDVar(id) => getBooking(bookingRepository, id)
case req @ PATCH -> Root / "booking" / UUIDVar(id) =>
patchBooking(bookingRepository, req, id)
case POST -> Root / "booking" / UUIDVar(id) / "cancel" =>
cancelBooking(bookingRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "speed" =>
getVehicleSpeed(vehicleRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "position" =>
getVehiclePosition(vehicleRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "recoveryCount" =>
getVehicleRecoveryCount(vehicleRepository, id)
case req @ POST -> Root / "vehicle" / UUIDVar(id) / "speed" =>
setVehicleSpeed(vehicleRepository, id, req)
case req @ POST -> Root / "vehicle" / UUIDVar(id) / "position" =>
setVehiclePosition(vehicleRepository, id, req)
case GET -> Root / "health" =>
isUp.flatMap {
case true => Ok("OK")
case false => ServiceUnavailable("Cluster member is down")
}
}
.orNotFound
)
Scaffolding
The application is assembled via calls to deployEntity
(for bookings) and deployDurableEntity
(for vehicles) (see runtime for more details)
Akka and Pekko runtimes essentially have the same API, so we’ll use Pekko for the example:
sourceResource
.eval(Slf4jLogger.create[IO])
.flatMap { implicit logger: Logger[IO] =>
PekkoCluster.managedResource[IO](actorSystem).flatMap {
implicit cluster: PekkoCluster[IO] =>
Resource
.both(
deployEntity[
IO,
Booking,
BookingEvent,
BookingID,
BookingAlg,
BookingRepositoryAlg
](
BookingEntity(_),
BookingRepository(_),
{ case (effector, _, _) => BookingEffector(effector) },
customizeBehavior = (_, behavior) =>
behavior.eventAdapter(
new EventAdapter[
BookingEvent,
endless.example.proto.booking.events.BookingEvent
] {
def toJournal(e: BookingEvent)
: endless.example.proto.booking.events.BookingEvent =
eventAdapter.toJournal(e)
def manifest(event: BookingEvent): String = event.getClass.getName
def fromJournal(
p: endless.example.proto.booking.events.BookingEvent,
manifest: String
): EventSeq[BookingEvent] = EventSeq.single(eventAdapter.fromJournal(p))
}
)
),
deployDurableEntityF[IO, Vehicle, VehicleID, VehicleAlg, VehicleRepositoryAlg](
VehicleEntity(_).pure[IO],
VehicleRepository(_).pure[IO],
{ case (effector, _, _) => VehicleEffector.apply[IO](effector).map(_.apply) },
customizeBehavior = (_, behavior) =>
behavior.snapshotAdapter(new SnapshotAdapter[Option[Vehicle]] {
def toJournal(state: Option[Vehicle]): Any = stateAdapter.toJournal(state)
def fromJournal(from: Any): Option[Vehicle] = stateAdapter.fromJournal(from)
})
)
)
.flatMap { case ((bookingRepository, _), (vehicleRepository, _)) =>
HttpServer(port, bookingRepository, vehicleRepository, cluster.isMemberUp)
}
}
}
Algebras
You might have spotted the two algebra types in the snippet above:
Repository
sourcetrait BookingRepositoryAlg[F[_]] {
def bookingFor(bookingID: BookingID): BookingAlg[F]
}
Here’s the sequence of operations happening behind the scenes when retrieving an instance of entity algebra:

Entity
sourcetrait BookingAlg[F[_]] {
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): F[BookingAlreadyExists \/ Unit]
def get: F[BookingUnknown.type \/ Booking]
def changeOrigin(newOrigin: LatLon): F[BookingUnknown.type \/ Unit]
def changeDestination(newDestination: LatLon): F[BookingUnknown.type \/ Unit]
def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): F[BookingUnknown.type \/ Unit]
def cancel: F[CancelError \/ Unit]
def notifyCapacity(isAvailable: Boolean): F[BookingUnknown.type \/ Unit]
}
Implementations
Implementation of the repository algebra is trivial using Repository
instance (injected by deployEntity
):
sourcefinal case class BookingRepository[F[_]: Monad](repository: Repository[F, BookingID, BookingAlg])
extends BookingRepositoryAlg[F] {
def bookingFor(bookingID: BookingID): BookingAlg[F] = repository.entityFor(bookingID)
}
Implementation of entity algebra is done using the Entity
typeclass instance (also injected by deployEntity
):
sourcefinal case class BookingEntity[F[_]: Logger: Clock](entity: Entity[F, Booking, BookingEvent])
extends BookingAlg[F] {
import entity._
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): F[BookingAlreadyExists \/ Unit] =
ifUnknownF(
Logger[F].info(show"Creating booking with ID $bookingID") >> write(
BookingPlaced(bookingID, time, origin, destination, passengerCount)
)
)(_ => BookingAlreadyExists(bookingID))
def get: F[BookingUnknown.type \/ Booking] = ifKnown(identity)(BookingUnknown)
def changeOrigin(newOrigin: LatLon): F[BookingUnknown.type \/ Unit] =
ifKnownF(booking =>
if (booking.origin =!= newOrigin) entity.write(OriginChanged(newOrigin)) else ().pure
)(BookingUnknown)
def changeDestination(newDestination: LatLon): F[BookingUnknown.type \/ Unit] =
ifKnownF(booking =>
if (booking.destination =!= newDestination) entity.write(DestinationChanged(newDestination))
else ().pure
)(BookingUnknown)
def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): F[BookingUnknown.type \/ Unit] = changeOrigin(newOrigin) >> changeDestination(newDestination)
def cancel: F[CancelError \/ Unit] =
ifKnownT[CancelError, Unit](booking =>
booking.status match {
case Status.Accepted | Status.Pending =>
EitherT.liftF(
(Clock[F].realTimeInstant >>= (timestamp =>
Logger[F]
.info(show"Cancelling booking with ID ${booking.id} at ${timestamp.toString}")
)) >> entity.write(BookingCancelled)
)
case Status.Cancelled => EitherT.pure(())
case Status.Rejected => EitherT.leftT[F, Unit](BookingAlg.BookingWasRejected(booking.id))
}
)(
BookingUnknown
)
def notifyCapacity(isAvailable: Boolean): F[BookingAlg.BookingUnknown.type \/ Unit] =
ifKnownF(_.status match {
case Status.Pending =>
if (isAvailable) entity.write(BookingAccepted) else entity.write(BookingRejected)
case _ => ().pure
})(
BookingUnknown
)
}
Event handling
In this simple example, events essentially set fields in the state:
sourceclass BookingEventApplier extends EventApplier[Booking, BookingEvent] {
def apply(state: Option[Booking], event: BookingEvent): String \/ Option[Booking] =
(event match {
case BookingPlaced(bookingID, time, origin, destination, passengerCount) =>
state
.toLeft(Booking(bookingID, time, origin, destination, passengerCount))
.leftMap(_ => "Booking already exists")
case OriginChanged(newOrigin) =>
state
.toRight("Attempt to change unknown booking")
.map(_.copy(origin = newOrigin))
case DestinationChanged(newDestination) =>
state
.toRight("Attempt to change unknown booking")
.map(_.copy(destination = newDestination))
case BookingAccepted =>
state
.toRight("Attempt to accept unknown booking")
.map(_.copy(status = Booking.Status.Accepted))
case BookingRejected =>
state
.toRight("Attempt to reject unknown booking")
.map(_.copy(status = Booking.Status.Rejected))
case BookingCancelled =>
state
.toRight("Attempt to cancel unknown booking")
.map(_.copy(status = Booking.Status.Cancelled))
}).map(Option(_))
}
Protocol
Command and reply encoding/decoding on client and server side is done by interpreting the entity algebra with IncomingCommand
and OutgoingCommand
contexts respectively:
sourceclass BookingCommandProtocol extends ProtobufCommandProtocol[BookingAlg] {
override def client: BookingAlg[OutgoingCommand[*]] =
new BookingAlg[OutgoingCommand[*]] {
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): OutgoingCommand[BookingAlreadyExists \/ Unit] =
outgoingCommand[BookingCommand, replies.PlaceBookingReply, BookingAlreadyExists \/ Unit](
BookingCommand.of(
Command.PlaceBookingV1(
PlaceBookingV1(
proto.BookingID(bookingID.id.toString),
Timestamp.of(time.getEpochSecond, time.getNano),
passengerCount,
proto.LatLonV1(origin.lat, origin.lon),
proto.LatLonV1(destination.lat, destination.lon)
)
)
),
{
case replies
.PlaceBookingReply(replies.PlaceBookingReply.Reply.AlreadyExists(booking), _) =>
Left(BookingAlreadyExists(BookingID(UUID.fromString(booking.bookingId.value))))
case replies.PlaceBookingReply(replies.PlaceBookingReply.Reply.Unit(_), _) => Right(())
case replies.PlaceBookingReply(replies.PlaceBookingReply.Reply.Empty, _) =>
throw new UnexpectedReplyException
}
)
// ...
sourceoverride def server[F[_]]: Decoder[IncomingCommand[F, BookingAlg]] =
ProtobufDecoder[BookingCommand].map(_.command match {
case Command.Empty => throw new UnexpectedCommandException
case Command.PlaceBookingV1(
PlaceBookingV1(bookingID, time, passengerCount, origin, destination, _)
) =>
incomingCommand[F, replies.PlaceBookingReply, BookingAlreadyExists \/ Unit](
_.place(
BookingID(UUID.fromString(bookingID.value)),
Instant.ofEpochSecond(time.seconds, time.nanos),
passengerCount,
LatLon(origin.lat, origin.lon),
LatLon(destination.lat, destination.lon)
),
{
case Left(bookingAlreadyExists) =>
replies.PlaceBookingReply(
replies.PlaceBookingReply.Reply.AlreadyExists(
replies.BookingAlreadyExistsV1(
proto.BookingID(bookingAlreadyExists.bookingID.id.toString)
)
)
)
case Right(_) =>
replies.PlaceBookingReply(replies.PlaceBookingReply.Reply.Unit(UnitReply()))
}
)
Here’s an illustration of the chain of interactions taking place when placing a booking, both from the client and the server side:


Side-effects
We describe availability process as well as explicit entity passivation using Effector
:
sourceobject BookingEffector {
def apply[F[_]: Logger: Monad](
effector: Effector[F, Booking, BookingAlg]
)(implicit availabilityAlg: AvailabilityAlg[F]): F[Unit] = {
import effector._
val availabilityProcess: Booking => F[Unit] = booking =>
booking.status match {
case Status.Pending =>
for {
isAvailable <- availabilityAlg.isCapacityAvailable(booking.time, booking.passengerCount)
entity <- self
_ <- entity.notifyCapacity(isAvailable)
} yield ()
case _ => ().pure
}
val handlePassivation: Booking => F[Unit] = {
_.status match {
case Status.Pending => Applicative[F].unit
case Status.Accepted => enablePassivation(passivationDelay)
case Status.Rejected => enablePassivation()
case Status.Cancelled => enablePassivation()
}
}
ifKnown(booking => Logger[F].info(show"State is now $booking")) >> ifKnown(
availabilityProcess
) >> ifKnown(handlePassivation)
}
private val passivationDelay = 1.hour
}
Testing
Unit testing for entity algebra implementation, event handling and effector is easy thanks to the parametric nature of F
:
sourceclass BookingEntitySuite
extends munit.CatsEffectSuite
with munit.ScalaCheckEffectSuite
with Generators {
implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
private val bookingAlg = BookingEntity(EntityT.instance[IO, Booking, BookingEvent])
private implicit val eventApplier: BookingEventApplier = new BookingEventApplier
test("place booking") {
forAllF { booking: Booking =>
bookingAlg
.place(
booking.id,
booking.time,
booking.passengerCount,
booking.origin,
booking.destination
)
.run(None)
.map {
case Right((events, _)) =>
assertEquals(
events,
Chain(
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
)
case Left(error) => fail(error)
}
.flatMap(_ => assertIOBoolean(logger.logged.map(_.nonEmpty)))
}
}
test("change origin and destination") {
forAllF { (booking: Booking, newOrigin: LatLon, newDestination: LatLon) =>
bookingAlg
.changeOriginAndDestination(newOrigin, newDestination)
.run(Some(booking))
.map {
case Right((events, _)) =>
assertEquals(
events,
Chain[BookingEvent](OriginChanged(newOrigin), DestinationChanged(newDestination))
)
case _ => fail("unexpected")
}
}
}
sourceclass BookingEventApplierSuite extends munit.ScalaCheckSuite with Generators {
property("booking placed when unknown") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(
None,
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
assertEquals(fold, Right(Some(booking)))
}
}
property("booking placed when known") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(
Some(booking),
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
assert(fold.isLeft)
}
}
property("origin changed when known") {
forAll { (booking: Booking, newOrigin: LatLon) =>
val fold = new BookingEventApplier()(Some(booking), OriginChanged(newOrigin))
assertEquals(fold.toOption.flatMap(_.map(_.origin)), Option(newOrigin))
}
}
property("origin changed when unknown") {
forAll { newOrigin: LatLon =>
val fold = new BookingEventApplier()(None, OriginChanged(newOrigin))
assert(fold.isLeft)
}
}
property("destination changed when known") {
forAll { (booking: Booking, newDestination: LatLon) =>
val fold = new BookingEventApplier()(Some(booking), DestinationChanged(newDestination))
assertEquals(fold.toOption.flatMap(_.map(_.destination)), Option(newDestination))
}
}
property("destination changed when unknown") {
forAll { newDestination: LatLon =>
val fold = new BookingEventApplier()(None, DestinationChanged(newDestination))
assert(fold.isLeft)
}
}
property("booking accepted when known") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(Some(booking), BookingAccepted)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Accepted))
}
}
property("booking accepted when unknown") {
val fold = new BookingEventApplier()(None, BookingAccepted)
assert(fold.isLeft)
}
property("booking rejected when known") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(Some(booking), BookingRejected)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Rejected))
}
}
property("booking rejected when unknown") {
val fold = new BookingEventApplier()(None, BookingRejected)
assert(fold.isLeft)
}
property("booking cancelled when known") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(Some(booking), BookingCancelled)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Cancelled))
}
}
property("booking cancelled when unknown") {
val fold = new BookingEventApplier()(None, BookingCancelled)
assert(fold.isLeft)
}
}
sourceclass BookingEffectorSuite
extends munit.CatsEffectSuite
with munit.ScalaCheckEffectSuite
with Generators {
implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
implicit private def availabilityAlg[F[_]: Applicative]: AvailabilityAlg[F] =
(_: Instant, _: Int) => Applicative[F].pure(true)
private val effector = BookingEffector(EffectorT.instance[IO, Booking, BookingAlg])
test("some state log") {
forAllF { booking: Booking =>
val acceptedBooking = booking.copy(status = Booking.Status.Accepted)
effector
.runA(Some(acceptedBooking), new SelfEntity {})
.flatMap(_ =>
assertIO(logger.logged.map(_.map(_.message).last), show"State is now $acceptedBooking")
)
}
}
test("some state passivate after one hour") {
forAllF { booking: Booking =>
assertIO(
effector.runS(Some(booking.copy(status = Booking.Status.Accepted)), new SelfEntity {}),
PassivationState.After(1.hour)
)
}
}
test("passivate immediately when cancelled") {
forAllF { booking: Booking =>
assertIO(
effector.runS(Some(booking.copy(status = Booking.Status.Cancelled)), new SelfEntity {}),
PassivationState.After(Duration.Zero)
)
}
}
test("notifies availability when pending and does not passivate") {
forAllF { booking: Booking =>
assertIO(
effector.runS(
Some(booking.copy(status = Booking.Status.Pending)),
new SelfEntity {
override def notifyCapacity(
isAvailable: Boolean
): IO[BookingAlg.BookingUnknown.type \/ Unit] = {
assert(isAvailable)
IO.pure(().asRight)
}
}
),
PassivationState.Unchanged
)
}
}
trait SelfEntity extends BookingAlg[IO] {
override def place(
bookingID: Booking.BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): IO[BookingAlg.BookingAlreadyExists \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def get: IO[BookingAlg.BookingUnknown.type \/ Booking] =
IO.raiseError(new RuntimeException("should not be called"))
override def changeOrigin(newOrigin: LatLon): IO[BookingAlg.BookingUnknown.type \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def changeDestination(
newDestination: LatLon
): IO[BookingAlg.BookingUnknown.type \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): IO[BookingAlg.BookingUnknown.type \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def cancel: IO[BookingAlg.CancelError \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def notifyCapacity(isAvailable: Boolean): IO[BookingAlg.BookingUnknown.type \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
}
}
Command protocol can be also easily be covered with synchronous round-trip tests:
sourceclass BookingCommandProtocolSuite extends munit.ScalaCheckSuite with Generators {
val protocol = new BookingCommandProtocol
test("place booking") {
forAll { (booking: Booking, reply: BookingAlg.BookingAlreadyExists \/ Unit) =>
val outgoingCommand = protocol.client.place(
booking.id,
booking.time,
booking.passengerCount,
booking.origin,
booking.destination
)
val incomingCommand = protocol.server[Id].decode(outgoingCommand.payload)
val encodedReply = incomingCommand
.runWith(new TestBookingAlg {
override def place(
bookingID: Booking.BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): Id[BookingAlg.BookingAlreadyExists \/ Unit] = reply
})
.map(incomingCommand.replyEncoder.encode(_))
assertEquals(outgoingCommand.replyDecoder.decode(encodedReply), reply)
}
}
Component and integration tests using Akka or Pekko testkit are also advisable and work as usual, see PekkoExampleAppSuite.