LinktLinkt

Bulk Enrichment

Enrich existing entities at scale

Learn how to enrich large datasets efficiently, including matching strategies, handling partial matches, and considerations for processing high-volume data.

Prerequisites

Before starting, ensure you have:

Understanding Bulk Enrichment

Bulk enrichment adds data to entities that already exist in your sheets or are being imported from CSV files. Unlike discovery searches, enrichment focuses on gathering additional information about known entities.

Enrichment vs Discovery

AspectDiscoveryEnrichment
Entity sourceSearch criteriaCSV file or existing sheet
Primary goalFind new entitiesAdd data to known entities
Entity matchingN/ABy name, domain, or identifier
Typical useLead generationData enhancement

Enrichment via ICP Configuration

Define enrichment fields in your ICP's entity target description:

curl -X POST "https://api.linkt.ai/v1/icp" \
  -H "x-api-key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Company Enrichment",
    "description": "Enrich imported companies with additional data",
    "mode": "discovery",
    "entity_targets": [
      {
        "entity_type": "company",
        "root": true,
        "description": "## Enrichment Fields\n- primary_product: Main product or service offered\n- tech_stack: Key technologies and frameworks used\n- funding_status: Latest funding round and amount\n- employee_growth: Year-over-year headcount change\n- key_customers: Notable customer names\n- recent_news: Latest company announcements"
      }
    ]
  }'

Matching Strategies

When importing CSV data, Linkt matches rows to real-world entities using various strategies.

Match by Company Name

The primary column contains company names:

{
  "task_config": {
    "type": "ingest",
    "file_id": "507f1f77bcf86cd799439001",
    "primary_column": "company_name",
    "csv_entity_type": "company"
  }
}

CSV Example:

company_name,industry,location
Acme Corporation,Software,San Francisco
TechStartup Inc,SaaS,New York

Match by Domain

Include website domains for more accurate matching:

company_name,website,industry
Acme Corporation,acme.com,Software
TechStartup Inc,techstartup.io,SaaS

Linkt uses the domain to verify entity identity and improve match accuracy.

Match by LinkedIn URL

For person entities, LinkedIn URLs provide the most reliable matching:

full_name,linkedin_url,company
Sarah Chen,https://linkedin.com/in/sarachen,TechCorp
James Wilson,https://linkedin.com/in/jameswilson,DataFlow

Matching Best Practices

StrategyAccuracyBest For
Name + DomainHighestCompany enrichment
LinkedIn URLHighestPerson enrichment
Name onlyGoodWhen domain unavailable
Name + LocationGoodCommon company names

Handling Partial Matches

Not all CSV rows will match entities perfectly. Handle partial matches appropriately.

Match States

StateDescriptionAction
completedEntity matched and enrichedData saved to sheet
discardedNo match foundSkipped, logged in queue
processingCurrently being matchedIn progress

Monitoring Match Status

Check the run queue for match results:

curl -X GET "https://api.linkt.ai/v1/run/{run_id}/queue" \
  -H "x-api-key: your-api-key"

Response:

{
  "items": [
    {
      "row_data": {"company_name": "Acme Corp", "industry": "Software"},
      "state": "completed",
      "entity_id": "507f1f77bcf86cd799439015"
    },
    {
      "row_data": {"company_name": "Unknown Inc", "industry": "Tech"},
      "state": "discarded",
      "reason": "Entity not found"
    }
  ],
  "total": 150,
  "completed": 142,
  "discarded": 8
}

Reviewing Discarded Rows

Filter to see only discarded items:

curl -X GET "https://api.linkt.ai/v1/run/{run_id}/queue?state=discarded" \
  -H "x-api-key: your-api-key"

Improving Match Rates

  1. Clean data before import — Fix typos, standardize names
  2. Include domains — Add website column for companies
  3. Use LinkedIn URLs — Most reliable for people
  4. Remove duplicates — Dedupe CSV before upload
  5. Verify entity existence — Some companies may have been acquired or renamed

Enrichment Data Handling

Understand how enrichment data is stored and merged.

Data Storage Pattern

Each enriched field follows the EntityAttribute structure:

{
  "field_name": {
    "value": "The enriched value",
    "references": ["https://source-url.com"],
    "created_at": "2025-01-06T10:00:00Z",
    "updated_at": "2025-01-06T10:00:00Z"
  }
}

Field Update Behavior

ScenarioBehavior
New fieldCreated with value
Existing field, new valueUpdated with new value
Existing field, null resultOriginal value preserved
Re-enrichmentUpdates timestamps

Preserving Original Data

Original CSV data is preserved alongside enriched data:

{
  "name": {
    "value": "Acme Corporation",
    "references": ["csv_import"]
  },
  "website": {
    "value": "https://acme.com",
    "references": ["csv_import"]
  },
  "primary_product": {
    "value": "Enterprise CRM platform",
    "references": ["https://acme.com/products"]
  }
}

Large Dataset Considerations

Handle high-volume enrichment efficiently.

Batch Size Guidelines

