Pipes and Filters Pattern – API Architecture for general purpose

According to Wikipedia, in software engineering, a pipeline consists of a chain of processing elements (processes, threads, coroutines, functions, etc.), arranged so that the output of each element is the input of the next; the name is by analogy to a physical pipeline.[3]

Pipes and Filters is a powerful and robust architectural design pattern that accepts a request and controls the flow as well as the execution of any number of filters responsible for transforming the request into the desired output.

The pattern gives us simplicity in designing smaller decoupled components and a simple understanding of the input, output, and behavior of the system. Filters can be plugged and unplugged without affecting the other filters. Maintenance is isolated to the individual components and re-usability of these components is made easier. A filter can be executed as a separate task and executed in parallel with other filters.

Implementation

Register and configure the pipeline

builder.RegisterType<ExecutionPipeline>()
	.As<IExecutionPipeline>()
	.AsSelf()
	.WithParameter("startNewScope",true)
	.WithParameter("types"new List<Type>() {
			typeof(AuthenticationFilter<,>),
			typeof(AuthorizationFilter<,>),
			typeof(FluentValidationFilter<,>),
			typeof(WorkerFilter<,>),
			typeof(EventFilter<,>),
		}).InstancePerDependency();

builder.RegisterType<ExecutionPipeline>()
	.As<ISecondaryExecutionPipeline>()
	.WithParameter("startNewScope",false)
	.WithParameter("types"new List<Type>() {
			typeof(AuthorizationFilter<,>),
			typeof(FluentValidationFilter<,>),
			typeof(WorkerFilter<,>),
		}).InstancePerDependency();

Two ExecutionPipeline instances are registered with two different interfaces IExecutionPipeline and ISecondaryExecutionPipeline. Each instance is configured with filters that are executed and managed by the execution pipeline. Filters are responsible for resolving and executing concrete handlers that apply the business logic. The secondary pipeline can be injected into a handler to execute other handlers.

Using the pipeline

[Produces("application/json")]
[Route("api/[controller]")]
public class LeadController : Controller
{
	private readonly IExecutionPipeline _executionPipeline;

	public LeadController(IExecutionPipeline executionPipeline)
		=> _executionPipeline = executionPipeline;

