Example app
Endless example application is a small API for managing imaginary bookings for passenger trips from some origin to some destination, as well as tracking positions and speeds of vehicles. It can be found in endless-example
and can be run directly: sbt run
.
API
It has a simple CRUD API for bookings and vehicles:
sourceprivate def httpService(
bookingRepository: BookingRepositoryAlg[IO],
vehicleRepository: VehicleRepositoryAlg[IO]
): HttpApp[IO] = HttpRoutes
.of[IO] {
case req @ POST -> Root / "booking" => postBooking(bookingRepository, req)
case GET -> Root / "booking" / UUIDVar(id) => getBooking(bookingRepository, id)
case req @ PATCH -> Root / "booking" / UUIDVar(id) => patchBooking(bookingRepository, req, id)
case POST -> Root / "booking" / UUIDVar(id) / "cancel" => cancelBooking(bookingRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "speed" => getVehicleSpeed(vehicleRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "position" =>
getVehiclePosition(vehicleRepository, id)
case GET -> Root / "vehicle" / UUIDVar(id) / "recoveryCount" =>
getVehicleRecoveryCount(vehicleRepository, id)
case req @ POST -> Root / "vehicle" / UUIDVar(id) / "speed" =>
setVehicleSpeed(vehicleRepository, id, req)
case req @ POST -> Root / "vehicle" / UUIDVar(id) / "position" =>
setVehiclePosition(vehicleRepository, id, req)
}
.orNotFound
Scaffolding
The application is assembled via calls to deployEntity
(for bookings) and deployDurableEntity
(for vehicles) (see runtime for more details)
sourcedef apply(implicit actorSystem: ActorSystem[Nothing]): IO[Resource[IO, Server]] = {
implicit val clusterSharding: ClusterSharding = ClusterSharding(actorSystem)
implicit val bookingCommandProtocol: BookingCommandProtocol = new BookingCommandProtocol
implicit val vehicleCommandProtocol: VehicleCommandProtocol = new VehicleCommandProtocol
implicit val eventApplier: BookingEventApplier = new BookingEventApplier
implicit val bookingEntityNameProvider: EntityNameProvider[BookingID] = () => "booking"
implicit val vehicleEntityNameProvider: EntityNameProvider[VehicleID] = () => "vehicle"
implicit val bookingIDEncoder: EntityIDCodec[BookingID] =
EntityIDCodec(_.id.show, BookingID.fromString)
implicit val vehicleIDEncoder: EntityIDCodec[VehicleID] =
EntityIDCodec(_.id.show, VehicleID.fromString)
implicit val askTimeout: Timeout = Timeout(10.seconds)
Slf4jLogger
.create[IO]
.map { implicit logger: Logger[IO] =>
Resource
.both(
deployEntity[IO, Booking, BookingEvent, BookingID, BookingAlg, BookingRepositoryAlg](
BookingEntity(_),
BookingRepository(_),
{ case (effector, _, _) => BookingEffector(effector) }
),
deployDurableEntityF[IO, Vehicle, VehicleID, VehicleAlg, VehicleRepositoryAlg](
VehicleEntity(_).pure[IO],
VehicleRepository(_).pure[IO],
{ case (effector, _, _) => VehicleEffector.apply[IO](effector).map(_.apply) },
customizeBehavior = (_, behavior) => behavior.snapshotAdapter(new VehicleStateAdapter)
)
)
.map { case ((bookingRepository, _), (vehicleRepository, _)) =>
httpService(bookingRepository, vehicleRepository)
}
}
.map(
_.flatMap(service =>
BlazeServerBuilder[IO]
.bindHttp(8080, "localhost")
.withHttpApp(service)
.resource
)
)
}
Algebras
You might have spotted the two algebra types in the snippet above:
Repository
sourcetrait BookingRepositoryAlg[F[_]] {
def bookingFor(bookingID: BookingID): BookingAlg[F]
}
Here’s the sequence of operations happening behind the scenes when retrieving an instance of entity algebra:

Entity
sourcetrait BookingAlg[F[_]] {
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): F[BookingAlreadyExists \/ Unit]
def get: F[BookingUnknown.type \/ Booking]
def changeOrigin(newOrigin: LatLon): F[BookingUnknown.type \/ Unit]
def changeDestination(newDestination: LatLon): F[BookingUnknown.type \/ Unit]
def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): F[BookingUnknown.type \/ Unit]
def cancel: F[CancelError \/ Unit]
def notifyCapacity(isAvailable: Boolean): F[BookingUnknown.type \/ Unit]
}
Implementations
Implementation of the repository algebra is trivial using Repository
instance (injected by deployEntity
):
sourcefinal case class BookingRepository[F[_]: Monad](repository: Repository[F, BookingID, BookingAlg])
extends BookingRepositoryAlg[F] {
def bookingFor(bookingID: BookingID): BookingAlg[F] = repository.entityFor(bookingID)
}
Implementation of entity algebra is done using the Entity
typeclass instance (also injected by deployEntity
):
sourcefinal case class BookingEntity[F[_]: Logger: Clock](entity: Entity[F, Booking, BookingEvent])
extends BookingAlg[F] {
import entity._
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): F[BookingAlreadyExists \/ Unit] =
ifUnknownF(
Logger[F].info(show"Creating booking with ID $bookingID") >> write(
BookingPlaced(bookingID, time, origin, destination, passengerCount)
)
)(_ => BookingAlreadyExists(bookingID))
def get: F[BookingUnknown.type \/ Booking] = ifKnown(identity)(BookingUnknown)
def changeOrigin(newOrigin: LatLon): F[BookingUnknown.type \/ Unit] =
ifKnownF(booking =>
if (booking.origin =!= newOrigin) entity.write(OriginChanged(newOrigin)) else ().pure
)(BookingUnknown)
def changeDestination(newDestination: LatLon): F[BookingUnknown.type \/ Unit] =
ifKnownF(booking =>
if (booking.destination =!= newDestination) entity.write(DestinationChanged(newDestination))
else ().pure
)(BookingUnknown)
def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): F[BookingUnknown.type \/ Unit] = changeOrigin(newOrigin) >> changeDestination(newDestination)
def cancel: F[CancelError \/ Unit] =
ifKnownT[CancelError, Unit](booking =>
booking.status match {
case Status.Accepted | Status.Pending =>
EitherT.liftF(
(Clock[F].realTimeInstant >>= (timestamp =>
Logger[F]
.info(show"Cancelling booking with ID ${booking.id} at ${timestamp.toString}")
)) >> entity.write(BookingCancelled)
)
case Status.Cancelled => EitherT.pure(())
case Status.Rejected => EitherT.leftT[F, Unit](BookingAlg.BookingWasRejected(booking.id))
}
)(
BookingUnknown
)
def notifyCapacity(isAvailable: Boolean): F[BookingAlg.BookingUnknown.type \/ Unit] =
ifKnownF(_.status match {
case Status.Pending =>
if (isAvailable) entity.write(BookingAccepted) else entity.write(BookingRejected)
case _ => ().pure
})(
BookingUnknown
)
}
Event handling
In this simple example, events essentially set fields in the state:
sourceclass BookingEventApplier extends EventApplier[Booking, BookingEvent] {
def apply(state: Option[Booking], event: BookingEvent): String \/ Option[Booking] =
(event match {
case BookingPlaced(bookingID, time, origin, destination, passengerCount) =>
state
.toLeft(Booking(bookingID, time, origin, destination, passengerCount))
.leftMap(_ => "Booking already exists")
case OriginChanged(newOrigin) =>
state
.toRight("Attempt to change unknown booking")
.map(_.copy(origin = newOrigin))
case DestinationChanged(newDestination) =>
state
.toRight("Attempt to change unknown booking")
.map(_.copy(destination = newDestination))
case BookingAccepted =>
state
.toRight("Attempt to accept unknown booking")
.map(_.copy(status = Booking.Status.Accepted))
case BookingRejected =>
state
.toRight("Attempt to reject unknown booking")
.map(_.copy(status = Booking.Status.Rejected))
case BookingCancelled =>
state
.toRight("Attempt to cancel unknown booking")
.map(_.copy(status = Booking.Status.Cancelled))
}).map(Option(_))
}
Protocol
Command and reply encoding/decoding on client and server side is done by interpreting the entity algebra with IncomingCommand
and OutgoingCommand
contexts respectively:
sourceclass BookingCommandProtocol extends CirceCommandProtocol[BookingAlg] {
override def client: BookingAlg[OutgoingCommand[*]] =
new BookingAlg[OutgoingCommand[*]] {
def place(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): OutgoingCommand[BookingAlreadyExists \/ Unit] =
outgoingCommand[BookingCommand, BookingAlreadyExists \/ Unit](
PlaceBooking(bookingID, time, passengerCount, origin, destination)
)
// ...
sourceoverride def server[F[_]]: Decoder[IncomingCommand[F, BookingAlg]] =
CirceDecoder(io.circe.Decoder[BookingCommand].map {
case PlaceBooking(
bookingID: BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
) =>
incomingCommand[F, BookingAlreadyExists \/ Unit](
_.place(bookingID, time, passengerCount, origin, destination)
)
Here’s an illustration of the chain of interactions taking place when placing a booking, both from the client and the server side:


Side-effects
We describe availability process as well as explicit entity passivation using Effector
:
sourceobject BookingEffector {
def apply[F[_]: Logger: Monad](
effector: Effector[F, Booking, BookingAlg]
)(implicit availabilityAlg: AvailabilityAlg[F]): F[Unit] = {
import effector._
val availabilityProcess: Booking => F[Unit] = booking =>
booking.status match {
case Status.Pending =>
for {
isAvailable <- availabilityAlg.isCapacityAvailable(booking.time, booking.passengerCount)
entity <- self
_ <- entity.notifyCapacity(isAvailable)
} yield ()
case _ => ().pure
}
val handlePassivation: Booking => F[Unit] = {
_.status match {
case Status.Pending => Applicative[F].unit
case Status.Accepted => enablePassivation(passivationDelay)
case Status.Rejected => enablePassivation()
case Status.Cancelled => enablePassivation()
}
}
ifKnown(booking => Logger[F].info(show"State is now $booking")) >> ifKnown(
availabilityProcess
) >> ifKnown(handlePassivation)
}
private val passivationDelay = 1.hour
}
Testing
Unit testing for entity algebra implementation, event handling and effector is easy thanks to the parametric nature of F
:
sourceclass BookingEntitySuite
extends munit.CatsEffectSuite
with munit.ScalaCheckEffectSuite
with Generators {
implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
private val bookingAlg = BookingEntity(EntityT.instance[IO, Booking, BookingEvent])
private implicit val eventApplier: BookingEventApplier = new BookingEventApplier
test("place booking") {
forAllF { booking: Booking =>
bookingAlg
.place(
booking.id,
booking.time,
booking.passengerCount,
booking.origin,
booking.destination
)
.run(None)
.map {
case Right((events, _)) =>
assertEquals(
events,
Chain(
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
)
case Left(error) => fail(error)
}
.flatMap(_ => assertIOBoolean(logger.logged.map(_.nonEmpty)))
}
}
test("change origin and destination") {
forAllF { (booking: Booking, newOrigin: LatLon, newDestination: LatLon) =>
bookingAlg
.changeOriginAndDestination(newOrigin, newDestination)
.run(Some(booking))
.map {
case Right((events, _)) =>
assertEquals(
events,
Chain[BookingEvent](OriginChanged(newOrigin), DestinationChanged(newDestination))
)
case _ => fail("unexpected")
}
}
}
sourceclass BookingEventApplierSuite extends munit.ScalaCheckSuite with Generators {
property("booking placed when unknown") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(
None,
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
assertEquals(fold, Right(Some(booking)))
}
}
property("booking placed when known") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(
Some(booking),
BookingPlaced(
booking.id,
booking.time,
booking.origin,
booking.destination,
booking.passengerCount
)
)
assert(fold.isLeft)
}
}
property("origin changed when known") {
forAll { (booking: Booking, newOrigin: LatLon) =>
val fold = new BookingEventApplier()(Some(booking), OriginChanged(newOrigin))
assertEquals(fold.toOption.flatMap(_.map(_.origin)), Option(newOrigin))
}
}
property("origin changed when unknown") {
forAll { newOrigin: LatLon =>
val fold = new BookingEventApplier()(None, OriginChanged(newOrigin))
assert(fold.isLeft)
}
}
property("destination changed when known") {
forAll { (booking: Booking, newDestination: LatLon) =>
val fold = new BookingEventApplier()(Some(booking), DestinationChanged(newDestination))
assertEquals(fold.toOption.flatMap(_.map(_.destination)), Option(newDestination))
}
}
property("destination changed when unknown") {
forAll { newDestination: LatLon =>
val fold = new BookingEventApplier()(None, DestinationChanged(newDestination))
assert(fold.isLeft)
}
}
property("booking accepted when known") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(Some(booking), BookingAccepted)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Accepted))
}
}
property("booking accepted when unknown") {
val fold = new BookingEventApplier()(None, BookingAccepted)
assert(fold.isLeft)
}
property("booking rejected when known") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(Some(booking), BookingRejected)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Rejected))
}
}
property("booking rejected when unknown") {
val fold = new BookingEventApplier()(None, BookingRejected)
assert(fold.isLeft)
}
property("booking cancelled when known") {
forAll { booking: Booking =>
val fold = new BookingEventApplier()(Some(booking), BookingCancelled)
assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Cancelled))
}
}
property("booking cancelled when unknown") {
val fold = new BookingEventApplier()(None, BookingCancelled)
assert(fold.isLeft)
}
}
sourceclass BookingEffectorSuite
extends munit.CatsEffectSuite
with munit.ScalaCheckEffectSuite
with Generators {
implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
implicit private def availabilityAlg[F[_]: Applicative]: AvailabilityAlg[F] =
(_: Instant, _: Int) => Applicative[F].pure(true)
private val effector = BookingEffector(EffectorT.instance[IO, Booking, BookingAlg])
test("some state log") {
forAllF { booking: Booking =>
val acceptedBooking = booking.copy(status = Booking.Status.Accepted)
effector
.runA(Some(acceptedBooking), new SelfEntity {})
.flatMap(_ =>
assertIO(logger.logged.map(_.map(_.message).last), show"State is now $acceptedBooking")
)
}
}
test("some state passivate after one hour") {
forAllF { booking: Booking =>
assertIO(
effector.runS(Some(booking.copy(status = Booking.Status.Accepted)), new SelfEntity {}),
PassivationState.After(1.hour)
)
}
}
test("passivate immediately when cancelled") {
forAllF { booking: Booking =>
assertIO(
effector.runS(Some(booking.copy(status = Booking.Status.Cancelled)), new SelfEntity {}),
PassivationState.After(Duration.Zero)
)
}
}
test("notifies availability when pending and does not passivate") {
forAllF { booking: Booking =>
assertIO(
effector.runS(
Some(booking.copy(status = Booking.Status.Pending)),
new SelfEntity {
override def notifyCapacity(
isAvailable: Boolean
): IO[BookingAlg.BookingUnknown.type \/ Unit] = {
assert(isAvailable)
IO.pure(().asRight)
}
}
),
PassivationState.Unchanged
)
}
}
trait SelfEntity extends BookingAlg[IO] {
override def place(
bookingID: Booking.BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): IO[BookingAlg.BookingAlreadyExists \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def get: IO[BookingAlg.BookingUnknown.type \/ Booking] =
IO.raiseError(new RuntimeException("should not be called"))
override def changeOrigin(newOrigin: LatLon): IO[BookingAlg.BookingUnknown.type \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def changeDestination(
newDestination: LatLon
): IO[BookingAlg.BookingUnknown.type \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def changeOriginAndDestination(
newOrigin: LatLon,
newDestination: LatLon
): IO[BookingAlg.BookingUnknown.type \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def cancel: IO[BookingAlg.CancelError \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
override def notifyCapacity(isAvailable: Boolean): IO[BookingAlg.BookingUnknown.type \/ Unit] =
IO.raiseError(new RuntimeException("should not be called"))
}
}
Command protocol can be also easily be covered with synchronous round-trip tests:
sourceclass BookingCommandProtocolSuite extends munit.ScalaCheckSuite with Generators {
val protocol = new BookingCommandProtocol
test("place booking") {
forAll { (booking: Booking, reply: BookingAlg.BookingAlreadyExists \/ Unit) =>
val outgoingCommand = protocol.client.place(
booking.id,
booking.time,
booking.passengerCount,
booking.origin,
booking.destination
)
val incomingCommand = protocol.server[Id].decode(outgoingCommand.payload)
val encodedReply = incomingCommand
.runWith(new TestBookingAlg {
override def place(
bookingID: Booking.BookingID,
time: Instant,
passengerCount: Int,
origin: LatLon,
destination: LatLon
): Id[BookingAlg.BookingAlreadyExists \/ Unit] = reply
})
.map(incomingCommand.replyEncoder.encode(_))
assertEquals(outgoingCommand.replyDecoder.decode(encodedReply), reply)
}
}
Component and integration tests using akka testkit are also advisable and work as usual, see ExampleAppSuite.