Example

The example app is a dummy API for managing bank accounts and transferring amounts between accounts. Accounts are implemented with sharded entities, and transfers with transactions.

The example app can be found in the example module and can be run directly with sbt run.

Stress-test

The module also contains a multi-jvm test that demonstrates the resilience of the system, by transferring amounts between 1000 accounts while restarting nodes.

API

The API has a simple CRUD for accounts and a dedicated endpoint to perform transfers between two accounts.

HttpRoutes.of[IO] {
    case GET -> Root / "account" / id / "balance" => server.balanceFor(AccountID(id))
    case POST -> Root / "account" / id            => server.open(AccountID(id))
    case POST -> Root / "account" / id / "deposit" / IntVar(amount) => server.deposit(AccountID(id), amount)
    case POST -> Root / "account" / id / "withdraw" / IntVar(amount) => server.withdraw(AccountID(id), amount)
    case POST -> Root / "account" / origin / "transfer" / "to" / destination / IntVar(amount) => server.transfer(AccountID(origin), AccountID(destination), PosAmount(amount))
  }
  .orNotFound

The HTTP server makes uses of the Accounts and Account algebras to satisfy the requests.

Algebras

Accounts

This trait represents the repository, i.e. the ability to retrieve a handle on a specific account and orchestrate transfers between accounts.

trait Accounts[F[_]] 
  def accountFor(name: AccountID): Account[F]
  def transfer(from: AccountID, to: AccountID, amount: PosAmount): F[TransferFailure \/ Unit]

Account

This trait represents the account entity, i.e. the ability to query and perform operations on a specific account.

trait Account[F[_]] 
  def open: F[AlreadyExists.type \/ Unit]
  def balance: F[Unknown.type \/ NonNegAmount]
  def deposit(amount: PosAmount): F[Unknown.type \/ PosAmount]
  def withdraw(amount: PosAmount): F[WithdrawFailure \/ NonNegAmount]

  def prepareOutgoingTransfer(id: TransferID, transfer: Transfer): F[WithdrawFailure \/ Unit]
  def prepareIncomingTransfer(id: TransferID, transfer: Transfer): F[IncomingTransferFailure \/ Unit]
  def commitTransfer(id: TransferID): F[TransferFailure \/ Unit]
  def abortTransfer(id: TransferID): F[TransferFailure \/ Unit]

The latter four methods are of particular interest as they are used by the transactional branch logic to carry out the various phases of the transfer.

Transfers

Transfers are implemented using an endless-transaction transaction coordinator.

Coordinator

A coordinator is created with TransferID as the transaction identifier type, AccountID as the branch identifier, Transfer as the query payload (the amount, origin, and destination), and TransferFailure as the error coproduct.

sourcetransactor.coordinator[TransferID, AccountID, Transfer, TransferFailure](
  "transfer",
  { accountID =>
    val account = sharding.entityFor(accountID)
    new TransferBranch(accountID, account)
  },
  Some(transferParameters.timeout)
)

The coordinator is used in the implementation of the transfer method in ShardedAccounts, i.e. the implementation of Accounts.

Transaction

The snippet below shows the logic: the code creates a transfer with two branches, one for the origin account, and the other for the destination account.

sourcedef transfer(from: AccountID, to: AccountID, amount: PosAmount): F[TransferFailure \/ Unit] =
  coordinator
    .create(TransferID.random, Transfer(from, to, amount), from, to)
    .asResource
    .use(_.pollForFinalStatus())
    .flatMap {
      case Status.Committed => ().asRight[TransferFailure].pure
      case Status.Aborted(reason) =>
        reason match {
          case AbortReason.Timeout =>
            EitherT.leftT(TransferFailure.Timeout: TransferFailure).value
          case AbortReason.Branches(reasons)    => EitherT.leftT(reasons.head).value
          case AbortReason.Client(Some(reason)) => EitherT.leftT(reason).value
          case AbortReason.Client(None) =>
            new Exception("Transaction aborted by client without justification")
              .raiseError[F, TransferFailure \/ Unit]
        }
      case Status.Failed(errors) =>
        Logger[F].error(show"Transaction failed: $errors") *> new Exception(
          "Transaction failed due to branch error"
        ).raiseError[F, TransferFailure \/ Unit]
    }

The transaction entity is sharded, so it can be running on a separate node. We therefore need to regularly check for completion so that we can respond to the API call (it’s synchronous here for the sake of simplicity).

To implement this polling operation, we use the pollForFinalStatus() built-in extension method (defined for Transaction): this method retrieves the status of the transaction at configurable intervals and semantically sleeps in between.

Branch

An account’s involvement in a transfer is described by TransferBranch, as exemplified below with the implementation of the prepare method:

sourceclass TransferBranch[F[_]: Logger](accountID: AccountID, account: Account[F])(implicit
    retryParameters: TransferParameters.BranchRetryParameters,
    temporal: Temporal[F]
) extends Branch[F, TransferID, Transfer, TransferFailure] {
  import temporal.*
  private implicit val onErrorRetryParameters: RetryParameters = retryParameters.onError

  def prepare(transferID: TransferID, transfer: Transfer): F[Branch.Vote[TransferFailure]] = {
    if (accountID === transfer.origin)
      Logger[F].debug(
        show"Preparing outgoing transfer $transferID: $transfer for account $accountID"
      ) >>
        account
          .prepareOutgoingTransfer(transferID, transfer)
          .onErrorRetryWithBackoff(
            Logger[F]
              .warn(_)(show"Error preparing outgoing transfer $transferID, retrying in a bit")
          )
          .onLeftRetryWithBackoff { case Account.PendingOutgoingTransfer =>
            Logger[F].warn(
              show"Account $accountID has a pending outgoing transfer, retrying in a bit"
            )
          }(retryParameters.onPendingTransfer)
          .flatMap {
            case Left(Account.Unknown) =>
              Branch.Vote.Abort(TransferFailure.AccountNotFound(accountID)).pure[F]
            case Left(InsufficientFunds(missing)) =>
              Branch.Vote.Abort(TransferFailure.InsufficientFunds(missing)).pure[F]
            case Left(Account.PendingOutgoingTransfer) =>
              Branch.Vote.Abort(TransferFailure.OtherPendingTransfer).pure[F]
            case Right(_) => Branch.Vote.Commit.pure[F]
          }
    else
      Logger[F].debug(show"Preparing incoming $transferID: $transfer for account $accountID") >>
        account
          .prepareIncomingTransfer(transferID, transfer)
          .onErrorRetryWithBackoff(
            Logger[F]
              .warn(_)(show"Error preparing incoming transfer $transferID, retrying in a bit")
          )
          .map {
            case Left(Account.Unknown) =>
              Branch.Vote.Abort(TransferFailure.AccountNotFound(accountID))
            case Right(_) => Branch.Vote.Commit
          }
  }