Astro Intelligence

Observability API Reference

Observability API Reference

All classes are importable from anchor.observability. For usage examples see the Observability Guide.


SpanKind

String enum representing the category of a traced operation.

from anchor.observability import SpanKind
ValueStringDescription
PIPELINE"pipeline"Root pipeline span
RETRIEVAL"retrieval"Retrieval / search
RERANKING"reranking"Reranking steps
FORMATTING"formatting"Output formatting
MEMORY"memory"Memory operations
INGESTION"ingestion"Indexing / ingestion
QUERY_TRANSFORM"query_transform"Query rewriting

Span

Immutable Pydantic model representing a single traced operation. Spans form a tree via parent_span_id.

from anchor.observability import Span
FieldTypeDefaultDescription
span_idstrauto UUIDUnique span identifier
parent_span_idstr | NoneNoneParent span for nesting
trace_idstr--Trace this span belongs to
namestr--Human-readable operation name
kindSpanKind--Category of operation
start_timedatetimenow (UTC)When the span started
end_timedatetime | NoneNoneWhen the span ended
duration_msfloat | NoneNoneDuration in milliseconds
statusstr"ok"Outcome: "ok" or "error"
attributesdict[str, Any]{}Arbitrary key-value metadata
eventslist[dict[str, Any]][]Timestamped event log

TraceRecord

Immutable Pydantic model grouping all spans from a single pipeline execution.

from anchor.observability import TraceRecord
FieldTypeDefaultDescription
trace_idstrauto UUIDUnique trace identifier
spanslist[Span][]Spans in this trace
start_timedatetime | NoneNoneTrace start time
end_timedatetime | NoneNoneTrace end time
total_duration_msfloat | NoneNoneTotal duration in ms
metadatadict[str, Any]{}Trace-level metadata

MetricPoint

Immutable Pydantic model for a single metric measurement.

from anchor.observability import MetricPoint
FieldTypeDefaultDescription
namestr--Metric name (e.g. "pipeline.build_time_ms")
valuefloat--Numeric measurement value
timestampdatetimenow (UTC)When the measurement was taken
tagsdict[str, str]{}Labels for filtering and grouping

Tracer

Creates and manages spans within traces.

from anchor.observability import Tracer

Constructor: Tracer() -- no parameters.

[!NOTE] Tracer is not thread-safe. Synchronise externally or use one instance per thread.start_trace(name, attributes=None) -> TraceRecord

ParameterTypeDefaultDescription
namestr--Human-readable trace name
attributesdict[str, Any] | NoneNoneMetadata to attach

end_trace(trace) -> TraceRecord -- finalises a trace, computes end_time and total_duration_ms, removes it from the active set.

start_span(trace_id, name, kind, parent_span_id=None, attributes=None) -> Span

ParameterTypeDefaultDescription
trace_idstr--Trace this span belongs to
namestr--Operation name
kindSpanKind--Category of operation
parent_span_idstr | NoneNoneParent span for nesting
attributesdict[str, Any] | NoneNoneMetadata to attach

end_span(span, status="ok", attributes=None) -> Span -- finalises a span with end_time, duration_ms, and merged attributes.

get_trace(trace_id) -> TraceRecord | None -- look up an active trace by ID.


TracingCallback

Pipeline callback that automatically traces execution via spans.

from anchor.observability import TracingCallback

Constructor:

TracingCallback(
    tracer: Tracer | None = None,
    exporters: list[SpanExporter] | None = None,
    metrics_collector: MetricsCollector | None = None,
)
ParameterTypeDefaultDescription
tracerTracer | NoneNoneCustom tracer; creates one if omitted
exporterslist[SpanExporter] | NoneNoneSpan exporters for completed spans
metrics_collectorMetricsCollector | NoneNoneCollector for timing metrics

Properties:

PropertyTypeDescription
tracerTracerUnderlying tracer instance
last_traceTraceRecord | NoneMost recently completed trace

Callback methods (called by the pipeline automatically):

  • on_pipeline_start(query: QueryBundle) -> None
  • on_step_start(step_name: str, items: list[ContextItem]) -> None
  • on_step_end(step_name: str, items: list[ContextItem], time_ms: float) -> None
  • on_step_error(step_name: str, error: Exception) -> None
  • on_pipeline_end(result: ContextResult) -> None

ConsoleSpanExporter

Exports spans as structured JSON to Python's logging system.

from anchor.observability import ConsoleSpanExporter

Constructor: ConsoleSpanExporter(log_level: int = logging.INFO)

export(spans: list[Span]) -> None -- logs each span as a JSON object.


InMemorySpanExporter

Stores spans in memory for testing and debugging.

from anchor.observability import InMemorySpanExporter

Constructor: InMemorySpanExporter()

export(spans: list[Span]) -> None -- appends spans to internal buffer.

get_spans() -> list[Span] -- returns a copy of all exported spans.

clear() -> None -- removes all stored spans.


FileSpanExporter

Writes spans as JSON-Lines to a file in append mode.

from anchor.observability import FileSpanExporter

Constructor: FileSpanExporter(path: str | Path)

