Skip to main content
← ResourcesUse Case

Building an Enrichment Pipeline

Process a batch of contacts, enrich them from multiple data sources, review the results, and sync everything back to your CRM—all orchestrated by an agent.

Overview

Most CRMs are full of stale data—missing phone numbers, outdated titles, blank company fields. An enrichment pipeline fixes this at scale: feed in a list, let the agent enrich every record, then write the clean data back.

This guide shows how to build a production-grade pipeline in Python using ABM.dev’s batch enrichment API, with SSE streaming for real-time progress and automatic CRM writeback.

The Pipeline

// batch enrichment pipeline

CSV / API InputQueue EnrichmentsStream ProgressReview ResultsWriteback to CRM

Step 1: Upload Contacts

Start with a CSV of contacts. Each row needs at least an email address and optionally a company domain for higher match rates.

# contacts.csv

email,company_domain,full_name

jane@acme.com,acme.com,Jane Smith

bob@globex.io,globex.io,Bob Chen

sara@initech.co,initech.co,Sara Johnson

# load contacts from CSV

import csv

 

def load_contacts(filepath: str) -> list[dict]:

with open(filepath) as f:

return list(csv.DictReader(f))

 

contacts = load_contacts("contacts.csv")

print(f"Loaded {len(contacts)} contacts")

Step 2: Queue Enrichments

Submit enrichments in batches with rate limiting. The API accepts up to 10 concurrent requests—queue the rest and process them as slots open.

# queue enrichments with rate limiting

import requests

import time

from concurrent.futures import ThreadPoolExecutor, as_completed

 

API_KEY = "abm_live_..."

BASE = "https://api.abm.dev/v1"

HEADERS = {"Authorization": f"Bearer {API_KEY}"}

 

def submit_enrichment(contact: dict) -> dict:

response = requests.post(

f"{BASE}/enrichments",

headers=HEADERS,

json={

"email": contact["email"],

"company_domain": contact.get("company_domain", ""),

},

)

response.raise_for_status()

return response.json()

 

# process in batches of 10

enrichment_ids = []

with ThreadPoolExecutor(max_workers=10) as pool:

futures = {pool.submit(submit_enrichment, c): c for c in contacts}

for future in as_completed(futures):

result = future.result()

enrichment_ids.append(result["id"])

print(f"Queued: {result['id']}")

 

print(f"Queued {len(enrichment_ids)} enrichments")

Step 3: Monitor Progress

Use server-sent events to stream enrichment progress in real time. Each event tells you when an enrichment completes or fails.

# stream progress via SSE

import sseclient

 

def monitor_enrichments(batch_id: str):

response = requests.get(

f"{BASE}/enrichments/stream?batch_id={batch_id}",

headers=HEADERS,

stream=True,

)

 

client = sseclient.SSEClient(response)

completed = 0

 

for event in client.events():

if event.event == "enrichment.completed":

completed += 1

data = json.loads(event.data)

print(f"[{completed}] {data['email']} enriched")

elif event.event == "enrichment.failed":

data = json.loads(event.data)

print(f"FAILED: {data['email']}{data['error']}")

elif event.event == "batch.done":

print("Batch complete!")

break

Step 4: Review Results

Fetch each enrichment result and filter by confidence score. Only write back data you trust.

# review and filter by confidence

def review_enrichments(enrichment_ids: list, min_confidence: float = 0.8) -> list:

high_quality = []

 

for eid in enrichment_ids:

r = requests.get(f"{BASE}/enrichments/{eid}", headers=HEADERS)

data = r.json()

 

if data["confidence"] >= min_confidence:

high_quality.append(data)

print(f"PASS {data['email']} (confidence: {data['confidence']})")

else:

print(f"SKIP {data['email']} (confidence: {data['confidence']})")

 

print(f"{len(high_quality)}/{len(enrichment_ids)} passed quality check")

return high_quality

Step 5: Writeback to CRM

Push enriched data back to your CRM. The writeback endpoint maps enrichment fields to your CRM’s schema automatically.

# writeback enriched data to CRM

def writeback_to_crm(enrichments: list) -> dict:

results = {"success": 0, "failed": 0}

 

for enrichment in enrichments:

try:

r = requests.post(

f"{BASE}/enrichments/{enrichment['id']}/writeback",

headers=HEADERS,

json={"destination": "salesforce"},

)

r.raise_for_status()

results["success"] += 1

except requests.HTTPError as e:

results["failed"] += 1

print(f"Writeback failed for {enrichment['email']}: {e}")

 

print(f"Writeback done: {results['success']} ok, {results['failed']} failed")

return results

Production Tips

Retry Logic

Wrap API calls in exponential backoff. A 429 means you hit the rate limit—wait and retry. A 5xx means a transient failure—retry up to 3 times.

# exponential backoff retry wrapper

import time

from functools import wraps

 

def with_retry(max_retries=3, base_delay=1.0):

def decorator(func):

@wraps(func)

def wrapper(*args, **kwargs):

for attempt in range(max_retries):

try:

return func(*args, **kwargs)

except requests.HTTPError as e:

if e.response.status_code == 429:

delay = base_delay * (2 ** attempt)

print(f"Rate limited, waiting {delay}s")

time.sleep(delay)

elif e.response.status_code >= 500:

time.sleep(base_delay)

else:

raise

raise Exception(f"Failed after {max_retries} retries")

return wrapper

return decorator

Error Handling

Log every failure with the enrichment ID and email for easy debugging. Store failed IDs in a dead-letter queue and reprocess them after the main batch completes.

Webhook Notifications

For large batches (1,000+ contacts), register a webhook instead of polling. You’ll get a POST to your endpoint when the batch finishes, with a summary of results.

Ready to enrich your contact database? Start with a small batch and scale up once you’ve verified the quality.