Aggregate Pattern – API Architecture for general purpose

  • by
Photo by Eaters Collective on Unsplash

The Aggregate pattern in the scope of Domain-Driven Design is a cluster of domain-specific objects that can be treated as a single Unit of Work. For example, a transactional account with its transactions. The transactions are separate objects from the account however it will be beneficial to manage both in a single Unit of Work because a change in a transaction will cause the account balance to update. Because of this strong relation, an account with its transactions can be considered an aggregate.

An aggregate has a root and a transactional boundary. Any reference outside of the aggregate accesses the aggregate from its root and the execution of the aggregate is done within a single Unit of Work specific to the aggregate. This makes the root the only public member accessible outside of the aggregate and the boundary defines an area of consistency.

Rules for an aggregate:

  1. Protect business invariants
  2. Reference aggregates by identity only
  3. Eventual Consistency
  4. Design primitive aggregates

The examples we are discussing in this post are based on the Execution Pipeline and Concrete Handlers previously discussed. The execution pipeline serves as a generic root to the aggregates whereas the concrete handlers represent the domain-specific objects for the aggregate.

private void SaveTransaction()
{
    ApiPostController
        .Create<SaveAccountTransactionRequestSaveAccountTransactionResponse>()
        .SetOnSuccess((rsp=>
        {
            TransactionId = rsp.Result.Id;
            Mapper.Map(rsp.Result, _selectedAccountTransaction.Result);
            OnSaveComplete.InvokeAsync(rsp.Result);
            ToastMessage(
                new MessageDto(
                    "M1",
                    "Transaction saved",
                    MessageDtoType.Success));
        })
        .SetOnFail((rsp=> { Messages.AddRange(rsp.Messages); })
        .Post(
            Mapper
                .Map<SaveAccountTransactionRequest>
                (_selectedAccountTransaction.Result)
        );
}

private void GetTransaction(int fromAccountIdint transactionId)
{
    //Prevent reloading already selected transaction
    if (_selectedAccountTransaction.Result.FromAccountId == fromAccountId
        && _selectedAccountTransaction.Result.Id == transactionId)
        return;

    ApiPostController
        .Create<GetAccountTransactionRequestGetAccountTransactionResponse>()
        .SetOnSuccess((rsp=> { })
        .SetOnComplete((rsp=> { _selectedAccountTransaction = rsp; })
        .Post(new GetAccountTransactionRequest
        {
            Id = transactionId,
            FromAccountId = fromAccountId
        });
}

The two methods above are from a Blazor UI application utilizing our API with a class called ApiPostController. The Create method on the ApiPostController allows us to define the root aggregate we wish to execute on the API. Next, we can define callback functions for successful, failed or completed responses from the API. The last method Post accepts the request DTO and sends our request to the API and handles the API response. The API accepts the request DTO and creates an Execution Pipeline to execute and handle the requested aggregate.

Above, we have two aggregates to GET and SAVE a transaction as well as a third aggregate to RECALCULATE the account. Earlier we mentioned that a change in a transaction results in a change to the account, therefore, it can be considered a single aggregate. However, in our example, they are separate and we will look at how to reuse aggregates in a single Unit of Work creating a cluster of domain-specific aggregates. We can create an aggregate representing a cluster of domain-specific objects, or we can combine aggregates to form a cluster of aggregates in a single Unit of Work. This gives us freedom of choice, reusability, and flexibility. We are going to do this while still conforming to the rules mentioned above. It is important to notice that an AccountTransaction is the parent and contains two properties, a ToPortfolioAccountId and a FromPortfolioAccountId, referencing the accounts by Id only.

public class AccountTransaction : IEntity, IEntityBasicAudit
{
    public int Id { get; set; }

    public int FromAccountId { get; set; }
    public int ToAccountId { get; set; }
    public int AccountTransactionTypeId { get; set; }
    public AccountTransactionType AccountTransactionType { get; set; }
    public int Units { get; set; }
    public decimal BasePrice { get; set; }
    public decimal Amount { get; set; }
    public decimal Tax { get; set; }
    public decimal TaxRate { get; set; }
    public decimal Fees { get; set; }
    public decimal Net { get; set; }
    public DateTime Date { get; set; }
    public DateTime CreatedDate { get; set; }
    public string CreatedBy { get; set; }
    public DateTime ModifiedDate { get; set; }
    public string ModifiedBy { get; set; }
}

The aggregate consists of a minimum of three objects., an Authorization Validation and one or more Worker Handlers all grouped together under a folder. All handlers (domain-specific objects) in an aggregate share a private context and, are executed in an isolated boundary. The order in which the worker handlers are executed can be controlled with the IOC registration or alphabetically by default.

builder
    .RegisterType<SaveAccountTransactionWorkerHandler>()
    .AsSelf()
    .AsImplementedInterfaces();
builder
    .RegisterType<SaveAccountTransactionWorkerRecalcHandler>()
    .AsSelf()
    .AsImplementedInterfaces();

The SaveAccountTransactionWorkerHandler is a primitive domain-specific object with a single responsibility, persisting the AccountTransaction entity.

public class SaveAccountTransactionWorkerHandler : CommandHandler
    <SaveAccountTransactionRequest, AccountTransaction, SaveAccountTransacti
{
    public SaveAccountTransactionWorkerHandler(
        IMapper mapper,
        IRepository repository,
        ILifetimeScope scope,
        IWorkerContext<SaveAccountTransactionResponse> context
    ) : base(mapper, repository, scope, context)
    {
    }

    protected override void GetEntity()
    {
        Entity = Request.Id > 0
            ? Repository.GetOne<AccountTransaction>(a => a.Id == Request.Id)
            : new AccountTransaction();
    }

    protected override void ValidateEntity()
    {
        if (Request.Id > 0 && Entity.Id == 0)
            Context
                .AddMessage(
                    "E4",
                    $"Account Transaction for ID {Request.Id}");
    }

    protected override void Execute()
    {
        Mapper.Map(Request, Entity);

        Entity.Amount = Entity.BasePrice * Entity.Units;
        Entity.Tax = Entity.TaxRate * (Entity.Amount - Entity.Fees);
        Entity.Net = Entity.Amount - Entity.Fees - Entity.Tax;

        Repository.Save(Entity);
        Repository.SaveChanges();

        Context.Result = Mapper.Map<SaveAccountTransactionResponse>(Entity);
    }
}

Following the successful execution of this handler, is the execution of the SaveAccountTransactionWorkerRecalcHandler. This handler takes on the responsibility of calling the RecalculatePortfolioAccount aggregate seen in the folder structure above.

//synchronously
protected override void Execute()
{
	var recalculateResponse = _pipeline
		.Execute
		<RecalculatePortfolioAccountRequest,
			RecalculatePortfolioAccountResponse>
		(
			new RecalculatePortfolioAccountRequest()
			{
				FromAccountId = Request.FromAccountId,
				ToAccountId = Request.ToAccountId
			}
		);

	Mapper.Map(recalculateResponse.Result, Context.Result);
	Context.Messages.AddRange(recalculateResponse.Messages);
}

//asynchronously
protected void ExecuteAsync()
{
	Context
		.RaiseAggregate
		<RecalculatePortfolioAccountRequest,
			RecalculatePortfolioAccountResponse>
		(
			new RecalculatePortfolioAccountRequest()
			{
				FromAccountId = Request.FromAccountId,
				ToAccountId = Request.ToAccountId
			}
		);
}

In a single aggregate, more specifically a worker handler, we have the ability to call onto other aggregates synchronously or asynchronously. When executing another aggregate, synchronously, it will execute in the same boundary as the calling aggregate. The method ExecuteAsync demonstrates how we would call the aggregate asynchronously. The ability to call aggregates by identity only, synchronously or asynchronously, and from within other aggregates or from clients such as our UI and Event Listeners, gives us the flexibility to orchestrate the execution of isolated modules in several different ways.

The synchronous call to the RecalculatePortfolioAccount aggregate, made from the SaveAccountTransactionWorkerRecalcHandler, uses a similar execution pipeline (_pipeline.Execute<>) as mentioned before, it executes immediately and returns an instance of RecalculatePortfolioAccountResponse. The response is then mapped to the SaveAccountTransactionResponse. Because we decided to handle saving a transaction and recalculating the accounts in the same boundary and aggregate, we map the recalculation response to the save response which is then returned to the calling client. If we had decided on the asynchronous call the response will not be mapped.

The asynchronous call utilizes an event-driven architecture where a listener will listen for aggregated events and forward the request to the API in the same manner the Blazor UI utilizes the ApiPostController.

On the road-map, we are going to extend the ApiPostController with the ability to swap out API source URLs at run time for microservice communication. The Authorization Handler will be extended to be able to restrict the execution of an aggregate to internal only, meaning an aggregate can only be executed from within another aggregate and not directly from the API.

References