	[HttpPost]
	[Route("[action]")]
	public IApiResponseContextDto<int> Create([FromBodyCreateLeadDto dto)
		=> _executionPipeline.Execute<CreateLeadDtoint>(dto);

	[HttpPost]
	[Route("[action]")]
	public IApiResponseContextDto<int> Update([FromBodyUpdateLeadDto dto)
		=> _executionPipeline.Execute<UpdateLeadDtoint>(dto);

	[HttpPost]
	[Route("[action]")]
	public IApiResponseContextDto<LeadDto> GetSingle([FromBodyGetLeadDto dto)
		=> _executionPipeline.Execute<GetLeadDtoLeadDto>(dto);

	[HttpPost]
	[Route("[action]")]
	public IApiResponseContextDto<List<LeadDto>> GetList([FromBodyGetLeadsDto dto)
		=> _executionPipeline.Execute<GetLeadsDtoList<LeadDto>>(dto);
}
public interface IApiResponseContextDto<TOut>
{
	TOut Result { getset; }
	List<MessageDto> Messages { getset; }
}

The IExecutionPipeline interface is injected into the controller’s constructor and the Execute method exposed on the interface is called in each controller method with different types for <TRequest, TOut >. The Execute method returns an instance of IApiResponseContextDto<TOut> containing the response DTO (TOut) and any warnings, information or error messages.

Executing the Pipeline

public IApiResponseContextDto<TOut>
    Execute<TRequestDtoTOut>(TRequestDto requestDto)
{
    var scopeId = ResolveScope(out var scope);
    var context = scope.Resolve<IExecutionPipelineContext<TOut>>();
    try
    {
        foreach (var type in _types)
        {
            ((IExecutionPipelineFilter<TRequestDtoTOut>)
                    scope.Resolve(type.MakeGenericType
                        (new[] {typeof(TRequestDto), typeof(TOut)})))
                .Participate(requestDto);

            if (context.Messages.Any(a => a.Type == MessageDtoType.Error))
                return scope.Resolve<IMapper>().Map<IApiResponseContextDto<TOut>>(context);
        }

        return scope.Resolve<IMapper>().Map<IApiResponseContextDto<TOut>>(context);
    }
    catch (Exception e)
    {
        var key = $"{DateTime.Now:MMddyyyyHHmmss}" +
                        $"_{scope.Resolve<IApplicationUserContext>().Id}";
        _logger.Fatal(key, e);
        context.AddMessage("E2", key);
        return scope.Resolve<IMapper>().Map<IApiResponseContextDto<TOut>>(context);
    }
    finally
    {
        DisposeScope(scope, scopeId);
    }
}

The Execute method starts with creating a new scope(ResolveScope) if the startNewScope property is set to true in the IOC registration above. This scope functions as a unit of work for the Execution Pipeline. If the startNewScope parameter is set to false, the previously set scope will be used. In this case, it is the scope created by the IExecutionPipeline implementation and passed to the ISecondaryExecutionPipeline, instances will be resolved from the scope created by the IExecutionPipeline. This allows us to share objects like the IExecutionPipelineContext<TOut> between a parent and its child pipelines and isolate parent pipelines from each other.

Next, the pipeline loops through the filters set in the configuration and executes each filter using the Participate method exposed on the IExecutionPipelineFilter interface. If any error messages are raised in the filter or concrete handlers the pipeline will exit and return the response. When the loop completes the pipeline returns the response.

In the case of an unhandled exception, the pipeline will catch the error. A key is generated and written to the log with the exception. The key is then added as an error message to the context and returned to the client inside the response. The key can be used to reference the error when reporting a bug.

Filters

public class WorkerFilter<TRequestDtoTOut> : ExecutionPipelineFilter<TRequestDtoTOut>
{
    public WorkerFilter(ILifetimeScope scope) : base(scope)
    {
    }

    public override IExecutionPipelineFilter<TRequestDtoTOut>
        Participate(TRequestDto requestDto)
    {
        Scope.Resolve<IEnumerable<IHandler<TRequestDtoIWorkerContext<TOut>>>>()
            .ToList()
            .ForEach(a => a.Handle(requestDto));

        return this;
    }
}


Filters are responsible for resolving concrete handlers and executing them. The constructor accepts the scope assigned in the pipeline as a parameter. It is used to resolve concrete handlers associated with the two generic types <TRequestDto, TOut> and the filter-specific interface IWorkerContext<TOut>. Each filter has its own interface which is a subset of the IExecutionPipelineContext<TOut> interface

Building on top of the foundation

public class CreateLeadDtoWorkerHandler : CommandHandler<CreateLeadDtoLeadint>
{
    public CreateLeadDtoWorkerHandler(
        IMapper mapper,
        IRepository repository,
        ILifetimeScope scope,
        IWorkerContext<int> context) : base(mapper, repository, scope, context)
    {
    }

    protected override void GetEntity()
    {
        Entity = new Lead();
    }

    protected override void ValidateEntity()
    {
        var any = Repository.GetIQueryable<LeadPersonalInformation>()
            .Any(a => a.Email.Equals(Dto.LeadPersonalInformation.Email,
                StringComparison.InvariantCultureIgnoreCase));
        if (any)
            Context.AddMessage("E1",
                $"{Dto.LeadPersonalInformation.Email} email already registed");
    }

    protected override void Execute()
    {
        Entity = Mapper.Map<Lead>(Dto);
        Repository.Save(Entity);
        Repository.SaveChanges();
        Context.Result = Entity.Id;
    }
}

The concrete handlers are built on a different layer of abstraction previously discussed(SOLID Principles applied and Developers Perspective). The example above is built on the abstract CommandHandler, this abstraction controls the flow of the concrete method implementations GetEntity, ValidateEntity and Execute in this order.

This framework allows us to build handlers like the one above in isolation and automatically plug them into the framework, we can extend it by adding handlers that handle requests like UpdateLeadDto, GetLeadDto, and GetLeadsDto or any other request. Adding additional handlers does not have an effect on the framework or handlers. The pipeline can be configured or implemented to accept any filter we wish to handle besides the ones used in this example. Both handlers and filter can be replaced without cascading effects on to the rest of your application.

References