Pular para conteúdo

Fluxo: Processamento de Mensagens

Visao Geral

Fluxo completo desde o recebimento de uma mensagem via webhook ate o envio da resposta. O processamento e dividido em duas fases: uma sincrona (IncomingMessagePipeline) que valida e enfileira a mensagem, e uma assincrona (AggregationPollingService + MessageAggregationPipeline) que agrega mensagens e gera a resposta IA. A fila de agregacao usa PostgreSQL (tabela pending_aggregations) para garantir ACID e coordenacao multi-instancia.

Arquitetura de Processamento

+---------------------------------------------------------------------------+
|  FASE SINCRONA --- WebhookEndpoint (< 100ms)                              |
+---------------------------------------------------------------------------+
|                                                                           |
|  WhatsApp --> Evolution API --> POST /api/webhook/evolution                |
|                                        |                                  |
|                                        v                                  |
|                             +---------------------+                       |
|                             |  WebhookEndpoint    |                       |
|                             |  - Parse JSON       |                       |
|                             |  - Despacha por     |                       |
|                             |    tipo de evento   |                       |
|                             |  - Retorna 200 OK   |                       |
|                             +----------+----------+                       |
|                                        |                                  |
|                        messages.upsert |                                  |
|                                        v                                  |
|                             +---------------------+                       |
|                             |IncomingMessagePipeline|                      |
|                             | (6 steps sincronos) |                       |
|                             +----------+----------+                       |
|                                        |                                  |
|                                        v                                  |
|                             +---------------------+                       |
|                             | pending_aggregations|                       |
|                             |   (PostgreSQL)      |                       |
|                             +---------------------+                       |
|                                                                           |
+---------------------------------------------------------------------------+
                                        |
                                        v
+---------------------------------------------------------------------------+
|  FASE ASSINCRONA --- AggregationPollingService (BackgroundService)         |
+---------------------------------------------------------------------------+
|                                                                           |
|                             +---------------------+                       |
|                             |AggregationPolling-  |                       |
|                             |Service (poll 500ms) |                       |
|                             +----------+----------+                       |
|                                        |                                  |
|                    Busca agregacoes     |                                  |
|                    com Status=Pending   |                                  |
|                    e ProcessAt <= now   |                                  |
|                    ou MaxWaitUntil<=now |                                  |
|                                        v                                  |
|                             +---------------------+                       |
|                             |MessageAggregation-  |                       |
|                             |Pipeline (20 steps)  |                       |
|                             +---------------------+                       |
|                                                                           |
+---------------------------------------------------------------------------+

IncomingMessagePipeline

Pipeline sincrono executado dentro do WebhookEndpoint. Opera sem autenticacao de tenant (webhooks sao anonimos). Executa apenas operacoes rapidas (leitura + INSERT).

# Step Descricao
1 ValidatePayloadStep Valida estrutura do payload JSON. Extrai lista de mensagens (data pode ser array ou objeto). Para o pipeline se nao houver mensagens
2 CheckIdempotencyStep Verifica duplicidade via Redis (chave msg:processed:{instanceName}:{messageId}, TTL 24h). Se ja processada, seta IsIdempotent = true e para. Se nova, marca como "processing"
3 ExtractMessageDataStep Extrai dados da primeira mensagem: telefone, tipo (Text, Image, Audio, Video, Document, Sticker), conteudo, info de midia. Para se tipo desconhecido ou se e reacao
4 LoadContextStep Carrega Instance + Agent do banco (com cache Redis). Filtra mensagens de grupo e fromMe. Configura parametros de agregacao (delay, maxWait, cooldown) via SystemConfig
5 ResolveConversationStep Busca conversa ativa existente. Se nao existe: tenta reabrir conversa resolvida recentemente (dentro de ReopenWindowMinutes), verifica blacklist/whitelist, cria nova conversa. Busca foto de perfil do contato
6 EnqueueAggregationStep Insere ou atualiza registro em pending_aggregations. Se existe agregacao Pending, adiciona mensagem ao JSON array e reseta ProcessAt. Se nao existe, cria nova (respeitando cooldown Redis). Salva no banco

Observacoes: - O endpoint sempre retorna 200 OK, mesmo em caso de erro no pipeline, para evitar que a Evolution API re-envie o webhook - Erros sao logados mas nao propagados ao caller - Mensagens de grupo (@g.us) e mensagens enviadas pelo bot (fromMe) sao ignoradas silenciosamente


Idempotencia

O sistema previne processamento duplicado via Redis com TTL:

1. Extrai WhatsAppMessageId do payload

2. Monta chave: msg:processed:{instanceName}:{messageId}
   (instanceName garante que a mesma mensagem em instancias
    diferentes e processada independentemente)

3. Se chave JA existe:
   - Mensagem ja foi processada
   - Para o pipeline (IsIdempotent = true)

