Protocols API Reference
Protocols API Reference
anchor uses PEP 544 structural protocols throughout its architecture.
Any class with matching method signatures satisfies a protocol -- no
inheritance required. All protocols are @runtime_checkable.
All protocols are importable from anchor:
from anchor import Retriever, AsyncRetriever, PostProcessor, Reranker
# ... and all others listed belowRetrieval
Retriever
Synchronous retrieval of context items.
@runtime_checkable
class Retriever(Protocol):
def retrieve(self, query: QueryBundle, top_k: int = 10) -> list[ContextItem]: ...| Parameter | Type | Default | Description |
|---|---|---|---|
query | QueryBundle | required | Query text and metadata |
top_k | int | 10 | Maximum items to return |
Returns: List of ContextItem ranked by relevance (most relevant first).
AsyncRetriever
Asynchronous retrieval for non-blocking I/O.
@runtime_checkable
class AsyncRetriever(Protocol):
async def aretrieve(self, query: QueryBundle, top_k: int = 10) -> list[ContextItem]: ...Same parameters as Retriever. Used with ContextPipeline.abuild().
Post-Processing
PostProcessor
Synchronous transformation of retrieved context items (reranking, filtering, deduplication, PII removal, etc.).
@runtime_checkable
class PostProcessor(Protocol):
def process(
self, items: list[ContextItem], query: QueryBundle | None = None
) -> list[ContextItem]: ...| Parameter | Type | Default | Description |
|---|---|---|---|
items | list[ContextItem] | required | Items to post-process |
query | QueryBundle | None | None | Original query for query-aware transforms |
Returns: A new or modified list of ContextItem objects.
AsyncPostProcessor
Asynchronous post-processing (e.g., LLM-based reranking).
@runtime_checkable
class AsyncPostProcessor(Protocol):
async def aprocess(
self, items: list[ContextItem], query: QueryBundle | None = None
) -> list[ContextItem]: ...Query Transformation
QueryTransformer
Synchronous query transformation. Takes a single query and produces one or more derived queries for improved retrieval (expansion, decomposition, HyDE).
@runtime_checkable
class QueryTransformer(Protocol):
def transform(self, query: QueryBundle) -> list[QueryBundle]: ...| Parameter | Type | Description |
|---|---|---|
query | QueryBundle | The original query to transform |
Returns: A list of derived QueryBundle objects. Always at least one.
AsyncQueryTransformer
Asynchronous query transformation (e.g., LLM-based HyDE generation).
@runtime_checkable
class AsyncQueryTransformer(Protocol):
async def atransform(self, query: QueryBundle) -> list[QueryBundle]: ...Query Classification
QueryClassifier
Classifies a query and returns a string label for downstream routing.
@runtime_checkable
class QueryClassifier(Protocol):
def classify(self, query: QueryBundle) -> str: ...| Parameter | Type | Description |
|---|---|---|
query | QueryBundle | The query to classify |
Returns: A string label representing the query category.
QueryRouter
Routes queries to named retriever backends.
@runtime_checkable
class QueryRouter(Protocol):
def route(self, query: QueryBundle) -> str: ...| Parameter | Type | Description |
|---|---|---|
query | QueryBundle | The query to route |
Returns: The name/key of the target retriever.
Reranking
Reranker
Synchronous reranking of retrieved results.
@runtime_checkable
class Reranker(Protocol):
def rerank(
self, query: QueryBundle, items: list[ContextItem], top_k: int = 10
) -> list[ContextItem]: ...| Parameter | Type | Default | Description |
|---|---|---|---|
query | QueryBundle | required | The user query |
items | list[ContextItem] | required | Candidate items to rerank |
top_k | int | 10 | Maximum items to return |
Returns: List of ContextItem ranked by relevance (most relevant first).
AsyncReranker
Asynchronous reranking for cross-encoder inference or API calls.
@runtime_checkable
class AsyncReranker(Protocol):
async def arerank(
self, query: QueryBundle, items: list[ContextItem], top_k: int = 10
) -> list[ContextItem]: ...Late Interaction
TokenLevelEncoder
Encodes text into per-token embeddings for late interaction scoring (e.g., ColBERT MaxSim).
@runtime_checkable
class TokenLevelEncoder(Protocol):
def encode_tokens(self, text: str) -> list[list[float]]: ...| Parameter | Type | Description |
|---|---|---|
text | str | Text to encode into per-token embeddings |
Returns: A list of embeddings, one per token. Each embedding is a list of floats.
Memory
ConversationMemory
Read-side access to conversation memory. Both SlidingWindowMemory and
SummaryBufferMemory satisfy this protocol.
@runtime_checkable
class ConversationMemory(Protocol):
@property
def turns(self) -> list[ConversationTurn]: ...
@property
def total_tokens(self) -> int: ...
def to_context_items(self, priority: int = 7) -> list[ContextItem]: ...
def clear(self) -> None: ...MemoryProvider
Provides context items from memory. Used by ContextPipeline.with_memory().
MemoryManager is the canonical implementation.
@runtime_checkable
class MemoryProvider(Protocol):
def get_context_items(self, priority: int = 7) -> list[ContextItem]: ...CompactionStrategy
Compacts evicted conversation turns into a summary string.
@runtime_checkable
class CompactionStrategy(Protocol):
def compact(self, turns: list[ConversationTurn]) -> str: ...| Parameter | Type | Description |
|---|---|---|
turns | list[ConversationTurn] | Turns to compact (chronological, oldest first) |
Returns: A concise summary string.
AsyncCompactionStrategy
Async variant of CompactionStrategy.
@runtime_checkable
class AsyncCompactionStrategy(Protocol):
async def compact(self, turns: list[ConversationTurn]) -> str: ...MemoryExtractor
Extracts structured memories from conversation turns.
@runtime_checkable
class MemoryExtractor(Protocol):
def extract(self, turns: list[ConversationTurn]) -> list[MemoryEntry]: ...Returns: Zero or more MemoryEntry objects representing facts, preferences,
or other persistent knowledge.
AsyncMemoryExtractor
Async variant of MemoryExtractor.
@runtime_checkable
class AsyncMemoryExtractor(Protocol):
async def extract(self, turns: list[ConversationTurn]) -> list[MemoryEntry]: ...MemoryConsolidator
Decides how to merge new memories with existing ones.
@runtime_checkable
class MemoryConsolidator(Protocol):
def consolidate(
self, new_entries: list[MemoryEntry], existing: list[MemoryEntry]
) -> list[tuple[MemoryOperation, MemoryEntry | None]]: ...Returns: A list of (MemoryOperation, entry | None) tuples. Operations
are ADD, UPDATE, DELETE, or NONE.
EvictionPolicy
Decides which conversation turns to evict when memory is full.
@runtime_checkable
class EvictionPolicy(Protocol):
def select_for_eviction(
self, turns: list[ConversationTurn], tokens_to_free: int
) -> list[int]: ...| Parameter | Type | Description |
|---|---|---|
turns | list[ConversationTurn] | All turns currently held in memory |
tokens_to_free | int | Minimum tokens that must be freed |
Returns: List of zero-based indices identifying turns to evict.
MemoryDecay
Computes a retention score for a memory entry (0.0 = forget, 1.0 = retain).
@runtime_checkable
class MemoryDecay(Protocol):
def compute_retention(self, entry: MemoryEntry) -> float: ...Returns: Float from 0.0 (completely forgotten) to 1.0 (perfectly retained).
MemoryQueryEnricher
Enriches a query with memory context before retrieval.
@runtime_checkable
class MemoryQueryEnricher(Protocol):
def enrich(self, query: str, memory_items: list[MemoryEntry]) -> str: ...| Parameter | Type | Description |
|---|---|---|
query | str | The original user query |
memory_items | list[MemoryEntry] | Relevant memory entries |
Returns: An enriched query string.
[!NOTE]
QueryEnricheris a deprecated alias forMemoryQueryEnricher.
RecencyScorer
Computes recency scores for memory items.
@runtime_checkable
class RecencyScorer(Protocol):
def score(self, index: int, total: int) -> float: ...| Parameter | Type | Description |
|---|---|---|
index | int | Zero-based position (0 = oldest) |
total | int | Total number of items |
Returns: Float from 0.0 (oldest) to 1.0 (newest).
MemoryOperation
Enum of actions a MemoryConsolidator can prescribe.
class MemoryOperation(StrEnum):
ADD = "add"
UPDATE = "update"
DELETE = "delete"
NONE = "none"Storage
ContextStore
CRUD operations for ContextItem objects.
@runtime_checkable
class ContextStore(Protocol):
def add(self, item: ContextItem) -> None: ...
def get(self, item_id: str) -> ContextItem | None: ...
def get_all(self) -> list[ContextItem]: ...
def delete(self, item_id: str) -> bool: ...
def clear(self) -> None: ...VectorStore
Vector similarity search backend (wraps FAISS, Chroma, Qdrant, etc.).
@runtime_checkable
class VectorStore(Protocol):
def add_embedding(
self, item_id: str, embedding: list[float],
metadata: dict[str, Any] | None = None,
) -> None: ...
def search(
self, query_embedding: list[float], top_k: int = 10,
) -> list[tuple[str, float]]: ...
def delete(self, item_id: str) -> bool: ...search returns: List of (item_id, score) tuples by descending similarity.
DocumentStore
Storage for raw documents (pre-chunking / pre-indexing).
@runtime_checkable
class DocumentStore(Protocol):
def add_document(
self, doc_id: str, content: str,
metadata: dict[str, Any] | None = None,
) -> None: ...
def get_document(self, doc_id: str) -> str | None: ...
def list_documents(self) -> list[str]: ...
def delete_document(self, doc_id: str) -> bool: ...MemoryEntryStore
Persistent storage for MemoryEntry objects with CRUD and search.
@runtime_checkable
class MemoryEntryStore(Protocol):
def add(self, entry: MemoryEntry) -> None: ...
def search(self, query: str, top_k: int = 5) -> list[MemoryEntry]: ...
def list_all(self) -> list[MemoryEntry]: ...
def delete(self, entry_id: str) -> bool: ...
def clear(self) -> None: ...GarbageCollectableStore
Extends MemoryEntryStore with access to expired entries for garbage
collection.
@runtime_checkable
class GarbageCollectableStore(Protocol):
def list_all_unfiltered(self) -> list[MemoryEntry]: ...
def delete(self, entry_id: str) -> bool: ...Unlike MemoryEntryStore.list_all which may filter out expired entries,
list_all_unfiltered returns every entry so the garbage collector can
identify and prune them.
Evaluation
RetrievalEvaluator
Evaluates retrieval quality (precision, recall, MRR, NDCG).
@runtime_checkable
class RetrievalEvaluator(Protocol):
def evaluate(
self, retrieved: list[ContextItem], relevant: list[str], k: int = 10,
) -> RetrievalMetrics: ...| Parameter | Type | Default | Description |
|---|---|---|---|
retrieved | list[ContextItem] | required | Items returned by retriever, ranked |
relevant | list[str] | required | IDs of truly relevant documents |
k | int | 10 | Top results to consider |
RAGEvaluator
Evaluates RAG output quality (faithfulness, relevancy, precision, recall).
@runtime_checkable
class RAGEvaluator(Protocol):
def evaluate(
self, query: str, answer: str, contexts: list[str],
ground_truth: str | None = None,
) -> RAGMetrics: ...| Parameter | Type | Default | Description |
|---|---|---|---|
query | str | required | Original user query |
answer | str | required | Generated answer |
contexts | list[str] | required | Context strings fed to generator |
ground_truth | str | None | None | Reference answer for recall |
HumanEvaluator
Human-in-the-loop evaluation with inter-annotator agreement.
@runtime_checkable
class HumanEvaluator(Protocol):
def add_judgment(self, judgment: Any) -> None: ...
def compute_agreement(self) -> float: ...Observability
SpanExporter
Exports spans to external systems (stdout, file, OTLP, etc.).
@runtime_checkable
class SpanExporter(Protocol):
def export(self, spans: list[Span]) -> None: ...MetricsCollector
Collects and exports metrics.
@runtime_checkable
class MetricsCollector(Protocol):
def record(self, metric: MetricPoint) -> None: ...
def flush(self) -> None: ...Ingestion
Chunker
Splits text into smaller pieces for embedding and retrieval.
@runtime_checkable
class Chunker(Protocol):
def chunk(self, text: str, metadata: dict[str, Any] | None = None) -> list[str]: ...| Parameter | Type | Default | Description |
|---|---|---|---|
text | str | required | Full document text |
metadata | dict[str, Any] | None | None | Document-level metadata |
Returns: List of text chunks.
DocumentParser
Converts a file into plain text plus metadata.
@runtime_checkable
class DocumentParser(Protocol):
def parse(self, source: Path | bytes) -> tuple[str, dict[str, Any]]: ...
@property
def supported_extensions(self) -> list[str]: ...parse returns: (text, metadata) tuple.
Multimodal
ModalityEncoder
Encodes multi-modal content into text for embedding and retrieval.
@runtime_checkable
class ModalityEncoder(Protocol):
def encode(self, content: MultiModalContent) -> str: ...
@property
def supported_modalities(self) -> list[ModalityType]: ...TableExtractor
Extracts structured tables from documents.
@runtime_checkable
class TableExtractor(Protocol):
def extract_tables(self, source: Path | bytes) -> list[MultiModalContent]: ...Tokenization
Tokenizer
Token counting and truncation abstraction.
@runtime_checkable
class Tokenizer(Protocol):
def count_tokens(self, text: str) -> int: ...
def truncate_to_tokens(self, text: str, max_tokens: int) -> str: ...| Method | Description |
|---|---|
count_tokens(text) | Count tokens in a text string |
truncate_to_tokens(text, max_tokens) | Truncate text to at most max_tokens tokens |
Caching
CacheBackend
Backend for caching pipeline step results.
@runtime_checkable
class CacheBackend(Protocol):
def get(self, key: str) -> Any | None: ...
def set(self, key: str, value: Any, ttl: float | None = None) -> None: ...
def invalidate(self, key: str) -> None: ...
def clear(self) -> None: ...See Cache API Reference for the built-in implementation.
Implementing a Protocol
To implement any protocol, create a class with matching method signatures:
from anchor import Retriever, ContextItem, QueryBundle
class MyRetriever:
"""Custom retriever -- no inheritance needed."""
def retrieve(self, query: QueryBundle, top_k: int = 10) -> list[ContextItem]:
# Your retrieval logic here
return []
# Verify at runtime
assert isinstance(MyRetriever(), Retriever)[!TIP] All protocols are
@runtime_checkable, so you can useisinstance()checks at runtime for validation and debugging.
See Also
- Retrieval Guide -- using retrievers and rerankers
- Memory Guide -- memory extension points
- Ingestion Guide -- chunkers and parsers
- Cache Guide -- cache backends