Astro Intelligence

Retrieval API Reference

Retrieval API Reference

API reference for all retrieval classes, protocols, and utilities. For guides, see Retrieval and Advanced Retrieval.


Protocols

class Retriever(Protocol):
    def retrieve(self, query: QueryBundle, top_k: int = 10) -> list[ContextItem]: ...

class AsyncRetriever(Protocol):
    async def aretrieve(self, query: QueryBundle, top_k: int = 10) -> list[ContextItem]: ...

class Reranker(Protocol):
    def rerank(self, query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]: ...

class AsyncReranker(Protocol):
    async def arerank(self, query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]: ...

class TokenLevelEncoder(Protocol):
    def encode_tokens(self, text: str) -> list[list[float]]: ...

Core Retrievers

DenseRetriever

Embedding-based retrieval via a VectorStore backend.

DenseRetriever(
    vector_store: VectorStore,
    context_store: ContextStore,
    embed_fn: Callable[[str], list[float]] | None = None,
    tokenizer: Tokenizer | None = None,
)
ParameterTypeDefaultDescription
vector_storeVectorStore--Embedding storage and search backend.
context_storeContextStore--Resolves item IDs to ContextItem.
embed_fnCallable[[str], list[float]] | NoneNoneText embedding function. Required for index().
tokenizerTokenizer | NoneNoneToken counter. Defaults to built-in.
MethodSignatureDescription
index(items: list[ContextItem]) -> intIndex items. Returns count. Raises RetrieverError if embed_fn is None.
retrieve(query: QueryBundle, top_k: int = 10) -> list[ContextItem]Retrieve by similarity. Uses query.embedding if set, else calls embed_fn.

SparseRetriever

BM25-based retrieval. Requires rank-bm25.

SparseRetriever(
    tokenize_fn: Callable[[str], list[str]] | None = None,
    tokenizer: Tokenizer | None = None,
)
MethodSignatureDescription
index(items: list[ContextItem]) -> intBuild BM25 index. Raises RetrieverError if rank-bm25 missing.
retrieve(query: QueryBundle, top_k: int = 10) -> list[ContextItem]Retrieve by BM25 score (normalized to [0, 1]).

HybridRetriever

Combines multiple retrievers with Reciprocal Rank Fusion.

HybridRetriever(
    retrievers: list[Retriever],
    rrf_k: int = 60,
    weights: list[float] | None = None,
)
MethodSignatureDescription
retrieve(query: QueryBundle, top_k: int = 10) -> list[ContextItem]Fuse results with RRF. Skips failed sub-retrievers. Raises RetrieverError if all fail.

Rerankers

ScoreReranker

Simple reranker with process() interface (not the Reranker protocol).

ScoreReranker(score_fn: Callable[[str, str], float], top_k: int | None = None)
MethodSignatureDescription
process(items: list[ContextItem], query: QueryBundle | None = None) -> list[ContextItem]Rescore and sort. Returns input unchanged if query is None.

CrossEncoderReranker

Cross-encoder scoring. Implements Reranker.

CrossEncoderReranker(score_fn: Callable[[str, str], float], top_k: int = 10)
MethodSignatureDescription
rerank(query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]Score each item and return top-k.

CohereReranker

Batch reranking via API callback. Implements Reranker.

CohereReranker(
    rerank_fn: Callable[[str, list[str], int], list[tuple[int, float]]],
    top_k: int = 10,
)

The rerank_fn takes (query, documents, top_k) and returns (index, score) tuples.

MethodSignatureDescription
rerank(query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]Batch-rerank via callback.

FlashRankReranker

Local reranking via flashrank. Model is lazily loaded.

FlashRankReranker(model_name: str = "ms-marco-MiniLM-L-12-v2", top_k: int = 10)
MethodSignatureDescription
rerank(query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]Rerank using flashrank. Raises RetrieverError if not installed.

RoundRobinReranker

