uCheckeruChecker
15 мин чтения

Data pipeline для email-аналитики: от webhook до дашборда

ESP отдаёт события через webhook: доставки, открытия, клики, баунсы, отписки. Данные летят тысячами в минуту, и большинство команд складывают их в PostgreSQL, а потом жалуются, что дашборд тормозит. Проблема не в дашборде. Проблема в архитектуре. В этой статье - конкретная схема pipeline, которая справляется с миллионами событий и отдаёт графики за миллисекунды.


Общая архитектура: четыре компонента

Pipeline для email-аналитики состоит из четырёх звеньев. Каждое решает одну задачу, и замена любого звена не ломает остальные.

1. Webhook-приёмник. Принимает HTTP-запросы от ESP (SendGrid, Mailgun, Postmark, Amazon SES). Валидирует подпись, нормализует формат событий и складывает в очередь.

2. Очередь сообщений. Kafka, RabbitMQ или даже Redis Streams. Буфер между приёмом и обработкой. Если ClickHouse на секунду недоступен, события не теряются.

3. ETL-слой. Читает события из очереди, обогащает (добавляет campaign_id, резолвит geo по IP, вычисляет время до открытия), пишет батчами в хранилище.

4. Хранилище + визуализация. ClickHouse для аналитических запросов, Grafana для дашбордов. Альтернативы - BigQuery + Looker Studio или TimescaleDB + Metabase. Но по соотношению скорости, стоимости и гибкости ClickHouse + Grafana выигрывает для большинства объёмов.

ESP (SendGrid / Mailgun / SES)
  │
  ▼  HTTP POST (webhook)
┌──────────────────┐
│ Webhook Receiver │  ← валидация подписи, нормализация
└──────┬───────────┘
       │
       ▼  publish
┌──────────────────┐
│  Kafka / Redis   │  ← буфер, гарантия доставки
└──────┬───────────┘
       │
       ▼  consume + transform
┌──────────────────┐
│   ETL Worker     │  ← обогащение, батч-вставка
└──────┬───────────┘
       │
       ▼  INSERT batch
┌──────────────────┐
│   ClickHouse     │  ← колоночное хранилище
└──────┬───────────┘
       │
       ▼  SQL
┌──────────────────┐
│    Grafana       │  ← дашборды, алерты
└──────────────────┘

Разберём каждый компонент. С кодом.

Webhook-приёмник: приём и валидация событий

Каждый ESP шлёт webhook в своём формате. SendGrid отправляет массив событий одним POST-запросом. Mailgun присылает form-encoded данные с HMAC-подписью. Amazon SES использует SNS с подтверждением подписки. Первая задача приёмника - привести всё к единой схеме.

Минимальная структура нормализованного события:

interface EmailEvent {
  event_id: string;        // идемпотентный ключ
  event_type: string;      // delivered | opened | clicked | bounced | complained | unsubscribed
  email: string;           // адрес получателя
  campaign_id: string;     // идентификатор рассылки
  timestamp: number;       // unix timestamp события
  esp: string;             // sendgrid | mailgun | ses
  ip?: string;             // IP получателя (для geo)
  user_agent?: string;     // для определения клиента
  url?: string;            // для click-событий
  bounce_type?: string;    // hard | soft
  raw: object;             // оригинальный payload для дебага
}

Приёмник на Node.js с Fastify. Почему Fastify, а не Express: в бенчмарках на webhook-нагрузках Fastify обрабатывает в 2-3 раза больше запросов в секунду на одном ядре. Для endpoint, который принимает тысячи запросов в минуту, разница существенна.

import Fastify from 'fastify';
import crypto from 'crypto';
import { Kafka } from 'kafkajs';

const app = Fastify({ logger: true });
const kafka = new Kafka({ brokers: ['kafka:9092'] });
const producer = kafka.producer();
await producer.connect();

// Валидация подписи SendGrid
function verifySendGridSignature(publicKey, payload, signature, timestamp) {
  const data = timestamp + JSON.stringify(payload);
  const verifier = crypto.createVerify('SHA256');
  verifier.update(data);
  return verifier.verify(publicKey, signature, 'base64');
}

