Pular para conteúdo

Fluxo: Processamento de Mensagens

Visão Geral

Fluxo completo desde o recebimento de uma mensagem via webhook até o envio da resposta. O processamento é assíncrono via Redis Streams para garantir resiliência e escalabilidade.

Arquitetura de Processamento

┌─────────────────────────────────────────────────────────────────────────────┐
│  FASE SÍNCRONA — Endpoint (< 100ms)                                         │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  WhatsApp ──▶ Evolution API ──▶ POST /api/webhook/evolution                 │
│                                         │                                   │
│                                         ▼                                   │
│                              ┌─────────────────────┐                        │
│                              │  WebhookEndpoint    │                        │
│                              │  - Valida JSON      │                        │
│                              │  - Publica Stream   │                        │
│                              │  - Retorna 200 OK   │                        │
│                              └──────────┬──────────┘                        │
│                                         │                                   │
│                                         ▼                                   │
│                              ┌─────────────────────┐                        │
│                              │   Redis Stream      │                        │
│                              │ webhook:messages    │                        │
│                              └─────────────────────┘                        │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│  FASE ASSÍNCRONA — Background Service                                       │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                              ┌─────────────────────┐                        │
│                              │WebhookConsumerService│                        │
│                              │  (BackgroundService) │                        │
│                              └──────────┬──────────┘                        │
│                                         │                                   │
│                    ┌────────────────────┼────────────────────┐              │
│                    │                    │                    │              │
│                    ▼                    ▼                    ▼              │
│             ┌──────────┐        ┌──────────────┐      ┌──────────┐          │
│             │ Processa │        │   Verifica   │      │   Lê     │          │
│             │ Pendentes│        │ Idempotência │      │  Novas   │          │
│             │ (retry)  │        │   (Redis)    │      │ Mensagens│          │
│             └────┬─────┘        └──────┬───────┘      └────┬─────┘          │
│                  │                     │                   │                │
│                  └─────────────────────┴───────────────────┘                │
│                                        │                                    │
│                                        ▼                                    │
│                              ┌─────────────────────┐                        │
│                              │ProcessMessageHandler│                        │
│                              │     (MediatR)       │                        │
│                              └──────────┬──────────┘                        │
│                                         │                                   │
│                                         ▼                                   │
│                               ┌─────────────────┐                           │
│                               │  Processamento  │                           │
│                               │   de Negócio    │                           │
│                               │ (ver detalhes)  │                           │
│                               └─────────────────┘                           │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘

Redis Streams — Configuração

Parâmetro Valor Descrição
Stream webhook:messages Fila principal de webhooks
Consumer Group ciba-consumers Grupo de consumidores
Consumer Name consumer-1 Identificador do consumidor
Batch Size 10 Mensagens lidas por iteração
Polling Interval 500ms Intervalo quando não há mensagens
Max Retries 3 Tentativas antes de dead-letter
Dead Letter Stream webhook:dead-letter Mensagens com falha permanente

Idempotência

O sistema previne processamento duplicado usando Redis TTL:

┌─────────────────────────────────────────────────────────────────┐
│  Evento: messages.upsert                                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Extrai WhatsAppMessageId do payload                         │
│                                                                 │
│  2. Verifica chave Redis: webhook:processed:{messageId}         │
│                                                                 │
│  3. Se chave NÃO existe:                                        │
│     - Cria chave com TTL de 24 horas                            │
│     - Processa mensagem normalmente                             │
│                                                                 │
│  4. Se chave JÁ existe:                                         │
│     - Mensagem já foi processada                                │
│     - Faz ACK e pula processamento                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Cenários cobertos: - Evolution API envia o mesmo webhook múltiplas vezes - Retry do consumer processa a mesma mensagem - Mensagens duplicadas em rede instável


Retry e Dead-Letter

Fluxo de Retry

