uCheckeruChecker
Blog/Verification
13 min read

Bulk email validation via API: optimizing for large lists

You have 200,000 addresses, a campaign budget, and a deadline next week. Upload everything in one request? The API will time out. Send them one at a time? That takes a month. Between those two extremes sits an engineering problem: how to run bulk validation through an API and finish in hours, not days.

Why the naive approach breaks down

The most common pattern we see from developers integrating validation for the first time: a loop that sends emails one by one through a single-address endpoint. It looks logical, and it works fine up to about a thousand addresses. At ten thousand things get slow. At a hundred thousand, it collapses.

The math is straightforward. Each single-address request opens an HTTP connection, authenticates, and writes a row to the database. The overhead per address is tiny. Multiply by 100,000 and you get hours of network overhead alone, before any actual validation happens. Rate limits will also kick in, forcing pauses between requests.

The bulk endpoint solves the per-request overhead: one HTTP call, an array of addresses, one task on the server. But even then, sending a million addresses in a single request is a bad idea for three reasons: request body size, processing time, and lack of granularity when something goes wrong. If the request fails halfway through, you lose all progress and start over.

The right approach is batching: split the full list into manageable chunks, submit each as a separate bulk request, then collect results. Simple in theory. There are details worth getting right.

Step 1. Prepare the list

Local preprocessing before hitting the API saves credits and speeds up the whole run.

Deduplication comes first. In real-world databases, duplicates typically account for 3 to 15 percent of records. They arrive through multiple signup forms, CRM imports, and database merges after acquisitions. Each duplicate is a wasted credit on a repeated check.

Syntax filtering comes second. The API rejects addresses with invalid syntax and does not charge for them, but sending obviously broken strings still inflates request size and parse time. A quick client-side regex pass removes the obvious junk: strings without @, whitespace, Cyrillic characters in the domain (unless you are working with IDN addresses).

import re

def load_and_clean(filepath: str) -> list[str]:
    """Load emails from file, deduplicate and remove obvious junk."""
    with open(filepath) as f:
        raw = [line.strip().lower() for line in f if line.strip()]

    # Deduplicate preserving order
    seen = set()
    unique = []
    for email in raw:
        if email not in seen:
            seen.add(email)
            unique.append(email)

    # Drop strings that clearly aren't emails
    pattern = re.compile(r"^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$")
    cleaned = [e for e in unique if pattern.match(e)]

    removed = len(raw) - len(cleaned)
    print(f"Loaded {len(raw)}, after cleanup: {len(cleaned)} ({removed} removed)")
    return cleaned

On a 200K list, this preprocessing typically strips 10,000 to 20,000 rows. At any reasonable credit price, that is a noticeable saving.

Step 2. Batching and submission

Batch size is a trade-off. Too small and you have many HTTP requests, many tasks, harder tracking. Too large and processing takes longer, and a failure is more costly. Based on what our users see in practice, batches of 5,000 to 50,000 addresses work well. For lists up to 100K, 10K per batch is a reasonable default. For million-record databases, 50K batches make more sense.

Submitting via curl to understand the mechanics:

# Submit a batch from file (first 10,000 addresses)
head -n 10000 emails_clean.txt | jq -R -s 'split("\n") | map(select(. != ""))' | \
  curl -X POST https://api.uchecker.net/api/v1/validate/bulk \
    -H "x-api-key: $UCHECKER_API_KEY" \
    -H "Content-Type: application/json" \
    -d "{\"emails\": $(cat)}"

# Response:
# {"success":true,"task_id":501,"status":"queued",
#  "valid_emails":9987,"invalid_emails":13,
#  "credits_used":9987,"credits_remaining":90013}

The same loop in Python, with a full submission cycle:

import requests
import os
import time

API_KEY = os.environ["UCHECKER_API_KEY"]
BASE = "https://api.uchecker.net"
BATCH_SIZE = 10_000

