Pipeline Pattern¶
Padrão para fluxos complexos com múltiplas etapas sequenciais.
Quando Usar¶
- Fluxos com 3+ etapas que modificam estado compartilhado
- Lógica complexa que precisa ser testada em partes
- Operações que podem crescer (novos steps no futuro)
Não usar para: operações simples, CRUD básico, queries.
Estrutura Base¶
Localização: Ciba.Api/_Shared/Pipelines/
// IPipelineContext.cs
public interface IPipelineContext
{
List<string> Errors { get; }
bool ShouldStop { get; set; }
}
// IPipelineStep.cs
public interface IPipelineStep<TContext> where TContext : IPipelineContext
{
Task ExecuteAsync(TContext context, CancellationToken ct = default);
}
// IPipeline.cs — apenas execução, sem AddStep
public interface IPipeline<TContext> where TContext : IPipelineContext
{
Task ExecuteAsync(TContext context, CancellationToken ct = default);
}
// Pipeline.cs — AddStep está na classe concreta (retorna Pipeline<TContext>, não a interface)
public class Pipeline<TContext> : IPipeline<TContext> where TContext : IPipelineContext
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<Pipeline<TContext>> _logger;
private readonly PerformanceMonitorOptions _options;
private readonly List<Type> _stepTypes = [];
public Pipeline(
IServiceProvider serviceProvider,
ILogger<Pipeline<TContext>> logger,
PerformanceMonitorOptions options)
{
_serviceProvider = serviceProvider;
_logger = logger;
_options = options;
}
public Pipeline<TContext> AddStep<TStep>() where TStep : IPipelineStep<TContext>
{
_stepTypes.Add(typeof(TStep));
return this;
}
public async Task ExecuteAsync(TContext context, CancellationToken ct = default)
{
var pipelineName = typeof(TContext).Name.Replace("Context", "Pipeline");
foreach (var stepType in _stepTypes)
{
if (context.ShouldStop)
break;
ct.ThrowIfCancellationRequested();
var step = (IPipelineStep<TContext>)_serviceProvider.GetRequiredService(stepType);
var stepSw = Stopwatch.StartNew();
try
{
await step.ExecuteAsync(context, ct);
stepSw.Stop();
// Log com thresholds configuráveis (SlowMs → Information, WarningMs → Warning)
}
catch (OperationCanceledException) { throw; }
catch (Exception ex)
{
stepSw.Stop();
context.Errors.Add($"Step {stepType.Name} failed: {ex.Message}");
context.ShouldStop = true;
}
}
// Log total do pipeline + warning se context.Errors.Count > 0
}
}
Detalhes do ExecuteAsync:
- Verifica context.ShouldStop antes de cada step (permite interrupção limpa via context.Stop())
- Captura exceções em context.Errors e seta context.ShouldStop = true no erro
- Mede tempo de cada step e do pipeline total com Stopwatch
- Logging com thresholds configuráveis via PerformanceMonitorOptions (SlowMs, WarningMs)
- OperationCanceledException é re-lançada (não capturada como erro)
Organização de Pastas¶
Ciba.Api/
└── Features/
└── {Recurso}/
└── {Ação}/
└── Pipelines/
└── {NomePipeline}/
├── {NomePipeline}Context.cs
├── Step1.cs
├── Step2.cs
└── Step3.cs
Exemplo Prático¶
Contexto¶
public class ProcessarMensagemContext : IPipelineContext
{
// IPipelineContext (obrigatório)
public List<string> Errors { get; } = [];
public bool ShouldStop { get; set; }
// Inputs (imutáveis)
public required MensagemRecebida Mensagem { get; init; }
public required Guid TenantId { get; init; }
// Outputs (modificados pelos steps)
public Conversa? Conversa { get; set; }
public List<string> RespostasIA { get; set; } = [];
public bool MensagemEnviada { get; set; }
// Acumuladores (padrão para listas produzidas por um step e consumidas por outro)
public List<Message> SystemMessages { get; set; } = [];
public bool TemErros => Errors.Count > 0;
// Atalho para interromper o pipeline
public void Stop() => ShouldStop = true;
}
Step¶
public class ValidarConversaStep(IConversaRepository repo)
: IPipelineStep<ProcessarMensagemContext>
{
public async Task ExecuteAsync(ProcessarMensagemContext ctx, CancellationToken ct)
{
var conversa = await repo.ObterPorContatoAsync(ctx.Mensagem.ContatoId, ct);
if (conversa is null)
{
ctx.Errors.Add("Conversa não encontrada");
return;
}
ctx.Conversa = conversa;
}
}
Handler¶
// Nota: injeta Pipeline<T> (classe concreta), não IPipeline<T>,
// porque AddStep<TStep>() está na classe concreta.
public class ProcessarMensagemHandler(Pipeline<ProcessarMensagemContext> pipeline)
: IRequestHandler<ProcessarMensagemCommand, Result>
{
public async Task<Result> Handle(ProcessarMensagemCommand cmd, CancellationToken ct)
{
var ctx = new ProcessarMensagemContext
{
Mensagem = cmd.Mensagem,
TenantId = cmd.TenantId
};
await pipeline
.AddStep<ValidarConversaStep>()
.AddStep<EnriquecerContextoStep>()
.AddStep<GerarRespostaIAStep>()
.AddStep<EnviarMensagemStep>()
.ExecuteAsync(ctx, ct);
return ctx.TemErros
? Result.Failure(ctx.Errors)
: Result.Success();
}
}
Registro de DI¶
Em PipelineServiceCollectionExtensions.cs:
// Pipeline genérico (registra tanto a interface quanto a classe concreta)
services.AddTransient(typeof(IPipeline<>), typeof(Pipeline<>));
services.AddTransient(typeof(Pipeline<>));
// Steps (via Scrutor)
services.Scan(scan => scan
.FromAssemblyOf<Program>()
.AddClasses(c => c.AssignableTo(typeof(IPipelineStep<>)))
.AsSelf()
.WithTransientLifetime());
Requer:
dotnet add package Scrutor
Regras¶
- Um step = uma responsabilidade — não misture validação com envio de email
- Early return — se condição não se aplica, retorne sem fazer nada
- Context carrega tudo — inputs são
init, outputs sãoset - Steps não conhecem outros steps — comunicam-se apenas via contexto
- Testável isoladamente — cada step pode ser testado unitariamente
- Acumuladores no contexto — para dados que steps diferentes produzem e um step final consome (ex:
SystemMessagesacumuladas por vários steps e persistidas emPersistMessagesStep) - Screening e guardrails — steps de classificação de intent (pré-IA) e validação de output (pós-IA) são adicionados como steps normais no pipeline. Ver intent-classification.md