Example app

Endless example application is a small API for managing imaginary bookings for passenger trips from some origin to some destination, as well as tracking positions and speeds of vehicles. It can be found in endless-example and can be run directly: sbt run.

API

It has a simple CRUD API for bookings and vehicles:

sourceprivate def httpService(
    bookingRepository: BookingRepositoryAlg[IO],
    vehicleRepository: VehicleRepositoryAlg[IO]
): HttpApp[IO] = HttpRoutes
  .of[IO] {
    case req @ POST -> Root / "booking"                => postBooking(bookingRepository, req)
    case GET -> Root / "booking" / UUIDVar(id)         => getBooking(bookingRepository, id)
    case req @ PATCH -> Root / "booking" / UUIDVar(id) => patchBooking(bookingRepository, req, id)
    case POST -> Root / "booking" / UUIDVar(id) / "cancel" => cancelBooking(bookingRepository, id)
    case GET -> Root / "vehicle" / UUIDVar(id) / "speed" => getVehicleSpeed(vehicleRepository, id)
    case GET -> Root / "vehicle" / UUIDVar(id) / "position" =>
      getVehiclePosition(vehicleRepository, id)
    case GET -> Root / "vehicle" / UUIDVar(id) / "recoveryCount" =>
      getVehicleRecoveryCount(vehicleRepository, id)
    case req @ POST -> Root / "vehicle" / UUIDVar(id) / "speed" =>
      setVehicleSpeed(vehicleRepository, id, req)
    case req @ POST -> Root / "vehicle" / UUIDVar(id) / "position" =>
      setVehiclePosition(vehicleRepository, id, req)
  }
  .orNotFound

Scaffolding

The application is assembled via calls to deployEntity (for bookings) and deployDurableEntity (for vehicles) (see runtime for more details)

sourcedef apply(implicit actorSystem: ActorSystem[Nothing]): IO[Resource[IO, Server]] = {
  implicit val clusterSharding: ClusterSharding = ClusterSharding(actorSystem)
  implicit val bookingCommandProtocol: BookingCommandProtocol = new BookingCommandProtocol
  implicit val vehicleCommandProtocol: VehicleCommandProtocol = new VehicleCommandProtocol
  implicit val eventApplier: BookingEventApplier = new BookingEventApplier
  implicit val bookingEntityNameProvider: EntityNameProvider[BookingID] = () => "booking"
  implicit val vehicleEntityNameProvider: EntityNameProvider[VehicleID] = () => "vehicle"
  implicit val bookingIDEncoder: EntityIDCodec[BookingID] =
    EntityIDCodec(_.id.show, BookingID.fromString)
  implicit val vehicleIDEncoder: EntityIDCodec[VehicleID] =
    EntityIDCodec(_.id.show, VehicleID.fromString)
  implicit val askTimeout: Timeout = Timeout(10.seconds)

  Slf4jLogger
    .create[IO]
    .map { implicit logger: Logger[IO] =>
      Resource
        .both(
          deployEntity[IO, Booking, BookingEvent, BookingID, BookingAlg, BookingRepositoryAlg](
            BookingEntity(_),
            BookingRepository(_),
            { case (effector, _, _) => BookingEffector(effector) }
          ),
          deployDurableEntityF[IO, Vehicle, VehicleID, VehicleAlg, VehicleRepositoryAlg](
            VehicleEntity(_).pure[IO],
            VehicleRepository(_).pure[IO],
            { case (effector, _, _) => VehicleEffector.apply[IO](effector).map(_.apply) },
            customizeBehavior = (_, behavior) => behavior.snapshotAdapter(new VehicleStateAdapter)
          )
        )
        .map { case ((bookingRepository, _), (vehicleRepository, _)) =>
          httpService(bookingRepository, vehicleRepository)
        }
    }
    .map(
      _.flatMap(service =>
        BlazeServerBuilder[IO]
          .bindHttp(8080, "localhost")
          .withHttpApp(service)
          .resource
      )
    )
}

Algebras

You might have spotted the two algebra types in the snippet above:

Repository

sourcetrait BookingRepositoryAlg[F[_]] {
  def bookingFor(bookingID: BookingID): BookingAlg[F]
}