4. Se chave NAO existe:
   - Cria chave com valor "processing" e TTL de 24 horas
   - Continua processamento

Cenarios cobertos: - Evolution API envia o mesmo webhook multiplas vezes - Mensagens duplicadas em rede instavel - Mesma mensagem chegando via instancias diferentes (chave inclui instanceName)


Agregacao e Debouncing

Entidade PendingAggregation

A tabela pending_aggregations armazena mensagens pendentes de processamento:

Campo Tipo Descricao
Id Guid Identificador unico
ConversationId Guid Conversa associada
InstanceId Guid Instancia WhatsApp
AgentId Guid? Agente (pode ser null)
ContactPhone string Telefone do contato
MessagesJson string Array JSON com todas as mensagens pendentes
FirstMessageAt DateTime Timestamp da primeira mensagem
LastMessageAt DateTime Timestamp da ultima mensagem
ProcessAt DateTime Quando processar (debounce). Resetado a cada nova mensagem
MaxWaitUntil DateTime Deadline absoluto. NAO resetado com novas mensagens
Status AggregationStatus Pending ou Processing

Logica de Debouncing

Mensagem 1 chega (t=0s):
  -> Cria PendingAggregation
  -> ProcessAt = now + 5s (AggregationDelaySeconds)
  -> MaxWaitUntil = now + 30s (AggregationMaxWaitSeconds)

Mensagem 2 chega (t=3s):
  -> Adiciona ao MessagesJson existente
  -> ProcessAt = now + 5s (resetado para t=8s)
  -> MaxWaitUntil permanece t=30s (NAO resetado)

Mensagem 3 chega (t=6s):
  -> Adiciona ao MessagesJson existente
  -> ProcessAt = now + 5s (resetado para t=11s)
  -> MaxWaitUntil permanece t=30s

AggregationPollingService processa quando:
  -> ProcessAt <= now (debounce expirou) OU
  -> MaxWaitUntil <= now (deadline absoluto atingido)

Cooldown

Apos processar uma agregacao, um cooldown Redis (CacheKeys.AggregationCooldown(conversationId)) e definido por AggregationCooldownSeconds (10s). Se uma nova mensagem chega durante o cooldown, o ProcessAt e deslocado para apos o cooldown expirar + delay normal.

O AggregationPollingService seta o cooldown imediatamente ao marcar agregacoes como "Processing" (antes de iniciar o pipeline), fechando a janela de race condition onde mensagens chegando durante o processamento (~8-10s) criavam novas agregacoes sem cooldown. O CleanupAggregationStep renova o cooldown ao final do processamento.

Configuracao

Parametro Valor Constante
Delay (debounce) 5s SystemConfig.AggregationDelaySeconds
Max Wait (deadline) 30s SystemConfig.AggregationMaxWaitSeconds
Cooldown 10s SystemConfig.AggregationCooldownSeconds
Polling interval 500ms AppConfig.AggregationPollingIntervalMs
Batch size (polling) 10 Hardcoded em ProcessReadyAggregationsAsync

AggregationPollingService

Background service que faz polling na tabela pending_aggregations a cada 500ms.

Fluxo por ciclo:

  1. Busca ate 10 agregacoes com Status = Pending e ProcessAt <= now ou MaxWaitUntil <= now
  2. Marca todas como Processing (salva no banco) para evitar race conditions
  3. Seta cooldown Redis imediatamente para cada conversa
  4. Para cada agregacao, cria novo scope de DI e executa MessageAggregationPipeline
  5. Seta TenantId no ITenantProvider para o contexto do background processing

Tratamento de erros: Se o pipeline falha com excecao, a agregacao com falha e deletada da tabela para evitar retries infinitos. O erro e logado.


Pipeline de Agregacao de Mensagens

O AggregationPollingService executa o pipeline de steps na seguinte ordem:

# Step Descricao
1 ValidateAggregationStep Carrega conversa + instancia + agent + steps + attachments. Verifica modo (Manual = skip AI). Verifica horarios de atendimento (fora do horario = agenda deferred via TickerQ). Parseia mensagens do JSON
2 SendTypingIndicatorStep Envia "digitando..." ao WhatsApp (se ShowTyping habilitado na instancia)
3 ProcessMediaStep Processa midia (download, upload R2, extracao OCR/transcricao). Limita extracoes AI aos N mais recentes (configuravel por instancia via MaxMediaPerMessage, padrao 3). Respeita flags AllowImageAnalysis/AllowAudioAnalysis da instancia. Se toda midia for bloqueada por politica e nao houver texto, seta SkipAiResponse = true + mensagem canned em AiMessages
4 ConsolidateMessagesStep Consolida multiplas mensagens em uma unica string
5 LoadMessageHistoryStep Carrega historico da conversa (exclui mensagens de sistema)
6 CheckSubscriptionCreditsStep Calcula custo estimado (1 base + 3 por midia com extracao bem-sucedida). Verifica creditos do tenant via HasAiCreditsAsync(requiredCredits). Se esgotados: escala para manual + mensagem de sistema
7 CheckAiResponseCapStep Verifica cap de respostas IA por conversa. Se atingido: escala para manual + mensagem de sistema
8 GenerateAiResponseStep Gera resposta via LLM (executa sub-pipeline abaixo). Se falhar: auto-escala silenciosamente
9 ConsumeSubscriptionCreditStep Consome context.EstimatedCreditCost creditos (variavel: 1 base + 3 por midia processada). Se balance=0: bulk-switch de todas as conversas IA do tenant
10 IncrementAiResponseCountStep Incrementa Conversation.AiResponseCount
11 ApplySignatureStep Aplica assinatura na primeira ou ultima mensagem de AiMessages conforme SignaturePosition (se configurado na instancia)
12 ApplyResponseDelayStep Aplica delay antes do envio (se DelaySeconds > 0 na instancia)
13 StopTypingIndicatorStep Para o indicador de digitacao
14 SendWhatsAppResponseStep Envia cada mensagem de AiMessages via WhatsApp com delay de OutgoingMessageDelayMs (500ms) entre elas. Nao checa SkipAiResponse — depende de AiMessages estar vazio para pular
15 SendAttachmentsStep Envia anexos relevantes do agente via WhatsApp
16 PersistMessagesStep Cria um registro Message por item em AiMessages, vinculando cada um ao WhatsAppMessageId correspondente por indice. Persiste mensagens de sistema com +100ms offset. Vincula credit transaction e LLM usage ao primeiro Message da resposta
17 MarkMessagesAsReadStep Marca mensagens como lidas
18 ScheduleFollowUpStep Agenda follow-up automatico via TickerQ
19 NotifySignalRStep Notifica clientes via SignalR
20 CleanupAggregationStep Limpa registro de agregacao e renova cooldown

Sub-pipeline de GenerateAiResponseStep:

O step 8 executa internamente um sub-pipeline com steps dedicados para geracao da resposta IA:

# Sub-step Descricao
8.1 InitializeStepTrackingStep Inicializa rastreamento de etapas do agente
8.2 PrepareAiHistoryStep Prepara e limpa historico de mensagens para o LLM
8.3 ClassifyIntentStep Classifica intent via Haiku (jailbreak -> bloqueia, FAQ -> Haiku, complex -> Sonnet)
8.4 RewriteKnowledgeQueryStep Reescreve mensagem em query standalone via LLM (pula se jailbreak/off-topic ou se agente nao tem embeddings)
8.5 InvokeAiServiceStep Chama AiResponseService com query reescrita. Retorna List<string> Messages que e atribuida a context.AiMessages (pula se jailbreak/off-topic)
8.6 ApplyAiDecisionsStep Aplica decisoes da IA (transicao de step, escalacao, resolucao)
8.7 ValidateOutputStep Valida resposta via guardrails rule-based (PII, promessas)

Observacoes: - StopTypingIndicatorStep executa antes de SendWhatsAppResponseStep para que o indicador desapareca naturalmente - ApplySignatureStep adiciona texto de assinatura na primeira mensagem (posicoes Start) ou ultima mensagem (posicoes End) de AiMessages - ApplyResponseDelayStep adiciona um delay artificial (0-10s) para tornar as respostas mais naturais - SendWhatsAppResponseStep nao checa SkipAiResponse — depende de AiMessages estar vazio. Isso permite que mensagens canned (ex: midia bloqueada) sejam enviadas mesmo quando a geracao de IA foi pulada - Cada step pode interromper o pipeline chamando context.Stop() se necessario - ValidateAggregationStep verifica AiAvailabilityHoursJson da instancia. Se fora do horario, agenda resposta diferida via TickerQ e define SkipAiResponse = true - Mensagens consecutivas de mesmo role sao mescladas com \n\n ao reconstruir o historico para o LLM (ver ConvertToChatMessages em AiResponseService)


Pipeline de Resposta Diferida (Deferred AI Response)

Quando a instancia possui horarios de atendimento configurados (AiAvailabilityHoursJson) e a mensagem chega fora desse horario, a resposta da IA e agendada para o proximo periodo disponivel via TickerQ.

Fluxo:

ValidateAggregationStep (fora do horario)
    -> IDeferredAiResponseScheduler.ScheduleAsync()
        -> Calcula proximo horario permitido (timezone do tenant)
        -> Cria TimeTickerEntity com funcao "SendDeferredAiResponse"
        -> SkipAiResponse = true (mensagem e persistida sem resposta da IA)

