LinktLinkt

Webhooks

Receive real-time HTTP notifications when workflows complete

Receive instant HTTP POST notifications when your workflows complete. Webhooks enable real-time integrations with CRMs, Slack, data pipelines, and other external systems without polling the API.

Overview

When a workflow run completes, Linkt can send an HTTP POST request to a URL you specify. The webhook payload contains details about the run including timing, credits used, and IDs of created resources.

Supported Workflows

Webhooks are available for all workflow types:

WorkflowTask TypeUse Case
SearchsearchDiscover new companies and contacts matching your ICP
IngestingestImport and enrich entities from CSV files
Signalsignal-topic, signal-csv, signal-sheetMonitor for business signals and events

Use Cases

  • CRM Integration — Automatically sync discovered entities or detected signals to your CRM
  • Slack/Teams Alerts — Notify your team when workflows complete or signals are detected
  • Data Pipelines — Trigger downstream processing when new entities are enriched
  • Custom Dashboards — Push real-time updates to monitoring systems

Configuring Webhooks

Add the webhook_url field to your task configuration. The URL must use HTTPS.

Search Task with Webhook

Receive a notification when a search workflow completes:

curl -X POST "https://api.linkt.ai/v1/task" \
  -H "x-api-key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Enterprise SaaS Discovery",
    "flow_name": "search",
    "deployment_name": "main",
    "icp_id": "507f1f77bcf86cd799439001",
    "task_config": {
      "type": "search",
      "desired_contact_count": 3,
      "user_feedback": "",
      "webhook_url": "https://your-server.com/webhooks/linkt"
    }
  }'

Ingest Task with Webhook

Receive a notification when CSV enrichment completes:

curl -X POST "https://api.linkt.ai/v1/task" \
  -H "x-api-key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Q1 Leads Enrichment",
    "flow_name": "ingest",
    "deployment_name": "main",
    "icp_id": "507f1f77bcf86cd799439001",
    "task_config": {
      "type": "ingest",
      "file_id": "507f1f77bcf86cd799439030",
      "primary_column": "company_name",
      "csv_entity_type": "company",
      "webhook_url": "https://your-server.com/webhooks/linkt"
    }
  }'

Signal Task with Webhook

Receive a notification when signal monitoring completes:

curl -X POST "https://api.linkt.ai/v1/task" \
  -H "x-api-key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "AI SaaS Signal Monitor",
    "flow_name": "signal",
    "deployment_name": "main",
    "icp_id": "507f1f77bcf86cd799439001",
    "task_config": {
      "type": "signal-topic",
      "entity_type": "company",
      "topic_criteria": "AI/ML-focused SaaS companies with Series A+ funding",
      "signal_types": [
        {
          "type": "funding",
          "display": "Funding Rounds",
          "description": "New funding announcements"
        }
      ],
      "monitoring_frequency": "daily",
      "webhook_url": "https://your-server.com/webhooks/linkt"
    }
  }'

URL Requirements

RequirementDescription
HTTPS RequiredWebhook URLs must use https://
No LocalhostCannot use localhost, 127.0.0.1, or ::1
Public EndpointURL must be publicly accessible
30s TimeoutEndpoint must respond within 30 seconds

Event Types

Each workflow type has its own set of event types:

Search Events

Event TypeDescription
run.search.completedSearch workflow completed successfully
run.search.failedSearch workflow failed with an error
run.search.crashedSearch workflow crashed unexpectedly
run.search.cancelledSearch workflow was cancelled

Ingest Events

Event TypeDescription
run.ingest.completedIngest workflow completed successfully
run.ingest.failedIngest workflow failed with an error
run.ingest.crashedIngest workflow crashed unexpectedly
run.ingest.cancelledIngest workflow was cancelled

Signal Events

Event TypeDescription
run.signal.completedSignal workflow completed successfully
run.signal.failedSignal workflow failed with an error
run.signal.crashedSignal workflow crashed unexpectedly
run.signal.cancelledSignal workflow was cancelled

