Building an Enrichment Pipeline
Process a batch of contacts, enrich them from multiple data sources, review the results, and sync everything back to your CRM—all orchestrated by an agent.
Overview
Most CRMs are full of stale data—missing phone numbers, outdated titles, blank company fields. An enrichment pipeline fixes this at scale: feed in a list, let the agent enrich every record, then write the clean data back.
This guide shows how to build a production-grade pipeline in Python using ABM.dev’s batch enrichment API, with SSE streaming for real-time progress and automatic CRM writeback.
The Pipeline
// batch enrichment pipeline
Step 1: Upload Contacts
Start with a CSV of contacts. Each row needs at least an email address and optionally a company domain for higher match rates.
# contacts.csv
email,company_domain,full_name
jane@acme.com,acme.com,Jane Smith
bob@globex.io,globex.io,Bob Chen
sara@initech.co,initech.co,Sara Johnson
# load contacts from CSV
import csv
def load_contacts(filepath: str) -> list[dict]:
with open(filepath) as f:
return list(csv.DictReader(f))
contacts = load_contacts("contacts.csv")
print(f"Loaded {len(contacts)} contacts")
Step 2: Queue Enrichments
Submit enrichments in batches with rate limiting. The API accepts up to 10 concurrent requests—queue the rest and process them as slots open.
# queue enrichments with rate limiting
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
API_KEY = "abm_live_..."
BASE = "https://api.abm.dev/v1"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
def submit_enrichment(contact: dict) -> dict:
response = requests.post(
f"{BASE}/enrichments",
headers=HEADERS,
json={
"email": contact["email"],
"company_domain": contact.get("company_domain", ""),
},
)
response.raise_for_status()
return response.json()
# process in batches of 10
enrichment_ids = []
with ThreadPoolExecutor(max_workers=10) as pool:
futures = {pool.submit(submit_enrichment, c): c for c in contacts}
for future in as_completed(futures):
result = future.result()
enrichment_ids.append(result["id"])
print(f"Queued: {result['id']}")
print(f"Queued {len(enrichment_ids)} enrichments")
Step 3: Monitor Progress
Use server-sent events to stream enrichment progress in real time. Each event tells you when an enrichment completes or fails.
# stream progress via SSE
import sseclient
def monitor_enrichments(batch_id: str):
response = requests.get(
f"{BASE}/enrichments/stream?batch_id={batch_id}",
headers=HEADERS,
stream=True,
)
client = sseclient.SSEClient(response)
completed = 0
for event in client.events():
if event.event == "enrichment.completed":
completed += 1
data = json.loads(event.data)
print(f"[{completed}] {data['email']} enriched")
elif event.event == "enrichment.failed":
data = json.loads(event.data)
print(f"FAILED: {data['email']} — {data['error']}")
elif event.event == "batch.done":
print("Batch complete!")
break
Step 4: Review Results
Fetch each enrichment result and filter by confidence score. Only write back data you trust.
# review and filter by confidence
def review_enrichments(enrichment_ids: list, min_confidence: float = 0.8) -> list:
high_quality = []
for eid in enrichment_ids:
r = requests.get(f"{BASE}/enrichments/{eid}", headers=HEADERS)
data = r.json()
if data["confidence"] >= min_confidence:
high_quality.append(data)
print(f"PASS {data['email']} (confidence: {data['confidence']})")
else:
print(f"SKIP {data['email']} (confidence: {data['confidence']})")
print(f"{len(high_quality)}/{len(enrichment_ids)} passed quality check")
return high_quality
Step 5: Writeback to CRM
Push enriched data back to your CRM. The writeback endpoint maps enrichment fields to your CRM’s schema automatically.
# writeback enriched data to CRM
def writeback_to_crm(enrichments: list) -> dict:
results = {"success": 0, "failed": 0}
for enrichment in enrichments:
try:
r = requests.post(
f"{BASE}/enrichments/{enrichment['id']}/writeback",
headers=HEADERS,
json={"destination": "salesforce"},
)
r.raise_for_status()
results["success"] += 1
except requests.HTTPError as e:
results["failed"] += 1
print(f"Writeback failed for {enrichment['email']}: {e}")
print(f"Writeback done: {results['success']} ok, {results['failed']} failed")
return results
Production Tips
Retry Logic
Wrap API calls in exponential backoff. A 429 means you hit the rate limit—wait and retry. A 5xx means a transient failure—retry up to 3 times.
# exponential backoff retry wrapper
import time
from functools import wraps
def with_retry(max_retries=3, base_delay=1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.HTTPError as e:
if e.response.status_code == 429:
delay = base_delay * (2 ** attempt)
print(f"Rate limited, waiting {delay}s")
time.sleep(delay)
elif e.response.status_code >= 500:
time.sleep(base_delay)
else:
raise
raise Exception(f"Failed after {max_retries} retries")
return wrapper
return decorator
Error Handling
Log every failure with the enrichment ID and email for easy debugging. Store failed IDs in a dead-letter queue and reprocess them after the main batch completes.
Webhook Notifications
For large batches (1,000+ contacts), register a webhook instead of polling. You’ll get a POST to your endpoint when the batch finishes, with a summary of results.
Ready to enrich your contact database? Start with a small batch and scale up once you’ve verified the quality.