sift-kg Python juanceresa/sift-kg

From Documents to Knowledge Graph: The Sifter Pipeline

Walk the full execution path: CLI dispatch, LLM extraction, graph construction, resolution, and the merge step that mutates the graph.

7 stops ~25 min Verified 2026-05-04
What you will learn
  • Where each CLI verb (extract, build, resolve, apply-merges) lives in the source
  • How extraction chunks text and runs the LLM concurrently with a single combined prompt
  • The schema discovery step that lets sift run on documents in any domain without configuration
  • How build_graph composes pre-dedup, NetworkX node insertion, and post-processing
  • How find_merge_candidates batches same-type entities and bridges sync CLI to async LLM calls
  • How apply_merges actually rewrites edges in the NetworkX MultiDiGraph
  • How the library API in pipeline.py composes these steps for non-CLI use
Prerequisites
  • Python 3.11+ and a basic command of asyncio
  • Familiarity with the idea of a knowledge graph (entities and labeled edges)
  • No prior knowledge of NetworkX or LiteLLM required
1 / 7

CLI Entry — sift is a Typer App

src/sift_kg/cli.py:13

Every CLI verb is a Typer command in this one file; the rest of the codebase is what they call into.

The whole CLI surface is a single Typer app. Every verb (sift extract, sift build, sift resolve, sift review, sift apply-merges, sift narrate, sift view) is a function decorated with @app.command() later in this same file. Reading cli.py top to bottom is the fastest way to understand what sift can do.

Each command is a thin wrapper. It loads SiftConfig, resolves a domain (bundled name or YAML path), validates API keys for the chosen LLM, then dispatches into extract/, graph/, or resolve/. The console here is shared across all verbs for consistent Rich-styled output. The same logic is also exposed as run_* functions in pipeline.py for library users.

Key takeaway

One Typer app, one verb per pipeline step. Read cli.py first; everything else is a subsystem it dispatches into.

app = typer.Typer(
    name="sift",
    help="Document-to-knowledge-graph pipeline",
    add_completion=True,
    rich_markup_mode="rich",
)

console = Console()
2 / 7

Extracting Entities — One LLM Call Per Chunk

src/sift_kg/extract/extractor.py:238

Each text chunk becomes one combined prompt that returns entities and relations together.

The cost of running this pipeline scales linearly with LLM calls, and the obvious way to extract entities and relations is two calls per chunk: one for each. build_combined_prompt builds a single prompt that asks for both. Cost halves and consistency improves, because the model picks relations from the same entity list it just produced, instead of re-identifying entities on a second pass.