Round-robin merge and score-based re-sort.

RoundRobinReranker(top_k: int = 10)
MethodSignatureDescription
rerank(query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]Re-sort by existing score descending.
rerank_multiple(query: QueryBundle, result_sets: list[list[ContextItem]], top_k: int | None = None) -> list[ContextItem]Merge result sets round-robin, deduplicating by ID.

RerankerPipeline

Chains multiple rerankers sequentially. Final top_k applied at the end.

RerankerPipeline(rerankers: list[Reranker], top_k: int = 10)
MethodSignatureDescription
rerank(query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]Pass items through each reranker in sequence.

Async Retrievers

AsyncDenseRetriever

Async embedding-based retriever with cosine similarity.

AsyncDenseRetriever(
    embed_fn: Callable[[str], Awaitable[list[float]]],
    similarity_fn: Callable[[list[float], list[float]], float] | None = None,
)
MethodSignatureDescription
index(items: list[ContextItem]) -> NoneStore pre-embedded items (need "embedding" in metadata).
aindexasync (items: list[ContextItem]) -> NoneEmbed and store items via embed_fn.
aretrieveasync (query: QueryBundle, top_k: int = 10) -> list[ContextItem]Retrieve by cosine similarity.

AsyncHybridRetriever

Concurrent fan-out with RRF fusion via asyncio.gather.

AsyncHybridRetriever(
    retrievers: list[AsyncDenseRetriever],
    weights: list[float] | None = None,
    k: int = 60,
)
MethodSignatureDescription
aretrieveasync (query: QueryBundle, top_k: int = 10) -> list[ContextItem]Fan out to all retrievers and fuse with RRF.

AsyncCrossEncoderReranker

Async cross-encoder scoring. Scores all items concurrently.

AsyncCrossEncoderReranker(score_fn: Callable[[str, str], Awaitable[float]])
MethodSignatureDescription
arerankasync (query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]Score concurrently and return top-k.

AsyncCohereReranker

Async batch reranker via callback.

AsyncCohereReranker(rerank_fn: Callable[[str, list[str], int], Awaitable[list[int]]])
MethodSignatureDescription
arerankasync (query: QueryBundle, items: list[ContextItem], top_k: int = 10) -> list[ContextItem]Batch-rerank via async callback.

Query Routing

KeywordRouter

Routes queries by keyword matching. First match wins.

KeywordRouter(routes: dict[str, list[str]], default: str, *, case_sensitive: bool = False)
MethodSignatureDescription
route(query: QueryBundle) -> strReturn route name for first keyword match, or default.

CallbackRouter

Routes using a user-provided callback.

CallbackRouter(callback: Callable[[QueryBundle], str | None], default: str = "default")
MethodSignatureDescription
route(query: QueryBundle) -> strCall callback. Returns default if callback returns None.

MetadataRouter

Routes based on a metadata field in the query.

MetadataRouter(metadata_key: str = "route", default: str = "default")
MethodSignatureDescription
route(query: QueryBundle) -> strRead query.metadata[metadata_key]. Returns default if missing.

RoutedRetriever

Delegates to named retrievers based on routing.

RoutedRetriever(
    router: KeywordRouter | CallbackRouter | MetadataRouter,
    retrievers: dict[str, Retriever],
    default_retriever: str | None = None,
)
MethodSignatureDescription
retrieve(query: QueryBundle, top_k: int = 10) -> list[ContextItem]Route and delegate. Raises RetrieverError if route unknown and no default.

Cross-Modal

CrossModalEncoder

Encodes content from multiple modalities into a shared vector space.

CrossModalEncoder(encoders: dict[str, Callable[[Any], list[float]]])
MemberSignatureDescription
encode(content: Any, modality: str) -> list[float]Encode using named modality. Raises ValueError if unknown.
modalities@property -> list[str]Sorted list of registered modality names.

SharedSpaceRetriever