┌──────────────────────────────────────────────────────────────────────────┐
│                                                                          │
│    Mensagem Nova          Processamento            Resultado             │
│    ──────────────         ─────────────            ─────────             │
│                                                                          │
│  ┌─────────────┐        ┌─────────────┐         ┌─────────────┐          │
│  │  Lê do      │───────▶│  Processa   │───────▶│  Sucesso    │─────┐    │
│  │  Stream     │        │  Handler    │         │  ACK        │     │    │
│  └─────────────┘        └──────┬──────┘         └─────────────┘     │    │
│                                │                                    │    │
│                                │ Falha                              │    │
│                                ▼                                    │    │
│                         ┌─────────────┐                             │    │
│                         │ NÃO faz ACK │                             │    │
│                         │ (pendente)  │                             │    │
│                         └──────┬──────┘                             │    │
│                                │                                    │    │
│                                ▼                                    │    │
│  ┌─────────────────────────────────────────────────────────────┐   │    │
│  │  Próximo Ciclo: Processa Pendentes                          │   │    │
│  ├─────────────────────────────────────────────────────────────┤   │    │
│  │                                                             │   │    │
│  │  DeliveryCount <= 3 ───▶ Tenta processar novamente ─────────┼───┘    │
│  │                                                             │        │
│  │  DeliveryCount > 3  ───▶ Move para dead-letter stream ──────┼───┐    │
│  │                          webhook:dead-letter                │   │    │
│  │                                                             │   │    │
│  └─────────────────────────────────────────────────────────────┘   │    │
│                                                                    │    │
│  ┌─────────────────────────────────────────────────────────────┐   │    │
│  │  Dead Letter Stream                                         │◀──┘    │
│  ├─────────────────────────────────────────────────────────────┤        │
│  │  Campos:                                                    │        │
│  │  - payload: JSON original                                   │        │
│  │  - original_id: ID do stream original                       │        │
│  │  - reason: "max_retries_exceeded"                           │        │
│  │  - timestamp: Unix timestamp                                │        │
│  │                                                             │        │
│  │  * Requer intervenção manual para reprocessamento           │        │
│  └─────────────────────────────────────────────────────────────┘        │
│                                                                         │
│  ┌─────────────────────────────────────────────────────────────┐        │
│  │  Mensagem Processada com Sucesso                            │◀───────┘
│  └─────────────────────────────────────────────────────────────┘
└──────────────────────────────────────────────────────────────────────────┘

Backoff

  • Falha no processamento: aguarda 2 segundos antes da próxima tentativa
  • Loop principal com erro: aguarda 5 segundos antes de continuar

Etapas do Processamento de Negócio

1. Recebimento do Webhook

O Evolution API envia webhook para POST /api/webhook/evolution:

{
  "event": "messages.upsert",
  "instance": "suporte-01",
  "data": {
    "key": {
      "remoteJid": "5511999998888@s.whatsapp.net",
      "fromMe": false,
      "id": "ABC123"
    },
    "pushName": "João Silva",
    "message": {
      "conversation": "Qual o horário de funcionamento?"
    }
  }
}

2. Identificação da Instância

var instance = await _db.WhatsAppInstances
    .IgnoreQueryFilters()
    .Include(i => i.Agent)
    .FirstOrDefaultAsync(i => i.Id == instanceId, ct);

Nota: O instance.Name na Evolution API corresponde ao Id (Guid) da instância no banco.

Se instância não encontrada → log warning e retorna.

3. Verificação de Blacklist/Whitelist

Apenas para novas conversas:

  1. Verifica se contato está na blacklist → bloqueia
  2. Se whitelist não está vazia, verifica se contato está nela → bloqueia se não estiver

Conversas existentes não são bloqueadas.

4. Busca/Criação de Conversa

var conversation = await _db.Conversations
    .IgnoreQueryFilters()
    .FirstOrDefaultAsync(
        c => c.InstanceId == instance.Id &&
             c.ContactPhone == contactPhone &&
             c.Status == ConversationStatus.Active,
        ct);

Se não existe, cria nova com: - Mode = StartsWithAI da instância (AI ou Manual) - Busca foto de perfil do contato - Usa pushName como nome do contato

5. Processamento de Mídia

Se mensagem contém mídia (imagem, áudio, vídeo, documento, sticker):

  1. Detecta tipo pela estrutura do webhook
  2. Baixa mídia via Evolution API
  3. Converte base64 → bytes
  4. Salva no Cloudflare R2
  5. Armazena URL no campo MediaUrl

6. Salvamento da Mensagem

var incomingMessage = new Message
{
    ConversationId = conversation.Id,
    Role = MessageRole.User,
    Type = messageType,
    Content = content,
    MediaUrl = mediaUrl,
    WhatsAppMessageId = whatsAppMessageId,
    Status = MessageStatus.Delivered
};

7. Verificação de Modo

Modo Ação
Manual Salva mensagem, notifica via SignalR, não processa IA
AI Continua para processamento com Claude

8. Montagem do Contexto

// Últimas 20 mensagens da conversa
var chatMessages = conversation.Messages
    .OrderByDescending(m => m.CreatedAt)
    .Take(MaxHistoryMessages)
    .OrderBy(m => m.CreatedAt)
    .Select(m => new ChatMessage(role, content))
    .ToList();

// Query rewriting: reformula mensagens anafóricas em queries autossuficientes
// Ex: "E o primeiro" → "Qual é o primeiro melhor refrigerante do mundo?"
var knowledgeQuery = await _queryRewrite.RewriteAsync(
    agent.Id,
    consolidatedContent,
    recentHistory,
    ct);