Quando o job e executado (DeferredAiResponseFunction), o pipeline de steps e:

# Step Descricao
1 ValidateDeferredStep Carrega conversa, instancia, agente e steps. Valida elegibilidade
2 CheckAvailabilityHoursStep Re-verifica se esta dentro do horario. Se nao, reagenda e para
3 LoadDeferredHistoryStep Carrega historico e identifica mensagens nao respondidas
4 CheckDeferredSubscriptionCreditsStep Verifica creditos de IA do tenant
5 CheckDeferredAiResponseCapStep Verifica cap de respostas IA por conversa
6 PrepareDeferredAiHistoryStep Remove assinaturas do historico para evitar copia pela IA
7 ClassifyDeferredIntentStep Classifica intent via Haiku (jailbreak -> bloqueia, FAQ -> Haiku)
8 InvokeDeferredAiStep Gera resposta via LLM. Retorna List<string> Messages atribuida a context.AiMessages (pula se jailbreak/off-topic)
9 ConsumeDeferredSubscriptionCreditStep Consome credito
10 IncrementDeferredAiResponseCountStep Incrementa contador
11 ApplyDeferredDecisionsStep Aplica decisoes da IA
12 ValidateDeferredOutputStep Valida cada mensagem de AiMessages via guardrails rule-based (PII, promessas)
13 ApplyDeferredSignatureStep Aplica assinatura na primeira ou ultima mensagem de AiMessages conforme SignaturePosition
14 SendDeferredResponseStep Envia cada mensagem de AiMessages via WhatsApp com delay de OutgoingMessageDelayMs (500ms) entre elas
15 PersistDeferredMessageStep Cria um registro Message por item em AiMessages, vinculando WhatsAppMessageId por indice. Registra uso de LLM
16 NotifyDeferredStep Notifica via SignalR

Observacoes: - O cancelamento de jobs existentes e feito antes de agendar novos (evita duplicatas) - Se a conversa mudar para modo manual ou for resolvida antes da execucao, o job e ignorado - O timezone do tenant e usado para calcular horarios locais


Eventos do Webhook

Evento Descricao Pipeline
messages.upsert Nova mensagem IncomingMessagePipeline
messages.update Status atualizado MessageStatusPipeline (Sent -> Delivered -> Read)
messages.reaction Reacao a mensagem MessageReactionPipeline
connection.update Conexao alterada ConnectionUpdatePipeline
presence.update Presenca Ignorado (noop)

Status de Mensagem

Pending -> Sent -> Delivered -> Read
            |
          Failed

Atualizado via webhook messages.update.


Tratamento de Erros

Erro Tratamento
Payload JSON invalido Retorna 400 Bad Request
Instancia nao encontrada Log warning, para o pipeline
Mensagem duplicada (idempotencia) Para o pipeline silenciosamente
Contato na blacklist Para o pipeline, log informativo
Tipo de mensagem desconhecido Para o pipeline silenciosamente
Claude API falha Auto-escala para manual silenciosamente (sem envio ao WhatsApp)
Envio WhatsApp falha Status = Failed, log error
Download/processamento de midia falha Continua com proxima midia
Pipeline de agregacao falha (excecao) Agregacao e deletada do banco, erro logado
Erro no loop do AggregationPollingService Log error, aguarda proximo ciclo (500ms)

Nota: Nao ha mecanismo de retry automatico nem dead-letter queue. Agregacoes que falham sao deletadas para evitar retries infinitos. Os dados da mensagem original estao logados e podem ser recuperados manualmente se necessario.


Monitoramento

Logs Estruturados

_logger.LogInformation("Processing aggregation {AggregationId} for conversation {ConversationId} with {MessageCount} messages",
    aggregation.Id, aggregation.ConversationId, messageCount);

Activity tracing via System.Diagnostics.Activity com tags ciba.conversationId e ciba.whatsAppMessageIds.

Metricas Importantes

  • Latencia do endpoint: tempo de resposta do webhook (deve ser < 100ms, apenas INSERT/UPDATE)
  • Agregacoes pendentes: registros com Status = Pending na tabela pending_aggregations
  • Erros de pipeline: logs de nivel Error no AggregationPollingService
  • Idempotencia: chaves Redis com prefixo msg:processed: (TTL 24h)

Performance

  • Endpoint responde em < 100ms (apenas valida, insere/atualiza no banco)
  • Historico de conversa limitado a 20 mensagens (AppConfig.MaxHistoryMessages)
  • Batch de 10 agregacoes por ciclo de polling
  • Idempotencia com TTL de 24 horas (Redis)
  • Midia salva no R2, extracao limitada a 5 itens por batch
  • Cooldown Redis previne processamento excessivo da mesma conversa
  • Notificacoes SignalR em background
  • Logs estruturados com scope para tracing