Pular para conteúdo

Pipeline Pattern

Padrão para fluxos complexos com múltiplas etapas sequenciais.

Quando Usar

  • Fluxos com 3+ etapas que modificam estado compartilhado
  • Lógica complexa que precisa ser testada em partes
  • Operações que podem crescer (novos steps no futuro)

Não usar para: operações simples, CRUD básico, queries.


Estrutura Base

Localização: Ciba.Api/_Shared/Pipelines/

// IPipelineContext.cs
public interface IPipelineContext
{
    List<string> Errors { get; }
    bool ShouldStop { get; set; }
}

// IPipelineStep.cs
public interface IPipelineStep<TContext> where TContext : IPipelineContext
{
    Task ExecuteAsync(TContext context, CancellationToken ct = default);
}

// IPipeline.cs — apenas execução, sem AddStep
public interface IPipeline<TContext> where TContext : IPipelineContext
{
    Task ExecuteAsync(TContext context, CancellationToken ct = default);
}

// Pipeline.cs — AddStep está na classe concreta (retorna Pipeline<TContext>, não a interface)
public class Pipeline<TContext> : IPipeline<TContext> where TContext : IPipelineContext
{
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<Pipeline<TContext>> _logger;
    private readonly PerformanceMonitorOptions _options;
    private readonly List<Type> _stepTypes = [];

    public Pipeline(
        IServiceProvider serviceProvider,
        ILogger<Pipeline<TContext>> logger,
        PerformanceMonitorOptions options)
    {
        _serviceProvider = serviceProvider;
        _logger = logger;
        _options = options;
    }

    public Pipeline<TContext> AddStep<TStep>() where TStep : IPipelineStep<TContext>
    {
        _stepTypes.Add(typeof(TStep));
        return this;
    }

    public async Task ExecuteAsync(TContext context, CancellationToken ct = default)
    {
        var pipelineName = typeof(TContext).Name.Replace("Context", "Pipeline");

        foreach (var stepType in _stepTypes)
        {
            if (context.ShouldStop)
                break;

            ct.ThrowIfCancellationRequested();

            var step = (IPipelineStep<TContext>)_serviceProvider.GetRequiredService(stepType);
            var stepSw = Stopwatch.StartNew();

            try
            {
                await step.ExecuteAsync(context, ct);
                stepSw.Stop();
                // Log com thresholds configuráveis (SlowMs → Information, WarningMs → Warning)
            }
            catch (OperationCanceledException) { throw; }
            catch (Exception ex)
            {
                stepSw.Stop();
                context.Errors.Add($"Step {stepType.Name} failed: {ex.Message}");
                context.ShouldStop = true;
            }
        }

        // Log total do pipeline + warning se context.Errors.Count > 0
    }
}

Detalhes do ExecuteAsync: - Verifica context.ShouldStop antes de cada step (permite interrupção limpa via context.Stop()) - Captura exceções em context.Errors e seta context.ShouldStop = true no erro - Mede tempo de cada step e do pipeline total com Stopwatch - Logging com thresholds configuráveis via PerformanceMonitorOptions (SlowMs, WarningMs) - OperationCanceledException é re-lançada (não capturada como erro)


Organização de Pastas

Ciba.Api/
└── Features/
    └── {Recurso}/
        └── {Ação}/
            └── Pipelines/
                └── {NomePipeline}/
                    ├── {NomePipeline}Context.cs
                    ├── Step1.cs
                    ├── Step2.cs
                    └── Step3.cs

Exemplo Prático

Contexto

public class ProcessarMensagemContext : IPipelineContext
{
    // IPipelineContext (obrigatório)
    public List<string> Errors { get; } = [];
    public bool ShouldStop { get; set; }

    // Inputs (imutáveis)
    public required MensagemRecebida Mensagem { get; init; }
    public required Guid TenantId { get; init; }

    // Outputs (modificados pelos steps)
    public Conversa? Conversa { get; set; }
    public List<string> RespostasIA { get; set; } = [];
    public bool MensagemEnviada { get; set; }

    // Acumuladores (padrão para listas produzidas por um step e consumidas por outro)
    public List<Message> SystemMessages { get; set; } = [];
    public bool TemErros => Errors.Count > 0;

    // Atalho para interromper o pipeline
    public void Stop() => ShouldStop = true;
}

Step

public class ValidarConversaStep(IConversaRepository repo)
    : IPipelineStep<ProcessarMensagemContext>
{
    public async Task ExecuteAsync(ProcessarMensagemContext ctx, CancellationToken ct)
    {
        var conversa = await repo.ObterPorContatoAsync(ctx.Mensagem.ContatoId, ct);

        if (conversa is null)
        {
            ctx.Errors.Add("Conversa não encontrada");
            return;
        }

        ctx.Conversa = conversa;
    }
}

Handler

// Nota: injeta Pipeline<T> (classe concreta), não IPipeline<T>,
// porque AddStep<TStep>() está na classe concreta.
public class ProcessarMensagemHandler(Pipeline<ProcessarMensagemContext> pipeline)
    : IRequestHandler<ProcessarMensagemCommand, Result>
{
    public async Task<Result> Handle(ProcessarMensagemCommand cmd, CancellationToken ct)
    {
        var ctx = new ProcessarMensagemContext
        {
            Mensagem = cmd.Mensagem,
            TenantId = cmd.TenantId
        };

        await pipeline
            .AddStep<ValidarConversaStep>()
            .AddStep<EnriquecerContextoStep>()
            .AddStep<GerarRespostaIAStep>()
            .AddStep<EnviarMensagemStep>()
            .ExecuteAsync(ctx, ct);

        return ctx.TemErros
            ? Result.Failure(ctx.Errors)
            : Result.Success();
    }
}

Registro de DI

Em PipelineServiceCollectionExtensions.cs:

// Pipeline genérico (registra tanto a interface quanto a classe concreta)
services.AddTransient(typeof(IPipeline<>), typeof(Pipeline<>));
services.AddTransient(typeof(Pipeline<>));

// Steps (via Scrutor)
services.Scan(scan => scan
    .FromAssemblyOf<Program>()
    .AddClasses(c => c.AssignableTo(typeof(IPipelineStep<>)))
    .AsSelf()
    .WithTransientLifetime());

Requer: dotnet add package Scrutor


Regras

  1. Um step = uma responsabilidade — não misture validação com envio de email
  2. Early return — se condição não se aplica, retorne sem fazer nada
  3. Context carrega tudo — inputs são init, outputs são set
  4. Steps não conhecem outros steps — comunicam-se apenas via contexto
  5. Testável isoladamente — cada step pode ser testado unitariamente
  6. Acumuladores no contexto — para dados que steps diferentes produzem e um step final consome (ex: SystemMessages acumuladas por vários steps e persistidas em PersistMessagesStep)
  7. Screening e guardrails — steps de classificação de intent (pré-IA) e validação de output (pós-IA) são adicionados como steps normais no pipeline. Ver intent-classification.md