def chunk(lst: list, size: int):
    """Split list into chunks of given size."""
    for i in range(0, len(lst), size):
        yield lst[i : i + size]

def submit_batches(emails: list[str]) -> list[int]:
    """Submit all batches and return list of task IDs."""
    task_ids = []
    batches = list(chunk(emails, BATCH_SIZE))

    for i, batch in enumerate(batches):
        resp = requests.post(
            f"{BASE}/api/v1/validate/bulk",
            headers={"x-api-key": API_KEY},
            json={"emails": batch},
        )
        resp.raise_for_status()
        data = resp.json()
        task_id = data["task_id"]
        task_ids.append(task_id)
        print(
            f"Batch {i+1}/{len(batches)}: task_id={task_id}, "
            f"queued={data['valid_emails']}, skipped={data['invalid_emails']}"
        )
        # Small pause between submissions to avoid rate limits
        time.sleep(1)

    return task_ids

Note the one-second pause between requests. For small volumes it is optional, but when you are firing off dozens of batches in a row it prevents the server from returning 429 Too Many Requests.

Step 3. Polling with exponential backoff

Validation is asynchronous. After submitting a batch, the server returns a task_id and queues the work. To get results, you poll the status via GET at /api/v1/tasks/{taskId}.

Fixed-interval polling works but wastes requests. A 50K-address task will be in processing for several minutes regardless, so checking every five seconds for the first two minutes accomplishes nothing. Exponential backoff starts with a short interval and lengthens it with each iteration, cutting API load significantly.

def poll_task(task_id: int, timeout: int = 600) -> dict:
    """Poll task with exponential backoff. Returns task data on completion."""
    interval = 5      # start at 5 seconds
    max_interval = 60  # cap at 60 seconds
    elapsed = 0

    while elapsed < timeout:
        resp = requests.get(
            f"{BASE}/api/v1/tasks/{task_id}",
            headers={"x-api-key": API_KEY},
        )
        data = resp.json()
        status = data["status"]
        progress = data.get("progress_percent", 0)
        print(f"  Task {task_id}: {status} ({progress}%)")

        if status == "completed":
            return data
        if status == "failed":
            raise RuntimeError(f"Task {task_id} failed")

        time.sleep(interval)
        elapsed += interval
        interval = min(interval * 1.5, max_interval)

    raise TimeoutError(f"Task {task_id} not completed within {timeout}s")

A multiplier of 1.5 is a good balance. The sequence goes 5, 7.5, 11, 17, 25, 38, 57, 60, 60... A 10K-address batch typically finishes in 3 to 5 minutes. A 50K batch takes 10 to 15.

Step 4. Polling multiple tasks in parallel

When you have ten or twenty batches, polling them sequentially throws away time. Tasks run in parallel on the server, so you can poll them in parallel too. Python's concurrent.futures makes this easy:

from concurrent.futures import ThreadPoolExecutor, as_completed

def poll_all_tasks(task_ids: list[int], max_workers: int = 5) -> list[dict]:
    """Poll multiple tasks in parallel."""
    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = {pool.submit(poll_task, tid): tid for tid in task_ids}
        for future in as_completed(futures):
            tid = futures[future]
            try:
                data = future.result()
                results.append(data)
                print(f"Task {tid} completed")
            except Exception as e:
                print(f"Task {tid} error: {e}")
    return results

Five workers is a reasonable ceiling: enough parallelism, not enough to hammer the API. With twenty batches, the total polling time equals the slowest batch, not the sum of all of them.

Step 5. Collecting and aggregating results

Once all tasks finish, you need to consolidate them into a single list. The API returns results in two formats: JSON for programmatic use and CSV for spreadsheet imports. For automation, JSON is easier to work with.

def fetch_results(task_id: int) -> list[dict]:
    """Fetch validation results for a completed task."""
    resp = requests.get(
        f"{BASE}/api/v1/tasks/{task_id}/results",
        headers={"x-api-key": API_KEY},
        params={"format": "json"},
    )
    resp.raise_for_status()
    return resp.json()["data"]