// Нормализация событий SendGrid
function normalizeSendGrid(events) {
  return events.map(e => ({
    event_id: e.sg_event_id,
    event_type: mapEventType(e.event),  // bounce → bounced, open → opened
    email: e.email,
    campaign_id: e.marketing_campaign_id || e.asm_group_id || 'unknown',
    timestamp: e.timestamp,
    esp: 'sendgrid',
    ip: e.ip,
    user_agent: e.useragent,
    url: e.url,
    bounce_type: e.type === 'bounce' ? (e.status?.startsWith('5') ? 'hard' : 'soft') : undefined,
    raw: e,
  }));
}

app.post('/webhooks/sendgrid', async (req, reply) => {
  // 1. Проверяем подпись
  const signature = req.headers['x-twilio-email-event-webhook-signature'];
  const timestamp = req.headers['x-twilio-email-event-webhook-timestamp'];
  if (!verifySendGridSignature(process.env.SG_WEBHOOK_KEY, req.body, signature, timestamp)) {
    return reply.code(401).send({ error: 'invalid signature' });
  }

  // 2. Нормализуем
  const events = normalizeSendGrid(req.body);

  // 3. Отправляем в Kafka
  await producer.send({
    topic: 'email-events',
    messages: events.map(e => ({
      key: e.email,            // партиционирование по email
      value: JSON.stringify(e),
    })),
  });

  return reply.code(200).send({ accepted: events.length });
});

app.listen({ port: 3100, host: '0.0.0.0' });

Три момента, которые стоит проговорить отдельно.

Подпись обязательна. Без проверки подписи любой может отправить POST на ваш endpoint и загрязнить данные. Или хуже - триггернуть бизнес-логику фиктивными событиями. Каждый ESP предоставляет механизм подписи: SendGrid использует ECDSA, Mailgun - HMAC-SHA256, SES - SNS message signing. Не пропускайте этот шаг.

Идемпотентность через event_id. ESP может повторить доставку webhook (retry при таймауте). Без дедупликации одно открытие превратится в три. Поле event_id позволяет фильтровать дубли на этапе ETL.

Партиционирование по email. Все события одного получателя попадают в одну партицию Kafka. Это гарантирует порядок: delivered всегда раньше opened, opened раньше clicked. Без партиционирования события перемешиваются, и воронка ломается.

ETL: обогащение и батч-запись

Consumer читает события из Kafka, добавляет метаданные и вставляет в ClickHouse батчами. Батч-вставка критически важна: ClickHouse оптимизирован под большие INSERT, а одиночные вставки создают избыточное количество партов и замедляют мерджи. Минимальный батч - 1000 строк или раз в 5 секунд (что наступит раньше).

import { Kafka } from 'kafkajs';
import { createClient } from '@clickhouse/client';
import geoip from 'geoip-lite';

const kafka = new Kafka({ brokers: ['kafka:9092'] });
const consumer = kafka.consumer({ groupId: 'etl-email-events' });
const ch = createClient({ host: 'http://clickhouse:8123' });

const BATCH_SIZE = 1000;
const FLUSH_INTERVAL_MS = 5000;
let buffer = [];
let flushTimer = null;

async function flush() {
  if (buffer.length === 0) return;
  const batch = buffer.splice(0, buffer.length);

  await ch.insert({
    table: 'email_events',
    values: batch,
    format: 'JSONEachRow',
  });

  console.log(`Inserted ${batch.length} events into ClickHouse`);
}

function scheduleFlush() {
  if (flushTimer) clearTimeout(flushTimer);
  flushTimer = setTimeout(flush, FLUSH_INTERVAL_MS);
}

function enrichEvent(event) {
  // Geo-обогащение по IP
  const geo = event.ip ? geoip.lookup(event.ip) : null;

  return {
    ...event,
    country: geo?.country || 'unknown',
    city: geo?.city || 'unknown',
    // Время от отправки до открытия (секунды)
    time_to_open: event.event_type === 'opened' && event.sent_at
      ? event.timestamp - event.sent_at
      : null,
    // День недели и час (для STO-аналитики)
    day_of_week: new Date(event.timestamp * 1000).getUTCDay(),
    hour_of_day: new Date(event.timestamp * 1000).getUTCHours(),
    // Дата для партиционирования
    event_date: new Date(event.timestamp * 1000).toISOString().split('T')[0],
  };
}

