Skip to content

Pipeline

Pipeline dataclass

Pipeline(parser: Parser, chunker: Chunker, embedder: Embedder, indexer: HybridIndex, generator: Generator, reranker: Reranker | None = None, verifier: Verifier | None = None, strictness: Strictness = 'balanced', top_k_retrieve: int = 20, top_k_rerank: int = 5)

Orchestrates the full verifiable RAG pipeline.

Swap any component by passing a different implementation of the corresponding Protocol. All components are required; there are no silent no-ops.

from_yaml classmethod

from_yaml(path: str | Path) -> 'Pipeline'

Build a Pipeline from a YAML config. See verifiable_rag.config for schema.

Source code in src/verifiable_rag/pipeline.py
@classmethod
def from_yaml(cls, path: str | Path) -> "Pipeline":
    """Build a Pipeline from a YAML config. See verifiable_rag.config for schema."""
    from verifiable_rag.config import load_pipeline_from_yaml

    return load_pipeline_from_yaml(path)

ingest

ingest(path: str | Path) -> Document

Parse, chunk, embed, and index a document.

Returns the parsed Document so callers can inspect sentence IDs.

Source code in src/verifiable_rag/pipeline.py
def ingest(self, path: str | Path) -> Document:
    """Parse, chunk, embed, and index a document.

    Returns the parsed Document so callers can inspect sentence IDs.
    """
    document, chunks, embeddings = self.prepare_ingest(path)
    self.commit_ingest(document, chunks, embeddings)
    return document

prepare_ingest

prepare_ingest(path: str | Path) -> tuple[Document, list[Any], list[list[float]]]

Parse, chunk, and embed path without touching the shared index.

Split out from ingest() so the slow work (parse + embed) can run concurrently across threads while commit_ingest() is serialised behind a lock to keep the index consistent.

Source code in src/verifiable_rag/pipeline.py
def prepare_ingest(
    self, path: str | Path
) -> tuple[Document, list[Any], list[list[float]]]:
    """Parse, chunk, and embed *path* without touching the shared index.

    Split out from ``ingest()`` so the slow work (parse + embed) can run
    concurrently across threads while ``commit_ingest()`` is serialised
    behind a lock to keep the index consistent.
    """
    p = Path(path)
    document = self.parser.parse(p)
    chunks = self.chunker.chunk(document)
    # Use contextual preamble if the chunker added one
    # (Contextual Retrieval recipe). Falls back to chunk.text otherwise.
    from verifiable_rag.chunkers.contextual import embedding_text

    embeddings = self.embedder.embed([embedding_text(c) for c in chunks])
    return document, chunks, embeddings

commit_ingest

commit_ingest(document: Document, chunks: list[Any], embeddings: list[list[float]]) -> None

Atomically write a prepared ingest to the shared index + doc map.

Source code in src/verifiable_rag/pipeline.py
def commit_ingest(
    self,
    document: Document,
    chunks: list[Any],
    embeddings: list[list[float]],
) -> None:
    """Atomically write a prepared ingest to the shared index + doc map."""
    self.indexer.add(chunks, embeddings)
    self._documents[document.doc_id] = document

ask

ask(query: str) -> Answer

Run the full pipeline for query and return a verified Answer.

Source code in src/verifiable_rag/pipeline.py
def ask(self, query: str) -> Answer:
    """Run the full pipeline for *query* and return a verified Answer."""
    query_embedding = self.embedder.embed_query(query)
    retrieved = self.indexer.search(query, query_embedding, k=self.top_k_retrieve)

    if self.reranker is not None:
        reranked = self.reranker.rerank(query, retrieved, top_k=self.top_k_rerank)
    else:
        reranked = retrieved[: self.top_k_rerank]

    doc_ids = {r.chunk.doc_id for r in reranked}
    documents = {did: self._documents[did] for did in doc_ids if did in self._documents}

    cited_sentences = self.generator.generate(query, reranked, documents)

    # Run the verifier whenever one is configured — strictness only controls
    # whether we REFUSE on a low score, not whether we score at all. This
    # lets loose mode emit NLI scores for analysis without filtering.
    if self.verifier is None:
        verification_results: list[VerificationResult] = []
    else:
        verification_results = self.verifier.verify(cited_sentences, documents)

    return self._build_answer(query, cited_sentences, verification_results, reranked)