Here’s the sequence of operations happening behind the scenes when retrieving an instance of entity algebra:

Entity

sourcetrait BookingAlg[F[_]] {
  def place(
      bookingID: BookingID,
      time: Instant,
      passengerCount: Int,
      origin: LatLon,
      destination: LatLon
  ): F[BookingAlreadyExists \/ Unit]
  def get: F[BookingUnknown.type \/ Booking]
  def changeOrigin(newOrigin: LatLon): F[BookingUnknown.type \/ Unit]
  def changeDestination(newDestination: LatLon): F[BookingUnknown.type \/ Unit]
  def changeOriginAndDestination(
      newOrigin: LatLon,
      newDestination: LatLon
  ): F[BookingUnknown.type \/ Unit]
  def cancel: F[CancelError \/ Unit]
  def notifyCapacity(isAvailable: Boolean): F[BookingUnknown.type \/ Unit]
}

Implementations

Implementation of the repository algebra is trivial using Repository instance (injected by deployEntity):

sourcefinal case class BookingRepository[F[_]: Monad](repository: Repository[F, BookingID, BookingAlg])
    extends BookingRepositoryAlg[F] {
  def bookingFor(bookingID: BookingID): BookingAlg[F] = repository.entityFor(bookingID)
}

Implementation of entity algebra is done using the Entity typeclass instance (also injected by deployEntity):

sourcefinal case class BookingEntity[F[_]: Logger: Clock](entity: Entity[F, Booking, BookingEvent])
    extends BookingAlg[F] {
  import entity._

  def place(
      bookingID: BookingID,
      time: Instant,
      passengerCount: Int,
      origin: LatLon,
      destination: LatLon
  ): F[BookingAlreadyExists \/ Unit] =
    ifUnknownF(
      Logger[F].info(show"Creating booking with ID $bookingID") >> write(
        BookingPlaced(bookingID, time, origin, destination, passengerCount)
      )
    )(_ => BookingAlreadyExists(bookingID))

  def get: F[BookingUnknown.type \/ Booking] = ifKnown(identity)(BookingUnknown)

  def changeOrigin(newOrigin: LatLon): F[BookingUnknown.type \/ Unit] =
    ifKnownF(booking =>
      if (booking.origin =!= newOrigin) entity.write(OriginChanged(newOrigin)) else ().pure
    )(BookingUnknown)

  def changeDestination(newDestination: LatLon): F[BookingUnknown.type \/ Unit] =
    ifKnownF(booking =>
      if (booking.destination =!= newDestination) entity.write(DestinationChanged(newDestination))
      else ().pure
    )(BookingUnknown)

  def changeOriginAndDestination(
      newOrigin: LatLon,
      newDestination: LatLon
  ): F[BookingUnknown.type \/ Unit] = changeOrigin(newOrigin) >> changeDestination(newDestination)

  def cancel: F[CancelError \/ Unit] =
    ifKnownT[CancelError, Unit](booking =>
      booking.status match {
        case Status.Accepted | Status.Pending =>
          EitherT.liftF(
            (Clock[F].realTimeInstant >>= (timestamp =>
              Logger[F]
                .info(show"Cancelling booking with ID ${booking.id} at ${timestamp.toString}")
            )) >> entity.write(BookingCancelled)
          )
        case Status.Cancelled => EitherT.pure(())
        case Status.Rejected  => EitherT.leftT[F, Unit](BookingAlg.BookingWasRejected(booking.id))
      }
    )(
      BookingUnknown
    )

  def notifyCapacity(isAvailable: Boolean): F[BookingAlg.BookingUnknown.type \/ Unit] =
    ifKnownF(_.status match {
      case Status.Pending =>
        if (isAvailable) entity.write(BookingAccepted) else entity.write(BookingRejected)
      case _ => ().pure
    })(
      BookingUnknown
    )
}

Event handling

In this simple example, events essentially set fields in the state:

sourceclass BookingEventApplier extends EventApplier[Booking, BookingEvent] {
  def apply(state: Option[Booking], event: BookingEvent): String \/ Option[Booking] =
    (event match {
      case BookingPlaced(bookingID, time, origin, destination, passengerCount) =>
        state
          .toLeft(Booking(bookingID, time, origin, destination, passengerCount))
          .leftMap(_ => "Booking already exists")
      case OriginChanged(newOrigin) =>
        state
          .toRight("Attempt to change unknown booking")
          .map(_.copy(origin = newOrigin))
      case DestinationChanged(newDestination) =>
        state
          .toRight("Attempt to change unknown booking")
          .map(_.copy(destination = newDestination))
      case BookingAccepted =>
        state
          .toRight("Attempt to accept unknown booking")
          .map(_.copy(status = Booking.Status.Accepted))
      case BookingRejected =>
        state
          .toRight("Attempt to reject unknown booking")
          .map(_.copy(status = Booking.Status.Rejected))
      case BookingCancelled =>
        state
          .toRight("Attempt to cancel unknown booking")
          .map(_.copy(status = Booking.Status.Cancelled))
    }).map(Option(_))
}

Protocol

Command and reply encoding/decoding on client and server side is done by interpreting the entity algebra with IncomingCommand and OutgoingCommand contexts respectively:

sourceclass BookingCommandProtocol extends CirceCommandProtocol[BookingAlg] {
  override def client: BookingAlg[OutgoingCommand[*]] =
    new BookingAlg[OutgoingCommand[*]] {
      def place(
          bookingID: BookingID,
          time: Instant,
          passengerCount: Int,
          origin: LatLon,
          destination: LatLon
      ): OutgoingCommand[BookingAlreadyExists \/ Unit] =
        outgoingCommand[BookingCommand, BookingAlreadyExists \/ Unit](
          PlaceBooking(bookingID, time, passengerCount, origin, destination)
        )

      // ...
sourceoverride def server[F[_]]: Decoder[IncomingCommand[F, BookingAlg]] =
  CirceDecoder(io.circe.Decoder[BookingCommand].map {
    case PlaceBooking(
          bookingID: BookingID,
          time: Instant,
          passengerCount: Int,
          origin: LatLon,
          destination: LatLon
        ) =>
      incomingCommand[F, BookingAlreadyExists \/ Unit](
        _.place(bookingID, time, passengerCount, origin, destination)
      )

Here’s an illustration of the chain of interactions taking place when placing a booking, both from the client and the server side:

Side-effects

We describe availability process as well as explicit entity passivation using Effector:

sourceobject BookingEffector {
  def apply[F[_]: Logger: Monad](
      effector: Effector[F, Booking, BookingAlg]
  )(implicit availabilityAlg: AvailabilityAlg[F]): F[Unit] = {
    import effector._

    val availabilityProcess: Booking => F[Unit] = booking =>
      booking.status match {
        case Status.Pending =>
          for {
            isAvailable <- availabilityAlg.isCapacityAvailable(booking.time, booking.passengerCount)
            entity <- self
            _ <- entity.notifyCapacity(isAvailable)
          } yield ()
        case _ => ().pure
      }

    val handlePassivation: Booking => F[Unit] = {
      _.status match {
        case Status.Pending   => Applicative[F].unit
        case Status.Accepted  => enablePassivation(passivationDelay)
        case Status.Rejected  => enablePassivation()
        case Status.Cancelled => enablePassivation()
      }
    }

    ifKnown(booking => Logger[F].info(show"State is now $booking")) >> ifKnown(
      availabilityProcess
    ) >> ifKnown(handlePassivation)
  }

  private val passivationDelay = 1.hour
}

Testing

Unit testing for entity algebra implementation, event handling and effector is easy thanks to the parametric nature of F:

sourceclass BookingEntitySuite
    extends munit.CatsEffectSuite
    with munit.ScalaCheckEffectSuite
    with Generators {
  implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
  private val bookingAlg = BookingEntity(EntityT.instance[IO, Booking, BookingEvent])
  private implicit val eventApplier: BookingEventApplier = new BookingEventApplier

  test("place booking") {
    forAllF { booking: Booking =>
      bookingAlg
        .place(
          booking.id,
          booking.time,
          booking.passengerCount,
          booking.origin,
          booking.destination
        )
        .run(None)
        .map {
          case Right((events, _)) =>
            assertEquals(
              events,
              Chain(
                BookingPlaced(
                  booking.id,
                  booking.time,
                  booking.origin,
                  booking.destination,
                  booking.passengerCount
                )
              )
            )
          case Left(error) => fail(error)
        }
        .flatMap(_ => assertIOBoolean(logger.logged.map(_.nonEmpty)))
    }
  }

  test("change origin and destination") {
    forAllF { (booking: Booking, newOrigin: LatLon, newDestination: LatLon) =>
      bookingAlg
        .changeOriginAndDestination(newOrigin, newDestination)
        .run(Some(booking))
        .map {
          case Right((events, _)) =>
            assertEquals(
              events,
              Chain[BookingEvent](OriginChanged(newOrigin), DestinationChanged(newDestination))
            )
          case _ => fail("unexpected")
        }
    }
  }
sourceclass BookingEventApplierSuite extends munit.ScalaCheckSuite with Generators {
  property("booking placed when unknown") {
    forAll { booking: Booking =>
      val fold = new BookingEventApplier()(
        None,
        BookingPlaced(
          booking.id,
          booking.time,
          booking.origin,
          booking.destination,
          booking.passengerCount
        )
      )
      assertEquals(fold, Right(Some(booking)))
    }
  }

  property("booking placed when known") {
    forAll { booking: Booking =>
      val fold = new BookingEventApplier()(
        Some(booking),
        BookingPlaced(
          booking.id,
          booking.time,
          booking.origin,
          booking.destination,
          booking.passengerCount
        )
      )
      assert(fold.isLeft)
    }
  }

  property("origin changed when known") {
    forAll { (booking: Booking, newOrigin: LatLon) =>
      val fold = new BookingEventApplier()(Some(booking), OriginChanged(newOrigin))
      assertEquals(fold.toOption.flatMap(_.map(_.origin)), Option(newOrigin))
    }
  }

  property("origin changed when unknown") {
    forAll { newOrigin: LatLon =>
      val fold = new BookingEventApplier()(None, OriginChanged(newOrigin))
      assert(fold.isLeft)
    }
  }

  property("destination changed when known") {
    forAll { (booking: Booking, newDestination: LatLon) =>
      val fold = new BookingEventApplier()(Some(booking), DestinationChanged(newDestination))
      assertEquals(fold.toOption.flatMap(_.map(_.destination)), Option(newDestination))
    }
  }

  property("destination changed when unknown") {
    forAll { newDestination: LatLon =>
      val fold = new BookingEventApplier()(None, DestinationChanged(newDestination))
      assert(fold.isLeft)
    }
  }

  property("booking accepted when known") {
    forAll { booking: Booking =>
      val fold = new BookingEventApplier()(Some(booking), BookingAccepted)
      assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Accepted))
    }
  }

  property("booking accepted when unknown") {
    val fold = new BookingEventApplier()(None, BookingAccepted)
    assert(fold.isLeft)

  }

  property("booking rejected when known") {
    forAll { booking: Booking =>
      val fold = new BookingEventApplier()(Some(booking), BookingRejected)
      assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Rejected))
    }
  }

  property("booking rejected when unknown") {
    val fold = new BookingEventApplier()(None, BookingRejected)
    assert(fold.isLeft)
  }

  property("booking cancelled when known") {
    forAll { booking: Booking =>
      val fold = new BookingEventApplier()(Some(booking), BookingCancelled)
      assertEquals(fold.toOption.flatMap(_.map(_.status)), Option(Booking.Status.Cancelled))
    }
  }

  property("booking cancelled when unknown") {
    val fold = new BookingEventApplier()(None, BookingCancelled)
    assert(fold.isLeft)
  }
}
sourceclass BookingEffectorSuite
    extends munit.CatsEffectSuite
    with munit.ScalaCheckEffectSuite
    with Generators {
  implicit private val logger: TestingLogger[IO] = TestingLogger.impl[IO]()
  implicit private def availabilityAlg[F[_]: Applicative]: AvailabilityAlg[F] =
    (_: Instant, _: Int) => Applicative[F].pure(true)
  private val effector = BookingEffector(EffectorT.instance[IO, Booking, BookingAlg])

  test("some state log") {
    forAllF { booking: Booking =>
      val acceptedBooking = booking.copy(status = Booking.Status.Accepted)
      effector
        .runA(Some(acceptedBooking), new SelfEntity {})
        .flatMap(_ =>
          assertIO(logger.logged.map(_.map(_.message).last), show"State is now $acceptedBooking")
        )
    }
  }

  test("some state passivate after one hour") {
    forAllF { booking: Booking =>
      assertIO(
        effector.runS(Some(booking.copy(status = Booking.Status.Accepted)), new SelfEntity {}),
        PassivationState.After(1.hour)
      )
    }
  }

  test("passivate immediately when cancelled") {
    forAllF { booking: Booking =>
      assertIO(
        effector.runS(Some(booking.copy(status = Booking.Status.Cancelled)), new SelfEntity {}),
        PassivationState.After(Duration.Zero)
      )
    }
  }

  test("notifies availability when pending and does not passivate") {
    forAllF { booking: Booking =>
      assertIO(
        effector.runS(
          Some(booking.copy(status = Booking.Status.Pending)),
          new SelfEntity {
            override def notifyCapacity(
                isAvailable: Boolean
            ): IO[BookingAlg.BookingUnknown.type \/ Unit] = {
              assert(isAvailable)
              IO.pure(().asRight)
            }
          }
        ),
        PassivationState.Unchanged
      )
    }
  }

  trait SelfEntity extends BookingAlg[IO] {
    override def place(
        bookingID: Booking.BookingID,
        time: Instant,
        passengerCount: Int,
        origin: LatLon,
        destination: LatLon
    ): IO[BookingAlg.BookingAlreadyExists \/ Unit] =
      IO.raiseError(new RuntimeException("should not be called"))

    override def get: IO[BookingAlg.BookingUnknown.type \/ Booking] =
      IO.raiseError(new RuntimeException("should not be called"))

    override def changeOrigin(newOrigin: LatLon): IO[BookingAlg.BookingUnknown.type \/ Unit] =
      IO.raiseError(new RuntimeException("should not be called"))

    override def changeDestination(
        newDestination: LatLon
    ): IO[BookingAlg.BookingUnknown.type \/ Unit] =
      IO.raiseError(new RuntimeException("should not be called"))

    override def changeOriginAndDestination(
        newOrigin: LatLon,
        newDestination: LatLon
    ): IO[BookingAlg.BookingUnknown.type \/ Unit] =
      IO.raiseError(new RuntimeException("should not be called"))

    override def cancel: IO[BookingAlg.CancelError \/ Unit] =
      IO.raiseError(new RuntimeException("should not be called"))

    override def notifyCapacity(isAvailable: Boolean): IO[BookingAlg.BookingUnknown.type \/ Unit] =
      IO.raiseError(new RuntimeException("should not be called"))
  }

}

Command protocol can be also easily be covered with synchronous round-trip tests:

sourceclass BookingCommandProtocolSuite extends munit.ScalaCheckSuite with Generators {
  val protocol = new BookingCommandProtocol

  test("place booking") {
    forAll { (booking: Booking, reply: BookingAlg.BookingAlreadyExists \/ Unit) =>
      val outgoingCommand = protocol.client.place(
        booking.id,
        booking.time,
        booking.passengerCount,
        booking.origin,
        booking.destination
      )
      val incomingCommand = protocol.server[Id].decode(outgoingCommand.payload)
      val encodedReply = incomingCommand
        .runWith(new TestBookingAlg {
          override def place(
              bookingID: Booking.BookingID,
              time: Instant,
              passengerCount: Int,
              origin: LatLon,
              destination: LatLon
          ): Id[BookingAlg.BookingAlreadyExists \/ Unit] = reply
        })
        .map(incomingCommand.replyEncoder.encode(_))
      assertEquals(outgoingCommand.replyDecoder.decode(encodedReply), reply)
    }
  }

Component and integration tests using akka testkit are also advisable and work as usual, see ExampleAppSuite.