Workflow Orchestrator

The WorkflowOrchestrator is the central component of the extrai library. It orchestrates the entire pipeline, from processing raw text documents to generating structured, database-ready SQLModel objects.

This component is responsible for:

  • Generating prompts for LLMs based on your data schema.

  • Interacting with one or more LLMs to perform data extraction.

  • Running a consensus algorithm to merge multiple LLM outputs.

  • Hydrating the final JSON data into structured SQLModel objects.

  • Persisting these objects to a database session.

Core Workflow

The typical workflow involves these steps:

  1. Initialization: You instantiate the WorkflowOrchestrator with your root data model (a SQLModel class) and at least one LLM client.

  2. Execution: You call either synthesize() or synthesize_and_save() with your input documents.

  3. Processing: The orchestrator sends the data to the LLM(s), gets back structured JSON, and runs the consensus process.

  4. Output: The final, clean data is returned as a list of hydrated SQLModel objects, ready for use in your application or already saved to the database.

Initialization and Configuration

The constructor of the WorkflowOrchestrator is key to configuring its behavior. The WorkflowOrchestrator acts as a facade, coordinating various internal components (like ExtractionPipeline and BatchPipeline) to simplify the API.

from extrai.core import WorkflowOrchestrator

import logging
from extrai.core import WorkflowOrchestrator
from your_models import DepartmentModel  # Your root SQLModel
from your_llm_client import llm_client  # An instance of a BaseLLMClient

# Initialize with default logger
orchestrator = WorkflowOrchestrator(
    root_sqlmodel_class=DepartmentModel,
    llm_client=llm_client,
    num_llm_revisions=3,
    consensus_threshold=0.51,
    # ... other parameters
)

# Or with a custom logger
logger = logging.getLogger("MyCustomLogger")
orchestrator_with_logger = WorkflowOrchestrator(
    root_sqlmodel_class=DepartmentModel,
    llm_client=llm_client,
    logger=logger
)

Here are the parameters you can use:

root_sqlmodel_class

The main SQLModel class that serves as the entry point for data extraction. The orchestrator automatically discovers all related SQLModel classes through its relationships.

  • Type: Type[SQLModel]

  • Example:

from tests.core.helpers.orchestrator_test_models import DepartmentModel

# DepartmentModel has a relationship to EmployeeModel,
# so both will be part of the schema.
orchestrator = WorkflowOrchestrator(
    root_sqlmodel_class=DepartmentModel,
    llm_client=my_llm_client
)
llm_client

An instance or a list of instances of an LLM client that conforms to the BaseLLMClient interface. Providing a list of clients enhances reliability; the orchestrator will rotate through them for each revision.

  • Type: Union[BaseLLMClient, List[BaseLLMClient]]

  • Example:

# Single client
orchestrator = WorkflowOrchestrator(..., llm_client=client1)

# Multiple clients for resilience
orchestrator = WorkflowOrchestrator(..., llm_client=[client1, client2])
counting_llm_client

An optional LLM client instance specifically for the entity counting phase. This allows you to use a cheaper or faster model for the initial count, while reserving the main llm_client for the detailed extraction.

  • Type: Optional[BaseLLMClient]

  • Default: None (uses llm_client)

num_llm_revisions

The total number of times the LLM will be asked to generate a JSON output for the given input. A higher number increases the chances of a reliable consensus but also increases costs and latency.

  • Type: int

  • Default: 3

  • Example:

# Request 5 different JSON outputs for the consensus process
orchestrator = WorkflowOrchestrator(..., num_llm_revisions=5)
max_validation_retries_per_revision

The maximum number of times the orchestrator will try to re-validate a single LLM revision if it fails schema validation. This is useful for correcting minor LLM errors automatically.

  • Type: int

  • Default: 2

consensus_threshold

The percentage of agreement required for a data point to be included in the final consensus output. For example, a threshold of 0.51 means at least 51% of the LLM revisions must agree on a value.

  • Type: float

  • Default: 0.51

  • Example:

# Require a strict 75% agreement
orchestrator = WorkflowOrchestrator(..., consensus_threshold=0.75)
conflict_resolver

An optional function to resolve disagreements when the consensus threshold is not met for a specific field. If not provided, a default resolver is used, which typically omits the conflicting field.

  • Type: Callable

  • Default: default_conflict_resolver

analytics_collector

An optional instance for collecting detailed analytics. If not provided, a new WorkflowAnalyticsCollector instance is created automatically.

  • Type: Optional[WorkflowAnalyticsCollector]

  • Default: None

  • See also: For more details, see the Analytics Collector documentation.

use_hierarchical_extraction

If True, enables a more advanced extraction mode designed for deeply nested and complex data models. This mode breaks down the extraction into smaller, manageable parts, which can improve accuracy for complex schemas but may increase the number of LLM calls.

logger

An optional logging.Logger instance. If not provided, a default logger is created.

  • Type: Optional[logging.Logger]

  • Default: None

Core Execution Methods

Once the orchestrator is configured, you can start processing documents using one of the two main methods: synthesize() and synthesize_and_save().

synthesize()

This method performs the full extraction and consensus pipeline and returns the hydrated SQLModel objects without persisting them to the database. This is useful if you need to perform additional validation or processing before saving.

hydrated_objects = await orchestrator.synthesize(
    input_strings=["Text document 1...", "Text document 2..."],
    db_session_for_hydration=db_session,  # Optional: for relationship resolution
    count_entities=True,                  # Optional: enable entity counting
    custom_counting_context="..."         # Optional: context for counting
)