ParameterTypeDescription
pathstr | PathFile path to write to; parent directories must exist

export(spans: list[Span]) -> None -- appends one JSON object per span.


OTLPSpanExporter

Exports spans to an OpenTelemetry collector via OTLP/HTTP.

from anchor.observability import OTLPSpanExporter

[!CAUTION] Requires opentelemetry-exporter-otlp-proto-http and opentelemetry-sdk. Install with pip install astro-anchor[otlp].Constructor:

OTLPSpanExporter(
    endpoint: str = "http://localhost:4318",
    service_name: str = "anchor",
    headers: dict[str, str] | None = None,
)
ParameterTypeDefaultDescription
endpointstr"http://localhost:4318"OTLP collector URL
service_namestr"anchor"OTel resource service name
headersdict[str, str] | NoneNoneAuth headers

export(spans: list[Span]) -> None -- converts and exports spans.

export_record(record: TraceRecord) -> None -- exports all spans from a TraceRecord.

shutdown() -> None -- flushes pending spans and shuts down.


InMemoryMetricsCollector

Stores metric points in memory with summary statistics.

from anchor.observability import InMemoryMetricsCollector

Constructor: InMemoryMetricsCollector()

record(metric: MetricPoint) -> None -- stores a metric point.

flush() -> None -- no-op; metrics remain in memory.

get_metrics(name: str | None = None) -> list[MetricPoint] -- returns stored metrics, optionally filtered by name.

get_summary(name: str) -> dict[str, Any] -- returns {"min", "max", "avg", "count", "p50", "p95"} for the named metric. Empty dict if no matches.

clear() -> None -- removes all stored metrics.


LoggingMetricsCollector

Logs each metric as structured JSON via Python's logging module.

from anchor.observability import LoggingMetricsCollector

Constructor: LoggingMetricsCollector(log_level: int = logging.INFO)

record(metric: MetricPoint) -> None -- logs the metric immediately.

flush() -> None -- no-op.


OTLPMetricsExporter

Exports metrics to an OpenTelemetry collector via OTLP/HTTP.

from anchor.observability import OTLPMetricsExporter

[!CAUTION] Requires opentelemetry-exporter-otlp-proto-http and opentelemetry-sdk. Install with pip install astro-anchor[otlp].Constructor:

OTLPMetricsExporter(
    endpoint: str = "http://localhost:4318",
    service_name: str = "anchor",
    headers: dict[str, str] | None = None,
)
ParameterTypeDefaultDescription
endpointstr"http://localhost:4318"OTLP collector URL
service_namestr"anchor"OTel resource service name
headersdict[str, str] | NoneNoneAuth headers

record(metric: MetricPoint) -> None -- records as an OTel gauge. Raises TypeError if not a MetricPoint.

flush() -> None -- flushes buffered metrics.

shutdown() -> None -- shuts down the exporter.


CostEntry

Immutable Pydantic model for a single cost record.

from anchor.observability import CostEntry
FieldTypeDefaultDescription
operationstr--Operation type (e.g. "embedding")
modelstr--Model identifier
input_tokensint0Input tokens consumed
output_tokensint0Output tokens produced
cost_usdfloat0.0Total cost in USD
timestampdatetimenow (UTC)When the operation occurred
metadatadict[str, Any]{}Arbitrary metadata

CostSummary

Immutable Pydantic model for aggregated cost.

from anchor.observability import CostSummary
FieldTypeDefaultDescription
total_cost_usdfloat0.0Sum of all entry costs
total_input_tokensint0Sum of all input tokens
total_output_tokensint0Sum of all output tokens
entrieslist[CostEntry][]Individual cost entries
by_modeldict[str, float]{}Cost breakdown by model
by_operationdict[str, float]{}Cost breakdown by operation

CostTracker

Thread-safe tracker for accumulating cost entries.

from anchor.observability import CostTracker

Constructor: CostTracker() -- no parameters.

Property: entries -> list[CostEntry] -- copy of all recorded entries.

record(...) -> CostEntry

def record(
    self,
    operation: str,
    model: str,
    input_tokens: int = 0,
    output_tokens: int = 0,
    cost_per_input_token: float = 0.0,
    cost_per_output_token: float = 0.0,
    metadata: dict[str, Any] | None = None,
) -> CostEntry

Computes cost_usd = input_tokens * cost_per_input_token + output_tokens * cost_per_output_token.

summary() -> CostSummary -- aggregates all entries with per-model and per-operation breakdowns.

reset() -> None -- clears all recorded entries.


CostTrackingCallback

Pipeline callback that records cost entries from step execution metadata.

from anchor.observability import CostTrackingCallback

Constructor: CostTrackingCallback(tracker: CostTracker)

Records a cost entry on on_step_end when items contain cost_model in their metadata. Other callback methods are no-ops.

Expected metadata keys on ContextItem.metadata:

KeyTypeDescription
cost_modelstrModel identifier (triggers recording)
cost_input_tokensintInput tokens consumed
cost_output_tokensintOutput tokens produced
cost_per_input_tokenfloatUSD per input token
cost_per_output_tokenfloatUSD per output token

On this page