await consumer.connect();
await consumer.subscribe({ topic: 'email-events', fromBeginning: false });

await consumer.run({
  eachMessage: async ({ message }) => {
    const event = JSON.parse(message.value.toString());
    const enriched = enrichEvent(event);
    buffer.push(enriched);

    if (buffer.length >= BATCH_SIZE) {
      await flush();
    } else {
      scheduleFlush();
    }
  },
});

Обогащение добавляет три категории данных. Гео-информация по IP - для отчётов по регионам. Временные метрики (time_to_open, day_of_week, hour_of_day) - для анализа поведения и оптимизации времени отправки. Дата события (event_date) - для партиционирования таблицы в ClickHouse.

Дедупликация по event_id происходит на уровне ClickHouse. Движок ReplacingMergeTree с полем event_id как ключом сортировки автоматически удаляет дубли при фоновых мерджах. Это надёжнее, чем фильтровать дубли в приложении: даже если ETL-воркер перезапустится и повторно обработает часть сообщений, ClickHouse разберётся сам.

ClickHouse: схема таблицы

ClickHouse - колоночная СУБД, спроектированная для аналитических запросов. Агрегация миллионов строк по нескольким колонкам - это её штатный режим. Запрос «open rate по кампаниям за последний месяц» выполняется за десятки миллисекунд, а не за секунды.

CREATE TABLE email_events (
    event_id       String,
    event_type     LowCardinality(String),  -- delivered, opened, clicked, bounced, ...
    email          String,
    campaign_id    String,
    timestamp      UInt32,
    esp            LowCardinality(String),
    ip             Nullable(String),
    user_agent     Nullable(String),
    url            Nullable(String),
    bounce_type    Nullable(String),
    country        LowCardinality(String),
    city           String,
    time_to_open   Nullable(UInt32),
    day_of_week    UInt8,
    hour_of_day    UInt8,
    event_date     Date
)
ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (campaign_id, event_type, email, event_id)
TTL event_date + INTERVAL 12 MONTH
SETTINGS index_granularity = 8192;

Несколько решений, которые заложены в схему.

LowCardinality для полей с небольшим количеством уникальных значений (event_type, esp, country). ClickHouse хранит их как dictionary-encoded столбцы - это сжимает данные в 10-20 раз и ускоряет фильтрацию.

ORDER BY (campaign_id, event_type, email, event_id). Порядок ключей выбран под типичные запросы: «все события кампании X», «все открытия кампании X», «все события пользователя Y в кампании X». ClickHouse хранит данные отсортированными по этим колонкам, поэтому фильтрация по ним работает за O(log n) без дополнительных индексов.

ReplacingMergeTree с event_id в ключе сортировки. Дубли удаляются при фоновом мердже. Для аналитических запросов, которые допускают незначительную неточность, этого достаточно. Если нужна точная дедупликация в момент чтения, добавьте FINAL к SELECT или оберните в подзапрос с argMax.

TTL 12 месяцев. Данные старше года удаляются автоматически. Email-аналитика за прошлый год редко нужна для оперативных решений. Если нужно хранить дольше - поднимите лимит или настройте тирование на S3.

Материализованные представления: предагрегация

Дашборд, который каждый раз сканирует таблицу с сотнями миллионов строк, рано или поздно начнёт тормозить. Материализованные представления (materialized views) в ClickHouse решают это: они агрегируют данные при вставке и хранят результат в отдельной таблице. Запрос к дашборду читает тысячи строк вместо миллионов.