def aggregate_results(task_ids: list[int]) -> dict:
    """Fetch and combine results from all tasks."""
    good, bad = [], []
    for tid in task_ids:
        results = fetch_results(tid)
        for r in results:
            if r["validation_result"] == "good":
                good.append(r["email"])
            else:
                bad.append(r["email"])
    return {"good": good, "bad": bad}

# Usage
all_results = aggregate_results(task_ids)
print(f"Total valid: {len(all_results['good'])}")
print(f"Total invalid: {len(all_results['bad'])}")

You end up with two lists. Valid addresses go to your campaign. Invalid ones go to quarantine or get deleted. Some users store the invalid addresses with their rejection reason (mailbox_not_found, domain_error, disposable) for later analysis. It is a quick way to trace which acquisition sources are feeding garbage into your database.

Error handling and retry logic

At scale, failures are not edge cases. Networks hiccup, servers return 500 under load, tasks fail because a specific mail server is unresponsive. Code that does not handle errors will break on a 100K run. It is only a question of when.

Three layers of protection cover most scenarios. First: HTTP-level retry. If the API returns 429 or a 5xx, wait a few seconds and try again. Second: task-level retry. If a task lands in failed status, create a new one with the same addresses. Third: checkpoints. Write each task_id to disk as you submit, so a script crash does not force you to re-submit batches that are already queued.

import json
from pathlib import Path
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# HTTP-level retry: handles 429, 500, 502, 503, 504
session = requests.Session()
retry_strategy = Retry(
    total=4,
    backoff_factor=2,       # 2s, 4s, 8s, 16s
    status_forcelist=[429, 500, 502, 503, 504],
)
session.mount("https://", HTTPAdapter(max_retries=retry_strategy))

CHECKPOINT_FILE = Path("checkpoint.json")

def save_checkpoint(task_ids: list[int], batch_index: int):
    """Save progress so we can resume after a crash."""
    CHECKPOINT_FILE.write_text(json.dumps({
        "task_ids": task_ids,
        "next_batch": batch_index,
    }))

def load_checkpoint() -> dict | None:
    """Load previous progress if exists."""
    if CHECKPOINT_FILE.exists():
        return json.loads(CHECKPOINT_FILE.read_text())
    return None

The checkpoint file is blunt but reliable. If the script dies after submitting batch seven of twenty, the next run reads checkpoint.json and picks up from batch eight. The task_ids from already-submitted batches are saved, so results can be collected once those tasks finish.

Code without retry logic is a demo script. Code with retry logic is production. The difference shows up at ten thousand addresses.

Webhooks instead of polling

Polling works, but it has real costs: constant HTTP traffic, a process that must stay running, and a gap between task completion and your script noticing. Webhooks remove all three problems. Pass a webhook_url when submitting a batch and the server will POST to that URL when the task finishes.

# Submit with webhook notification
curl -X POST https://api.uchecker.net/api/v1/validate/bulk \
  -H "x-api-key: $UCHECKER_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "emails": ["alice@company.com", "bob@example.org"],
    "webhook_url": "https://your-server.com/hooks/uchecker"
  }'

Webhooks fit naturally into server-side applications that already expose HTTP endpoints. Instead of a polling loop, you register a handler that fires on completion. This pairs well with task queues like Celery, Sidekiq, or Bull: submit the validation job from one worker, handle the result in another.

In practice, webhooks and polling often run together. Webhook is the primary notification path; polling with a 30-60 second interval runs as a fallback in case the webhook does not arrive (network glitch, your server temporarily down). That combination handles most failure modes without extra complexity.

Check your balance before starting

A common mistake: submitting twenty batches and discovering that credits ran out after fourteen. The remaining six return 403 Insufficient credits while you are waiting for results. Check the balance first and compare it against the address count.

