Fluxo: Processamento de Mensagens¶
Visão Geral¶
Fluxo completo desde o recebimento de uma mensagem via webhook até o envio da resposta. O processamento é assíncrono via Redis Streams para garantir resiliência e escalabilidade.
Arquitetura de Processamento¶
┌─────────────────────────────────────────────────────────────────────────────┐
│ FASE SÍNCRONA — Endpoint (< 100ms) │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ WhatsApp ──▶ Evolution API ──▶ POST /api/webhook/evolution │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ WebhookEndpoint │ │
│ │ - Valida JSON │ │
│ │ - Publica Stream │ │
│ │ - Retorna 200 OK │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Redis Stream │ │
│ │ webhook:messages │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ FASE ASSÍNCRONA — Background Service │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────┐ │
│ │WebhookConsumerService│ │
│ │ (BackgroundService) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ Processa │ │ Verifica │ │ Lê │ │
│ │ Pendentes│ │ Idempotência │ │ Novas │ │
│ │ (retry) │ │ (Redis) │ │ Mensagens│ │
│ └────┬─────┘ └──────┬───────┘ └────┬─────┘ │
│ │ │ │ │
│ └─────────────────────┴───────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ProcessMessageHandler│ │
│ │ (MediatR) │ │
│ └──────────┬──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Processamento │ │
│ │ de Negócio │ │
│ │ (ver detalhes) │ │
│ └─────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘
Redis Streams — Configuração¶
| Parâmetro | Valor | Descrição |
|---|---|---|
| Stream | webhook:messages |
Fila principal de webhooks |
| Consumer Group | ciba-consumers |
Grupo de consumidores |
| Consumer Name | consumer-1 |
Identificador do consumidor |
| Batch Size | 10 | Mensagens lidas por iteração |
| Polling Interval | 500ms | Intervalo quando não há mensagens |
| Max Retries | 3 | Tentativas antes de dead-letter |
| Dead Letter Stream | webhook:dead-letter |
Mensagens com falha permanente |
Idempotência¶
O sistema previne processamento duplicado usando Redis TTL:
┌─────────────────────────────────────────────────────────────────┐
│ Evento: messages.upsert │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Extrai WhatsAppMessageId do payload │
│ │
│ 2. Verifica chave Redis: webhook:processed:{messageId} │
│ │
│ 3. Se chave NÃO existe: │
│ - Cria chave com TTL de 24 horas │
│ - Processa mensagem normalmente │
│ │
│ 4. Se chave JÁ existe: │
│ - Mensagem já foi processada │
│ - Faz ACK e pula processamento │
│ │
└─────────────────────────────────────────────────────────────────┘
Cenários cobertos: - Evolution API envia o mesmo webhook múltiplas vezes - Retry do consumer processa a mesma mensagem - Mensagens duplicadas em rede instável
Retry e Dead-Letter¶
Fluxo de Retry¶
┌──────────────────────────────────────────────────────────────────────────┐
│ │
│ Mensagem Nova Processamento Resultado │
│ ────────────── ───────────── ───────── │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Lê do │───────▶│ Processa │───────▶│ Sucesso │─────┐ │
│ │ Stream │ │ Handler │ │ ACK │ │ │
│ └─────────────┘ └──────┬──────┘ └─────────────┘ │ │
│ │ │ │
│ │ Falha │ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ NÃO faz ACK │ │ │
│ │ (pendente) │ │ │
│ └──────┬──────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ Próximo Ciclo: Processa Pendentes │ │ │
│ ├─────────────────────────────────────────────────────────────┤ │ │
│ │ │ │ │
│ │ DeliveryCount <= 3 ───▶ Tenta processar novamente ─────────┼───┘ │
│ │ │ │
│ │ DeliveryCount > 3 ───▶ Move para dead-letter stream ──────┼───┐ │
│ │ webhook:dead-letter │ │ │
│ │ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │ │
│ │ │
│ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ Dead Letter Stream │◀──┘ │
│ ├─────────────────────────────────────────────────────────────┤ │
│ │ Campos: │ │
│ │ - payload: JSON original │ │
│ │ - original_id: ID do stream original │ │
│ │ - reason: "max_retries_exceeded" │ │
│ │ - timestamp: Unix timestamp │ │
│ │ │ │
│ │ * Requer intervenção manual para reprocessamento │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Mensagem Processada com Sucesso │◀───────┘
│ └─────────────────────────────────────────────────────────────┘
│
└──────────────────────────────────────────────────────────────────────────┘
Backoff¶
- Falha no processamento: aguarda 2 segundos antes da próxima tentativa
- Loop principal com erro: aguarda 5 segundos antes de continuar
Etapas do Processamento de Negócio¶
1. Recebimento do Webhook¶
O Evolution API envia webhook para POST /api/webhook/evolution:
{
"event": "messages.upsert",
"instance": "suporte-01",
"data": {
"key": {
"remoteJid": "5511999998888@s.whatsapp.net",
"fromMe": false,
"id": "ABC123"
},
"pushName": "João Silva",
"message": {
"conversation": "Qual o horário de funcionamento?"
}
}
}
2. Identificação da Instância¶
var instance = await _db.WhatsAppInstances
.IgnoreQueryFilters()
.Include(i => i.Agent)
.FirstOrDefaultAsync(i => i.Id == instanceId, ct);
Nota: O
instance.Namena Evolution API corresponde aoId(Guid) da instância no banco.
Se instância não encontrada → log warning e retorna.
3. Verificação de Blacklist/Whitelist¶
Apenas para novas conversas:
- Verifica se contato está na blacklist → bloqueia
- Se whitelist não está vazia, verifica se contato está nela → bloqueia se não estiver
Conversas existentes não são bloqueadas.
4. Busca/Criação de Conversa¶
var conversation = await _db.Conversations
.IgnoreQueryFilters()
.FirstOrDefaultAsync(
c => c.InstanceId == instance.Id &&
c.ContactPhone == contactPhone &&
c.Status == ConversationStatus.Active,
ct);
Se não existe, cria nova com:
- Mode = StartsWithAI da instância (AI ou Manual)
- Busca foto de perfil do contato
- Usa pushName como nome do contato
5. Processamento de Mídia¶
Se mensagem contém mídia (imagem, áudio, vídeo, documento, sticker):
- Detecta tipo pela estrutura do webhook
- Baixa mídia via Evolution API
- Converte base64 → bytes
- Salva no Cloudflare R2
- Armazena URL no campo
MediaUrl
6. Salvamento da Mensagem¶
var incomingMessage = new Message
{
ConversationId = conversation.Id,
Role = MessageRole.User,
Type = messageType,
Content = content,
MediaUrl = mediaUrl,
WhatsAppMessageId = whatsAppMessageId,
Status = MessageStatus.Delivered
};
7. Verificação de Modo¶
| Modo | Ação |
|---|---|
| Manual | Salva mensagem, notifica via SignalR, não processa IA |
| AI | Continua para processamento com Claude |
8. Montagem do Contexto¶
// Últimas 20 mensagens da conversa
var chatMessages = conversation.Messages
.OrderByDescending(m => m.CreatedAt)
.Take(MaxHistoryMessages)
.OrderBy(m => m.CreatedAt)
.Select(m => new ChatMessage(role, content))
.ToList();
// Query rewriting: reformula mensagens anafóricas em queries autossuficientes
// Ex: "E o primeiro" → "Qual é o primeiro melhor refrigerante do mundo?"
var knowledgeQuery = await _queryRewrite.RewriteAsync(
agent.Id,
consolidatedContent,
recentHistory,
ct);
// Busca semântica por blocos de conhecimento relevantes
var knowledgeBlocks = await _knowledgeRetrieval.GetRelevantKnowledgeAsync(
agent.Id,
knowledgeQuery, // Usa a query reescrita (ou original se já autossuficiente)
ct);
// System prompt otimizado (com fallback) + Knowledge blocks relevantes
var basePrompt = agent.OptimizedSystemPrompt ?? agent.SystemPrompt;
var fullSystemPrompt = basePrompt;
if (knowledgeBlocks.Count > 0)
{
var knowledgeSection = string.Join("\n\n---\n\n", knowledgeBlocks);
fullSystemPrompt = $"{basePrompt}\n\n## Base de Conhecimento\n\n{knowledgeSection}";
}
Query Rewriting (LLM):
- Usa LLM leve (gpt-4o-mini) para reformular mensagens anafóricas em queries standalone
- Resolve referências como "e o primeiro", "e no domingo?", "quanto custa esse?" usando histórico
- Se a mensagem já é autossuficiente, retorna como está
- Fallback gracioso: em caso de falha, usa a mensagem original
- Configurável via KnowledgeRetrieval:QueryRewrite no appsettings
- Veja knowledge-retrieval.md para detalhes
Otimização de Prompt:
- Usa OptimizedSystemPrompt se disponível, senão fallback para SystemPrompt
- Reduz consumo de tokens base em ~30-35%
- Veja prompt-optimization.md para detalhes
Busca Semântica (pgvector): - Gera embedding da query (reescrita ou original) via OpenAI API - Busca os 5 blocos mais similares usando distância por cosseno - Reduz consumo de tokens em ~90% comparado a carregar todos os blocos
Veja knowledge-retrieval.md para detalhes.
9. Chamada à Claude API¶
var llmResponse = await _llm.GenerateResponseAsync(
fullSystemPrompt,
chatMessages,
modelOverride,
ct);
Se falhar → usa ErrorMessage do agente como fallback.
10. Salvamento da Resposta¶
var assistantMessage = new Message
{
ConversationId = conversation.Id,
Role = MessageRole.Assistant,
Content = assistantResponse,
InputTokens = inputTokens,
OutputTokens = outputTokens,
Status = MessageStatus.Pending
};
11. Envio via WhatsApp¶
var sendResult = await _whatsApp.SendTextMessageAsync(
sessionId,
contactPhone,
assistantResponse,
ct);
Atualiza status para Sent ou Failed.
12. Notificação SignalR¶
await _hubNotification.NotifyNewMessageAsync(tenantId, conversationId, message, ct);
await _hubNotification.NotifyConversationUpdatedAsync(tenantId, conversation, ct);
Pipeline de Agregação de Mensagens¶
Quando a agregação está habilitada, múltiplas mensagens recebidas em sequência são agrupadas antes de processar a resposta da IA. Isso evita respostas parciais quando o usuário envia várias mensagens seguidas.
O AggregationPollingService (background service) verifica periodicamente se há agregações prontas para processar e executa o pipeline de steps na seguinte ordem:
| # | Step | Descrição |
|---|---|---|
| 1 | ValidateAggregationStep |
Valida dados da agregação e verifica horários de atendimento |
| 2 | SendTypingIndicatorStep |
Envia "digitando..." ao WhatsApp (se ShowTyping habilitado na instância) |
| 3 | ProcessMediaStep |
Processa mídia (imagens, áudios, etc.) |
| 4 | ConsolidateMessagesStep |
Consolida múltiplas mensagens em uma |
| 5 | LoadMessageHistoryStep |
Carrega histórico da conversa |
| 6 | GenerateAiResponseStep |
Gera resposta via LLM (executa sub-pipeline abaixo) |
| 7 | ApplySignatureStep |
Aplica assinatura na resposta (se configurado na instância) |
| 8 | ApplyResponseDelayStep |
Aplica delay antes do envio (se DelaySeconds > 0 na instância) |
| 9 | StopTypingIndicatorStep |
Para o indicador de digitação |
| 10 | SendWhatsAppResponseStep |
Envia resposta via WhatsApp |
| 11 | MarkMessagesAsReadStep |
Marca mensagens como lidas |
| 12 | PersistMessagesStep |
Persiste mensagens no banco |
| 13 | NotifySignalRStep |
Notifica clientes via SignalR |
| 14 | CleanupAggregationStep |
Limpa registro de agregação |
Sub-pipeline de GenerateAiResponseStep:
O step 6 executa internamente um sub-pipeline com steps dedicados para geração da resposta IA:
| # | Sub-step | Descrição |
|---|---|---|
| 6.1 | InitializeStepTrackingStep |
Inicializa rastreamento de etapas do agente |
| 6.2 | PrepareAiHistoryStep |
Prepara e limpa histórico de mensagens para o LLM |
| 6.3 | RewriteKnowledgeQueryStep |
Reescreve mensagem em query standalone via LLM (gpt-4o-mini) |
| 6.4 | InvokeAiServiceStep |
Chama AiResponseService com query reescrita para busca vetorial |
| 6.5 | ApplyAiDecisionsStep |
Aplica decisões da IA (transição de step, escalação, resolução) |
Observações:
- StopTypingIndicatorStep executa antes de SendWhatsAppResponseStep para que o indicador desapareça naturalmente
- ApplySignatureStep adiciona texto de assinatura conforme a posição configurada (StartSameLine, StartNewLine, EndSameLine, EndNewLine)
- ApplyResponseDelayStep adiciona um delay artificial (0-10s) para tornar as respostas mais naturais
- Cada step pode interromper o pipeline chamando context.Stop() se necessário
- ValidateAggregationStep verifica AiAvailabilityHoursJson da instância. Se fora do horário, agenda resposta diferida via TickerQ e define SkipAiResponse = true
Pipeline de Resposta Diferida (Deferred AI Response)¶
Quando a instância possui horários de atendimento configurados (AiAvailabilityHoursJson) e a mensagem chega fora desse horário, a resposta da IA é agendada para o próximo período disponível via TickerQ.
Fluxo:
ValidateAggregationStep (fora do horário)
→ IDeferredAiResponseScheduler.ScheduleAsync()
→ Calcula próximo horário permitido (timezone do tenant)
→ Cria TimeTickerEntity com função "SendDeferredAiResponse"
→ SkipAiResponse = true (mensagem é persistida sem resposta da IA)
Quando o job é executado (DeferredAiResponseFunction), o pipeline de steps é:
| # | Step | Descrição |
|---|---|---|
| 1 | ValidateDeferredStep |
Carrega conversa, instância, agente e steps. Valida elegibilidade |
| 2 | CheckAvailabilityHoursStep |
Re-verifica se está dentro do horário. Se não, reagenda e para |
| 3 | LoadDeferredHistoryStep |
Carrega histórico e identifica mensagens não respondidas |
| 4 | PrepareDeferredAiHistoryStep |
Remove assinaturas do histórico para evitar cópia pela IA |
| 5 | InvokeDeferredAiStep |
Gera resposta via LLM |
| 6 | ApplyDeferredDecisionsStep |
Aplica decisões da IA (transição de step, escalação, resolução) |
| 7 | ApplyDeferredSignatureStep |
Aplica assinatura na resposta |
| 8 | SendDeferredResponseStep |
Envia resposta via WhatsApp |
| 9 | PersistDeferredMessageStep |
Persiste mensagem e registra uso de LLM |
| 10 | NotifyDeferredStep |
Notifica via SignalR |
Observações: - O cancelamento de jobs existentes é feito antes de agendar novos (evita duplicatas) - Se a conversa mudar para modo manual ou for resolvida antes da execução, o job é ignorado - O timezone do tenant é usado para calcular horários locais
Eventos do Webhook¶
| Evento | Descrição | Ação |
|---|---|---|
| messages.upsert | Nova mensagem | Processa mensagem |
| messages.update | Status atualizado | Atualiza status (Sent→Delivered→Read) |
| messages.reaction | Reação a mensagem | Adiciona/remove reação |
| connection.update | Conexão alterada | Atualiza status da instância |
Status de Mensagem¶
Atualizado via webhook messages.update.
Tratamento de Erros¶
| Erro | Tratamento |
|---|---|
| Instância não encontrada | Log warning, ignora |
| Claude API falha | Usa ErrorMessage do agente |
| Envio WhatsApp falha | Status = Failed, log error |
| Download mídia falha | Salva placeholder |
| Handler exception | Não faz ACK, retry automático |
| Max retries exceeded | Move para dead-letter stream |
Monitoramento¶
Logs Estruturados¶
_logger.LogInformation("Message {MessageId} processed for conversation {ConversationId}",
whatsAppMessageId, conversationId);
Métricas Importantes¶
- Latência do endpoint: tempo de resposta do webhook (deve ser < 100ms)
- Tamanho do stream: mensagens pendentes em
webhook:messages - Dead letters: mensagens em
webhook:dead-letter(devem ser investigadas) - Retry rate: frequência de mensagens que precisam retry
Comandos Redis para Debug¶
# Ver mensagens pendentes
redis-cli XPENDING webhook:messages ciba-consumers
# Ver tamanho do stream
redis-cli XLEN webhook:messages
# Ver dead letters
redis-cli XLEN webhook:dead-letter
redis-cli XRANGE webhook:dead-letter - + COUNT 10
# Ver chaves de idempotência
redis-cli KEYS webhook:processed:*
Performance¶
- Endpoint responde em < 100ms (apenas publica no stream)
- Histórico de conversa limitado a 20 mensagens
- Batch de 10 mensagens por iteração do consumer
- Idempotência com TTL de 24 horas
- Mídia salva de forma assíncrona no R2
- Notificações SignalR em background
- Logs estruturados com scope para tracing