This tutorial picks up where the Quickstart left off. You
will add retrieval, hybrid search, provider formatting, token budgets,
custom steps, async support, query transformation, and diagnostics to your
pipeline. By the end you will have a solid understanding of every major
feature in anchor.
from anchor import ( ContextPipeline, QueryBundle, ContextItem, SourceType, DenseRetriever, InMemoryContextStore, InMemoryVectorStore, retriever_step,)# You provide the embedding function (any provider works)def my_embed_fn(text: str) -> list[float]: # Replace with your actual embedding call # e.g., openai.embeddings.create(model="text-embedding-3-small", input=text) return [0.0] * 384# Set up retrieverretriever = DenseRetriever( vector_store=InMemoryVectorStore(), context_store=InMemoryContextStore(), embed_fn=my_embed_fn,)# Index some documentsdocs = [ ContextItem(content="Python lists support .sort() and sorted().", source=SourceType.RETRIEVAL), ContextItem(content="Use lambda for custom sort keys.", source=SourceType.RETRIEVAL),]retriever.index(docs)# Build pipeline with retrievalpipeline = ( ContextPipeline(max_tokens=4096) .add_step(retriever_step("search", retriever, top_k=5)))result = pipeline.build(QueryBundle(query_str="How to sort in Python?"))print(f"Found {len(result.window.items)} context items")print(f"Used {result.window.used_tokens}/{result.window.max_tokens} tokens")
[!NOTE] Bring your own embeddings
anchor never calls an embedding provider directly. You supply the
embed_fn and can use OpenAI, Cohere, local models, or anything else.
default_chat_budget(max_tokens) -- Conversational apps. 60% of usable
tokens go to conversation and memory.
default_rag_budget(max_tokens) -- RAG-heavy apps. 40% of usable tokens
go to retrieval results.
default_agent_budget(max_tokens) -- Agentic apps. Includes a 15% tool
allocation and balances across all sources.
[!CAUTION]
The reserve_tokens field (15% by default) is subtracted from max_tokens
before any items are placed. Make sure your pipeline's max_tokens
is large enough to leave room after the reservation.
Instead of using add_step() with factory functions, you can register pipeline
steps using the @pipeline.step decorator. This is especially convenient for
custom post-processing logic:
from anchor import ContextPipeline, ContextItem, QueryBundlepipeline = ContextPipeline(max_tokens=8192)@pipeline.stepdef boost_recent(items: list[ContextItem], query: QueryBundle) -> list[ContextItem]: """Boost scores of recent items.""" return [ item.model_copy(update={"score": min(1.0, item.score * 1.5)}) if item.metadata.get("recent") else item for item in items ]@pipeline.step(name="quality-filter")def remove_low_quality(items: list[ContextItem], query: QueryBundle) -> list[ContextItem]: """Filter out low-scoring items.""" return [item for item in items if item.score > 0.3]result = pipeline.build("How to sort in Python?")
Step functions must accept two arguments -- items: list[ContextItem] and
query: QueryBundle -- and return a list[ContextItem].
You can also pass on_error="skip" to gracefully skip a step if it raises:
@pipeline.step(name="optional-enrichment", on_error="skip")def enrich(items: list[ContextItem], query: QueryBundle) -> list[ContextItem]: """This step is skipped if it fails instead of crashing the pipeline.""" return items
[!NOTE]
Passing an async function to @pipeline.step raises a TypeError.
Use @pipeline.async_step for async functions (see below).
For pipelines that include async steps (e.g., database lookups, API calls),
use @pipeline.async_step and call abuild() instead of build():
import asynciofrom anchor import ContextPipeline, ContextItem, SourceType, QueryBundlepipeline = ContextPipeline(max_tokens=8192)@pipeline.async_stepasync def fetch_from_db(items: list[ContextItem], query: QueryBundle) -> list[ContextItem]: """Fetch relevant context from an async database.""" # Replace with your actual async database call await asyncio.sleep(0) # placeholder for async I/O new_items = [ ContextItem( content="Retrieved from database", source=SourceType.RETRIEVAL, score=0.9, ) ] return items + new_items@pipeline.stepdef filter_results(items: list[ContextItem], query: QueryBundle) -> list[ContextItem]: """Sync steps and async steps can be mixed in the same pipeline.""" return [item for item in items if item.score > 0.5]# Use abuild() instead of build() to run the async pipelineresult = asyncio.run(pipeline.abuild("What is context engineering?"))
[!CAUTION]
If your pipeline contains any async steps, you must use abuild().
Calling build() on a pipeline with async steps will raise an error.You can also use @pipeline.async_step with keyword arguments:
HyDE generates a hypothetical answer and uses it as the retrieval query.
The intuition: embedding a plausible answer is closer in vector space to the
real answer than the question itself.
from anchor import ( ContextPipeline, HyDETransformer, query_transform_step, DenseRetriever, InMemoryVectorStore, InMemoryContextStore,)def generate_hypothetical(query: str) -> str: """Replace with your actual LLM call.""" return f"A hypothetical answer to: {query}"hyde = HyDETransformer(generate_fn=generate_hypothetical)retriever = DenseRetriever( vector_store=InMemoryVectorStore(), context_store=InMemoryContextStore(), embed_fn=my_embed_fn,)pipeline = ContextPipeline(max_tokens=8192).add_step( query_transform_step("hyde-search", transformer=hyde, retriever=retriever, top_k=10))result = pipeline.build("What causes memory leaks in Python?")
[!NOTE]
anchor never calls an LLM directly. You provide the generation
function (generate_fn) and the transformers handle orchestration.
The diagnostics dictionary contains the following fields:
Field
Type
Description
steps
list[StepDiagnostic]
Per-step name, item count, and timing
memory_items
int
Number of items contributed by memory
total_items_considered
int
Total items before window assembly
items_included
int
Items that fit in the context window
items_overflow
int
Items that did not fit
token_utilization
float
Fraction of token budget used (0.0--1.0)
token_usage_by_source
dict[str, int]
Per-source token counts (when using budgets)
budget_overflow_by_source
dict[str, int]
Per-source overflow counts (when using budgets)
shared_pool_usage
int
Tokens used by non-allocated sources (when using budgets)
skipped_steps
list[str]
Steps that failed with on_error="skip"
failed_step
str
Step that caused a pipeline failure
query_enriched
bool
Whether query enrichment was applied
[!TIP]
A token_utilization close to 1.0 means you are making good use of your
context window. If it is consistently low, consider increasing top_k on
your retriever steps or lowering max_tokens.
[!CAUTION]
If items_overflow is high, important context may be getting dropped.
Consider increasing max_tokens, tuning per-source budgets, or filtering
low-quality items earlier in the pipeline.
Every ContextItem has a priority field (1--10) that controls placement
order. Higher priority items are placed first and are never evicted in favor
of lower priority items.
Priority
Source
Usage
10
System prompts
Instructions, persona, rules
8
Persistent memory
Long-term facts from MemoryManager.add_fact()
7
Conversation memory
Recent chat turns
5
Retrieval (default)
RAG results from retrievers
1--4
Custom
Low-priority supplementary context
[!NOTE]
The priority system works together with token budgets. Within a single
source category, items are ordered by priority first, then by score.