.. _analytics_collector: Analytics Collector =================== The `WorkflowAnalyticsCollector` is a dedicated component for collecting and storing detailed metrics and events during the execution of an extraction workflow. It provides crucial insights into the performance and reliability of the process. While it is used automatically by the ``WorkflowOrchestrator``, it can also be used as a standalone tool to track metrics in any custom workflow. Core Concepts ------------- The collector is designed to track several key areas of the extraction pipeline: - **LLM Interaction Health**: Monitors API call failures and errors in processing LLM outputs. - **Consensus Quality**: Gathers detailed statistics from each consensus run to determine the level of agreement between LLM revisions. - **Custom Events**: Allows for logging of custom events to trace specific steps or occurrences in your workflow. - **Error Reporting**: Provides a structured way to log errors that occur during the process. Initialization -------------- The collector can be initialized without any parameters, but you can also provide a custom logger. .. code-block:: python import logging from extrai.core import WorkflowAnalyticsCollector # Create a new collector instance analytics_collector = WorkflowAnalyticsCollector() # Or with a custom logger logger = logging.getLogger("MyCustomLogger") analytics_collector_with_logger = WorkflowAnalyticsCollector(logger=logger) Recording Metrics ----------------- The collector provides several methods to record specific events throughout the workflow. **LLM Interaction** These methods track the health of LLM API calls and output processing. .. code-block:: python # To record a successful API call to the LLM analytics_collector.record_llm_api_call_success() # To record a failed API call to the LLM analytics_collector.record_llm_api_call_failure() # To record an error when the LLM output is not valid JSON analytics_collector.record_llm_output_parse_error() # To record an error when the LLM's JSON output fails schema validation analytics_collector.record_llm_output_validation_error() **Consensus Run Details** This method is used to log the outcome of a consensus process. It takes a dictionary of metrics, usually generated by the ``JSONConsensus`` component. .. code-block:: python consensus_stats = { "revisions_processed": 3, "unique_paths_considered": 50, "paths_agreed_by_threshold": 45, "paths_resolved_by_conflict_resolver": 3, "paths_omitted_due_to_no_consensus_or_resolver_omission": 2, } analytics_collector.record_consensus_run_details(consensus_stats) **Custom Events and Errors** You can log your own custom events and structured errors for debugging or monitoring purposes. .. code-block:: python # Record a custom event with optional details analytics_collector.record_custom_event( "data_preprocessing_started", details={"document_count": 10} ) # Record a structured workflow error analytics_collector.record_workflow_error( error_type="DatabaseConnectionError", context="db_writer.save_objects", message="Failed to connect to the primary database." ) **Hydration Metrics** These methods track the process of converting the final JSON into structured `SQLModel` objects. .. code-block:: python # Record that 5 objects were successfully created analytics_collector.record_hydration_success(count=5) # Record a failure during the hydration process analytics_collector.record_hydration_failure() Accessing the Report -------------------- The primary way to access the collected data is through the ``get_report()`` method, which returns a comprehensive dictionary. **Usage with WorkflowOrchestrator** The ``WorkflowOrchestrator`` automatically handles the collection of analytics. You can access the report after a workflow has been run. .. code-block:: python # After running orchestrator.synthesize() or orchestrator.synthesize_and_save() report = orchestrator.analytics_collector.get_report() import json print(json.dumps(report, indent=2)) **Example Report Output** A report provides a detailed summary of the entire workflow. .. admonition:: Example JSON Report :class: dropdown .. code-block:: json { "llm_api_call_successes": 5, "llm_api_call_failures": 1, "llm_api_call_success_rate": 0.833, "llm_output_parse_errors": 2, "llm_output_validation_errors": 1, "total_invalid_parsing_errors": 3, "number_of_consensus_runs": 1, "hydrated_objects_successes": 10, "hydration_failures": 0, "average_path_agreement_ratio": 0.9, "average_paths_resolved_by_conflict_resolver_ratio": 0.06, "average_paths_omitted_ratio": 0.04, "all_consensus_run_details": [ { "revisions_processed": 3, "unique_paths_considered": 50, "paths_agreed_by_threshold": 45, "paths_resolved_by_conflict_resolver": 3, "paths_omitted_due_to_no_consensus_or_resolver_omission": 2 } ], "custom_events": [ { "event_name": "data_preprocessing_started", "document_count": 10 } ], "workflow_errors": [ { "error_type": "DatabaseConnectionError", "context": "db_writer.save_objects", "message": "Failed to connect to the primary database." } ] } By analyzing this report, you can fine-tune your extraction process, monitor costs, and identify potential issues with your LLM prompts or data models. Interpreting the Report ----------------------- By analyzing the reports from this collector, you can diagnose issues and optimize your extraction workflow. For example: - A high number of ``llm_output_parse_errors`` might indicate that your LLM is struggling to generate valid JSON, suggesting a need to refine your prompts, simplify your models or activate hierarchical_extractor. - A low ``average_path_agreement_ratio`` from the consensus details can show that your LLM revisions are highly inconsistent, pointing to an ambiguous task, making a more accurate prompt can help. - Monitoring custom events can help you trace the flow of data and pinpoint where errors occur. Resetting the Collector ----------------------- If you need to reuse a collector instance for multiple, separate runs, you can clear all its stored data using the ``reset()`` method. .. code-block:: python # Run the first workflow await orchestrator.synthesize_and_save(...) report1 = orchestrator.analytics_collector.get_report() # Reset the collector before the next run orchestrator.analytics_collector.reset() # Run the second workflow await orchestrator.synthesize_and_save(...) report2 = orchestrator.analytics_collector.get_report() # Contains data only from the second run For a full API reference, see the :doc:`api/extrai.core` documentation.