uCheckeruChecker
Блог/Верификация
13 мин чтения

Bulk email validation через API: оптимизация для больших списков

У вас 200 тысяч адресов, бюджет на рассылку и дедлайн через неделю. Загрузить всё одним запросом? API вернёт таймаут. Отправлять по одному? Займёт месяц. Между этими крайностями - инженерная задача: как организовать массовую валидацию через API так, чтобы она завершилась за часы, а не за дни.

Почему наивный подход не работает

Самый частый сценарий, который мы видим у разработчиков, впервые интегрирующих валидацию: цикл, который отправляет email по одному через single-эндпоинт. Выглядит логично, работает - пока адресов не больше тысячи. На десяти тысячах начинаются проблемы. На ста тысячах - полный коллапс.

Арифметика простая. Single-запрос создаёт задачу, которую сервер ставит в очередь. Каждый запрос - отдельное HTTP-соединение, отдельная аутентификация, отдельная запись в базу. Накладные расходы на один адрес минимальны. Умножьте на сто тысяч - получите часы только на сетевой overhead, без учёта собственно валидации. А ещё есть rate limits, которые не дадут вам отправлять запросы без пауз.

Bulk-эндпоинт решает эту проблему: один HTTP-запрос, массив адресов, одна задача на сервере. Но и здесь есть ограничения. Передать миллион адресов одним запросом - плохая идея по трём причинам: размер тела запроса, время обработки и отсутствие гранулярности при ошибках. Если запрос упадёт на полпути, вы потеряете прогресс и начнёте заново.

Правильный подход - батчинг: разбить весь список на управляемые порции, отправить каждую отдельным bulk-запросом и собрать результаты. Звучит просто. На практике есть нюансы.

Шаг 1. Подготовка списка

Перед отправкой в API стоит провести локальную предобработку. Это экономит кредиты и ускоряет весь процесс.

Дедупликация - первое, что нужно сделать. В реальных базах дубликаты составляют от 3 до 15 процентов. Это адреса, попавшие через разные формы, импорты из CRM, слияния баз после покупки бизнеса. Каждый дубликат - потраченный кредит на повторную проверку.

Синтаксическая фильтрация - второе. API сам отсеивает адреса с невалидным синтаксисом и не тарифицирует их, но отправлять заведомо битые строки означает увеличивать размер запроса и время парсинга. Быстрая regex-проверка на клиенте убирает очевидный мусор: строки без @, пробелы, кириллицу в домене (если вы не работаете с IDN).

import re

def load_and_clean(filepath: str) -> list[str]:
    """Load emails from file, deduplicate and remove obvious junk."""
    with open(filepath) as f:
        raw = [line.strip().lower() for line in f if line.strip()]

    # Deduplicate preserving order
    seen = set()
    unique = []
    for email in raw:
        if email not in seen:
            seen.add(email)
            unique.append(email)

    # Drop strings that clearly aren't emails
    pattern = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$")
    cleaned = [e for e in unique if pattern.match(e)]

    removed = len(raw) - len(cleaned)
    print(f"Loaded {len(raw)}, after cleanup: {len(cleaned)} ({removed} removed)")
    return cleaned

На базе в 200K адресов такая предобработка обычно убирает 10-20 тысяч строк. При стоимости кредита это заметная экономия.

Шаг 2. Батчинг и отправка

Оптимальный размер батча - это компромисс. Слишком маленький - много HTTP-запросов, много задач, сложнее отслеживать. Слишком большой - дольше обрабатывается, больнее терять при ошибке. По опыту наших пользователей, хорошо работают батчи от 5 до 50 тысяч адресов. Для списков до 100K достаточно 10K на батч. Для миллионных баз разумнее делать батчи по 50K.

Отправка через curl для понимания механики:

# Отправить батч из файла (первые 10000 адресов)
head -n 10000 emails_clean.txt | jq -R -s 'split("\n") | map(select(. != ""))' | \
  curl -X POST https://api.uchecker.net/api/v1/validate/bulk \
    -H "x-api-key: $UCHECKER_API_KEY" \
    -H "Content-Type: application/json" \
    -d "{\"emails\": $(cat)}"

# Ответ:
# {"success":true,"task_id":501,"status":"queued",
#  "valid_emails":9987,"invalid_emails":13,
#  "credits_used":9987,"credits_remaining":90013}

На Python с полным циклом отправки:

import requests
import os
import time

API_KEY = os.environ["UCHECKER_API_KEY"]
BASE = "https://api.uchecker.net"
BATCH_SIZE = 10_000