Webhook Payload

When a workflow completes, Linkt sends a POST request with a JSON payload. The payload structure is consistent across all workflow types, with some flow-specific fields.

Common Payload Structure

{
  "event_type": "run.search.completed",
  "timestamp": "2025-01-07T14:30:00.000000+00:00",
  "data": {
    "run_id": "507f1f77bcf86cd799439003",
    "run_name": "search Run",
    "icp_name": "Enterprise SaaS Companies",
    "icp_id": "507f1f77bcf86cd799439001",
    "user_email": "user@example.com",
    "user_first_name": "Jane",
    "started_at": "2025-01-07T14:00:00.000000+00:00",
    "ended_at": "2025-01-07T14:30:00.000000+00:00",
    "duration_seconds": 1800.0,
    "duration_formatted": "30 minutes",
    "credits_used": 75.0,
    "error_message": null,
    "resources": {
      "entities_created": [
        "507f1f77bcf86cd799439020",
        "507f1f77bcf86cd799439021"
      ],
      "entities_updated": [],
      "signals_created": []
    }
  }
}

Common Fields

Envelope Fields

FieldTypeDescription
event_typestringEvent type (e.g., run.search.completed)
timestampstringISO 8601 timestamp when event was generated
dataobjectEvent payload data

Data Fields

FieldTypeDescription
run_idstringUnique run identifier
run_namestringRun display name
icp_namestringAssociated ICP name
icp_idstringAssociated ICP ID
user_emailstringUser who created the task
user_first_namestringUser's first name
started_atstringRun start timestamp (ISO 8601)
ended_atstringRun end timestamp (ISO 8601)
duration_secondsfloatTotal duration in seconds
duration_formattedstringHuman-readable duration (e.g., "9 minutes")
credits_usedfloatCredits consumed by the run
error_messagestringError details (for failed runs, otherwise null)
resourcesobjectCreated/updated resource IDs

Resources Field

FieldTypeDescription
entities_createdarrayIDs of newly created entities
entities_updatedarrayIDs of updated entities
signals_createdarrayIDs of newly created signals (signal workflows only)

Search Workflow Payload

Search workflows create and update entities:

{
  "event_type": "run.search.completed",
  "timestamp": "2025-01-07T14:30:00.000000+00:00",
  "data": {
    "run_id": "507f1f77bcf86cd799439003",
    "run_name": "search Run",
    "icp_name": "Enterprise SaaS Companies",
    "icp_id": "507f1f77bcf86cd799439001",
    "user_email": "user@example.com",
    "user_first_name": "Jane",
    "started_at": "2025-01-07T14:00:00.000000+00:00",
    "ended_at": "2025-01-07T14:30:00.000000+00:00",
    "duration_seconds": 1800.0,
    "duration_formatted": "30 minutes",
    "credits_used": 75.0,
    "error_message": null,
    "resources": {
      "entities_created": [
        "507f1f77bcf86cd799439020",
        "507f1f77bcf86cd799439021",
        "507f1f77bcf86cd799439022"
      ],
      "entities_updated": [],
      "signals_created": []
    }
  }
}

Use the entity IDs in resources.entities_created to fetch full entity details via the API.

Ingest Workflow Payload

Ingest workflows primarily update existing entities:

{
  "event_type": "run.ingest.completed",
  "timestamp": "2025-01-07T14:30:00.000000+00:00",
  "data": {
    "run_id": "507f1f77bcf86cd799439003",
    "run_name": "ingest Run",
    "icp_name": "Q1 Leads Import",
    "icp_id": "507f1f77bcf86cd799439001",
    "user_email": "user@example.com",
    "user_first_name": "Jane",
    "started_at": "2025-01-07T14:00:00.000000+00:00",
    "ended_at": "2025-01-07T14:15:00.000000+00:00",
    "duration_seconds": 900.0,
    "duration_formatted": "15 minutes",
    "credits_used": 50.0,
    "error_message": null,
    "resources": {
      "entities_created": [
        "507f1f77bcf86cd799439030"
      ],
      "entities_updated": [
        "507f1f77bcf86cd799439031",
        "507f1f77bcf86cd799439032"
      ],
      "signals_created": []
    }
  }
}

