Astro Intelligence

Pipeline API Reference

Pipeline API Reference

API reference for the pipeline module -- the orchestration layer of anchor.

ContextPipeline

The main orchestrator that assembles context from multiple sources into a token-aware, priority-ranked context window.

from anchor import ContextPipeline

class ContextPipeline:
    def __init__(
        self,
        max_tokens: int = 8192,
        tokenizer: Tokenizer | None = None,
        budget: TokenBudget | None = None,
    ) -> None: ...

Parameters:

ParameterTypeDefaultDescription
max_tokensint8192Maximum token budget for the context window. Must be positive.
tokenizerTokenizer | NoneNoneCustom tokenizer. Falls back to the built-in TiktokenCounter.
budgetTokenBudget | NoneNoneOptional token budget for fine-grained per-source allocation.

Raises: ValueError if max_tokens <= 0.

Properties

PropertyTypeDescription
max_tokensintThe maximum token budget for the context window.
formatterFormatterThe current output formatter.
stepslist[PipelineStep]A copy of the registered pipeline steps.
system_itemslist[ContextItem]A copy of the registered system items.
budgetTokenBudget | NoneThe optional token budget.

Methods

add_step(step) -> ContextPipeline

Add a pipeline step. Returns self for chaining.

def add_step(self, step: PipelineStep) -> ContextPipeline: ...

with_memory(memory) -> ContextPipeline

Attach a memory provider. Any object satisfying the MemoryProvider protocol (i.e. having a get_context_items() -> list[ContextItem] method) is accepted.

def with_memory(self, memory: MemoryProvider) -> ContextPipeline: ...

with_budget(budget) -> ContextPipeline

Attach a TokenBudget for fine-grained allocation.

def with_budget(self, budget: TokenBudget) -> ContextPipeline: ...

with_formatter(formatter) -> ContextPipeline

Set the output formatter.

def with_formatter(self, formatter: Formatter) -> ContextPipeline: ...

add_system_prompt(content, priority=10) -> ContextPipeline

Add a system prompt as a high-priority context item with source=SourceType.SYSTEM.

def add_system_prompt(self, content: str, priority: int = 10) -> ContextPipeline: ...

add_callback(callback) -> ContextPipeline

Register an event callback for pipeline observability.

def add_callback(self, callback: PipelineCallback) -> ContextPipeline: ...

with_query_enricher(enricher) -> ContextPipeline

Attach a query enricher for memory-aware query expansion. The enricher is called after memory items are collected but before pipeline steps execute.

def with_query_enricher(self, enricher: ContextQueryEnricher) -> ContextPipeline: ...

step(fn=None, *, name=None, on_error="raise")

Decorator to register a synchronous function as a pipeline step. Usable with or without arguments: @pipeline.step or @pipeline.step(name="x", on_error="skip").

Raises: TypeError if the function is async (use async_step instead).

async_step(fn=None, *, name=None, on_error="raise")

Decorator to register an async function as a pipeline step. Same usage pattern as step.

Raises: TypeError if the function is not async.

build(query) -> ContextResult

Execute the full pipeline synchronously and return assembled context.

def build(self, query: str | QueryBundle) -> ContextResult: ...

Accepts either a plain string (auto-wrapped in QueryBundle) or a QueryBundle.

abuild(query) -> ContextResult

Execute the full pipeline asynchronously. Supports both sync and async steps: sync steps are called directly, async steps are awaited.

async def abuild(self, query: str | QueryBundle) -> ContextResult: ...

PipelineStep

A single composable step in the context pipeline. This is a dataclass.

from anchor import PipelineStep

@dataclass(slots=True)
class PipelineStep:
    name: str
    fn: SyncStepFn | AsyncStepFn
    is_async: bool = False
    on_error: Literal["raise", "skip"] = "raise"
    metadata: dict[str, Any] = field(default_factory=dict)

Fields:

FieldTypeDefaultDescription
namestr(required)Human-readable name for diagnostics.
fnStepFn(required)The callable implementing the step logic.
is_asyncboolFalseWhether fn is an async function.
on_error"raise" | "skip""raise"Error handling policy.
metadatadict[str, Any]{}Arbitrary metadata.

Methods

execute(items, query) -> list[ContextItem]

Execute the step synchronously. Raises TypeError if the step is async.

aexecute(items, query) -> list[ContextItem]

Execute the step asynchronously. Works for both sync and async step functions.


Factory Functions

retriever_step(name, retriever, top_k=10) -> PipelineStep

Create a step from a Retriever protocol implementation. Appends retrieved items to the current list.

from anchor import retriever_step
step = retriever_step("search", my_retriever, top_k=5)
ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
retrieverRetriever(required)Object with retrieve(query, top_k) method.
top_kint10Maximum items to retrieve.

async_retriever_step(name, retriever, top_k=10) -> PipelineStep

Async variant. Wraps an AsyncRetriever (must have aretrieve(query, top_k)).

from anchor import async_retriever_step
step = async_retriever_step("async-search", my_async_retriever, top_k=5)
ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
retrieverAsyncRetriever(required)Object with aretrieve(query, top_k) method.
top_kint10Maximum items to retrieve.

filter_step(name, predicate) -> PipelineStep

Create a step that filters items by a predicate function. Items where the predicate returns False are removed.

from anchor import filter_step
step = filter_step("score-gate", lambda item: item.score > 0.5)
ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
predicateCallable[[ContextItem], bool](required)Returns True to keep an item.

postprocessor_step(name, processor) -> PipelineStep

Create a step from a PostProcessor protocol implementation.

from anchor import postprocessor_step
step = postprocessor_step("dedup", my_deduplicator)
ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
processorPostProcessor(required)Object with process(items, query) method.

async_postprocessor_step(name, processor) -> PipelineStep

Async variant. Wraps an AsyncPostProcessor (must have aprocess(items, query)).

ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
processorAsyncPostProcessor(required)Object with aprocess(items, query) method.

reranker_step(name, reranker, top_k=10) -> PipelineStep

Create a step from a Reranker protocol implementation. The reranker scores and returns the top-k most relevant items.

from anchor import reranker_step
step = reranker_step("rerank", my_reranker, top_k=3)
ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
rerankerReranker(required)Object with rerank(query, items, top_k) method.
top_kint10Maximum items the reranker should return.

async_reranker_step(name, reranker, top_k=10) -> PipelineStep

Async variant. Wraps an AsyncReranker (must have arerank(query, items, top_k)).

ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
rerankerAsyncReranker(required)Object with arerank(query, items, top_k) method.
top_kint10Maximum items the reranker should return.

query_transform_step(name, transformer, retriever, top_k=10) -> PipelineStep

Create a step that expands the query into multiple variants, retrieves for each, and merges results using Reciprocal Rank Fusion (RRF). New items are deduplicated by ID.

from anchor import query_transform_step
step = query_transform_step("multi-query", my_transformer, my_retriever, top_k=5)
ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
transformerQueryTransformer(required)Expands a single query into multiple queries.
retrieverRetriever(required)Retriever to run against each expanded query.
top_kint10Maximum items to retrieve per query variant.

classified_retriever_step(name, classifier, retrievers, default=None, top_k=10) -> PipelineStep

Create a step that classifies the query and routes to the appropriate retriever.

from anchor import classified_retriever_step
step = classified_retriever_step(
    "router",
    classifier=my_classifier,
    retrievers={"technical": tech_retriever, "general": general_retriever},
    default="general",
    top_k=5,
)
ParameterTypeDefaultDescription
namestr(required)Step name for diagnostics.
classifierQueryClassifier(required)Object with classify(query) -> str method.
retrieversdict[str, Retriever](required)Mapping from class label to retriever.
defaultstr | NoneNoneFallback key when the label is not found.
top_kint10Maximum items to retrieve.