def chunk(lst: list, size: int):
    """Split list into chunks of given size."""
    for i in range(0, len(lst), size):
        yield lst[i : i + size]

def submit_batches(emails: list[str]) -> list[int]:
    """Submit all batches and return list of task IDs."""
    task_ids = []
    batches = list(chunk(emails, BATCH_SIZE))

    for i, batch in enumerate(batches):
        resp = requests.post(
            f"{BASE}/api/v1/validate/bulk",
            headers={"x-api-key": API_KEY},
            json={"emails": batch},
        )
        resp.raise_for_status()
        data = resp.json()
        task_id = data["task_id"]
        task_ids.append(task_id)
        print(
            f"Batch {i+1}/{len(batches)}: task_id={task_id}, "
            f"queued={data['valid_emails']}, skipped={data['invalid_emails']}"
        )
        # Small pause between submissions to avoid rate limits
        time.sleep(1)

    return task_ids

Обратите внимание на паузу в одну секунду между запросами. Это не обязательно для малых объёмов, но при отправке десятков батчей подряд предотвращает ситуацию, когда сервер начнёт отвечать 429 Too Many Requests.

Шаг 3. Polling с экспоненциальным backoff

Валидация - асинхронный процесс. После отправки батча сервер возвращает task_id и ставит задачу в очередь. Для получения результатов нужно периодически проверять статус через GET-запрос к /api/v1/tasks/{taskId}.

Наивный polling с фиксированным интервалом работает, но неэффективен. Если задача обрабатывает 50K адресов, первые несколько минут статус гарантированно будет processing. Зачем опрашивать каждые 5 секунд? Экспоненциальный backoff начинает с короткого интервала и увеличивает его с каждой итерацией, снижая нагрузку на API.

def poll_task(task_id: int, timeout: int = 600) -> dict:
    """Poll task with exponential backoff. Returns task data on completion."""
    interval = 5      # start at 5 seconds
    max_interval = 60  # cap at 60 seconds
    elapsed = 0

    while elapsed < timeout:
        resp = requests.get(
            f"{BASE}/api/v1/tasks/{task_id}",
            headers={"x-api-key": API_KEY},
        )
        data = resp.json()
        status = data["status"]
        progress = data.get("progress_percent", 0)
        print(f"  Task {task_id}: {status} ({progress}%)")

        if status == "completed":
            return data
        if status == "failed":
            raise RuntimeError(f"Task {task_id} failed")

        time.sleep(interval)
        elapsed += interval
        interval = min(interval * 1.5, max_interval)

    raise TimeoutError(f"Task {task_id} not completed within {timeout}s")

Множитель 1.5 - хороший баланс. Интервалы получаются 5, 7.5, 11, 17, 25, 38, 57, 60, 60... Для батча в 10K адресов обычно хватает 3-5 минут. Для 50K - 10-15.

Шаг 4. Параллельный polling нескольких задач

Когда батчей десять или двадцать, последовательный polling каждого - потеря времени. Задачи обрабатываются на сервере параллельно, значит и опрашивать их можно одновременно. В Python для этого удобен concurrent.futures:

from concurrent.futures import ThreadPoolExecutor, as_completed

def poll_all_tasks(task_ids: list[int], max_workers: int = 5) -> list[dict]:
    """Poll multiple tasks in parallel."""
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {pool.submit(poll_task, tid): tid for tid in task_ids}
        for future in as_completed(futures):
            tid = futures[future]
            try:
                data = future.result()
                results.append(data)
                print(f"Task {tid} completed")
            except Exception as e:
                print(f"Task {tid} error: {e}")
    return results

Параметр max_workers ограничивает количество одновременных потоков. Пять - разумное значение: достаточно для параллельности, но не перегружает API. Для двадцати батчей полный цикл polling займёт столько же, сколько обработка самого медленного батча, а не сумму всех.

Шаг 5. Сбор и агрегация результатов

Когда все задачи завершены, результаты нужно собрать в единый список. API возвращает результаты в двух форматах: JSON для программной обработки и CSV для импорта в таблицы. Для автоматизации JSON удобнее.

def fetch_results(task_id: int) -> list[dict]:
    """Fetch validation results for a completed task."""
    resp = requests.get(
        f"{BASE}/api/v1/tasks/{task_id}/results",
        headers={"x-api-key": API_KEY},
        params={"format": "json"},
    )
    resp.raise_for_status()
    return resp.json()["data"]

