LinktLinkt

Execution

Understanding task execution, runs, and workflow states

When you execute a task, Linkt creates a run that tracks the workflow from start to finish. This page explains how execution works, run states, output formats, and best practices for monitoring workflows.

Task to Run Relationship

Tasks and runs have a one-to-many relationship:

  • A Task is a reusable workflow template that defines what to do and how to do it
  • A Run is a single execution instance of a task
  • Each time you execute a task, a new run is created
  • Multiple runs can exist for the same task
Loading diagram...

Executing a Task

To execute a task, call the execute endpoint:

curl -X POST "https://api.linkt.ai/v1/task/{task_id}/execute" \
  -H "x-api-key: your-api-key" \
  -H "Content-Type: application/json" \
  -d '{
    "icp_id": "507f1f77bcf86cd799439011",
    "parameters": {}
  }'

Response:

{
  "run_id": "507f1f77bcf86cd799439014",
  "flow_run_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "SCHEDULED"
}

Use the returned run_id to monitor execution progress.

Run States

Runs progress through a state machine from creation to completion (or failure):

StatusDescriptionTerminal?
SCHEDULEDRun created, queued for executionNo
PENDINGRun preparing, resources being allocatedNo
RUNNINGWorkflow actively executingNo
COMPLETEDWorkflow finished successfullyYes
FAILEDWorkflow encountered an errorYes
CANCELEDRun was manually canceledYes
CRASHEDRun crashed unexpectedlyYes
PAUSEDRun is paused (rare)No

State Flow

Loading diagram...

Most runs follow the happy path: SCHEDULED → PENDING → RUNNING → COMPLETED.

Run Structure

A complete run object contains:

{
  "id": "507f1f77bcf86cd799439014",
  "task_id": "507f1f77bcf86cd799439013",
  "icp_id": "507f1f77bcf86cd799439011",
  "task_type": "search",
  "flow_run_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "COMPLETED",
  "input": {
    "icp_id": "507f1f77bcf86cd799439011"
  },
  "output": {
    "version": "1.0",
    "credits": {
      "total_credits": 150.0,
      "breakdown": {
        "search_company": 100.0,
        "enrich_company": 50.0
      }
    },
    "resources": {
      "entities_created": ["entity_id_1", "entity_id_2"],
      "entities_updated": [],
      "signals_created": []
    },
    "run_time": 45.2
  },
  "error": null,
  "created_at": "2025-01-05T12:00:15Z",
  "updated_at": "2025-01-05T12:01:00Z"
}

Fields

FieldTypeDescription
idstringUnique run identifier
task_idstringParent task ID
icp_idstringICP used for this execution (optional)
task_typestringWorkflow type: search, signal, profile, ingest
flow_run_idstringInternal workflow engine ID
statusstringCurrent run state
inputobjectParameters passed at execution time
outputobjectStructured output (see below)
errorstringError message if failed (null otherwise)
created_atdatetimeWhen the run was created
updated_atdatetimeWhen the run was last updated

Structured Output

When a run completes, the output field contains structured results with credit usage and resource tracking:

{
  "version": "1.0",
  "credits": {
    "total_credits": 150.0,
    "breakdown": {
      "search_company": 100.0,
      "enrich_company": 50.0
    }
  },
  "resources": {
    "entities_created": ["507f1f77bcf86cd799439015", "507f1f77bcf86cd799439016"],
    "entities_updated": ["507f1f77bcf86cd799439017"],
    "signals_created": []
  },
  "run_time": 45.2
}

Credit Usage

The credits object tracks consumption:

FieldTypeDescription
total_creditsnumberTotal credits consumed by this run
breakdownobjectCredits by action type (e.g., search_company, enrich_person)

Common action types in breakdown:

  • search_company — Company discovery searches
  • search_person — Person discovery searches
  • enrich_company — Company data enrichment
  • enrich_person — Person data enrichment

Resource Tracking

The resources object tracks what was created or modified:

FieldTypeDescription
entities_createdarrayIDs of newly created entities
entities_updatedarrayIDs of entities that were updated
signals_createdarrayIDs of newly created signals

Use these IDs to retrieve the actual data from the entities or signals endpoints.

Run Time

The run_time field contains the total execution duration in seconds.

Polling for Status

To monitor a run, poll the run endpoint until it reaches a terminal state:

import time
import requests
 