Parameters:

  • input_strings (List[str]): A list of strings, where each string is a document to be processed.

  • db_session_for_hydration (Optional[Session]): An optional SQLAlchemy session. If provided, the hydrator will use it to resolve relationships. If not, a temporary in-memory session is created.

  • count_entities (bool, default False): If True, performs an initial pass to count entities before extraction.

  • custom_counting_context (str, optional): Custom instructions or context specifically for the entity counting phase.

  • extraction_example_json (str, optional): A JSON string that provides a few-shot example to the LLM, guiding it to produce a better-structured output. If not provided, the orchestrator will attempt to auto-generate one.

  • extraction_example_object (Optional[Union[SQLModel, List[SQLModel]]], optional): An existing SQLModel object or a list of them to be used as the few-shot example. This is an alternative to providing the example as a raw JSON string.

  • custom_extraction_process (str, optional): Custom, step-by-step instructions for the LLM on how to perform the extraction.

  • custom_extraction_guidelines (str, optional): A list of rules or guidelines for the LLM to follow.

  • custom_final_checklist (str, optional): A final checklist for the LLM to review before finalizing its output.

synthesize_and_save()

This is the most common method for end-to-end processing. It calls synthesize() internally and then persists the resulting objects to the database within a single transaction. If any part of the process fails, it automatically rolls back the transaction.

# This will extract, hydrate, and save the objects to the DB
saved_objects = await orchestrator.synthesize_and_save(
    input_strings=["Order confirmation text..."],
    db_session=db_session
)

The parameters are the same as for synthesize(), except it requires a db_session to commit the transaction.

synthesize_batch()

Submits an asynchronous batch job for extraction. This is ideal for large-scale processing or when using cheaper batch APIs.

# Non-blocking submission (returns str)
batch_id = await orchestrator.synthesize_batch(
    input_strings=["..."],
    db_session=db_session,
    wait_for_completion=False
)
print(f"Batch submitted: {batch_id}")

# Blocking until complete (returns BatchProcessResult)
result = await orchestrator.synthesize_batch(
    input_strings=["..."],
    db_session=db_session,
    wait_for_completion=True
)
print(f"Extraction complete: {len(result.hydrated_objects)} objects found.")

Parameters:

  • input_strings (List[str]): A list of strings to be processed.

  • db_session (Optional[Session]): A database session (SQLModel/SQLAlchemy) used for initial counting and potentially for result hydration.

  • wait_for_completion (bool, default False): * If True, the method polls the batch status until completion (or error). It handles multi-stage hierarchical batches automatically. Returns: BatchProcessResult (containing hydrated objects). * If False, it submits the batch to the LLM provider and returns immediately. Returns: str (the batch job ID).

  • count_entities (bool, default False): If True, performs an initial pass to count entities before extraction.

  • custom_counting_context (str, optional): Custom instructions or context specifically for the entity counting phase.

  • … (other parameters similar to synthesize)

create_continuation_batch()

Creates a new batch job that continues from a specific step of a previous batch. This is useful for retrying failed steps or extending a workflow without re-running earlier successful steps.

# Continue from step 2 (0-indexed) of a previous batch
new_batch_id = await orchestrator.create_continuation_batch(
    original_batch_id="prev_batch_id",
    db_session=db_session,
    start_from_step_index=2,
    wait_for_completion=True
)

Parameters:

  • original_batch_id (str): The ID of the batch to continue from.

  • start_from_step_index (int): The hierarchical step index to start the new batch from. Steps before this index will be copied from the original batch.

  • wait_for_completion (bool, default False): Same behavior as in synthesize_batch.

  • (Other parameters allow overriding configuration for the new batch)

monitor_batch_job()

Polls the status of an existing batch job until it completes. This method handles multi-stage workflows (like counting -> extraction, or hierarchical steps) by automatically detecting phase transitions and submitting subsequent jobs.

# Resume monitoring a batch (e.g., after script restart)
result = await orchestrator.monitor_batch_job(
    root_batch_id="existing_batch_id",
    db_session=db_session
)

Parameters:

  • root_batch_id (str): The ID of the batch to monitor.

  • poll_interval (int, default 60): Seconds to wait between status checks.

get_batch_status()

Retrieves the current status of a batch job, checking with the LLM provider if necessary.

status = await orchestrator.get_batch_status("batch_id", db_session)
process_batch()

Processes a completed batch job. This downloads results, runs consensus, hydrates objects, and persists them to the database. It is typically called automatically by monitor_batch_job, but can be used manually for non-blocking workflows.

result = await orchestrator.process_batch("batch_id", db_session)

Concise Usage Example

This example provides a focused look at initializing and calling the orchestrator, assuming your models and database are already defined. For a full step-by-step guide, please see the Getting Started: A Step-by-Step Tutorial tutorial.

import asyncio
from sqlmodel import Session
from extrai.core import WorkflowOrchestrator

# Assume the following are already configured:
# - `YourRootModel`: Your top-level SQLModel class.
# - `your_llm_client`: An initialized LLM client.
# - `your_db_engine`: A SQLAlchemy engine.

# 1. Initialize the WorkflowOrchestrator
orchestrator = WorkflowOrchestrator(
    root_sqlmodel_class=YourRootModel,
    llm_client=your_llm_client,
    num_llm_revisions=3  # Request 3 revisions for consensus
)

# 2. Define the text to process
unstructured_text = "Some text containing data about a Company and its Employees..."

# 3. Run the extraction and save the results
async def run_extraction():
    with Session(your_db_engine) as session:
        saved_objects = await orchestrator.synthesize_and_save(
            [unstructured_text],
            db_session=session
        )
        print(f"Successfully extracted and saved {len(saved_objects)} objects.")

# Run the asynchronous function
asyncio.run(run_extraction())