Signal Workflow Payload

Signal workflows include additional signal-specific fields:

{
  "event_type": "run.signal.completed",
  "timestamp": "2025-01-07T14:30:00.000000+00:00",
  "data": {
    "run_id": "507f1f77bcf86cd799439003",
    "run_name": "signal Run",
    "icp_name": "AI SaaS Companies",
    "icp_id": "507f1f77bcf86cd799439001",
    "user_email": "user@example.com",
    "user_first_name": "Jane",
    "started_at": "2025-01-07T14:00:00.000000+00:00",
    "ended_at": "2025-01-07T14:30:00.000000+00:00",
    "duration_seconds": 1800.0,
    "duration_formatted": "30 minutes",
    "credits_used": 50.0,
    "error_message": null,
    "resources": {
      "entities_created": [],
      "entities_updated": [],
      "signals_created": [
        "507f1f77bcf86cd799439010",
        "507f1f77bcf86cd799439011",
        "507f1f77bcf86cd799439012"
      ]
    },
    "total_signals": 12,
    "signal_breakdown": {
      "funding": 5,
      "product_launch": 4,
      "leadership_change": 3
    }
  }
}

Signal-Specific Fields

FieldTypeDescription
total_signalsintegerTotal signals detected
signal_breakdownobjectSignal counts by type

Building Webhook Endpoints

Your webhook endpoint should accept POST requests and respond quickly.

Requirements

  1. Accept POST requests with Content-Type: application/json
  2. Respond with 2xx status within 30 seconds
  3. Handle retries idempotently (same webhook may be delivered multiple times)

Python Example (Flask)

from flask import Flask, request, jsonify
 
app = Flask(__name__)
 
@app.route('/webhooks/linkt', methods=['POST'])
def handle_webhook():
    """Handle Linkt webhook notifications."""
    payload = request.get_json()
    event_type = payload.get('event_type')
    data = payload.get('data', {})
 
    # Log the event
    print(f"Received {event_type}")
    print(f"  Run: {data.get('run_id')}")
    print(f"  Duration: {data.get('duration_formatted')}")
 
    # Route by workflow type
    if event_type.startswith('run.search.'):
        handle_search_event(event_type, data)
    elif event_type.startswith('run.ingest.'):
        handle_ingest_event(event_type, data)
    elif event_type.startswith('run.signal.'):
        handle_signal_event(event_type, data)
 
    # Respond quickly - do heavy processing async
    return jsonify({'status': 'received'}), 200
 
def handle_search_event(event_type, data):
    """Process search workflow events."""
    if event_type == 'run.search.completed':
        entity_ids = data.get('resources', {}).get('entities_created', [])
        print(f"Search discovered {len(entity_ids)} entities")
        # Sync to CRM, update dashboard, etc.
 
def handle_ingest_event(event_type, data):
    """Process ingest workflow events."""
    if event_type == 'run.ingest.completed':
        created = len(data.get('resources', {}).get('entities_created', []))
        updated = len(data.get('resources', {}).get('entities_updated', []))
        print(f"Ingest: {created} created, {updated} updated")
 
def handle_signal_event(event_type, data):
    """Process signal workflow events."""
    if event_type == 'run.signal.completed':
        total = data.get('total_signals', 0)
        breakdown = data.get('signal_breakdown', {})
        print(f"Detected {total} signals: {breakdown}")
        # Send Slack notification, update CRM, etc.
 
if __name__ == '__main__':
    app.run(port=5000)

Node.js Example (Express)

const express = require('express');
const app = express();
 
app.use(express.json());
 