Cross-modal retriever in a shared embedding space.

SharedSpaceRetriever(
    encoder: CrossModalEncoder,
    query_modality: str = "text",
    similarity_fn: Callable[[list[float], list[float]], float] | None = None,
)
MethodSignatureDescription
index(items: list[ContextItem], modality: str | None = None) -> NoneEmbed and store items. Uses metadata["modality"] if modality is None.
retrieve(query: QueryBundle, top_k: int = 10) -> list[ContextItem]Retrieve by similarity in shared space.

Late Interaction

MaxSimScorer

ColBERT-style MaxSim scoring over per-token embeddings.

MaxSimScorer()
MethodSignatureDescription
score(query_tokens: list[list[float]], doc_tokens: list[list[float]]) -> floatSum of per-query-token maximum cosine similarities.

LateInteractionScorer

Configurable wrapper for token-level scoring. Defaults to MaxSim.

LateInteractionScorer(
    score_fn: Callable[[list[list[float]], list[list[float]]], float] | None = None,
)
MethodSignatureDescription
score(query_tokens: list[list[float]], doc_tokens: list[list[float]]) -> floatDelegate to configured scoring function.

LateInteractionRetriever

Two-stage retriever: first-stage candidate generation + token-level re-scoring.

LateInteractionRetriever(
    first_stage: Retriever,
    encoder: TokenLevelEncoder,
    scorer: LateInteractionScorer | None = None,
    first_stage_k: int = 100,
)
MethodSignatureDescription
retrieve(query: QueryBundle, top_k: int = 10) -> list[ContextItem]Generate candidates then re-score with token-level similarity.

Memory Retrieval

ScoredMemoryRetriever

Multi-signal retriever combining recency, relevance, and importance.

ScoredMemoryRetriever(
    store: MemoryEntryStore,
    embed_fn: Callable[[str], list[float]] | None = None,
    vector_store: VectorStore | None = None,
    decay: MemoryDecay | None = None,
    alpha: float = 0.3,
    beta: float = 0.5,
    gamma: float = 0.2,
)
ParameterTypeDefaultDescription
storeMemoryEntryStore--Backing store for entries.
embed_fnCallable | NoneNoneEmbedding function for relevance.
vector_storeVectorStore | NoneNoneVector index for relevance scoring.
decayMemoryDecay | NoneNoneCustom decay. Defaults to 7-day half-life.
alpha / beta / gammafloat0.3 / 0.5 / 0.2Recency / relevance / importance weights.
MethodSignatureDescription
retrieve(query: str, top_k: int = 5, *, user_id: str | None, memory_type: str | None) -> list[MemoryEntry]Retrieve by composite score with optional filters.
add_entry(entry: MemoryEntry) -> NoneAdd entry and optionally index embedding.
as_retriever() -> MemoryRetrieverAdapterReturn Retriever-protocol adapter for pipeline use.

MemoryRetrieverAdapter

Bridges ScoredMemoryRetriever to the Retriever protocol.

MemoryRetrieverAdapter(retriever: ScoredMemoryRetriever)
MethodSignatureDescription
retrieve(query: QueryBundle, top_k: int = 10) -> list[ContextItem]Convert MemoryEntry to ContextItem with source=MEMORY, priority=7.

Utility Functions

rrf_fuse

Standalone Reciprocal Rank Fusion for combining ranked lists.

def rrf_fuse(
    ranked_lists: list[list[ContextItem]],
    weights: list[float] | None = None,
    k: int = 60,
    top_k: int | None = None,
) -> list[ContextItem]: ...
ParameterTypeDefaultDescription
ranked_listslist[list[ContextItem]]--Ranked item lists.
weightslist[float] | NoneNonePer-list weights. Defaults to 1.0.
kint60RRF smoothing constant.
top_kint | NoneNoneMax items to return. None returns all.

Returns fused items with normalized scores and retrieval_method: "rrf" in metadata.

On this page