Analytics Collector

The WorkflowAnalyticsCollector is a dedicated component for collecting and storing detailed metrics and events during the execution of an extraction workflow. It provides crucial insights into the performance and reliability of the process.

While it is used automatically by the WorkflowOrchestrator, it can also be used as a standalone tool to track metrics in any custom workflow.

Core Concepts

The collector is designed to track several key areas of the extraction pipeline:

  • LLM Interaction Health: Monitors API call failures and errors in processing LLM outputs.

  • Consensus Quality: Gathers detailed statistics from each consensus run to determine the level of agreement between LLM revisions.

  • Custom Events: Allows for logging of custom events to trace specific steps or occurrences in your workflow.

  • Error Reporting: Provides a structured way to log errors that occur during the process.

Initialization

The collector can be initialized without any parameters, but you can also provide a custom logger.

import logging
from extrai.core import WorkflowAnalyticsCollector

# Create a new collector instance
analytics_collector = WorkflowAnalyticsCollector()

# Or with a custom logger
logger = logging.getLogger("MyCustomLogger")
analytics_collector_with_logger = WorkflowAnalyticsCollector(logger=logger)

Recording Metrics

The collector provides several methods to record specific events throughout the workflow.

LLM Interaction

These methods track the health of LLM API calls and output processing.

# To record a successful API call to the LLM
analytics_collector.record_llm_api_call_success()

# To record a failed API call to the LLM
analytics_collector.record_llm_api_call_failure()

# To record an error when the LLM output is not valid JSON
analytics_collector.record_llm_output_parse_error()

# To record an error when the LLM's JSON output fails schema validation
analytics_collector.record_llm_output_validation_error()

Consensus Run Details

This method is used to log the outcome of a consensus process. It takes a dictionary of metrics, usually generated by the JSONConsensus component.

consensus_stats = {
    "revisions_processed": 3,
    "unique_paths_considered": 50,
    "paths_agreed_by_threshold": 45,
    "paths_resolved_by_conflict_resolver": 3,
    "paths_omitted_due_to_no_consensus_or_resolver_omission": 2,
}

analytics_collector.record_consensus_run_details(consensus_stats)

Custom Events and Errors

You can log your own custom events and structured errors for debugging or monitoring purposes.

# Record a custom event with optional details
analytics_collector.record_custom_event(
    "data_preprocessing_started",
    details={"document_count": 10}
)

# Record a structured workflow error
analytics_collector.record_workflow_error(
    error_type="DatabaseConnectionError",
    context="db_writer.save_objects",
    message="Failed to connect to the primary database."
)

Hydration Metrics

These methods track the process of converting the final JSON into structured SQLModel objects.

# Record that 5 objects were successfully created
analytics_collector.record_hydration_success(count=5)

# Record a failure during the hydration process
analytics_collector.record_hydration_failure()

Accessing the Report

The primary way to access the collected data is through the get_report() method, which returns a comprehensive dictionary.

Usage with WorkflowOrchestrator

The WorkflowOrchestrator automatically handles the collection of analytics. You can access the report after a workflow has been run.

# After running orchestrator.synthesize() or orchestrator.synthesize_and_save()
report = orchestrator.analytics_collector.get_report()

import json
print(json.dumps(report, indent=2))

Example Report Output

A report provides a detailed summary of the entire workflow.

By analyzing this report, you can fine-tune your extraction process, monitor costs, and identify potential issues with your LLM prompts or data models.

Interpreting the Report

By analyzing the reports from this collector, you can diagnose issues and optimize your extraction workflow. For example:

  • A high number of llm_output_parse_errors might indicate that your LLM is struggling to generate valid JSON, suggesting a need to refine your prompts, simplify your models or activate hierarchical_extractor.

  • A low average_path_agreement_ratio from the consensus details can show that your LLM revisions are highly inconsistent, pointing to an ambiguous task, making a more accurate prompt can help.

  • Monitoring custom events can help you trace the flow of data and pinpoint where errors occur.

Resetting the Collector

If you need to reuse a collector instance for multiple, separate runs, you can clear all its stored data using the reset() method.

# Run the first workflow
await orchestrator.synthesize_and_save(...)
report1 = orchestrator.analytics_collector.get_report()

# Reset the collector before the next run
orchestrator.analytics_collector.reset()

# Run the second workflow
await orchestrator.synthesize_and_save(...)
report2 = orchestrator.analytics_collector.get_report() # Contains data only from the second run

For a full API reference, see the llm_consensus_extraction.core package documentation.