Data pipeline для email-аналитики: от webhook до дашборда
ESP отдаёт события через webhook: доставки, открытия, клики, баунсы, отписки. Данные летят тысячами в минуту, и большинство команд складывают их в PostgreSQL, а потом жалуются, что дашборд тормозит. Проблема не в дашборде. Проблема в архитектуре. В этой статье - конкретная схема pipeline, которая справляется с миллионами событий и отдаёт графики за миллисекунды.
Общая архитектура: четыре компонента
Pipeline для email-аналитики состоит из четырёх звеньев. Каждое решает одну задачу, и замена любого звена не ломает остальные.
1. Webhook-приёмник. Принимает HTTP-запросы от ESP (SendGrid, Mailgun, Postmark, Amazon SES). Валидирует подпись, нормализует формат событий и складывает в очередь.
2. Очередь сообщений. Kafka, RabbitMQ или даже Redis Streams. Буфер между приёмом и обработкой. Если ClickHouse на секунду недоступен, события не теряются.
3. ETL-слой. Читает события из очереди, обогащает (добавляет campaign_id, резолвит geo по IP, вычисляет время до открытия), пишет батчами в хранилище.
4. Хранилище + визуализация. ClickHouse для аналитических запросов, Grafana для дашбордов. Альтернативы - BigQuery + Looker Studio или TimescaleDB + Metabase. Но по соотношению скорости, стоимости и гибкости ClickHouse + Grafana выигрывает для большинства объёмов.
ESP (SendGrid / Mailgun / SES)
│
▼ HTTP POST (webhook)
┌──────────────────┐
│ Webhook Receiver │ ← валидация подписи, нормализация
└──────┬───────────┘
│
▼ publish
┌──────────────────┐
│ Kafka / Redis │ ← буфер, гарантия доставки
└──────┬───────────┘
│
▼ consume + transform
┌──────────────────┐
│ ETL Worker │ ← обогащение, батч-вставка
└──────┬───────────┘
│
▼ INSERT batch
┌──────────────────┐
│ ClickHouse │ ← колоночное хранилище
└──────┬───────────┘
│
▼ SQL
┌──────────────────┐
│ Grafana │ ← дашборды, алерты
└──────────────────┘Разберём каждый компонент. С кодом.
Webhook-приёмник: приём и валидация событий
Каждый ESP шлёт webhook в своём формате. SendGrid отправляет массив событий одним POST-запросом. Mailgun присылает form-encoded данные с HMAC-подписью. Amazon SES использует SNS с подтверждением подписки. Первая задача приёмника - привести всё к единой схеме.
Минимальная структура нормализованного события:
interface EmailEvent {
event_id: string; // идемпотентный ключ
event_type: string; // delivered | opened | clicked | bounced | complained | unsubscribed
email: string; // адрес получателя
campaign_id: string; // идентификатор рассылки
timestamp: number; // unix timestamp события
esp: string; // sendgrid | mailgun | ses
ip?: string; // IP получателя (для geo)
user_agent?: string; // для определения клиента
url?: string; // для click-событий
bounce_type?: string; // hard | soft
raw: object; // оригинальный payload для дебага
}Приёмник на Node.js с Fastify. Почему Fastify, а не Express: в бенчмарках на webhook-нагрузках Fastify обрабатывает в 2-3 раза больше запросов в секунду на одном ядре. Для endpoint, который принимает тысячи запросов в минуту, разница существенна.
import Fastify from 'fastify';
import crypto from 'crypto';
import { Kafka } from 'kafkajs';
const app = Fastify({ logger: true });
const kafka = new Kafka({ brokers: ['kafka:9092'] });
const producer = kafka.producer();
await producer.connect();
// Валидация подписи SendGrid
function verifySendGridSignature(publicKey, payload, signature, timestamp) {
const data = timestamp + JSON.stringify(payload);
const verifier = crypto.createVerify('SHA256');
verifier.update(data);
return verifier.verify(publicKey, signature, 'base64');
}
// Нормализация событий SendGrid
function normalizeSendGrid(events) {
return events.map(e => ({
event_id: e.sg_event_id,
event_type: mapEventType(e.event), // bounce → bounced, open → opened
email: e.email,
campaign_id: e.marketing_campaign_id || e.asm_group_id || 'unknown',
timestamp: e.timestamp,
esp: 'sendgrid',
ip: e.ip,
user_agent: e.useragent,
url: e.url,
bounce_type: e.type === 'bounce' ? (e.status?.startsWith('5') ? 'hard' : 'soft') : undefined,
raw: e,
}));
}
app.post('/webhooks/sendgrid', async (req, reply) => {
// 1. Проверяем подпись
const signature = req.headers['x-twilio-email-event-webhook-signature'];
const timestamp = req.headers['x-twilio-email-event-webhook-timestamp'];
if (!verifySendGridSignature(process.env.SG_WEBHOOK_KEY, req.body, signature, timestamp)) {
return reply.code(401).send({ error: 'invalid signature' });
}
// 2. Нормализуем
const events = normalizeSendGrid(req.body);
// 3. Отправляем в Kafka
await producer.send({
topic: 'email-events',
messages: events.map(e => ({
key: e.email, // партиционирование по email
value: JSON.stringify(e),
})),
});
return reply.code(200).send({ accepted: events.length });
});
app.listen({ port: 3100, host: '0.0.0.0' });Три момента, которые стоит проговорить отдельно.
Подпись обязательна. Без проверки подписи любой может отправить POST на ваш endpoint и загрязнить данные. Или хуже - триггернуть бизнес-логику фиктивными событиями. Каждый ESP предоставляет механизм подписи: SendGrid использует ECDSA, Mailgun - HMAC-SHA256, SES - SNS message signing. Не пропускайте этот шаг.
Идемпотентность через event_id. ESP может повторить доставку webhook (retry при таймауте). Без дедупликации одно открытие превратится в три. Поле event_id позволяет фильтровать дубли на этапе ETL.
Партиционирование по email. Все события одного получателя попадают в одну партицию Kafka. Это гарантирует порядок: delivered всегда раньше opened, opened раньше clicked. Без партиционирования события перемешиваются, и воронка ломается.
ETL: обогащение и батч-запись
Consumer читает события из Kafka, добавляет метаданные и вставляет в ClickHouse батчами. Батч-вставка критически важна: ClickHouse оптимизирован под большие INSERT, а одиночные вставки создают избыточное количество партов и замедляют мерджи. Минимальный батч - 1000 строк или раз в 5 секунд (что наступит раньше).
import { Kafka } from 'kafkajs';
import { createClient } from '@clickhouse/client';
import geoip from 'geoip-lite';
const kafka = new Kafka({ brokers: ['kafka:9092'] });
const consumer = kafka.consumer({ groupId: 'etl-email-events' });
const ch = createClient({ host: 'http://clickhouse:8123' });
const BATCH_SIZE = 1000;
const FLUSH_INTERVAL_MS = 5000;
let buffer = [];
let flushTimer = null;
async function flush() {
if (buffer.length === 0) return;
const batch = buffer.splice(0, buffer.length);
await ch.insert({
table: 'email_events',
values: batch,
format: 'JSONEachRow',
});
console.log(`Inserted ${batch.length} events into ClickHouse`);
}
function scheduleFlush() {
if (flushTimer) clearTimeout(flushTimer);
flushTimer = setTimeout(flush, FLUSH_INTERVAL_MS);
}
function enrichEvent(event) {
// Geo-обогащение по IP
const geo = event.ip ? geoip.lookup(event.ip) : null;
return {
...event,
country: geo?.country || 'unknown',
city: geo?.city || 'unknown',
// Время от отправки до открытия (секунды)
time_to_open: event.event_type === 'opened' && event.sent_at
? event.timestamp - event.sent_at
: null,
// День недели и час (для STO-аналитики)
day_of_week: new Date(event.timestamp * 1000).getUTCDay(),
hour_of_day: new Date(event.timestamp * 1000).getUTCHours(),
// Дата для партиционирования
event_date: new Date(event.timestamp * 1000).toISOString().split('T')[0],
};
}
await consumer.connect();
await consumer.subscribe({ topic: 'email-events', fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString());
const enriched = enrichEvent(event);
buffer.push(enriched);
if (buffer.length >= BATCH_SIZE) {
await flush();
} else {
scheduleFlush();
}
},
});Обогащение добавляет три категории данных. Гео-информация по IP - для отчётов по регионам. Временные метрики (time_to_open, day_of_week, hour_of_day) - для анализа поведения и оптимизации времени отправки. Дата события (event_date) - для партиционирования таблицы в ClickHouse.
Дедупликация по event_id происходит на уровне ClickHouse. Движок ReplacingMergeTree с полем event_id как ключом сортировки автоматически удаляет дубли при фоновых мерджах. Это надёжнее, чем фильтровать дубли в приложении: даже если ETL-воркер перезапустится и повторно обработает часть сообщений, ClickHouse разберётся сам.
ClickHouse: схема таблицы
ClickHouse - колоночная СУБД, спроектированная для аналитических запросов. Агрегация миллионов строк по нескольким колонкам - это её штатный режим. Запрос «open rate по кампаниям за последний месяц» выполняется за десятки миллисекунд, а не за секунды.
CREATE TABLE email_events (
event_id String,
event_type LowCardinality(String), -- delivered, opened, clicked, bounced, ...
email String,
campaign_id String,
timestamp UInt32,
esp LowCardinality(String),
ip Nullable(String),
user_agent Nullable(String),
url Nullable(String),
bounce_type Nullable(String),
country LowCardinality(String),
city String,
time_to_open Nullable(UInt32),
day_of_week UInt8,
hour_of_day UInt8,
event_date Date
)
ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (campaign_id, event_type, email, event_id)
TTL event_date + INTERVAL 12 MONTH
SETTINGS index_granularity = 8192;Несколько решений, которые заложены в схему.
LowCardinality для полей с небольшим количеством уникальных значений (event_type, esp, country). ClickHouse хранит их как dictionary-encoded столбцы - это сжимает данные в 10-20 раз и ускоряет фильтрацию.
ORDER BY (campaign_id, event_type, email, event_id). Порядок ключей выбран под типичные запросы: «все события кампании X», «все открытия кампании X», «все события пользователя Y в кампании X». ClickHouse хранит данные отсортированными по этим колонкам, поэтому фильтрация по ним работает за O(log n) без дополнительных индексов.
ReplacingMergeTree с event_id в ключе сортировки. Дубли удаляются при фоновом мердже. Для аналитических запросов, которые допускают незначительную неточность, этого достаточно. Если нужна точная дедупликация в момент чтения, добавьте FINAL к SELECT или оберните в подзапрос с argMax.
TTL 12 месяцев. Данные старше года удаляются автоматически. Email-аналитика за прошлый год редко нужна для оперативных решений. Если нужно хранить дольше - поднимите лимит или настройте тирование на S3.
Материализованные представления: предагрегация
Дашборд, который каждый раз сканирует таблицу с сотнями миллионов строк, рано или поздно начнёт тормозить. Материализованные представления (materialized views) в ClickHouse решают это: они агрегируют данные при вставке и хранят результат в отдельной таблице. Запрос к дашборду читает тысячи строк вместо миллионов.
-- Агрегат: метрики по кампаниям за день
CREATE MATERIALIZED VIEW campaign_daily_mv
TO campaign_daily_stats
AS SELECT
event_date,
campaign_id,
countIf(event_type = 'delivered') AS delivered,
countIf(event_type = 'opened') AS opened,
countIf(event_type = 'clicked') AS clicked,
countIf(event_type = 'bounced') AS bounced,
countIf(event_type = 'complained') AS complained,
countIf(event_type = 'unsubscribed') AS unsubscribed,
-- Метрики
round(countIf(event_type = 'opened') / countIf(event_type = 'delivered') * 100, 2)
AS open_rate,
round(countIf(event_type = 'clicked') / countIf(event_type = 'opened') * 100, 2)
AS click_rate,
round(countIf(event_type = 'bounced') / (countIf(event_type = 'delivered') + countIf(event_type = 'bounced')) * 100, 2)
AS bounce_rate
FROM email_events
GROUP BY event_date, campaign_id;
-- Целевая таблица для агрегата
CREATE TABLE campaign_daily_stats (
event_date Date,
campaign_id String,
delivered UInt64,
opened UInt64,
clicked UInt64,
bounced UInt64,
complained UInt64,
unsubscribed UInt64,
open_rate Float32,
click_rate Float32,
bounce_rate Float32
)
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, campaign_id);SummingMergeTree суммирует числовые колонки при мердже одинаковых ключей. Каждый батч вставки в email_events автоматически обновляет campaign_daily_stats. Grafana читает эту таблицу, а не основную - и отвечает мгновенно.
Для более сложных агрегатов (уникальные получатели, медиана time_to_open) используйте AggregatingMergeTree с функциями uniqState и quantileState. Это сложнее, но позволяет точно считать уников без полного скана.
Grafana: дашборд за 30 минут
Grafana подключается к ClickHouse через официальный плагин (grafana-clickhouse-datasource). После подключения - обычные SQL-запросы в каждой панели.
Минимальный набор панелей для email-дашборда:
Open rate по дням - линейный график. Показывает тренд за последние 30 дней. Падение на 5+ процентных пунктов - сигнал проблем с доставляемостью.
Bounce rate по кампаниям - таблица с сортировкой. Кампании с bounce rate выше 3% требуют внимания: скорее всего, в сегменте есть устаревшие адреса.
Тепловая карта открытий - hour_of_day vs day_of_week. Показывает, когда подписчики открывают письма. Данные для оптимизации времени отправки.
Воронка по кампании - delivered / opened / clicked. Быстрая диагностика: если delivered высокий, а opened низкий - проблема в теме письма. Если opened нормальный, а clicked нет - проблема в контенте или CTA.
-- Запрос для панели "Open rate по дням"
SELECT
event_date AS time,
round(open_rate, 1) AS "Open Rate %"
FROM campaign_daily_stats
WHERE
campaign_id = '$campaign' -- переменная Grafana
AND event_date >= today() - 30
ORDER BY event_date;
-- Запрос для тепловой карты открытий
SELECT
day_of_week,
hour_of_day,
count() AS opens
FROM email_events
WHERE
event_type = 'opened'
AND event_date >= today() - 90
GROUP BY day_of_week, hour_of_day
ORDER BY day_of_week, hour_of_day;Grafana поддерживает алерты. Настройте оповещение при bounce rate выше 5% за последние 24 часа - это порог, после которого ESP начинают ограничивать отправку. Раннее оповещение даёт время приостановить рассылку и разобраться.
Где здесь валидация email
Pipeline показывает, что произошло. Валидация определяет, что произойдёт. Если в кампанию попадают невалидные адреса, pipeline честно покажет bounce rate 15% и падение open rate. Но к этому моменту вред уже нанесён: репутация домена пострадала, ESP понизил приоритет, а вы потратили деньги на отправку в никуда.
Валидация работает до pipeline - на этапе формирования сегмента. Перед запуском кампании пропускайте список через проверку. Адреса с высоким риском (несуществующие, одноразовые, спам-ловушки) убираются заранее. Pipeline получает чистые события, метрики отражают реальную картину, а не шум от мёртвых адресов.
Автоматизировать просто: добавьте шаг валидации в CI/CD рассылки или в cron, который проверяет базу раз в неделю. API принимает список, возвращает статус каждого адреса. Адреса со статусом undeliverable исключаются из следующей кампании.
# Пример: проверка списка перед рассылкой
import requests
API_KEY = "your_api_key"
API_URL = "https://api.uchecker.net/api/v1/validate/single"
def validate_email(email: str) -> dict:
resp = requests.post(
API_URL,
headers={"x-api-key": API_KEY, "Content-Type": "application/json"},
json={"email": email},
)
return resp.json()
def clean_segment(emails: list[str]) -> list[str]:
clean = []
for email in emails:
result = validate_email(email)
if result.get("result") == "deliverable":
clean.append(email)
else:
print(f"Excluded: {email} — {result.get('result')}")
return clean
# Для больших списков используйте bulk API вместо single
# POST /api/v1/validate/bulkРезультат: bounce rate держится ниже 2%, дашборд показывает чистые метрики, а ESP не понижает приоритет домена.
Деплой: сколько стоит и где запускать
Для базы до 500 000 подписчиков и 3-5 рассылок в неделю (примерно 10-20 млн событий в месяц) хватает скромной конфигурации:
Webhook receiver + ETL worker: один VPS с 2 vCPU, 4 GB RAM. Два Node.js-процесса. $15-20/мес.
Kafka: managed Kafka (Upstash, Confluent Cloud) на бесплатном или начальном плане. Или Redis Streams на том же сервере, если не хочется усложнять.
ClickHouse: ClickHouse Cloud (от $50/мес), Altinity.Cloud или self-hosted на VPS с 4 vCPU, 16 GB RAM. Для 20 млн строк в месяц с TTL год (около 240 млн строк максимум) 16 GB достаточно.
Grafana: Grafana Cloud (бесплатный план до 10 000 метрик) или self-hosted на том же сервере с ClickHouse.
Итого: $70-100 в месяц для production-ready pipeline. Сравните со стоимостью SaaS-аналитики для email: Litmus, Validity, Everest берут $500-2000/мес за сопоставимый набор метрик. Свой pipeline дешевле и гибче - но требует инженерного времени на настройку и поддержку.
Типичные ошибки
Вставка по одной строке. ClickHouse создаёт новый парт при каждом INSERT. Тысяча одиночных вставок в секунду приведёт к тысячам партов, фоновые мерджи не успеют, сервер упадёт с ошибкой «too many parts». Батчите. Минимум 1000 строк на INSERT.
Отсутствие буфера. Webhook-приёмник пишет напрямую в ClickHouse. ESP прислал 5000 событий за секунду (нормальная ситуация при отправке 100K+ писем). ClickHouse не успел - часть событий потеряна. Очередь решает проблему: приёмник работает быстро, обработка идёт в своём темпе.
Хранение raw payload. Сырой JSON от ESP полезен для дебага, но занимает 80% места. Храните raw в отдельной таблице с коротким TTL (7-14 дней) или не храните совсем после стабилизации pipeline.
Игнорирование идемпотентности. ESP повторяет webhook при timeout. Без дедупликации open rate раздувается на 5-15%. Метрики искажены, решения на их основе ошибочны.
Собираем всё вместе
Pipeline для email-аналитики - не rocket science. Четыре компонента, каждый делает одну вещь. Webhook-приёмник принимает и нормализует события. Очередь буферизирует. ETL обогащает и батчит. ClickHouse хранит и агрегирует. Grafana визуализирует и алертит.
Но pipeline отвечает только на вопрос «что произошло». Вопрос «как сделать так, чтобы метрики были хорошими» решается на уровне до отправки. Чистая база, валидированные адреса, отсутствие спам-ловушек и одноразовых ящиков - это входные условия, при которых pipeline показывает здоровую картину. Без них дашборд превращается в визуализацию проблем.
Хороший дашборд не улучшит метрики. Он покажет правду. А правда зависит от того, какие адреса вы отправляете.
Перед запуском pipeline - проверьте базу. uChecker покажет, сколько невалидных адресов в списке, до того как они испортят статистику.