// Busca semântica por blocos de conhecimento relevantes
var knowledgeBlocks = await _knowledgeRetrieval.GetRelevantKnowledgeAsync(
    agent.Id,
    knowledgeQuery,  // Usa a query reescrita (ou original se já autossuficiente)
    ct);

// System prompt otimizado (com fallback) + Knowledge blocks relevantes
var basePrompt = agent.OptimizedSystemPrompt ?? agent.SystemPrompt;
var fullSystemPrompt = basePrompt;
if (knowledgeBlocks.Count > 0)
{
    var knowledgeSection = string.Join("\n\n---\n\n", knowledgeBlocks);
    fullSystemPrompt = $"{basePrompt}\n\n## Base de Conhecimento\n\n{knowledgeSection}";
}

Query Rewriting (LLM): - Usa LLM leve (gpt-4o-mini) para reformular mensagens anafóricas em queries standalone - Resolve referências como "e o primeiro", "e no domingo?", "quanto custa esse?" usando histórico - Se a mensagem já é autossuficiente, retorna como está - Fallback gracioso: em caso de falha, usa a mensagem original - Configurável via KnowledgeRetrieval:QueryRewrite no appsettings - Veja knowledge-retrieval.md para detalhes

Otimização de Prompt: - Usa OptimizedSystemPrompt se disponível, senão fallback para SystemPrompt - Reduz consumo de tokens base em ~30-35% - Veja prompt-optimization.md para detalhes

Busca Semântica (pgvector): - Gera embedding da query (reescrita ou original) via OpenAI API - Busca os 5 blocos mais similares usando distância por cosseno - Reduz consumo de tokens em ~90% comparado a carregar todos os blocos

Veja knowledge-retrieval.md para detalhes.

9. Chamada à Claude API

var llmResponse = await _llm.GenerateResponseAsync(
    fullSystemPrompt,
    chatMessages,
    modelOverride,
    ct);

Se falhar → usa ErrorMessage do agente como fallback.

10. Salvamento da Resposta

var assistantMessage = new Message
{
    ConversationId = conversation.Id,
    Role = MessageRole.Assistant,
    Content = assistantResponse,
    InputTokens = inputTokens,
    OutputTokens = outputTokens,
    Status = MessageStatus.Pending
};

11. Envio via WhatsApp

var sendResult = await _whatsApp.SendTextMessageAsync(
    sessionId,
    contactPhone,
    assistantResponse,
    ct);

Atualiza status para Sent ou Failed.

12. Notificação SignalR

await _hubNotification.NotifyNewMessageAsync(tenantId, conversationId, message, ct);
await _hubNotification.NotifyConversationUpdatedAsync(tenantId, conversation, ct);

Pipeline de Agregação de Mensagens

Quando a agregação está habilitada, múltiplas mensagens recebidas em sequência são agrupadas antes de processar a resposta da IA. Isso evita respostas parciais quando o usuário envia várias mensagens seguidas.

O AggregationPollingService (background service) verifica periodicamente se há agregações prontas para processar e executa o pipeline de steps na seguinte ordem:

# Step Descrição
1 ValidateAggregationStep Valida dados da agregação e verifica horários de atendimento
2 SendTypingIndicatorStep Envia "digitando..." ao WhatsApp (se ShowTyping habilitado na instância)
3 ProcessMediaStep Processa mídia (imagens, áudios, etc.)
4 ConsolidateMessagesStep Consolida múltiplas mensagens em uma
5 LoadMessageHistoryStep Carrega histórico da conversa
6 GenerateAiResponseStep Gera resposta via LLM (executa sub-pipeline abaixo)
7 ApplySignatureStep Aplica assinatura na resposta (se configurado na instância)
8 ApplyResponseDelayStep Aplica delay antes do envio (se DelaySeconds > 0 na instância)
9 StopTypingIndicatorStep Para o indicador de digitação
10 SendWhatsAppResponseStep Envia resposta via WhatsApp
11 MarkMessagesAsReadStep Marca mensagens como lidas
12 PersistMessagesStep Persiste mensagens no banco
13 NotifySignalRStep Notifica clientes via SignalR
14 CleanupAggregationStep Limpa registro de agregação

Sub-pipeline de GenerateAiResponseStep:

O step 6 executa internamente um sub-pipeline com steps dedicados para geração da resposta IA:

# Sub-step Descrição
6.1 InitializeStepTrackingStep Inicializa rastreamento de etapas do agente
6.2 PrepareAiHistoryStep Prepara e limpa histórico de mensagens para o LLM
6.3 RewriteKnowledgeQueryStep Reescreve mensagem em query standalone via LLM (gpt-4o-mini)
6.4 InvokeAiServiceStep Chama AiResponseService com query reescrita para busca vetorial
6.5 ApplyAiDecisionsStep Aplica decisões da IA (transição de step, escalação, resolução)

Observações: - StopTypingIndicatorStep executa antes de SendWhatsAppResponseStep para que o indicador desapareça naturalmente - ApplySignatureStep adiciona texto de assinatura conforme a posição configurada (StartSameLine, StartNewLine, EndSameLine, EndNewLine) - ApplyResponseDelayStep adiciona um delay artificial (0-10s) para tornar as respostas mais naturais - Cada step pode interromper o pipeline chamando context.Stop() se necessário - ValidateAggregationStep verifica AiAvailabilityHoursJson da instância. Se fora do horário, agenda resposta diferida via TickerQ e define SkipAiResponse = true


Pipeline de Resposta Diferida (Deferred AI Response)

Quando a instância possui horários de atendimento configurados (AiAvailabilityHoursJson) e a mensagem chega fora desse horário, a resposta da IA é agendada para o próximo período disponível via TickerQ.

Fluxo:

ValidateAggregationStep (fora do horário)
    → IDeferredAiResponseScheduler.ScheduleAsync()
        → Calcula próximo horário permitido (timezone do tenant)
        → Cria TimeTickerEntity com função "SendDeferredAiResponse"
        → SkipAiResponse = true (mensagem é persistida sem resposta da IA)

Quando o job é executado (DeferredAiResponseFunction), o pipeline de steps é:

# Step Descrição
1 ValidateDeferredStep Carrega conversa, instância, agente e steps. Valida elegibilidade
2 CheckAvailabilityHoursStep Re-verifica se está dentro do horário. Se não, reagenda e para
3 LoadDeferredHistoryStep Carrega histórico e identifica mensagens não respondidas
4 PrepareDeferredAiHistoryStep Remove assinaturas do histórico para evitar cópia pela IA
5 InvokeDeferredAiStep Gera resposta via LLM
6 ApplyDeferredDecisionsStep Aplica decisões da IA (transição de step, escalação, resolução)
7 ApplyDeferredSignatureStep Aplica assinatura na resposta
8 SendDeferredResponseStep Envia resposta via WhatsApp
9 PersistDeferredMessageStep Persiste mensagem e registra uso de LLM
10 NotifyDeferredStep Notifica via SignalR

Observações: - O cancelamento de jobs existentes é feito antes de agendar novos (evita duplicatas) - Se a conversa mudar para modo manual ou for resolvida antes da execução, o job é ignorado - O timezone do tenant é usado para calcular horários locais


Eventos do Webhook

Evento Descrição Ação
messages.upsert Nova mensagem Processa mensagem
messages.update Status atualizado Atualiza status (Sent→Delivered→Read)
messages.reaction Reação a mensagem Adiciona/remove reação
connection.update Conexão alterada Atualiza status da instância

Status de Mensagem

Pending → Sent → Delivered → Read
         Failed

Atualizado via webhook messages.update.


Tratamento de Erros

Erro Tratamento
Instância não encontrada Log warning, ignora
Claude API falha Usa ErrorMessage do agente
Envio WhatsApp falha Status = Failed, log error
Download mídia falha Salva placeholder
Handler exception Não faz ACK, retry automático
Max retries exceeded Move para dead-letter stream

Monitoramento

Logs Estruturados

_logger.LogInformation("Message {MessageId} processed for conversation {ConversationId}",
    whatsAppMessageId, conversationId);

Métricas Importantes

  • Latência do endpoint: tempo de resposta do webhook (deve ser < 100ms)
  • Tamanho do stream: mensagens pendentes em webhook:messages
  • Dead letters: mensagens em webhook:dead-letter (devem ser investigadas)
  • Retry rate: frequência de mensagens que precisam retry

Comandos Redis para Debug

# Ver mensagens pendentes
redis-cli XPENDING webhook:messages ciba-consumers

# Ver tamanho do stream
redis-cli XLEN webhook:messages

# Ver dead letters
redis-cli XLEN webhook:dead-letter
redis-cli XRANGE webhook:dead-letter - + COUNT 10

# Ver chaves de idempotência
redis-cli KEYS webhook:processed:*

Performance

  • Endpoint responde em < 100ms (apenas publica no stream)
  • Histórico de conversa limitado a 20 mensagens
  • Batch de 10 mensagens por iteração do consumer
  • Idempotência com TTL de 24 horas
  • Mídia salva de forma assíncrona no R2
  • Notificações SignalR em background
  • Logs estruturados com scope para tracing