Raises: RetrieverError if the label has no matching retriever and no default.

auto_promotion_step(extractor, store, consolidator=None, name="auto_promotion", on_error="skip") -> PipelineStep

Create a step that extracts and stores memories from context. This is a side-effect-only step that returns items unchanged.

ParameterTypeDefaultDescription
extractorMemoryExtractor(required)Extracts MemoryEntry objects from conversation turns.
storeMemoryEntryStore(required)Persistence backend for memory entries.
consolidatorMemoryConsolidator | NoneNoneOptional deduplication against existing entries.
namestr"auto_promotion"Step name for diagnostics.
on_error"raise" | "skip""skip"Error handling policy.

graph_retrieval_step(graph, store, entity_extractor, ...) -> PipelineStep

Create a step that retrieves memory entries linked to graph entities via BFS traversal.

ParameterTypeDefaultDescription
graphSimpleGraphMemory(required)Graph memory instance to traverse.
storeMemoryEntryStore(required)Store holding MemoryEntry objects.
entity_extractorCallable[[str], list[str]](required)Maps query string to entity IDs.
max_depthint2Maximum BFS traversal depth.
max_itemsint5Maximum ContextItem objects to return.
namestr"graph_retrieval"Step name for diagnostics.
on_error"raise" | "skip""skip"Error handling policy.

create_eviction_promoter(extractor, store, consolidator=None) -> Callable

Create an on_evict callback that promotes evicted conversation turns to long-term memory. Designed for SlidingWindowMemory(on_evict=...).

from anchor import create_eviction_promoter, SlidingWindowMemory

promoter = create_eviction_promoter(extractor, store, consolidator)
memory = SlidingWindowMemory(max_tokens=4096, on_evict=promoter)
ParameterTypeDefaultDescription
extractorMemoryExtractor(required)Extracts MemoryEntry objects from turns.
storeMemoryEntryStore(required)Persistence backend.
consolidatorMemoryConsolidator | NoneNoneOptional deduplication.

Returns: A callable with signature (list[ConversationTurn]) -> None.

[!NOTE] Errors inside the eviction promoter are logged but never propagated to prevent crashing the memory pipeline.


MemoryContextEnricher

Enriches queries by appending recent conversation context. Helps retrieval steps find documents relevant to the ongoing conversation, not just the literal query.

from anchor import MemoryContextEnricher

MemoryContextEnricher(
    max_items: int = 5,
    template: str = "{query}\n\nConversation context: {context}",
)

Parameters:

ParameterTypeDefaultDescription
max_itemsint5Maximum number of recent memory items to include. Must be positive.
templatestr"{query}\n\nConversation context: {context}"Format string with {query} and {context} placeholders.

Usage:

enricher = MemoryContextEnricher(max_items=3)
pipeline = ContextPipeline(max_tokens=8192).with_query_enricher(enricher)

Satisfies the MemoryQueryEnricher protocol. When attached via with_query_enricher(), it receives memory items and appends a summary to the query before retrieval steps execute.


PipelineCallback

A runtime-checkable protocol for pipeline event callbacks. All methods are optional -- implement only the ones you need.

from anchor import PipelineCallback

class PipelineCallback(Protocol):
    def on_pipeline_start(self, query: QueryBundle) -> None: ...
    def on_step_start(self, step_name: str, items: list[ContextItem]) -> None: ...
    def on_step_end(self, step_name: str, items: list[ContextItem], time_ms: float) -> None: ...
    def on_step_error(self, step_name: str, error: Exception) -> None: ...
    def on_pipeline_end(self, result: ContextResult) -> None: ...
MethodCalled When
on_pipeline_startPipeline execution begins.
on_step_startA step is about to execute.
on_step_endA step completed successfully.
on_step_errorA step raised an exception.
on_pipeline_endPipeline execution completed.

See Also

On this page