Observability
Observability
anchor provides built-in tracing, metrics collection, and cost tracking for pipeline execution. Every span, metric point, and cost entry is a Pydantic model -- fully typed and serialisable.
Tracing with Tracer
Tracer is the low-level API for creating traces and spans. A trace
groups all operations from a single pipeline execution; each operation is
represented as a span.
from anchor.observability import Tracer, SpanKind
tracer = Tracer()
# Start a trace
trace = tracer.start_trace("my-pipeline", attributes={"user": "demo"})
# Start a span within the trace
span = tracer.start_span(
trace_id=trace.trace_id,
name="retrieval",
kind=SpanKind.RETRIEVAL,
attributes={"top_k": 10},
)
# ... perform retrieval work ...
# End the span
ended_span = tracer.end_span(span, status="ok", attributes={"items": 5})
print(f"Span took {ended_span.duration_ms:.1f} ms")
# End the trace
ended_trace = tracer.end_trace(trace)
print(f"Trace took {ended_trace.total_duration_ms:.1f} ms")SpanKind values
| Value | When to use |
|---|---|
SpanKind.PIPELINE | Root pipeline span |
SpanKind.RETRIEVAL | Retrieval / search operations |
SpanKind.RERANKING | Reranking steps |
SpanKind.FORMATTING | Output formatting |
SpanKind.MEMORY | Memory / history operations |
SpanKind.INGESTION | Indexing and ingestion |
SpanKind.QUERY_TRANSFORM | Query rewriting / expansion |
[!NOTE]
Traceris not thread-safe. Use one instance per thread or synchronise externally for concurrent tracing.
Automatic Pipeline Tracing
TracingCallback hooks into the pipeline lifecycle and creates spans
automatically for the overall execution and each individual step.
from anchor.observability import (
TracingCallback,
InMemorySpanExporter,
InMemoryMetricsCollector,
)
exporter = InMemorySpanExporter()
metrics = InMemoryMetricsCollector()
callback = TracingCallback(
exporters=[exporter],
metrics_collector=metrics,
)
# Attach to your pipeline
pipeline.add_callback(callback)
# Run the pipeline -- spans are recorded automatically
result = pipeline.build("What is context engineering?")
# Inspect the trace
trace = callback.last_trace
print(f"Trace ID: {trace.trace_id}")
print(f"Duration: {trace.total_duration_ms:.1f} ms")
print(f"Spans: {len(trace.spans)}")
# Inspect individual spans
for span in exporter.get_spans():
print(f" {span.name}: {span.duration_ms:.1f} ms [{span.status}]")[!TIP]
TracingCallbackautomatically infersSpanKindfrom the step name using heuristics (e.g. a step named"rerank"maps toSpanKind.RERANKING).
Span Exporters
Exporters receive a batch of completed spans and deliver them to a backend.
All exporters implement the SpanExporter protocol.
ConsoleSpanExporter
Logs spans as JSON via Python's logging module. Useful for development.
import logging
from anchor.observability import ConsoleSpanExporter
logging.basicConfig(level=logging.INFO)
exporter = ConsoleSpanExporter(log_level=logging.INFO)
callback = TracingCallback(exporters=[exporter])InMemorySpanExporter
Stores spans in an in-memory list. Ideal for testing and debugging.
from anchor.observability import InMemorySpanExporter
exporter = InMemorySpanExporter()
# After pipeline execution:
spans = exporter.get_spans()
exporter.clear() # reset for the next runFileSpanExporter
Appends spans as JSON-Lines to a file on disk.
from anchor.observability import FileSpanExporter
exporter = FileSpanExporter(path="traces.jsonl")
callback = TracingCallback(exporters=[exporter])OTLPSpanExporter
Exports spans to an OpenTelemetry collector via OTLP/HTTP. Requires the
otlp extra.
pip install astro-anchor[otlp]from anchor.observability import OTLPSpanExporter
exporter = OTLPSpanExporter(
endpoint="http://localhost:4318",
service_name="my-app",
headers={"Authorization": "Bearer ..."},
)
callback = TracingCallback(exporters=[exporter])
# Shut down cleanly when done
exporter.shutdown()[!CAUTION]
OTLPSpanExporterraisesImportErrorat construction if theopentelemetry-exporter-otlp-proto-httpandopentelemetry-sdkpackages are not installed.
Metrics Collection
Metrics collectors record MetricPoint values. TracingCallback records
step durations and pipeline build times automatically when a collector is
configured.
InMemoryMetricsCollector
Stores metrics in memory with summary statistics.
from anchor.observability import InMemoryMetricsCollector, MetricPoint
collector = InMemoryMetricsCollector()
# Record manually
collector.record(MetricPoint(name="step.duration_ms", value=12.5, tags={"step": "retrieval"}))
collector.record(MetricPoint(name="step.duration_ms", value=8.3, tags={"step": "rerank"}))
# Query metrics
all_step_durations = collector.get_metrics("step.duration_ms")
print(f"Recorded {len(all_step_durations)} duration metrics")
# Get summary statistics
summary = collector.get_summary("step.duration_ms")
print(f" avg: {summary['avg']:.1f} ms")
print(f" p95: {summary['p95']:.1f} ms")
print(f" min: {summary['min']:.1f} ms")
print(f" max: {summary['max']:.1f} ms")
collector.clear()LoggingMetricsCollector
Emits each metric as a structured JSON log message immediately on record.
import logging
from anchor.observability import LoggingMetricsCollector
logging.basicConfig(level=logging.INFO)
collector = LoggingMetricsCollector(log_level=logging.INFO)OTLPMetricsExporter
Exports metrics to an OpenTelemetry collector via OTLP/HTTP. Requires the
otlp extra.
from anchor.observability import OTLPMetricsExporter
exporter = OTLPMetricsExporter(
endpoint="http://localhost:4318",
service_name="my-app",
)
callback = TracingCallback(metrics_collector=exporter)
# Flush and shut down when done
exporter.flush()
exporter.shutdown()Cost Tracking
CostTracker accumulates per-operation cost entries (tokens, USD) and
produces aggregated summaries. It is thread-safe.
from anchor.observability import CostTracker
tracker = CostTracker()
# Record embedding cost
tracker.record(
operation="embedding",
model="text-embedding-3-small",
input_tokens=500,
cost_per_input_token=0.00002,
)
# Record reranking cost
tracker.record(
operation="rerank",
model="rerank-v3",
input_tokens=2000,
cost_per_input_token=0.00001,
)
# Get summary
summary = tracker.summary()
print(f"Total cost: ${summary.total_cost_usd:.4f}")
print(f"Input tokens: {summary.total_input_tokens}")
print(f"By model: {summary.by_model}")
print(f"By operation: {summary.by_operation}")
# Reset for next run
tracker.reset()CostTrackingCallback
CostTrackingCallback is a pipeline callback that automatically records
cost entries when pipeline steps produce items with cost-related metadata.
from anchor.observability import CostTracker, CostTrackingCallback
tracker = CostTracker()
cost_callback = CostTrackingCallback(tracker=tracker)
# Attach both tracing and cost tracking
pipeline.add_callback(cost_callback)
# After pipeline execution
summary = tracker.summary()
print(f"Pipeline cost: ${summary.total_cost_usd:.4f}")The callback looks for these metadata keys on ContextItem.metadata:
| Key | Type | Description |
|---|---|---|
cost_model | str | Model identifier (triggers recording) |
cost_input_tokens | int | Input tokens consumed |
cost_output_tokens | int | Output tokens produced |
cost_per_input_token | float | USD per input token |
cost_per_output_token | float | USD per output token |
[!TIP] Combine
TracingCallbackandCostTrackingCallbackon the same pipeline to get both performance traces and cost breakdowns.
Full Example: Tracing + Metrics + Cost
from anchor.observability import (
CostTracker,
CostTrackingCallback,
InMemoryMetricsCollector,
InMemorySpanExporter,
TracingCallback,
)
# Set up all observers
span_exporter = InMemorySpanExporter()
metrics_collector = InMemoryMetricsCollector()
cost_tracker = CostTracker()
tracing_cb = TracingCallback(
exporters=[span_exporter],
metrics_collector=metrics_collector,
)
cost_cb = CostTrackingCallback(tracker=cost_tracker)
pipeline.add_callback(tracing_cb)
pipeline.add_callback(cost_cb)
# Run the pipeline
result = pipeline.build("What is context engineering?")
# Inspect results
print("--- Spans ---")
for span in span_exporter.get_spans():
print(f" {span.name}: {span.duration_ms:.1f} ms")
print("--- Metrics ---")
summary = metrics_collector.get_summary("pipeline.build_time_ms")
if summary:
print(f" Build time avg: {summary['avg']:.1f} ms")
print("--- Cost ---")
cost_summary = cost_tracker.summary()
print(f" Total: ${cost_summary.total_cost_usd:.4f}")Next Steps
- Observability API Reference -- full class and method signatures
- Evaluation Guide -- measure retrieval and RAG quality
- Pipeline Guide -- build and configure context pipelines