-- Агрегат: метрики по кампаниям за день
CREATE MATERIALIZED VIEW campaign_daily_mv
TO campaign_daily_stats
AS SELECT
    event_date,
    campaign_id,
    countIf(event_type = 'delivered')    AS delivered,
    countIf(event_type = 'opened')       AS opened,
    countIf(event_type = 'clicked')      AS clicked,
    countIf(event_type = 'bounced')      AS bounced,
    countIf(event_type = 'complained')   AS complained,
    countIf(event_type = 'unsubscribed') AS unsubscribed,
    -- Метрики
    round(countIf(event_type = 'opened') / countIf(event_type = 'delivered') * 100, 2)
        AS open_rate,
    round(countIf(event_type = 'clicked') / countIf(event_type = 'opened') * 100, 2)
        AS click_rate,
    round(countIf(event_type = 'bounced') / (countIf(event_type = 'delivered') + countIf(event_type = 'bounced')) * 100, 2)
        AS bounce_rate
FROM email_events
GROUP BY event_date, campaign_id;

-- Целевая таблица для агрегата
CREATE TABLE campaign_daily_stats (
    event_date   Date,
    campaign_id  String,
    delivered    UInt64,
    opened       UInt64,
    clicked      UInt64,
    bounced      UInt64,
    complained   UInt64,
    unsubscribed UInt64,
    open_rate    Float32,
    click_rate   Float32,
    bounce_rate  Float32
)
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, campaign_id);

SummingMergeTree суммирует числовые колонки при мердже одинаковых ключей. Каждый батч вставки в email_events автоматически обновляет campaign_daily_stats. Grafana читает эту таблицу, а не основную - и отвечает мгновенно.

Для более сложных агрегатов (уникальные получатели, медиана time_to_open) используйте AggregatingMergeTree с функциями uniqState и quantileState. Это сложнее, но позволяет точно считать уников без полного скана.

Grafana: дашборд за 30 минут

Grafana подключается к ClickHouse через официальный плагин (grafana-clickhouse-datasource). После подключения - обычные SQL-запросы в каждой панели.

Минимальный набор панелей для email-дашборда:

Open rate по дням - линейный график. Показывает тренд за последние 30 дней. Падение на 5+ процентных пунктов - сигнал проблем с доставляемостью.

Bounce rate по кампаниям - таблица с сортировкой. Кампании с bounce rate выше 3% требуют внимания: скорее всего, в сегменте есть устаревшие адреса.

Тепловая карта открытий - hour_of_day vs day_of_week. Показывает, когда подписчики открывают письма. Данные для оптимизации времени отправки.

Воронка по кампании - delivered / opened / clicked. Быстрая диагностика: если delivered высокий, а opened низкий - проблема в теме письма. Если opened нормальный, а clicked нет - проблема в контенте или CTA.

-- Запрос для панели "Open rate по дням"
SELECT
    event_date AS time,
    round(open_rate, 1) AS "Open Rate %"
FROM campaign_daily_stats
WHERE
    campaign_id = '$campaign'        -- переменная Grafana
    AND event_date >= today() - 30
ORDER BY event_date;

-- Запрос для тепловой карты открытий
SELECT
    day_of_week,
    hour_of_day,
    count() AS opens
FROM email_events
WHERE
    event_type = 'opened'
    AND event_date >= today() - 90
GROUP BY day_of_week, hour_of_day
ORDER BY day_of_week, hour_of_day;

Grafana поддерживает алерты. Настройте оповещение при bounce rate выше 5% за последние 24 часа - это порог, после которого ESP начинают ограничивать отправку. Раннее оповещение даёт время приостановить рассылку и разобраться.

Где здесь валидация email

Pipeline показывает, что произошло. Валидация определяет, что произойдёт. Если в кампанию попадают невалидные адреса, pipeline честно покажет bounce rate 15% и падение open rate. Но к этому моменту вред уже нанесён: репутация домена пострадала, ESP понизил приоритет, а вы потратили деньги на отправку в никуда.

Валидация работает до pipeline - на этапе формирования сегмента. Перед запуском кампании пропускайте список через проверку. Адреса с высоким риском (несуществующие, одноразовые, спам-ловушки) убираются заранее. Pipeline получает чистые события, метрики отражают реальную картину, а не шум от мёртвых адресов.

Автоматизировать просто: добавьте шаг валидации в CI/CD рассылки или в cron, который проверяет базу раз в неделю. API принимает список, возвращает статус каждого адреса. Адреса со статусом undeliverable исключаются из следующей кампании.