def check_balance() -> int:
    """Return current credit balance."""
    resp = session.get(
        f"{BASE}/api/v1/account/balance",
        headers={"x-api-key": API_KEY},
    )
    resp.raise_for_status()
    return resp.json()["credits_remaining"]

def validate_large_list(filepath: str):
    """Full pipeline: clean, check balance, submit, poll, aggregate."""
    emails = load_and_clean(filepath)

    balance = check_balance()
    if balance < len(emails):
        print(f"Insufficient credits: have {balance}, need {len(emails)}")
        print(f"Top up at https://app.uchecker.net")
        return

    print(f"Balance OK: {balance} credits for {len(emails)} emails")

    task_ids = submit_batches(emails)
    poll_all_tasks(task_ids)
    results = aggregate_results(task_ids)

    # Save results
    Path("valid_emails.txt").write_text("\n".join(results["good"]))
    Path("invalid_emails.txt").write_text("\n".join(results["bad"]))
    print(f"Done: {len(results['good'])} valid, {len(results['bad'])} invalid")

If there are not enough credits, the script reports that before touching the API, not halfway through a run.

Performance: what to expect

Throughput depends on batch size, server load, and how responsive the target mail servers are. A gmail.com address resolves in a fraction of a second. An address on a slow corporate server can take several seconds. That variance is why time estimates are ranges, not fixed numbers.

Approximate figures for the uChecker API:

List sizeBatches (at 10K each)Typical time
10,00013–5 min
50,000510–15 min
200,0002030–50 min
1,000,0001002–4 hours

These times assume batches process in parallel on the server side. Submitting batches sequentially (one at a time, waiting for each to finish) multiplies total time by the number of batches. If speed matters at high volumes, contact support — per-account limits can be increased.

Full pipeline in one script

Putting it all together. A script you can run from the terminal or cron, passing the path to your address file:

#!/usr/bin/env python3
"""Bulk email validation via uChecker API.

Usage: python validate_bulk.py emails.txt
"""
import sys
from pathlib import Path

def main():
    if len(sys.argv) < 2:
        print("Usage: python validate_bulk.py <emails_file>")
        sys.exit(1)

    filepath = sys.argv[1]
    if not Path(filepath).exists():
        print(f"File not found: {filepath}")
        sys.exit(1)

    validate_large_list(filepath)

if __name__ == "__main__":
    main()

The functions load_and_clean, submit_batches, poll_all_tasks, and aggregate_results are from the examples above. In a real project they live in one module, with settings pulled from environment variables.

For recurring validation — weekly or monthly hygiene runs on a growing database — this script runs on cron. Add a report to Slack or email and you have fully automated list hygiene that runs without anyone watching it.

Common mistakes with bulk API

These are the failure patterns we see most often from API users running bulk validation.

Not checking balance first. The script submits batches until credits run out. Some tasks complete, others get rejected. Results are partial and figuring out which addresses were never checked is harder than it sounds.

Not saving task_ids. The script crashes, task_ids are gone. The tasks on the server complete, but you cannot retrieve the results because you do not know which IDs to query. A checkpoint file fixes this.

Too-aggressive polling. Checking status every second across twenty tasks is twenty requests per second. The rate limiter responds with 429, which slows everything down further. Exponential backoff handles this automatically.

Ignoring invalid_details. The API returns the list of addresses that failed syntax checks — those are not charged and not validated. If that count is over 5% of a batch, the source data has a problem worth investigating. Bad addresses are entering the database at collection time.

Re-validating the same addresses repeatedly. Without result caching, every run spends credits on addresses checked yesterday. Email validity does not change overnight. Store results in a database with a checked_at timestamp and only send new or stale addresses to the API.

A good API integration is not the one that runs fast. It is the one that runs predictably: any volume, any error condition, without losing data.

Try bulk validation in uChecker — free credits on signup, an API key in 30 seconds, and your first batch running in under five minutes.

bulk email validationbatch email verificationemail validation apilarge list validationemail api pythonretry logicwebhook email verificationuchecker api