Analytics Collector¶
The WorkflowAnalyticsCollector is a dedicated component for collecting and storing detailed metrics and events during the execution of an extraction workflow. It provides crucial insights into the performance and reliability of the process.
While it is used automatically by the WorkflowOrchestrator, it can also be used as a standalone tool to track metrics in any custom workflow.
Core Concepts¶
The collector is designed to track several key areas of the extraction pipeline:
LLM Interaction Health: Monitors API call failures and errors in processing LLM outputs.
Consensus Quality: Gathers detailed statistics from each consensus run to determine the level of agreement between LLM revisions.
Custom Events: Allows for logging of custom events to trace specific steps or occurrences in your workflow.
Error Reporting: Provides a structured way to log errors that occur during the process.
Initialization¶
The collector can be initialized without any parameters, but you can also provide a custom logger.
import logging
from extrai.core import WorkflowAnalyticsCollector
# Create a new collector instance
analytics_collector = WorkflowAnalyticsCollector()
# Or with a custom logger
logger = logging.getLogger("MyCustomLogger")
analytics_collector_with_logger = WorkflowAnalyticsCollector(logger=logger)
Recording Metrics¶
The collector provides several methods to record specific events throughout the workflow.
LLM Interaction
These methods track the health of LLM API calls and output processing.
# To record a successful API call to the LLM
analytics_collector.record_llm_api_call_success()
# To record a failed API call to the LLM
analytics_collector.record_llm_api_call_failure()
# To record an error when the LLM output is not valid JSON
analytics_collector.record_llm_output_parse_error()
# To record an error when the LLM's JSON output fails schema validation
analytics_collector.record_llm_output_validation_error()
Consensus Run Details
This method is used to log the outcome of a consensus process. It takes a dictionary of metrics, usually generated by the JSONConsensus component.
consensus_stats = {
    "revisions_processed": 3,
    "unique_paths_considered": 50,
    "paths_agreed_by_threshold": 45,
    "paths_resolved_by_conflict_resolver": 3,
    "paths_omitted_due_to_no_consensus_or_resolver_omission": 2,
}
analytics_collector.record_consensus_run_details(consensus_stats)
Custom Events and Errors
You can log your own custom events and structured errors for debugging or monitoring purposes.
# Record a custom event with optional details
analytics_collector.record_custom_event(
    "data_preprocessing_started",
    details={"document_count": 10}
)
# Record a structured workflow error
analytics_collector.record_workflow_error(
    error_type="DatabaseConnectionError",
    context="db_writer.save_objects",
    message="Failed to connect to the primary database."
)
Hydration Metrics
These methods track the process of converting the final JSON into structured SQLModel objects.
# Record that 5 objects were successfully created
analytics_collector.record_hydration_success(count=5)
# Record a failure during the hydration process
analytics_collector.record_hydration_failure()
Accessing the Report¶
The primary way to access the collected data is through the get_report() method, which returns a comprehensive dictionary.
Usage with WorkflowOrchestrator
The WorkflowOrchestrator automatically handles the collection of analytics. You can access the report after a workflow has been run.
# After running orchestrator.synthesize() or orchestrator.synthesize_and_save()
report = orchestrator.analytics_collector.get_report()
import json
print(json.dumps(report, indent=2))
Example Report Output
A report provides a detailed summary of the entire workflow.
Example JSON Report
{
  "llm_api_call_successes": 5,
  "llm_api_call_failures": 1,
  "llm_api_call_success_rate": 0.833,
  "llm_output_parse_errors": 2,
  "llm_output_validation_errors": 1,
  "total_invalid_parsing_errors": 3,
  "number_of_consensus_runs": 1,
  "hydrated_objects_successes": 10,
  "hydration_failures": 0,
  "average_path_agreement_ratio": 0.9,
  "average_paths_resolved_by_conflict_resolver_ratio": 0.06,
  "average_paths_omitted_ratio": 0.04,
  "all_consensus_run_details": [
    {
      "revisions_processed": 3,
      "unique_paths_considered": 50,
      "paths_agreed_by_threshold": 45,
      "paths_resolved_by_conflict_resolver": 3,
      "paths_omitted_due_to_no_consensus_or_resolver_omission": 2
    }
  ],
  "custom_events": [
    {
      "event_name": "data_preprocessing_started",
      "document_count": 10
    }
  ],
  "workflow_errors": [
    {
      "error_type": "DatabaseConnectionError",
      "context": "db_writer.save_objects",
      "message": "Failed to connect to the primary database."
    }
  ]
}
By analyzing this report, you can fine-tune your extraction process, monitor costs, and identify potential issues with your LLM prompts or data models.
Interpreting the Report¶
By analyzing the reports from this collector, you can diagnose issues and optimize your extraction workflow. For example:
A high number of
llm_output_parse_errorsmight indicate that your LLM is struggling to generate valid JSON, suggesting a need to refine your prompts, simplify your models or activate hierarchical_extractor.A low
average_path_agreement_ratiofrom the consensus details can show that your LLM revisions are highly inconsistent, pointing to an ambiguous task, making a more accurate prompt can help.Monitoring custom events can help you trace the flow of data and pinpoint where errors occur.
Resetting the Collector¶
If you need to reuse a collector instance for multiple, separate runs, you can clear all its stored data using the reset() method.
# Run the first workflow
await orchestrator.synthesize_and_save(...)
report1 = orchestrator.analytics_collector.get_report()
# Reset the collector before the next run
orchestrator.analytics_collector.reset()
# Run the second workflow
await orchestrator.synthesize_and_save(...)
report2 = orchestrator.analytics_collector.get_report() # Contains data only from the second run
For a full API reference, see the llm_consensus_extraction.core package documentation.