def wait_for_completion(run_id, api_key, max_wait=1800, poll_interval=10):
    """
    Poll run status until completion or timeout.
 
    Args:
        run_id: The run ID to monitor
        api_key: Your API key
        max_wait: Maximum seconds to wait (default 30 minutes)
        poll_interval: Seconds between polls (default 10)
 
    Returns:
        The completed run object
 
    Raises:
        Exception if run fails, is canceled, or times out
    """
    headers = {"x-api-key": api_key}
    url = f"https://api.linkt.ai/v1/run/{run_id}"
 
    terminal_states = {"COMPLETED", "FAILED", "CANCELED", "CRASHED"}
    start = time.time()
 
    while time.time() - start < max_wait:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        run = response.json()
 
        status = run["status"]
 
        if status == "COMPLETED":
            return run
        elif status in terminal_states:
            error = run.get("error", "Unknown error")
            raise Exception(f"Run {status}: {error}")
 
        time.sleep(poll_interval)
 
    raise Exception(f"Timeout after {max_wait}s waiting for run {run_id}")

JavaScript Example

async function waitForCompletion(runId, apiKey, maxWait = 1800000) {
  // maxWait default: 30 minutes (in milliseconds)
  const terminalStates = new Set(['COMPLETED', 'FAILED', 'CANCELED', 'CRASHED']);
  const startTime = Date.now();
 
  while (Date.now() - startTime < maxWait) {
    const response = await fetch(`https://api.linkt.ai/v1/run/${runId}`, {
      headers: { 'x-api-key': apiKey }
    });
 
    const run = await response.json();
 
    if (run.status === 'COMPLETED') {
      return run;
    }
 
    if (terminalStates.has(run.status)) {
      throw new Error(`Run ${run.status}: ${run.error || 'Unknown error'}`);
    }
 
    await new Promise(resolve => setTimeout(resolve, 10000)); // Poll every 10 seconds
  }
 
  throw new Error(`Timeout waiting for run ${runId}`);
}

Canceling a Run

To cancel a running workflow:

curl -X POST "https://api.linkt.ai/v1/run/{run_id}/cancel" \
  -H "x-api-key: your-api-key"

Response: 204 No Content

Run Queue

For long-running workflows, you can monitor the processing queue to see entities being worked on:

curl -X GET "https://api.linkt.ai/v1/run/{run_id}/queue?limit=20" \
  -H "x-api-key: your-api-key"

Query Parameters:

ParameterDefaultDescription
offset0Starting position in queue
limit20Maximum entities to return (1-100)
stateFilter by state: queued, processing, completed, discarded
include_historyfalseInclude entities from all states

Entity Queue States:

StateDescription
queuedWaiting to be processed
processingCurrently being worked on
completedSuccessfully processed and saved
discardedSkipped (not qualified)

Error Handling

When a run fails, follow these steps:

1. Check Error Details

The run response includes error information:

{
  "status": "FAILED",
  "error": "Rate limit exceeded for provider X"
}

2. Common Failure Reasons

Error TypeCauseResolution
Rate limitToo many requests to external providersWait and retry
Invalid configurationICP or task misconfiguredReview settings
Sheet capacitySheet at entity limitIncrease limit or use new sheet
TimeoutWorkflow took too longReduce batch size
Provider errorExternal API unavailableRetry later

3. Retry Strategy

For transient errors, implement exponential backoff:

import time
 
def execute_with_retry(task_id, icp_id, api_key, max_retries=3):
    headers = {"x-api-key": api_key, "Content-Type": "application/json"}
    url = f"https://api.linkt.ai/v1/task/{task_id}/execute"
 
    for attempt in range(max_retries):
        # Execute task
        response = requests.post(url, headers=headers, json={"icp_id": icp_id})
        run = response.json()
 
        # Wait for completion
        try:
            return wait_for_completion(run["run_id"], api_key)
        except Exception as e:
            if "Rate limit" in str(e) and attempt < max_retries - 1:
                wait_time = (2 ** attempt) * 30  # 30s, 60s, 120s
                print(f"Retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise

Expected Durations

Different workflow types have different typical durations:

Task TypeTypical DurationNotes
Search15-20+ minutesCan take longer depending on ICP complexity and desired entity count
CSV ingest~1 minute per row~15 rows takes ~15 minutes
Signal monitoringVariesDepends on monitoring_frequency configured in task

Listing Runs

Query runs with filtering:

curl -X GET "https://api.linkt.ai/v1/run?task_id={task_id}&status=COMPLETED" \
  -H "x-api-key: your-api-key"

Query Parameters:

ParameterDescription
task_idFilter by task
icp_idFilter by ICP
statusFilter by status
task_typeFilter by type: search, signal, profile, ingest
created_afterISO 8601 datetime
created_beforeISO 8601 datetime
pagePage number (default: 1)
page_sizeResults per page (default: 20, max: 100)
sort_bySort field
order-1 (descending) or 1 (ascending)

Next Steps

  • Tasks — Configure workflow templates
  • Runs — Run data structure reference
  • Quickstart — See execution in practice
  • API Reference — Complete run endpoint documentation