Dataset SizeApproachNotes
1-100 rowsSingle importDirect processing
100-500 rowsSingle importMonitor queue progress
500-1000 rowsConsider splittingBetter error handling
1000+ rowsMultiple batchesRecommended

Splitting Large Files

import pandas as pd
 
def split_csv(file_path, batch_size=500):
    """Split large CSV into smaller batches."""
    df = pd.read_csv(file_path)
    num_batches = (len(df) + batch_size - 1) // batch_size
 
    for i in range(num_batches):
        start = i * batch_size
        end = start + batch_size
        batch_df = df[start:end]
        batch_df.to_csv(f"batch_{i+1}.csv", index=False)
        print(f"Created batch_{i+1}.csv with {len(batch_df)} rows")
 
split_csv("large_dataset.csv", batch_size=500)

Parallel Processing

Process multiple batches concurrently:

import requests
import time
from concurrent.futures import ThreadPoolExecutor
 
API_KEY = "your-api-key"
BASE_URL = "https://api.linkt.ai/v1"
HEADERS = {"x-api-key": API_KEY, "Content-Type": "application/json"}
 
def process_batch(file_path, icp_id, sheet_id):
    """Process a single batch file."""
 
    # Upload file
    with open(file_path, "rb") as f:
        upload = requests.post(
            f"{BASE_URL}/files/upload",
            headers={"x-api-key": API_KEY},
            files={"file": f}
        ).json()
 
    # Create and execute task
    task = requests.post(
        f"{BASE_URL}/task",
        headers=HEADERS,
        json={
            "name": f"Enrich {file_path}",
            "flow_name": "ingest",
            "deployment_name": "ingest/v1",
            "sheet_id": sheet_id,
            "task_config": {
                "type": "ingest",
                "file_id": upload["file_id"],
                "primary_column": "company_name",
                "csv_entity_type": "company"
            }
        }
    ).json()
 
    run = requests.post(
        f"{BASE_URL}/task/{task['id']}/execute",
        headers=HEADERS,
        json={"icp_id": icp_id}
    ).json()
 
    # Wait for completion
    while True:
        status = requests.get(
            f"{BASE_URL}/run/{run['run_id']}",
            headers={"x-api-key": API_KEY}
        ).json()
 
        if status["status"] in ["COMPLETED", "FAILED", "CANCELED", "CRASHED"]:
            return {
                "file": file_path,
                "status": status["status"],
                "run_id": run["run_id"]
            }
 
        time.sleep(10)  # Poll every 10 seconds
 
# Process batches in parallel (limit concurrency)
batch_files = ["batch_1.csv", "batch_2.csv", "batch_3.csv"]
 
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [
        executor.submit(process_batch, f, icp_id, sheet_id)
        for f in batch_files
    ]
 
    for future in futures:
        result = future.result()
        print(f"{result['file']}: {result['status']}")

Rate Limiting

Be mindful of API rate limits:

import time
 
def process_with_rate_limit(items, delay_seconds=1):
    """Process items with rate limiting."""
    for i, item in enumerate(items):
        process_item(item)
 
        # Add delay between requests
        if i < len(items) - 1:
            time.sleep(delay_seconds)

Progress Monitoring

Track progress across large imports:

def monitor_enrichment_progress(run_ids):
    """Monitor multiple runs and report progress."""
    while True:
        all_complete = True
        total_completed = 0
        total_items = 0
 
        for run_id in run_ids:
            status = requests.get(
                f"{BASE_URL}/run/{run_id}",
                headers={"x-api-key": API_KEY}
            ).json()
 
            if status["status"] not in ["COMPLETED", "FAILED", "CANCELED", "CRASHED"]:
                all_complete = False
 
            # Get queue progress
            queue = requests.get(
                f"{BASE_URL}/run/{run_id}/queue",
                headers={"x-api-key": API_KEY}
            ).json()
 
            total_completed += queue.get("completed", 0)
            total_items += queue.get("total", 0)
 
        progress = (total_completed / total_items * 100) if total_items > 0 else 0
        print(f"Progress: {total_completed}/{total_items} ({progress:.1f}%)")
 
        if all_complete:
            break
 
        time.sleep(10)

Complete Example

Full bulk enrichment workflow:

import requests
import time
import pandas as pd
 
API_KEY = "your-api-key"
BASE_URL = "https://api.linkt.ai/v1"
HEADERS = {"x-api-key": API_KEY, "Content-Type": "application/json"}
 