def aggregate_results(task_ids: list[int]) -> dict:
    """Fetch and combine results from all tasks."""
    good, bad = [], []
    for tid in task_ids:
        results = fetch_results(tid)
        for r in results:
            if r["validation_result"] == "good":
                good.append(r["email"])
            else:
                bad.append(r["email"])
    return {"good": good, "bad": bad}

# Usage
all_results = aggregate_results(task_ids)
print(f"Total valid: {len(all_results['good'])}")
print(f"Total invalid: {len(all_results['bad'])}")

На выходе - два списка. Валидные адреса идут в рассылку. Невалидные - в карантин или удаляются. Некоторые наши пользователи сохраняют невалидные с причиной отказа (mailbox_not_found, domain_error, disposable) для аналитики: это помогает понять, откуда в базу попадают плохие адреса.

Обработка ошибок и retry-логика

На больших объёмах сбои неизбежны. Сеть моргнёт, сервер вернёт 500 под нагрузкой, задача упадёт из-за проблем с конкретным почтовым сервером. Код, который не обрабатывает ошибки, на 100K адресов гарантированно сломается.

Три уровня защиты. Первый - retry на уровне HTTP-запроса. Если API вернул 429 или 5xx, повторить через несколько секунд. Второй - retry на уровне задачи. Если задача перешла в статус failed, создать новую с теми же адресами. Третий - чекпоинты: сохранять task_id каждого батча на диск, чтобы при падении скрипта не отправлять батчи повторно.

import json
from pathlib import Path
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# HTTP-level retry: handles 429, 500, 502, 503, 504
session = requests.Session()
retry_strategy = Retry(
    total=4,
    backoff_factor=2,       # 2s, 4s, 8s, 16s
    status_forcelist=[429, 500, 502, 503, 504],
)
session.mount("https://", HTTPAdapter(max_retries=retry_strategy))

CHECKPOINT_FILE = Path("checkpoint.json")

def save_checkpoint(task_ids: list[int], batch_index: int):
    """Save progress so we can resume after a crash."""
    CHECKPOINT_FILE.write_text(json.dumps({
        "task_ids": task_ids,
        "next_batch": batch_index,
    }))

def load_checkpoint() -> dict | None:
    """Load previous progress if exists."""
    if CHECKPOINT_FILE.exists():
        return json.loads(CHECKPOINT_FILE.read_text())
    return None

Чекпоинт-файл - грубый, но надёжный механизм. Если скрипт упал после отправки седьмого батча из двадцати, при перезапуске он прочитает checkpoint.json и продолжит с восьмого. Task_id уже отправленных батчей сохранены - результаты можно будет собрать, когда задачи завершатся.

Код без retry-логики - это демо-скрипт. Код с retry-логикой - это продакшн. Разница проявляется на десяти тысячах адресов.

Webhook вместо polling

Polling работает, но имеет недостатки: постоянные HTTP-запросы, необходимость держать процесс запущенным, задержка между завершением задачи и обнаружением этого факта. Альтернатива - webhook. При создании задачи передайте webhook_url, и сервер отправит POST-запрос на этот URL, когда задача завершится.

# Отправка с webhook-уведомлением
curl -X POST https://api.uchecker.net/api/v1/validate/bulk \
  -H "x-api-key: $UCHECKER_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "emails": ["alice@company.com", "bob@example.org"],
    "webhook_url": "https://your-server.com/hooks/uchecker"
  }'

Webhook-подход особенно удобен в серверных приложениях, где есть входящий HTTP-эндпоинт. Вместо длинного цикла polling вы регистрируете обработчик, который срабатывает по факту готовности. Это позволяет, например, запускать валидацию из очереди задач (Celery, Sidekiq, Bull) и обрабатывать результат в отдельном воркере.

На практике webhook и polling часто комбинируют. Webhook - основной канал уведомления, polling - запасной, на случай если webhook не дошёл (сетевой сбой, ваш сервер был временно недоступен). Запустили polling с большим интервалом в 30-60 секунд как fallback, и система работает надёжно в любых условиях.

Проверка баланса перед запуском

Частая ошибка: отправить двадцать батчей и обнаружить, что кредитов хватило только на четырнадцать. Остальные шесть вернут 403 Insufficient credits, а вы уже ждёте результатов. Решение - проверять баланс до начала и сравнивать с количеством адресов.

def check_balance() -> int:
    """Return current credit balance."""
    resp = session.get(
        f"{BASE}/api/v1/account/balance",
        headers={"x-api-key": API_KEY},
    )
    resp.raise_for_status()
    return resp.json()["credits_remaining"]