Concurrency happens one level up in _aextract_from_text: chunks are dispatched through an asyncio.Semaphore that bounds parallelism. The doc_context parameter is a one-sentence document summary generated from the first chunk and prepended to every subsequent prompt, giving the LLM the document-level frame (who's speaking, what case, what subject) without re-paying for it on each chunk.

Key takeaway

One combined prompt per chunk, dispatched concurrently with an asyncio.Semaphore. Cheaper and more consistent than two-pass extraction.

async def _aextract_chunk(
    chunk: TextChunk,
    doc_id: str,
    llm: LLMClient,
    domain: DomainConfig,
    doc_context: str = "",
) -> ExtractionResult:
    """Extract entities and relations from a single chunk (async).

    Uses a combined prompt (1 LLM call) instead of separate entity + relation
    calls (2 LLM calls). Falls back to entity-only on parse failure.
    """
    prompt = build_combined_prompt(chunk.text, doc_id, domain, doc_context=doc_context)
    try:
        data = await llm.acall_json(prompt)
    except (RuntimeError, ValueError) as e:
        logger.warning(f"Extraction failed for {doc_id} chunk {chunk.chunk_index}: {e}")
        return ExtractionResult(source_document=doc_id, chunk_index=chunk.chunk_index)
3 / 7

Schema Discovery — Designing Types From the Documents

src/sift_kg/extract/extractor.py:112

For schema-free domains, one extra LLM call samples the corpus and designs entity and relation types tailored to it.

Most knowledge graph tools force you to define your entity and relation types up front. sift-kg ships with bundled schemas (general, osint, academic) but defaults to schema-free. Before extraction begins, one LLM call samples the first chunk and designs types specific to the corpus. Instead of forcing relations into ASSOCIATED_WITH, the discovered schema produces specific types like FUNDED, TESTIFIED_AGAINST, or ENROLLED_AT.

The result is saved to output/discovered_domain.yaml and reused on every subsequent run, so types stay consistent across chunks and documents. The --force flag re-runs discovery. The discovered file is plain YAML; you can hand-edit it or copy it as a starting point for a custom domain.

Key takeaway

One extra LLM call before extraction designs entity and relation types for this corpus. The result is cached as YAML and reused on every subsequent run.

    # Schema discovery for schema-free domains
    if domain.schema_free and output_dir is not None:
        from sift_kg.domains.discovery import (
            discover_domain,
            load_discovered_domain,
            save_discovered_domain,
        )

        discovered_path = output_dir / "discovered_domain.yaml"
        cached = load_discovered_domain(discovered_path)
        if cached is not None and not force:
            logger.info(f"Using cached discovered schema ({len(cached.entity_types)} entity types)")
            domain = cached
        else:
            samples = [chunks[0].text[:3000]]
            try:
                domain = await discover_domain(samples, llm, domain.system_context or "")
                save_discovered_domain(domain, discovered_path)
                logger.info(f"Discovered schema: {len(domain.entity_types)} entity types, {len(domain.relation_types)} relation types")
            except (RuntimeError, ValueError) as e:
                logger.warning(f"Schema discovery failed, falling back to schema-free extraction: {e}")
4 / 7

Building the Graph — Pre-Dedup First

src/sift_kg/graph/builder.py:89

Before any entity becomes a NetworkX node, deterministic pre-dedup collapses obvious near-duplicates.

Catching duplicates after they enter the graph means walking edges and rewriting endpoints. Catching them before is a hash table lookup. prededup_entities runs before any node is added, returning a map from (entity_type, original_name) to a canonical name. build_graph consults this map every time it adds an entity, so the four spellings of Sam Bankman-Fried never become four separate nodes. This is layer one of the four-layer dedup model: automatic, with no LLM call or review step.

The second loop pre-creates entities for any domain that defines closed-vocabulary canonical_names (a finite list of departments, jurisdictions, or classifications). These canonical nodes are inserted with confidence=1.0 and {"canonical": True} so relations from extracted text always have somewhere to land. Non-canonical names get retyped to a fallback type later in the same function.

Key takeaway

Pre-dedup runs before any node is added. The graph never sees the duplicates that deterministic normalization can collapse.

    # Entity name → ID lookup (for resolving relation endpoints)
    name_to_id: dict[str, str] = {}

    # Pre-dedup: merge near-identical entity names deterministically
    canonical_map = prededup_entities(extractions)

    # Pre-create canonical entities so relations always resolve
    for entity_type, (names, _fallback) in (domain_canonical_entities or {}).items():
        for name in names:
            eid = _make_entity_id(name, entity_type)
            kg.add_entity(
                entity_id=eid,
                entity_type=entity_type,
                name=name,
                confidence=1.0,
                source_documents=[],
                attributes={"canonical": True},
            )
            name_to_id[name.lower().strip()] = eid
            stats["canonical_created"] += 1
5 / 7

Resolving Duplicates — Sync CLI, Async LLM

src/sift_kg/resolve/resolver.py:62

The public sync entry point bridges into an async implementation that batches per-type LLM calls concurrently.

The CLI is synchronous because Typer commands return when their function returns. The LLM client is async because batches of same-typed entities run in parallel. find_merge_candidates is the bridge: a sync wrapper that calls asyncio.run on the async implementation. The same pattern appears in extractor.py.

The return type is a tuple of MergeFile (entity merge proposals, all status DRAFT) and a list of variant relations. The system_context string comes from the loaded domain and gets injected into the LLM prompt, so the model can make better judgments about names specific to your field. Concurrency defaults to 4 with use_embeddings=False, meaning entities are batched alphabetically rather than by KMeans clustering on sentence embeddings.

Key takeaway

Sync CLI bridges to async LLM via asyncio.run. The result is a MergeFile of DRAFT proposals; nothing is merged yet.

def find_merge_candidates(
    kg: KnowledgeGraph,
    llm: LLMClient,
    entity_types: list[str] | None = None,
    concurrency: int = 4,
    use_embeddings: bool = False,
    system_context: str = "",
) -> tuple[MergeFile, list[RelationReviewEntry]]:
    """Find entities that likely refer to the same real-world thing.

    Args:
        kg: Knowledge graph with entities
        llm: LLM client for similarity judgments
        entity_types: Types to resolve (default: all types except DOCUMENT)
        concurrency: Max concurrent LLM calls
        use_embeddings: Use semantic clustering instead of alphabetical batching
        system_context: Domain context to help LLM understand entity names

    Returns:
        Tuple of (MergeFile with DRAFT proposals, list of variant relation proposals)
    """
    return asyncio.run(
        _afind_merge_candidates(kg, llm, entity_types, concurrency, use_embeddings, system_context)
    )
6 / 7

Applying Merges — Edge Rewriting in NetworkX

src/sift_kg/resolve/engine.py:60

Confirmed merges become a single pass that rewrites every edge through a member-to-canonical map and drops self-loops.

The merge problem looks like a node operation but the work is in the edges. Every relation that pointed at a member node has to point at the canonical instead, or the graph loses connectivity. valid_map is the member_id → canonical_id mapping built from every CONFIRMED proposal. The loop materializes the edge list first via list(...) because mutating the multigraph mid-iteration is unsafe, then rewrites each edge whose source or target was merged.

Two details matter. Self-loops: if both endpoints of an edge get merged into the same canonical node, the rewritten edge would point at itself, so it is dropped instead of added. Edge keys preserved: networkx.MultiDiGraph uses keys to disambiguate parallel edges of different types between the same pair of nodes, and the rewrite preserves them. Member nodes are removed in a second pass, since they have no remaining edges by then.

Key takeaway

Merges are not metadata flips. apply_merges rewrites every affected edge, drops self-loops, and removes the merged nodes from the NetworkX graph.

    # Rewrite edges
    edges_to_rewrite = list(kg.graph.edges(data=True, keys=True))
    for source, target, key, data in edges_to_rewrite:
        new_source = valid_map.get(source, source)
        new_target = valid_map.get(target, target)

        if new_source != source or new_target != target:
            # Remove old edge
            kg.graph.remove_edge(source, target, key=key)

            # Skip self-loops
            if new_source == new_target:
                stats["self_loops_removed"] += 1
                continue

            # Add rewritten edge
            kg.graph.add_edge(new_source, new_target, key=key, **data)

    # Remove merged nodes
    for member_id in valid_map:
        if kg.graph.has_node(member_id):
            kg.graph.remove_node(member_id)
            stats["nodes_removed"] += 1
7 / 7

The Library API — Pipeline as a Function

src/sift_kg/pipeline.py:374

The same pipeline is exposed as a single function for use from notebooks and applications.

Library callers do not get a CLI. They get functions, and the design choice is which functions to expose as a single chained call. run_pipeline chains the three steps that need no human input: extract, build, narrate. The docstring is explicit about what is missing: resolve and apply-merges deliberately stay out of the auto-pipeline because their whole point is human review.

That omission is the design statement. A pipeline function that runs extraction through narration is appropriate for exploration. A pipeline function that also auto-applied merges would silently bake LLM judgments into the output graph, which is exactly what the four-layer model is designed to prevent. The library user has to invoke run_resolve and run_apply_merges explicitly, the same boundary the CLI enforces with separate verbs.

Key takeaway

run_pipeline chains extract, build, and narrate. Resolve and apply-merges are deliberately excluded; they require human review.

def run_pipeline(
    doc_dir: Path,
    model: str,
    domain: DomainConfig,
    output_dir: Path,
    max_cost: float | None = None,
    include_narrative: bool = True,
) -> Path:
    """Run the full pipeline: extract → build → narrate.

    Skips resolve/apply-merges (those require human review).

    Args:
        doc_dir: Directory containing documents
        model: LLM model string
        domain: Domain configuration
        output_dir: Output directory for all artifacts
        max_cost: Budget cap in USD
        include_narrative: Whether to generate narrative at the end

    Returns:
        Path to output directory
    """
    run_extract(doc_dir, model, domain, output_dir, max_cost=max_cost)
    run_build(output_dir, domain)

    if include_narrative:
        system_context = domain.system_context or ""
        run_narrate(output_dir, model, system_context=system_context, max_cost=max_cost)
Your codebase next

Create code tours for your project

Intraview lets AI create interactive walkthroughs of any codebase. Install the free VS Code extension and generate your first tour in minutes.

Install Intraview Free