# Пример: проверка списка перед рассылкой
import requests

API_KEY = "your_api_key"
API_URL = "https://api.uchecker.net/api/v1/validate/single"

def validate_email(email: str) -> dict:
    resp = requests.post(
        API_URL,
        headers={"x-api-key": API_KEY, "Content-Type": "application/json"},
        json={"email": email},
    )
    return resp.json()

def clean_segment(emails: list[str]) -> list[str]:
    clean = []
    for email in emails:
        result = validate_email(email)
        if result.get("result") == "deliverable":
            clean.append(email)
        else:
            print(f"Excluded: {email} — {result.get('result')}")
    return clean

# Для больших списков используйте bulk API вместо single
# POST /api/v1/validate/bulk

Результат: bounce rate держится ниже 2%, дашборд показывает чистые метрики, а ESP не понижает приоритет домена.

Деплой: сколько стоит и где запускать

Для базы до 500 000 подписчиков и 3-5 рассылок в неделю (примерно 10-20 млн событий в месяц) хватает скромной конфигурации:

Webhook receiver + ETL worker: один VPS с 2 vCPU, 4 GB RAM. Два Node.js-процесса. $15-20/мес.

Kafka: managed Kafka (Upstash, Confluent Cloud) на бесплатном или начальном плане. Или Redis Streams на том же сервере, если не хочется усложнять.

ClickHouse: ClickHouse Cloud (от $50/мес), Altinity.Cloud или self-hosted на VPS с 4 vCPU, 16 GB RAM. Для 20 млн строк в месяц с TTL год (около 240 млн строк максимум) 16 GB достаточно.

Grafana: Grafana Cloud (бесплатный план до 10 000 метрик) или self-hosted на том же сервере с ClickHouse.

Итого: $70-100 в месяц для production-ready pipeline. Сравните со стоимостью SaaS-аналитики для email: Litmus, Validity, Everest берут $500-2000/мес за сопоставимый набор метрик. Свой pipeline дешевле и гибче - но требует инженерного времени на настройку и поддержку.

Типичные ошибки

Вставка по одной строке. ClickHouse создаёт новый парт при каждом INSERT. Тысяча одиночных вставок в секунду приведёт к тысячам партов, фоновые мерджи не успеют, сервер упадёт с ошибкой «too many parts». Батчите. Минимум 1000 строк на INSERT.

Отсутствие буфера. Webhook-приёмник пишет напрямую в ClickHouse. ESP прислал 5000 событий за секунду (нормальная ситуация при отправке 100K+ писем). ClickHouse не успел - часть событий потеряна. Очередь решает проблему: приёмник работает быстро, обработка идёт в своём темпе.

Хранение raw payload. Сырой JSON от ESP полезен для дебага, но занимает 80% места. Храните raw в отдельной таблице с коротким TTL (7-14 дней) или не храните совсем после стабилизации pipeline.

Игнорирование идемпотентности. ESP повторяет webhook при timeout. Без дедупликации open rate раздувается на 5-15%. Метрики искажены, решения на их основе ошибочны.

Собираем всё вместе

Pipeline для email-аналитики - не rocket science. Четыре компонента, каждый делает одну вещь. Webhook-приёмник принимает и нормализует события. Очередь буферизирует. ETL обогащает и батчит. ClickHouse хранит и агрегирует. Grafana визуализирует и алертит.

Но pipeline отвечает только на вопрос «что произошло». Вопрос «как сделать так, чтобы метрики были хорошими» решается на уровне до отправки. Чистая база, валидированные адреса, отсутствие спам-ловушек и одноразовых ящиков - это входные условия, при которых pipeline показывает здоровую картину. Без них дашборд превращается в визуализацию проблем.

Хороший дашборд не улучшит метрики. Он покажет правду. А правда зависит от того, какие адреса вы отправляете.

Перед запуском pipeline - проверьте базу. uChecker покажет, сколько невалидных адресов в списке, до того как они испортят статистику.

email analytics pipelineаналитика email рассылокdata pipeline emailClickHouse emailwebhook ESPemail events ETLдашборд email метрикGrafana emailвалидация email