Astro Intelligence
AnchorConcepts

Protocol-Based Architecture

Protocol-Based Architecture

anchor uses Python Protocols (PEP 544) to define all extension points. This page explains what protocols are, why anchor chose them over class inheritance, and how to implement your own.

What Are Protocols?

A Protocol is a way to declare an interface in Python using structural subtyping -- also known as "static duck typing." A class satisfies a protocol if it has the right methods with the right signatures. No base class or registration is needed.

from typing import Protocol, runtime_checkable

@runtime_checkable
class Retriever(Protocol):
    def retrieve(self, query, top_k=10):
        ...

Any object with a retrieve(query, top_k) method satisfies the Retriever protocol -- even if it has never seen the protocol definition.

[!NOTE] PEP 544 Protocols were introduced in Python 3.8 via PEP 544. They are part of the typing module and are fully supported by mypy, pyright, and other type checkers.

Why Protocols Over Inheritance?

Traditional frameworks use abstract base classes (ABCs) to define interfaces. This creates problems:

ConcernInheritance (ABC)Protocol
Must import base classyesno
Must call super().__init__()oftennever
Works with third-party classesnoyes
Runtime isinstance() checksyesyes (@runtime_checkable)
IDE autocompletionyesyes
Type checker validationyesyes

With protocols, you can wrap any existing object -- a Pinecone client, a custom database class, a test stub -- without modifying its inheritance chain. If it has the right methods, it works.

Protocol Families

anchor defines protocols across seven families. Every protocol is @runtime_checkable, so you can use isinstance() checks at runtime.

Retrieval Protocols

For fetching and ranking context items.

ProtocolKey MethodDescription
Retrieverretrieve(query, top_k)Synchronous retrieval
AsyncRetrieveraretrieve(query, top_k)Async retrieval
Rerankerrerank(query, items, top_k)Synchronous reranking
AsyncRerankerarerank(query, items, top_k)Async reranking
PostProcessorprocess(items, query)Post-retrieval transformation
AsyncPostProcessoraprocess(items, query)Async post-processing
TokenLevelEncoderencode_tokens(text)Per-token embeddings (ColBERT-style)

Memory Protocols

For conversation history, persistent facts, and memory lifecycle.

ProtocolKey MethodDescription
MemoryProviderget_context_items(priority)Provides items to the pipeline
ConversationMemoryturns, to_context_items()Conversation turn management
CompactionStrategycompact(turns)Summarize evicted turns
AsyncCompactionStrategycompact(turns)Async summarization
MemoryExtractorextract(turns)Extract structured facts from turns
AsyncMemoryExtractorextract(turns)Async fact extraction
MemoryConsolidatorconsolidate(new, existing)Merge/deduplicate memories
EvictionPolicyselect_for_eviction(turns, tokens_to_free)Choose turns to evict
MemoryDecaycompute_retention(entry)Score memory retention (0.0--1.0)
MemoryQueryEnricherenrich(query, memory_items)Augment query with memory context
RecencyScorerscore(index, total)Compute recency weight

Storage Protocols

For persisting context items, vectors, documents, and memory entries.

ProtocolKey MethodsDescription
ContextStoreadd, get, get_all, delete, clearContext item persistence
VectorStoreadd_embedding, search, deleteVector similarity search
DocumentStoreadd_document, get_document, list_documents, delete_documentRaw document storage
MemoryEntryStoreadd, search, list_all, delete, clearMemory entry persistence
GarbageCollectableStorelist_all_unfiltered, deleteExtends MemoryEntryStore for GC

Observability Protocols

For tracing, metrics, and monitoring.

ProtocolKey MethodsDescription
SpanExporterexport(spans)Export trace spans to backends
MetricsCollectorrecord(metric), flush()Collect and flush metric points

Query Protocols

For query transformation, classification, and routing.

ProtocolKey MethodDescription
QueryTransformertransform(query)Expand or rewrite queries
AsyncQueryTransformeratransform(query)Async query transformation
QueryClassifierclassify(query)Assign a label to a query
QueryRouterroute(query)Route to a named retriever

Ingestion Protocols

For document parsing and chunking.

ProtocolKey MethodDescription
Chunkerchunk(text, metadata)Split text into chunks
DocumentParserparse(source)Extract text + metadata from files

Evaluation Protocols

For assessing retrieval and generation quality.

ProtocolKey MethodDescription
RetrievalEvaluatorevaluate(retrieved, relevant, k)Precision, recall, MRR, NDCG
RAGEvaluatorevaluate(query, answer, contexts, ground_truth)Faithfulness, relevancy
HumanEvaluatoradd_judgment, compute_agreementHuman-in-the-loop evaluation

Infrastructure Protocols

For caching and tokenization.

ProtocolKey MethodsDescription
Tokenizercount_tokens(text), truncate_to_tokens(text, max_tokens)Token counting
CacheBackendget, set, invalidate, clearPipeline step caching

Implementing a Protocol

To implement a protocol, just write a class with the matching methods. Here is a concrete example implementing the Retriever protocol:

from anchor import ContextItem, SourceType
from anchor.models.query import QueryBundle

class KeywordRetriever:
    """A simple keyword-based retriever -- satisfies the Retriever protocol."""

    def __init__(self, documents: list[str]):
        self._docs = documents

    def retrieve(self, query: QueryBundle, top_k: int = 10) -> list[ContextItem]:
        query_words = set(query.query_str.lower().split())
        scored = []
        for doc in self._docs:
            overlap = len(query_words & set(doc.lower().split()))
            if overlap > 0:
                scored.append((doc, overlap))
        scored.sort(key=lambda x: -x[1])
        return [
            ContextItem(
                content=doc,
                source=SourceType.RETRIEVAL,
                score=min(1.0, count / len(query_words)) if query_words else 0.0,
            )
            for doc, count in scored[:top_k]
        ]

This class works with retriever_step() and ContextPipeline without importing any base class:

from anchor import ContextPipeline
from anchor.pipeline.step import retriever_step

retriever = KeywordRetriever(["Python is great", "RAG combines retrieval with generation"])
pipeline = ContextPipeline(max_tokens=8192).add_step(retriever_step("keyword", retriever))
result = pipeline.build("What is RAG?")

[!TIP] Runtime checking All protocols are @runtime_checkable, so you can verify at runtime:

from anchor.protocols import Retriever
assert isinstance(retriever, Retriever)  # True

Async Protocol Pairs

Many protocols come in sync/async pairs. The async variant uses a different method name (prefixed with a) to avoid ambiguity:

SyncAsyncSync MethodAsync Method
RetrieverAsyncRetrieverretrieve()aretrieve()
RerankerAsyncRerankerrerank()arerank()
PostProcessorAsyncPostProcessorprocess()aprocess()
QueryTransformerAsyncQueryTransformertransform()atransform()

Use the sync variant with pipeline.build() and the async variant with pipeline.abuild().

[!CAUTION] Sync steps in async pipelines abuild() can run both sync and async steps -- sync functions are called directly. But build() cannot run async steps and will raise TypeError if it encounters one.

See Also

On this page