def bulk_enrich(csv_path, enrichment_fields, batch_size=500):
    """
    Bulk enrich companies from a CSV file.
 
    Args:
        csv_path: Path to CSV file with company data
        enrichment_fields: List of fields to enrich
        batch_size: Rows per batch (default 500)
    """
 
    # Step 1: Create enrichment ICP
    print("Creating enrichment ICP...")
    fields_description = "\n".join([
        f"- {field['name']}: {field['description']}"
        for field in enrichment_fields
    ])
 
    icp = requests.post(
        f"{BASE_URL}/icp",
        headers=HEADERS,
        json={
            "name": "Bulk Enrichment",
            "description": "Enrich imported company data",
            "mode": "discovery",
            "entity_targets": [{
                "entity_type": "company",
                "root": True,
                "description": f"## Enrichment Fields\n{fields_description}"
            }]
        }
    ).json()
    icp_id = icp["id"]
 
    # Step 2: Create sheet
    print("Creating sheet...")
    sheet = requests.post(
        f"{BASE_URL}/sheet",
        headers=HEADERS,
        json={
            "name": "Enriched Companies",
            "icp_id": icp_id,
            "entity_type": "company"
        }
    ).json()
    sheet_id = sheet["id"]
 
    # Step 3: Split and process CSV
    df = pd.read_csv(csv_path)
    total_rows = len(df)
    num_batches = (total_rows + batch_size - 1) // batch_size
 
    print(f"Processing {total_rows} rows in {num_batches} batches...")
 
    run_ids = []
    for i in range(num_batches):
        start = i * batch_size
        end = min(start + batch_size, total_rows)
        batch_df = df[start:end]
 
        # Save batch to temp file
        batch_path = f"/tmp/batch_{i+1}.csv"
        batch_df.to_csv(batch_path, index=False)
 
        print(f"Processing batch {i+1}/{num_batches} ({len(batch_df)} rows)...")
 
        # Upload batch
        with open(batch_path, "rb") as f:
            upload = requests.post(
                f"{BASE_URL}/files/upload",
                headers={"x-api-key": API_KEY},
                files={"file": f}
            ).json()
 
        # Create task
        task = requests.post(
            f"{BASE_URL}/task",
            headers=HEADERS,
            json={
                "name": f"Enrich Batch {i+1}",
                "flow_name": "ingest",
                "deployment_name": "ingest/v1",
                "sheet_id": sheet_id,
                "task_config": {
                    "type": "ingest",
                    "file_id": upload["file_id"],
                    "primary_column": "company_name",
                    "csv_entity_type": "company"
                }
            }
        ).json()
 
        # Execute
        run = requests.post(
            f"{BASE_URL}/task/{task['id']}/execute",
            headers=HEADERS,
            json={"icp_id": icp_id}
        ).json()
 
        run_ids.append(run["run_id"])
 
        # Small delay between batches
        time.sleep(2)
 
    # Step 4: Wait for all batches
    print("\nWaiting for completion...")
    while True:
        all_complete = True
        completed_count = 0
 
        for run_id in run_ids:
            status = requests.get(
                f"{BASE_URL}/run/{run_id}",
                headers={"x-api-key": API_KEY}
            ).json()
 
            if status["status"] == "COMPLETED":
                completed_count += 1
            elif status["status"] in ["FAILED", "CANCELED", "CRASHED"]:
                completed_count += 1
                print(f"  Run {run_id} failed: {status.get('error')}")
            else:
                all_complete = False
 
        print(f"  Batches complete: {completed_count}/{len(run_ids)}")
 
        if all_complete:
            break
 
        time.sleep(10)
 
    # Step 5: Get results
    print("\nRetrieving enriched entities...")
    entities = requests.get(
        f"{BASE_URL}/sheet/{sheet_id}/entities",
        headers={"x-api-key": API_KEY},
        params={"page_size": 100}
    ).json()
 
    print(f"\nEnrichment complete!")
    print(f"  Total entities: {entities['total']}")
    print(f"  ICP ID: {icp_id}")
    print(f"  Sheet ID: {sheet_id}")
 
    return {
        "icp_id": icp_id,
        "sheet_id": sheet_id,
        "total_entities": entities["total"],
        "entities": entities["entities"]
    }
 
# Run bulk enrichment
result = bulk_enrich(
    csv_path="companies.csv",
    enrichment_fields=[
        {"name": "primary_product", "description": "Main product or service offered"},
        {"name": "tech_stack", "description": "Key technologies used"},
        {"name": "funding_status", "description": "Latest funding round and amount"},
        {"name": "employee_count", "description": "Current number of employees"},
        {"name": "key_customers", "description": "Notable customer names"}
    ],
    batch_size=500
)
 
# Print sample results
print("\nSample enriched data:")
for entity in result["entities"][:3]:
    name = entity["data"]["name"]["value"]
    product = entity["data"].get("primary_product", {}).get("value", "N/A")
    tech = entity["data"].get("tech_stack", {}).get("value", "N/A")
    print(f"\n  {name}")
    print(f"    Product: {product}")
    print(f"    Tech: {tech}")

Best Practices

Data Preparation

  1. Clean your CSV — Remove duplicates, fix obvious errors
  2. Include identifiers — Add domain or LinkedIn URL when available
  3. Standardize formats — Consistent company name formatting
  4. Validate encoding — Ensure UTF-8 encoding

Enrichment Efficiency

  1. Focus on high-value fields — Don't request unnecessary data
  2. Batch appropriately — 500-row batches work well
  3. Monitor progress — Track queue completion rates
  4. Handle failures — Implement retry logic for transient errors

Quality Assurance

  1. Spot check results — Review a sample of enriched entities
  2. Track match rates — Monitor discarded rows
  3. Validate data quality — Check enrichment accuracy
  4. Export and backup — Save enriched data externally

Next Steps