Fluxo: Processamento de Mensagens¶
Visao Geral¶
Fluxo completo desde o recebimento de uma mensagem via webhook ate o envio da resposta. O processamento e dividido em duas fases: uma sincrona (IncomingMessagePipeline) que valida e enfileira a mensagem, e uma assincrona (AggregationPollingService + MessageAggregationPipeline) que agrega mensagens e gera a resposta IA. A fila de agregacao usa PostgreSQL (tabela pending_aggregations) para garantir ACID e coordenacao multi-instancia.
Arquitetura de Processamento¶
+---------------------------------------------------------------------------+
| FASE SINCRONA --- WebhookEndpoint (< 100ms) |
+---------------------------------------------------------------------------+
| |
| WhatsApp --> Evolution API --> POST /api/webhook/evolution |
| | |
| v |
| +---------------------+ |
| | WebhookEndpoint | |
| | - Parse JSON | |
| | - Despacha por | |
| | tipo de evento | |
| | - Retorna 200 OK | |
| +----------+----------+ |
| | |
| messages.upsert | |
| v |
| +---------------------+ |
| |IncomingMessagePipeline| |
| | (6 steps sincronos) | |
| +----------+----------+ |
| | |
| v |
| +---------------------+ |
| | pending_aggregations| |
| | (PostgreSQL) | |
| +---------------------+ |
| |
+---------------------------------------------------------------------------+
|
v
+---------------------------------------------------------------------------+
| FASE ASSINCRONA --- AggregationPollingService (BackgroundService) |
+---------------------------------------------------------------------------+
| |
| +---------------------+ |
| |AggregationPolling- | |
| |Service (poll 500ms) | |
| +----------+----------+ |
| | |
| Busca agregacoes | |
| com Status=Pending | |
| e ProcessAt <= now | |
| ou MaxWaitUntil<=now | |
| v |
| +---------------------+ |
| |MessageAggregation- | |
| |Pipeline (20 steps) | |
| +---------------------+ |
| |
+---------------------------------------------------------------------------+
IncomingMessagePipeline¶
Pipeline sincrono executado dentro do WebhookEndpoint. Opera sem autenticacao de tenant (webhooks sao anonimos). Executa apenas operacoes rapidas (leitura + INSERT).
| # | Step | Descricao |
|---|---|---|
| 1 | ValidatePayloadStep |
Valida estrutura do payload JSON. Extrai lista de mensagens (data pode ser array ou objeto). Para o pipeline se nao houver mensagens |
| 2 | CheckIdempotencyStep |
Verifica duplicidade via Redis (chave msg:processed:{instanceName}:{messageId}, TTL 24h). Se ja processada, seta IsIdempotent = true e para. Se nova, marca como "processing" |
| 3 | ExtractMessageDataStep |
Extrai dados da primeira mensagem: telefone, tipo (Text, Image, Audio, Video, Document, Sticker), conteudo, info de midia. Para se tipo desconhecido ou se e reacao |
| 4 | LoadContextStep |
Carrega Instance + Agent do banco (com cache Redis). Filtra mensagens de grupo e fromMe. Configura parametros de agregacao (delay, maxWait, cooldown) via SystemConfig |
| 5 | ResolveConversationStep |
Busca conversa ativa existente. Se nao existe: tenta reabrir conversa resolvida recentemente (dentro de ReopenWindowMinutes), verifica blacklist/whitelist, cria nova conversa. Busca foto de perfil do contato |
| 6 | EnqueueAggregationStep |
Insere ou atualiza registro em pending_aggregations. Se existe agregacao Pending, adiciona mensagem ao JSON array e reseta ProcessAt. Se nao existe, cria nova (respeitando cooldown Redis). Salva no banco |
Observacoes:
- O endpoint sempre retorna 200 OK, mesmo em caso de erro no pipeline, para evitar que a Evolution API re-envie o webhook
- Erros sao logados mas nao propagados ao caller
- Mensagens de grupo (@g.us) e mensagens enviadas pelo bot (fromMe) sao ignoradas silenciosamente
Idempotencia¶
O sistema previne processamento duplicado via Redis com TTL:
1. Extrai WhatsAppMessageId do payload
2. Monta chave: msg:processed:{instanceName}:{messageId}
(instanceName garante que a mesma mensagem em instancias
diferentes e processada independentemente)
3. Se chave JA existe:
- Mensagem ja foi processada
- Para o pipeline (IsIdempotent = true)
4. Se chave NAO existe:
- Cria chave com valor "processing" e TTL de 24 horas
- Continua processamento
Cenarios cobertos: - Evolution API envia o mesmo webhook multiplas vezes - Mensagens duplicadas em rede instavel - Mesma mensagem chegando via instancias diferentes (chave inclui instanceName)
Agregacao e Debouncing¶
Entidade PendingAggregation¶
A tabela pending_aggregations armazena mensagens pendentes de processamento:
| Campo | Tipo | Descricao |
|---|---|---|
Id |
Guid | Identificador unico |
ConversationId |
Guid | Conversa associada |
InstanceId |
Guid | Instancia WhatsApp |
AgentId |
Guid? | Agente (pode ser null) |
ContactPhone |
string | Telefone do contato |
MessagesJson |
string | Array JSON com todas as mensagens pendentes |
FirstMessageAt |
DateTime | Timestamp da primeira mensagem |
LastMessageAt |
DateTime | Timestamp da ultima mensagem |
ProcessAt |
DateTime | Quando processar (debounce). Resetado a cada nova mensagem |
MaxWaitUntil |
DateTime | Deadline absoluto. NAO resetado com novas mensagens |
Status |
AggregationStatus | Pending ou Processing |
Logica de Debouncing¶
Mensagem 1 chega (t=0s):
-> Cria PendingAggregation
-> ProcessAt = now + 5s (AggregationDelaySeconds)
-> MaxWaitUntil = now + 30s (AggregationMaxWaitSeconds)
Mensagem 2 chega (t=3s):
-> Adiciona ao MessagesJson existente
-> ProcessAt = now + 5s (resetado para t=8s)
-> MaxWaitUntil permanece t=30s (NAO resetado)
Mensagem 3 chega (t=6s):
-> Adiciona ao MessagesJson existente
-> ProcessAt = now + 5s (resetado para t=11s)
-> MaxWaitUntil permanece t=30s
AggregationPollingService processa quando:
-> ProcessAt <= now (debounce expirou) OU
-> MaxWaitUntil <= now (deadline absoluto atingido)
Cooldown¶
Apos processar uma agregacao, um cooldown Redis (CacheKeys.AggregationCooldown(conversationId)) e definido por AggregationCooldownSeconds (10s). Se uma nova mensagem chega durante o cooldown, o ProcessAt e deslocado para apos o cooldown expirar + delay normal.
O AggregationPollingService seta o cooldown imediatamente ao marcar agregacoes como "Processing" (antes de iniciar o pipeline), fechando a janela de race condition onde mensagens chegando durante o processamento (~8-10s) criavam novas agregacoes sem cooldown. O CleanupAggregationStep renova o cooldown ao final do processamento.
Configuracao¶
| Parametro | Valor | Constante |
|---|---|---|
| Delay (debounce) | 5s | SystemConfig.AggregationDelaySeconds |
| Max Wait (deadline) | 30s | SystemConfig.AggregationMaxWaitSeconds |
| Cooldown | 10s | SystemConfig.AggregationCooldownSeconds |
| Polling interval | 500ms | AppConfig.AggregationPollingIntervalMs |
| Batch size (polling) | 10 | Hardcoded em ProcessReadyAggregationsAsync |
AggregationPollingService¶
Background service que faz polling na tabela pending_aggregations a cada 500ms.
Fluxo por ciclo:
- Busca ate 10 agregacoes com
Status = PendingeProcessAt <= nowouMaxWaitUntil <= now - Marca todas como
Processing(salva no banco) para evitar race conditions - Seta cooldown Redis imediatamente para cada conversa
- Para cada agregacao, cria novo scope de DI e executa
MessageAggregationPipeline - Seta
TenantIdnoITenantProviderpara o contexto do background processing
Tratamento de erros: Se o pipeline falha com excecao, a agregacao com falha e deletada da tabela para evitar retries infinitos. O erro e logado.
Pipeline de Agregacao de Mensagens¶
O AggregationPollingService executa o pipeline de steps na seguinte ordem:
| # | Step | Descricao |
|---|---|---|
| 1 | ValidateAggregationStep |
Carrega conversa + instancia + agent + steps + attachments. Verifica modo (Manual = skip AI). Verifica horarios de atendimento (fora do horario = agenda deferred via TickerQ). Parseia mensagens do JSON |
| 2 | SendTypingIndicatorStep |
Envia "digitando..." ao WhatsApp (se ShowTyping habilitado na instancia) |
| 3 | ProcessMediaStep |
Processa midia (download, upload R2, extracao OCR/transcricao). Limita extracoes AI aos N mais recentes (configuravel por instancia via MaxMediaPerMessage, padrao 3). Respeita flags AllowImageAnalysis/AllowAudioAnalysis da instancia. Se toda midia for bloqueada por politica e nao houver texto, seta SkipAiResponse = true + mensagem canned em AiMessages |
| 4 | ConsolidateMessagesStep |
Consolida multiplas mensagens em uma unica string |
| 5 | LoadMessageHistoryStep |
Carrega historico da conversa (exclui mensagens de sistema) |
| 6 | CheckSubscriptionCreditsStep |
Calcula custo estimado (1 base + 3 por midia com extracao bem-sucedida). Verifica creditos do tenant via HasAiCreditsAsync(requiredCredits). Se esgotados: escala para manual + mensagem de sistema |
| 7 | CheckAiResponseCapStep |
Verifica cap de respostas IA por conversa. Se atingido: escala para manual + mensagem de sistema |
| 8 | GenerateAiResponseStep |
Gera resposta via LLM (executa sub-pipeline abaixo). Se falhar: auto-escala silenciosamente |
| 9 | ConsumeSubscriptionCreditStep |
Consome context.EstimatedCreditCost creditos (variavel: 1 base + 3 por midia processada). Se balance=0: bulk-switch de todas as conversas IA do tenant |
| 10 | IncrementAiResponseCountStep |
Incrementa Conversation.AiResponseCount |
| 11 | ApplySignatureStep |
Aplica assinatura na primeira ou ultima mensagem de AiMessages conforme SignaturePosition (se configurado na instancia) |
| 12 | ApplyResponseDelayStep |
Aplica delay antes do envio (se DelaySeconds > 0 na instancia) |
| 13 | StopTypingIndicatorStep |
Para o indicador de digitacao |
| 14 | SendWhatsAppResponseStep |
Envia cada mensagem de AiMessages via WhatsApp com delay de OutgoingMessageDelayMs (500ms) entre elas. Nao checa SkipAiResponse — depende de AiMessages estar vazio para pular |
| 15 | SendAttachmentsStep |
Envia anexos relevantes do agente via WhatsApp |
| 16 | PersistMessagesStep |
Cria um registro Message por item em AiMessages, vinculando cada um ao WhatsAppMessageId correspondente por indice. Persiste mensagens de sistema com +100ms offset. Vincula credit transaction e LLM usage ao primeiro Message da resposta |
| 17 | MarkMessagesAsReadStep |
Marca mensagens como lidas |
| 18 | ScheduleFollowUpStep |
Agenda follow-up automatico via TickerQ |
| 19 | NotifySignalRStep |
Notifica clientes via SignalR |
| 20 | CleanupAggregationStep |
Limpa registro de agregacao e renova cooldown |
Sub-pipeline de GenerateAiResponseStep:
O step 8 executa internamente um sub-pipeline com steps dedicados para geracao da resposta IA:
| # | Sub-step | Descricao |
|---|---|---|
| 8.1 | InitializeStepTrackingStep |
Inicializa rastreamento de etapas do agente |
| 8.2 | PrepareAiHistoryStep |
Prepara e limpa historico de mensagens para o LLM |
| 8.3 | ClassifyIntentStep |
Classifica intent via Haiku (jailbreak -> bloqueia, FAQ -> Haiku, complex -> Sonnet) |
| 8.4 | RewriteKnowledgeQueryStep |
Reescreve mensagem em query standalone via LLM (pula se jailbreak/off-topic ou se agente nao tem embeddings) |
| 8.5 | InvokeAiServiceStep |
Chama AiResponseService com query reescrita. Retorna List<string> Messages que e atribuida a context.AiMessages (pula se jailbreak/off-topic) |
| 8.6 | ApplyAiDecisionsStep |
Aplica decisoes da IA (transicao de step, escalacao, resolucao) |
| 8.7 | ValidateOutputStep |
Valida resposta via guardrails rule-based (PII, promessas) |
Observacoes:
- StopTypingIndicatorStep executa antes de SendWhatsAppResponseStep para que o indicador desapareca naturalmente
- ApplySignatureStep adiciona texto de assinatura na primeira mensagem (posicoes Start) ou ultima mensagem (posicoes End) de AiMessages
- ApplyResponseDelayStep adiciona um delay artificial (0-10s) para tornar as respostas mais naturais
- SendWhatsAppResponseStep nao checa SkipAiResponse — depende de AiMessages estar vazio. Isso permite que mensagens canned (ex: midia bloqueada) sejam enviadas mesmo quando a geracao de IA foi pulada
- Cada step pode interromper o pipeline chamando context.Stop() se necessario
- ValidateAggregationStep verifica AiAvailabilityHoursJson da instancia. Se fora do horario, agenda resposta diferida via TickerQ e define SkipAiResponse = true
- Mensagens consecutivas de mesmo role sao mescladas com \n\n ao reconstruir o historico para o LLM (ver ConvertToChatMessages em AiResponseService)
Pipeline de Resposta Diferida (Deferred AI Response)¶
Quando a instancia possui horarios de atendimento configurados (AiAvailabilityHoursJson) e a mensagem chega fora desse horario, a resposta da IA e agendada para o proximo periodo disponivel via TickerQ.
Fluxo:
ValidateAggregationStep (fora do horario)
-> IDeferredAiResponseScheduler.ScheduleAsync()
-> Calcula proximo horario permitido (timezone do tenant)
-> Cria TimeTickerEntity com funcao "SendDeferredAiResponse"
-> SkipAiResponse = true (mensagem e persistida sem resposta da IA)
Quando o job e executado (DeferredAiResponseFunction), o pipeline de steps e:
| # | Step | Descricao |
|---|---|---|
| 1 | ValidateDeferredStep |
Carrega conversa, instancia, agente e steps. Valida elegibilidade |
| 2 | CheckAvailabilityHoursStep |
Re-verifica se esta dentro do horario. Se nao, reagenda e para |
| 3 | LoadDeferredHistoryStep |
Carrega historico e identifica mensagens nao respondidas |
| 4 | CheckDeferredSubscriptionCreditsStep |
Verifica creditos de IA do tenant |
| 5 | CheckDeferredAiResponseCapStep |
Verifica cap de respostas IA por conversa |
| 6 | PrepareDeferredAiHistoryStep |
Remove assinaturas do historico para evitar copia pela IA |
| 7 | ClassifyDeferredIntentStep |
Classifica intent via Haiku (jailbreak -> bloqueia, FAQ -> Haiku) |
| 8 | InvokeDeferredAiStep |
Gera resposta via LLM. Retorna List<string> Messages atribuida a context.AiMessages (pula se jailbreak/off-topic) |
| 9 | ConsumeDeferredSubscriptionCreditStep |
Consome credito |
| 10 | IncrementDeferredAiResponseCountStep |
Incrementa contador |
| 11 | ApplyDeferredDecisionsStep |
Aplica decisoes da IA |
| 12 | ValidateDeferredOutputStep |
Valida cada mensagem de AiMessages via guardrails rule-based (PII, promessas) |
| 13 | ApplyDeferredSignatureStep |
Aplica assinatura na primeira ou ultima mensagem de AiMessages conforme SignaturePosition |
| 14 | SendDeferredResponseStep |
Envia cada mensagem de AiMessages via WhatsApp com delay de OutgoingMessageDelayMs (500ms) entre elas |
| 15 | PersistDeferredMessageStep |
Cria um registro Message por item em AiMessages, vinculando WhatsAppMessageId por indice. Registra uso de LLM |
| 16 | NotifyDeferredStep |
Notifica via SignalR |
Observacoes: - O cancelamento de jobs existentes e feito antes de agendar novos (evita duplicatas) - Se a conversa mudar para modo manual ou for resolvida antes da execucao, o job e ignorado - O timezone do tenant e usado para calcular horarios locais
Eventos do Webhook¶
| Evento | Descricao | Pipeline |
|---|---|---|
messages.upsert |
Nova mensagem | IncomingMessagePipeline |
messages.update |
Status atualizado | MessageStatusPipeline (Sent -> Delivered -> Read) |
messages.reaction |
Reacao a mensagem | MessageReactionPipeline |
connection.update |
Conexao alterada | ConnectionUpdatePipeline |
presence.update |
Presenca | Ignorado (noop) |
Status de Mensagem¶
Atualizado via webhook messages.update.
Tratamento de Erros¶
| Erro | Tratamento |
|---|---|
| Payload JSON invalido | Retorna 400 Bad Request |
| Instancia nao encontrada | Log warning, para o pipeline |
| Mensagem duplicada (idempotencia) | Para o pipeline silenciosamente |
| Contato na blacklist | Para o pipeline, log informativo |
| Tipo de mensagem desconhecido | Para o pipeline silenciosamente |
| Claude API falha | Auto-escala para manual silenciosamente (sem envio ao WhatsApp) |
| Envio WhatsApp falha | Status = Failed, log error |
| Download/processamento de midia falha | Continua com proxima midia |
| Pipeline de agregacao falha (excecao) | Agregacao e deletada do banco, erro logado |
| Erro no loop do AggregationPollingService | Log error, aguarda proximo ciclo (500ms) |
Nota: Nao ha mecanismo de retry automatico nem dead-letter queue. Agregacoes que falham sao deletadas para evitar retries infinitos. Os dados da mensagem original estao logados e podem ser recuperados manualmente se necessario.
Monitoramento¶
Logs Estruturados¶
_logger.LogInformation("Processing aggregation {AggregationId} for conversation {ConversationId} with {MessageCount} messages",
aggregation.Id, aggregation.ConversationId, messageCount);
Activity tracing via System.Diagnostics.Activity com tags ciba.conversationId e ciba.whatsAppMessageIds.
Metricas Importantes¶
- Latencia do endpoint: tempo de resposta do webhook (deve ser < 100ms, apenas INSERT/UPDATE)
- Agregacoes pendentes: registros com
Status = Pendingna tabelapending_aggregations - Erros de pipeline: logs de nivel Error no
AggregationPollingService - Idempotencia: chaves Redis com prefixo
msg:processed:(TTL 24h)
Performance¶
- Endpoint responde em < 100ms (apenas valida, insere/atualiza no banco)
- Historico de conversa limitado a 20 mensagens (
AppConfig.MaxHistoryMessages) - Batch de 10 agregacoes por ciclo de polling
- Idempotencia com TTL de 24 horas (Redis)
- Midia salva no R2, extracao limitada a 5 itens por batch
- Cooldown Redis previne processamento excessivo da mesma conversa
- Notificacoes SignalR em background
- Logs estruturados com scope para tracing