def validate_large_list(filepath: str):
    """Full pipeline: clean, check balance, submit, poll, aggregate."""
    emails = load_and_clean(filepath)

    balance = check_balance()
    if balance < len(emails):
        print(f"Insufficient credits: have {balance}, need {len(emails)}")
        print(f"Top up at https://app.uchecker.net")
        return

    print(f"Balance OK: {balance} credits for {len(emails)} emails")

    task_ids = submit_batches(emails)
    poll_all_tasks(task_ids)
    results = aggregate_results(task_ids)

    # Save results
    Path("valid_emails.txt").write_text("\n".join(results["good"]))
    Path("invalid_emails.txt").write_text("\n".join(results["bad"]))
    print(f"Done: {len(results['good'])} valid, {len(results['bad'])} invalid")

Если кредитов не хватает, скрипт сообщит об этом до начала обработки, а не на полпути.

Производительность: что ожидать

Скорость валидации зависит от нескольких факторов: размер батча, загрузка сервера, отзывчивость почтовых серверов проверяемых доменов. Адрес на gmail.com проверяется за доли секунды. Адрес на корпоративном сервере, который отвечает медленно - может занять несколько секунд.

Ориентировочные цифры для uChecker API:

Размер спискаБатчей (по 10K)Среднее время
10 00013-5 мин
50 000510-15 мин
200 0002030-50 мин
1 000 0001002-4 часа

Время указано при параллельной обработке батчей на стороне сервера. Последовательная отправка увеличит общее время пропорционально количеству батчей. Если для вас критична скорость на больших объёмах - напишите в поддержку, мы можем увеличить лимиты для вашего аккаунта.

Полный pipeline одним скриптом

Собираем всё вместе. Скрипт, который можно запустить из терминала или cron, передав путь к файлу со списком:

#!/usr/bin/env python3
"""Bulk email validation via uChecker API.

Usage: python validate_bulk.py emails.txt
"""
import sys
from pathlib import Path

def main():
    if len(sys.argv) < 2:
        print("Usage: python validate_bulk.py <emails_file>")
        sys.exit(1)

    filepath = sys.argv[1]
    if not Path(filepath).exists():
        print(f"File not found: {filepath}")
        sys.exit(1)

    validate_large_list(filepath)

if __name__ == "__main__":
    main()

Функции load_and_clean, submit_batches, poll_all_tasks и aggregate_results - из примеров выше. В реальном проекте они будут в одном модуле с настройками из переменных окружения.

Для регулярной валидации (еженедельная или ежемесячная проверка обновлённой базы) этот скрипт запускается по cron. Добавьте отправку отчёта в Slack или на email - и получите полностью автоматизированную гигиену списка, которая работает без участия человека.

Частые ошибки при работе с bulk API

За время работы с пользователями API мы собрали типичные проблемы, которые возникают при массовой валидации.

Не проверяют баланс заранее. Скрипт отправляет батчи, пока кредиты не кончатся. Часть задач в работе, часть отклонена. Результаты неполные, а разобраться, какие адреса остались непроверенными, сложнее, чем кажется.

Не сохраняют task_id. Скрипт упал, task_id потеряны. Задачи на сервере завершились, но забрать результаты невозможно - непонятно, какие ID запрашивать. Чекпоинт решает эту проблему.

Слишком агрессивный polling. Запрос статуса каждую секунду на двадцать задач одновременно - это двадцать запросов в секунду. Rate limiter начнёт отвечать 429, polling замедлится ещё сильнее. Экспоненциальный backoff решает это автоматически.

Игнорируют invalid_details. API возвращает список адресов, которые не прошли синтаксическую проверку. Эти адреса не тарифицируются и не валидируются. Если их много (больше 5% батча), стоит проверить источник данных - вероятно, в базу попадает мусор ещё на этапе сбора.

Повторно проверяют те же адреса. Если результаты не кешируются, при каждом запуске скрипт тратит кредиты на адреса, которые были проверены вчера. Валидность адреса не меняется за сутки. Храните результаты в базе данных с датой проверки и отправляйте на валидацию только новые или давно не проверенные адреса.

Хорошая интеграция с API валидации - не та, которая работает быстро. А та, которая работает предсказуемо: с любым объёмом, при любых ошибках, без потери данных.

Попробуйте bulk-валидацию в uChecker - бесплатные кредиты при регистрации, API-ключ за 30 секунд, первый батч через пять минут.

bulk email validationмассовая проверка email APIbatch email verificationоптимизация валидацииemail validation apiпроверка списка emailuchecker api