app.post('/webhooks/linkt', (req, res) => {
  const { event_type, data, timestamp } = req.body;
 
  console.log(`Received ${event_type} at ${timestamp}`);
  console.log(`  Run: ${data.run_id}`);
  console.log(`  Duration: ${data.duration_formatted}`);
 
  // Route by workflow type
  if (event_type.startsWith('run.search.')) {
    handleSearchEvent(event_type, data);
  } else if (event_type.startsWith('run.ingest.')) {
    handleIngestEvent(event_type, data);
  } else if (event_type.startsWith('run.signal.')) {
    handleSignalEvent(event_type, data);
  }
 
  // Respond immediately
  res.status(200).json({ status: 'received' });
});
 
function handleSearchEvent(eventType, data) {
  if (eventType === 'run.search.completed') {
    const entityIds = data.resources?.entities_created || [];
    console.log(`Search discovered ${entityIds.length} entities`);
  }
}
 
function handleIngestEvent(eventType, data) {
  if (eventType === 'run.ingest.completed') {
    const created = (data.resources?.entities_created || []).length;
    const updated = (data.resources?.entities_updated || []).length;
    console.log(`Ingest: ${created} created, ${updated} updated`);
  }
}
 
function handleSignalEvent(eventType, data) {
  if (eventType === 'run.signal.completed') {
    console.log(`Detected ${data.total_signals} signals`);
    console.log(`  Breakdown: ${JSON.stringify(data.signal_breakdown)}`);
  }
}
 
app.listen(5000);

Retry Behavior

If your webhook endpoint is unavailable or returns an error, Linkt automatically retries delivery.

Retry Policy

AttemptDelayCumulative Time
1 (initial)0s
22s2s
34s6s
48s14s
516s30s
632s62s

Retriable Errors

Error TypeRetried?Description
5xx Server ErrorsYesServer errors (500, 502, 503, etc.)
TimeoutYesNo response within 30 seconds
Connection ErrorYesFailed to connect to endpoint
4xx Client ErrorsNoBad request, unauthorized, not found

Handling Duplicate Deliveries

Due to retries, your endpoint may receive the same webhook multiple times. Implement idempotent handling:

# Track processed webhooks by run_id + event_type
processed_events = set()
 
def handle_webhook(payload):
    event_key = f"{payload['data']['run_id']}:{payload['event_type']}"
 
    if event_key in processed_events:
        print(f"Skipping duplicate: {event_key}")
        return
 
    processed_events.add(event_key)
    # Process the webhook...

Best Practices

Respond Quickly

Return a 2xx response immediately and process webhooks asynchronously:

from celery import Celery
 
celery = Celery('tasks')
 
@app.route('/webhooks/linkt', methods=['POST'])
def webhook():
    payload = request.get_json()
 
    # Queue for async processing
    process_webhook.delay(payload)
 
    # Respond immediately
    return '', 200
 
@celery.task
def process_webhook(payload):
    # Heavy processing here
    pass

Log Everything

Log webhook payloads for debugging:

import logging
import json
 
logger = logging.getLogger(__name__)
 
@app.route('/webhooks/linkt', methods=['POST'])
def webhook():
    payload = request.get_json()
 
    logger.info(f"Webhook received: {json.dumps(payload)}")
 
    # Process...

Validate Payloads

Verify required fields exist before processing:

def validate_payload(payload):
    required = ['event_type', 'timestamp', 'data']
    for field in required:
        if field not in payload:
            raise ValueError(f"Missing required field: {field}")
 
    data = payload['data']
    if 'run_id' not in data:
        raise ValueError("Missing run_id in data")

Troubleshooting

Webhook Not Received

  1. Check URL is HTTPS — HTTP URLs are rejected
  2. Verify endpoint is public — Linkt cannot reach private networks
  3. Check firewall rules — Allow incoming POST requests
  4. Review server logs — Look for incoming requests

Receiving Duplicate Webhooks

Duplicates occur during retries. Implement idempotent handling using run_id as a deduplication key.

Timeouts

If your endpoint takes longer than 30 seconds:

  1. Respond immediately with 200
  2. Process webhook data asynchronously
  3. Use a task queue (Celery, RQ, Bull, etc.)

Next Steps