# Briefcase AI — full documentation for AI assistants > Infrastructure for governing the decisions your AI systems make: enforce controls before an action runs, capture the full context behind every decision, and keep a complete, reproducible record you can verify later. Open-source Python SDK with a Rust core. Auto-generated from the docs at https://briefcaseai.io. A link index is at https://briefcaseai.io/llms.txt ======================================================================== # Start Here ======================================================================== ## Why Briefcase Source: https://briefcaseai.io/getting-started/why-briefcase/ > Briefcase is infrastructure for governing the decisions your AI systems make — enforce controls before an action runs, capture the full context behind every decision, and keep a complete, reproducible record you can verify later. AI systems don't just produce text — they **make decisions that trigger actions**: routing a support ticket, approving a request, choosing a tool, escalating to a human. When one of those decisions is wrong, "the model did it" is not an answer anyone can act on. Briefcase is **infrastructure for governing those decisions**. It sits around the decision points in your application and gives you three things that are otherwise impossible to reconstruct after the fact: **Controls before action** — Evaluate whether an action is allowed **before** it runs — deny-by-default, composable, and side-effect-free. **Full context, captured** — Every decision is recorded with its inputs, outputs, model parameters, evidence, and the data it depended on. **A record you can verify** — Replay decisions, reconstruct exactly what was known at the time, and seal it into a tamper-evident bundle. ## The questions Briefcase lets you answer When a decision is challenged — by a teammate, an incident review, or a customer — you need to answer, precisely and after the fact: - **What did the system decide, and what did it see?** The inputs, outputs, and confidence behind the call. - **What rule governed it?** The exact policy version that was in effect at the decision's moment — not today's policy. - **Did the controls run first?** Proof that a guardrail evaluated the action before anything happened. - **What did we know at the time?** The evidence and external data as they were then — corrections appended, never overwritten. - **Can we reproduce it?** A deterministic replay that compares the original output against a fresh run. > Built for accountability Briefcase is designed for teams where accountability, traceability, and operational control are non-negotiable — where "we think it was fine" has to become "here is the record, and here is the proof it wasn't tampered with." ## How it works: five acts Briefcase organizes around the lifecycle of a single decision. The rest of these docs follow the same five acts, and a single running example threads through all of them: a **support-ticket triage agent**. Each ticket it handles produces two decisions you'll see throughout — it **classifies** the ticket (the `classify_ticket` call in most examples) and **routes** it to a queue. Both are decisions Briefcase captures, governs, and can replay. ```mermaid graph LR A["Capture
record inputs, outputs,
context, evidence"] --> B["Control
enforce guardrails &
versioned policy"] B --> C["Store & Query
durable, append-only,
queryable trail"] C --> D["Replay & Verify
re-run, compare,
detect drift"] D --> E["Prove
reconstruct as-of &
seal an audit bundle"] ``` > Diagram description A left-to-right flow of five stages: Capture (record inputs, outputs, context, and evidence) → Control (enforce guardrails and a versioned policy before the action) → Store & Query (a durable, append-only, queryable trail) → Replay & Verify (re-run a decision, compare outputs, detect drift) → Prove (reconstruct what was known at a past time and seal a tamper-evident bundle). | Act | What you do | Key building blocks | | --- | --- | --- | | **Capture** | Record every decision with full context | `@capture`, `DecisionSnapshot`, exporters, PII sanitization | | **Control** | Enforce controls before the action runs | Guardrails, routing, versioned routing policy, validation | | **Store & Query** | Keep a durable, queryable, append-only trail | Storage adapters, bitemporal storage, external data, RAG versioning | | **Replay & Verify** | Re-run and check decisions hold up | Deterministic replay, drift detection, audit bundles | | **Prove** | Reconstruct and verify after the fact | As-of reconstruction, `ExaminerBundle` | ## Who Briefcase is for - I use an AI coding assistant — Let your AI editor add Briefcase for you — point it at the docs or give it the MCP tools. Start with AI-Assisted Setup. (/getting-started/ai-assisted-setup/) - Engineers — Instrument a decision point in minutes, send records anywhere, and replay to catch regressions. Start with the Quickstart. (/getting-started/quickstart/) - Platform & governance leads — Define controls that run before actions, route through versioned policies, and prove which rule was in effect. Start with Guardrails. (/advanced/guardrails/) - Reproducibility & audit reviewers — Reconstruct past decisions exactly and verify a sealed, tamper-evident record. Start with Audit Bundles. (/advanced/compliance-bundles/) ## Where it runs Briefcase is an open-source Python SDK (with a Rust core) that wraps the decision points in code you already have. It is independent of model, vendor, and framework: bring your own LLM calls and storage. The base package is `pip install briefcase-ai`; optional capabilities are installed as extras. ## Next steps - Quickstart — Record, persist, and replay your first decision in about 5 minutes. (/getting-started/quickstart/) - Core Concepts — The object model behind every decision: snapshots, inputs, outputs, evidence. (/getting-started/core-concepts/) - Audit a Decision End-to-End — Follow one decision from capture all the way to a verifiable sealed record. (/guides/audit-a-decision/) ## AI-Assisted Setup Source: https://briefcaseai.io/getting-started/ai-assisted-setup/ > The fastest way to add Briefcase to a project — let your AI coding assistant do it. Point it at the docs, or give it Briefcase's tools over MCP. The fastest way to instrument decisions with Briefcase is to let the AI assistant you already code with do it for you. You don't have to memorize the API — give your assistant the docs (or Briefcase's own tools) and ask it to add capture, controls, and replay to a function. > Two ways, use either or both **Point your assistant at the docs** so it answers from accurate, current Briefcase material. **Add the MCP server** so it can run Briefcase tools — redact PII, estimate cost, check drift — right inside your editor. They compose: docs for knowledge, MCP for actions. _Point your assistant at the docs_ Every page on this site is also published as machine-readable text an assistant can ingest: - **`https://briefcaseai.io/llms.txt`** — an index of the docs (titles + links), grouped by the five acts. Best as a map. - **`https://briefcaseai.io/llms-full.txt`** — every page concatenated as plain text. Best for "read this, then help me." Both files are generated from the docs on every build, so they always match what's published here. 1. ### Give your assistant the docs In Cursor, Claude Code, Copilot, or any assistant that can read a URL, paste the link (or add it to the project's context / docs settings): ```text Read https://briefcaseai.io/llms-full.txt — that's the Briefcase AI SDK. ``` 2. ### Ask it to instrument a function ```text Using Briefcase, add @capture and observe() to classify_ticket() in app/triage.py so every classification is recorded, then show me how to read the records back. ``` 3. ### Review what it wrote The assistant should produce the canonical pattern — `briefcase.observe(...)` plus `@briefcase.capture(...)`. Confirm it matches the [Quickstart](/getting-started/quickstart/). `llms.txt` is an emerging convention for exposing docs to AI tools. Even assistants without a built-in "docs" setting can usually read a pasted URL. _Give your assistant Briefcase_ The Briefcase MCP server exposes a small set of **read-only** tools your assistant can call directly: redact PII, estimate model cost, analyze drift, and look up usage. It also serves a `briefcase://llms-full.txt` resource so the assistant can read the guide in-editor. 1. ### Install the server ```bash pip install "briefcase-ai[mcp]" ``` The `[mcp]` extra installs `mcp>=1.2`. The server runs over stdio with the `briefcase-mcp` command (or `python -m briefcase.mcp`). 2. ### Register it with your assistant Point your MCP-capable client at the `briefcase-mcp` command. The config file differs per tool, but the shape is the same: ```json { "mcpServers": { "briefcase": { "command": "briefcase-mcp" } } } ``` 3. ### Use the tools from your editor Ask the assistant things like: ```text Use the briefcase tools to redact PII from this support ticket, then estimate the cost of classifying it with claude-haiku-4-5. ``` The assistant calls `sanitize_text` and `estimate_cost` — the latter now supports a `rate_card` (e.g. `"bedrock:batch"`) and returns a `cache_cost`, added in [v3.2.1](/resources/changelog/). The server exposes `sanitize_text`, `estimate_cost`, `analyze_drift`, and `how_to`. See the [MCP Server](/integrations/mcp/) reference for each tool's inputs and outputs. ## Prefer to do it by hand? The [Quickstart](/getting-started/quickstart/) walks the same path manually — record, persist, and replay a decision — in about 5 minutes. ## Where this fits - Quickstart — The manual path: record, persist, and replay a decision. (/getting-started/quickstart/) - MCP Server — Full reference for the tools and resource your assistant can call. (/integrations/mcp/) - Why Briefcase — What Briefcase governs, and the five-act lifecycle. (/getting-started/why-briefcase/) - What — oci-bai artifact graph launch, cost rate cards, and latest model pricing. (/getting-started/whats-new/) ## Quickstart Source: https://briefcaseai.io/getting-started/quickstart/ > Record, persist, and replay your first decision in about 5 minutes. > Fastest: set up with your AI assistant The quickest way to add Briefcase is to let your AI editor do it — point it at the docs or give it Briefcase's tools over MCP. See [AI-Assisted Setup](/getting-started/ai-assisted-setup/). Prefer to do it by hand? Continue below. ## Install ```bash pip install briefcase-ai ``` ## Pick your path Briefcase records decisions two ways. They serve two different outcomes — pick based on whether the decision needs to outlive the current process. > Pick your path Reach for **Live Observability** when you just want to watch decisions as they happen — local debugging, a notebook, a test. Reach for **Persistent Decisions** the moment you'll need to load, replay, or audit a decision *after* the process ends — that's the foundation for the rest of the journey. | Path | Outcome | API | When to use | | --- | --- | --- | --- | | **Live Observability** | Watch decisions as they happen | `@capture` + `observe()` | Local debugging, notebooks, low-overhead logging | | **Persistent Decisions** | Reload, replay & audit later | `DecisionSnapshot` + a backend | Anything you'll need to reproduce, verify, or govern | _@capture (Live Observability)_ - Lightweight: wraps a function and logs its inputs, outputs, and timing. - Handed to an exporter (console, file, or memory) — not persisted to a backend. - Best for low-overhead logging and live observability. _DecisionSnapshot (Persistent Decisions)_ - Structured: you build the record field by field with typed inputs and outputs. - Persisted to a storage backend and reloadable by ID. - Replayable, so you can re-run and compare against the original output. See [Core Concepts](/getting-started/core-concepts/) for how each record is structured. ## Record, persist, and replay 1. ### Record a decision (Live Observability) The `@capture` decorator records every call — inputs, outputs, and timing — and hands the lightweight record to an exporter. But `@capture` alone has nowhere to send what it records: `briefcase.observe()` is the one call that wires up that exporter. Pass `"memory"` to collect records in a list, `"console"` to print them to stderr, or a `.jsonl` path to append them to a file. Because `@capture` exports in a background thread by default, pass `async_capture=False` when you want the record available synchronously — for example to read it back right after the call. ```python import briefcase mem = briefcase.observe("memory") # send captured records to memory @briefcase.capture(decision_type="ticket-classification", async_capture=False) def classify_ticket(text: str) -> str: # call your model here return "billing" classify_ticket("My invoice is wrong") print(mem.records[0]) ``` :::note See [the stock and custom exporters](/features/exporters/) for everything `observe()` can target. ::: `@capture` is for low-overhead logging. To persist a structured decision that you can reload and replay, build a `DecisionSnapshot` and store it with a backend. 2. ### Persist a snapshot (Persistent Decisions) ```python from briefcase import ( DecisionSnapshot, Input, ModelParameters, Output, init, ) from briefcase.storage import SqliteBackend init() # start the native runtime decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("ticket_text", "My invoice is wrong", "string")) params = ModelParameters("gpt-4o-mini") params.with_provider("openai") params.with_parameter("temperature", 0.0) decision.with_model_parameters(params) output = Output("category", "billing", "string") output.with_confidence(0.93) decision.add_output(output) decision.with_execution_time(12.0) backend = SqliteBackend.in_memory() # or SqliteBackend("./decisions.db") decision_id = backend.save_decision(decision) print(f"Recorded decision {decision_id}") ``` `save_decision` returns the snapshot ID. Use a file path instead of `in_memory()` to keep decisions across runs. 3. ### Replay a decision Re-run a stored decision and compare the result against the original output. ```python from briefcase.replay import ReplayEngine engine = ReplayEngine(backend) result = engine.replay(decision_id, "strict") print("status:", result.status) print("outputs match:", result.outputs_match) print("execution time (ms):", result.execution_time_ms) ``` `ReplayResult` exposes `.status`, `.outputs_match`, `.replay_output`, `.execution_time_ms`, and `.policy_violations`. Valid replay modes are `"strict"` and `"tolerant"`. ## What's next You've captured, persisted, and replayed a decision. The journey continues with controlling what runs before a decision, auditing one after the fact, and choosing where persistent decisions live. - Core Concepts — The data model behind every recorded decision. (/getting-started/core-concepts/) - Guardrails — Control & route: enforce rules before a decision runs. (/advanced/guardrails/) - Audit a decision — Replay & verify: reconstruct why a decision happened. (/guides/audit-a-decision/) - Storage Adapters — Store & query: pick where persistent decisions live. (/features/storage-adapters/) ## Core Concepts Source: https://briefcaseai.io/getting-started/core-concepts/ > The mental model behind Briefcase — why a decision, its inputs and outputs, its environment, and its grouping are recorded as distinct, reproducible records. > When to read this Read this if you want the mental model before the feature pages. If you'd rather record something first and pick up the types as you go, start with the [Quickstart](/getting-started/quickstart/) and circle back here. ## The mental model To make a past AI decision **reproducible and accountable**, you have to capture more than the answer — you need what went in, what the model was, and the environment it ran in. Briefcase keeps these as distinct records so each can be stored, queried, replayed, and verified on its own. Briefcase records decisions at two levels: - **`@capture`** is the lightweight path: it wraps a function and emits a plain dict to an exporter. Best for live observability. - **`DecisionSnapshot`** is the native, persistent record described below — a structured snapshot you build, store, reload, and replay. This is the thing you audit and reproduce later. The rest of this page walks the persistent types using the running `classify_ticket` triage example. ## DecisionSnapshot A `DecisionSnapshot` is the immutable, point-in-time account of a **single** AI decision — the thing you audit, replay, and verify later. Everything else in Briefcase exists to enrich or store this record. It holds: - **inputs** — a list of `Input` values sent to the model - **outputs** — a list of `Output` values the model returned - **ModelParameters** — model name, provider, and per-parameter settings - **ExecutionContext** — the runtime environment the decision ran in - **execution_time_ms** — how long the call took | Field | What it is | Why it matters | | --- | --- | --- | | `inputs` | The `Input` values sent to the model | A replay needs the same inputs to reproduce the outcome | | `outputs` | The `Output` values the model returned | The result under audit, and the baseline a replay is compared against | | `model_parameters` | Model name, provider, per-call settings | Explains *why* the output looked the way it did; a parameter change can be attributed | | `execution_context` | The runtime environment | So a replay runs somewhere comparable and differences trace to a real change | | `execution_time_ms` | How long the call took | A performance baseline you can compare a replay against | | `tags` | Key/value labels (e.g. `environment`) | Lets you query and group decisions later via `SnapshotQuery` | ```python from briefcase import ( DecisionSnapshot, Input, ModelParameters, Output, init, ) from briefcase.storage import SqliteBackend init() backend = SqliteBackend("./decisions.db") decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("ticket_text", "My invoice is wrong", "string")) params = ModelParameters("gpt-4o-mini") params.with_provider("openai") params.with_parameter("temperature", 0.0) decision.with_model_parameters(params) decision.add_output(Output("category", "billing", "string").with_confidence(0.93)) decision.with_execution_time(12.0) decision_id = backend.save_decision(decision) loaded = backend.load_decision(decision_id) print(loaded.function_name) print([(i.name, i.value) for i in loaded.inputs]) print([(o.name, o.value, o.confidence) for o in loaded.outputs]) print(loaded.execution_time_ms) ``` `load_decision` returns a `DecisionSnapshot`. Inputs and outputs are lists, so read them as `.inputs` and `.outputs`. ## Decision Flow ```mermaid graph LR A["DecisionSnapshot built"] --> B["save_decision"] B --> C["Storage Backend"] C --> D["load_decision / query"] D --> E["Replay / Audit"] ``` > Diagram description A left-to-right flow with five stages: a built DecisionSnapshot is passed to save_decision, which writes to a storage backend; the backend is read back through load_decision or a query, and the loaded decision feeds replay or audit. 1. Build a `DecisionSnapshot` with inputs, outputs, and parameters 2. Persist it with `backend.save_decision(decision)` 3. The backend stores it and returns the decision ID 4. Reload it later with `backend.load_decision(decision_id)` 5. Replay or audit the stored decision ## Input and Output `Input(name, value, data_type)` and `Output(name, value, data_type)` are typed wrappers around a single named value. `data_type` is a string describing the value (for example `"string"` or `"json"`). ```python from briefcase import Input, Output prompt = Input("prompt", "Summarize this article", "string") answer = Output("summary", "A short summary.", "string") answer.with_confidence(0.88) # returns the Output, so it chains ``` `Output.with_confidence(confidence)` attaches a confidence score, readable as `output.confidence`. ## ModelParameters `ModelParameters(model_name)` captures the model configuration at call time. ```python from briefcase import ModelParameters params = ModelParameters("gpt-4o-mini") params.with_provider("openai") params.with_parameter("temperature", 0.0) params.with_parameter("max_tokens", 256) print(params.model_name, params.provider, params.parameters) ``` Read back the configuration via `.model_name`, `.provider`, and the `.parameters` dict. ## ExecutionContext The same inputs can produce a different answer on a different runtime version or seed. `ExecutionContext` records the environment a decision ran in — **so a replay can run in a comparable one** and any difference is attributable to a real change rather than a moved goalpost. It captures the runtime version, resolved dependencies, the random seed, and relevant environment variables. It does not carry timing — use `DecisionSnapshot.execution_time_ms` for that. ```python from briefcase import ExecutionContext ctx = ExecutionContext() ctx.with_runtime_version("3.11.0") ctx.with_dependency("torch", "2.1.0") ctx.with_random_seed(42) ctx.with_env_var("ENVIRONMENT", "production") print(ctx.runtime_version) # "3.11.0" print(ctx.dependencies) # {"torch": "2.1.0"} print(ctx.random_seed) # 42 print(ctx.environment_variables) # {"ENVIRONMENT": "production"} ``` ## Snapshot A single decision rarely tells the whole story — a request or session usually produces several. `Snapshot(snapshot_type)` groups related decisions **so you can store, load, and reason about them as one unit** — for example, every decision made in one support request or session. Add decisions with `add_decision` and persist the group with `backend.save`. ```python from briefcase import DecisionSnapshot, Input, Output, Snapshot, init from briefcase.storage import SqliteBackend init() backend = SqliteBackend.in_memory() session = Snapshot("session") decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("ticket_text", "Where is my refund?", "string")) decision.add_output(Output("category", "refunds", "string").with_confidence(0.9)) session.add_decision(decision) snapshot_id = backend.save(session) loaded = backend.load(snapshot_id) print(loaded.snapshot_type, len(loaded.decisions)) ``` `backend.load` returns a `Snapshot`; read its grouped decisions from `.decisions`. ## Where this fits These types are the vocabulary for the whole journey. Now put them to work, or go deeper on the recording API that produces them. - Quickstart — Put these abstractions to work: record, persist, and replay a decision. (/getting-started/quickstart/) - Decision Recording — Go deeper on capturing inputs, outputs, and parameters. (/features/decision-recording/) - Exporters — Emit lightweight @capture records to the console, a file, or memory. (/features/exporters/) ## Installation Source: https://briefcaseai.io/getting-started/installation/ > Install Briefcase AI, then add only the extras you need — grouped by what you're trying to do, with no surprise dependencies. Briefcase ships as `briefcase-ai` — a Python SDK over a Rust core. Install the base package, then add extras only when a concrete need appears. ## Base Install _pip_ ```bash pip install briefcase-ai ``` _uv_ ```bash uv add briefcase-ai ``` ## What ships in the base package The base install carries the whole core loop — capture a decision, store it, replay it, verify it — with no extras. The exports below are grouped by the act of the journey they belong to. **Recording** — `capture` · `observe` · `setup` · `DecisionSnapshot` · `Snapshot` · `Input` · `Output` · `ModelParameters` · `ExecutionContext` · `HardwareMetadata` Plus the stock exporters under `briefcase.exporters` and `enable_logging` / `get_logger`. Record each `classify_ticket` call and emit it to the console, a file, or memory. **Store & query** — `init` · `init_with_config` · `is_initialized` · `BriefcaseConfig` · `SnapshotQuery` Start the native runtime and query stored decisions. Cost types live in `briefcase.cost` — also base, no extra. **Replay & verify** — Wired through the runtime above Persist a decision, then re-run it and confirm its record is intact. The replay engine itself is gated behind the `replay` extra below. Cost tracking ships in the base package (`briefcase.cost`) — there is **no** separate `cost` extra. The stock exporters (`ConsoleExporter`, `JSONLFileExporter`, `MemoryExporter`) are also base. ## Extras Extras gate the import surface of optional submodules. Most install **nothing** and simply mark intent — only `otel`, `lakefs`, `bitemporal-iceberg`, and `mcp` pull in third-party dependencies. Install only what your deployment needs. | Group | Extra | What it adds | Recommended for | | --- | --- | --- | --- | | | `storage` | `SqliteBackend`, `BufferedBackend` | Persisting `classify_ticket` decisions to a SQLite file | | | `replay` | `ReplayEngine` | Re-running stored decisions to confirm a change reproduces them | | | `drift` | `DriftCalculator`, `DriftMetrics` | Measuring how consistent repeated decisions are | | | `validate` | `PromptValidationEngine` | Checking prompt references before a call runs | | | `guardrails` | `GuardrailEnv` framework | Allowing or denying an action before it executes | | | `routing` | `AgentRouter`, `PolicyRegistry` | Versioned, policy-based routing of decisions | | | `compliance` | `ExaminerBundle` | Building a tamper-evident, verifiable bundle for a decision | | | `bitemporal` | `BitemporalRecord`, in-memory store | Reconstructing any past state of recorded facts | | | `bitemporal-iceberg` | Iceberg-backed store *(installs pyiceberg, pyarrow)* | A scalable bitemporal store | | | `sanitize` | `Sanitizer` (redaction) | Stripping sensitive spans from inputs/outputs | | | `external` | `ExternalDataTracker` | Snapshotting external data a decision read | | | `rag` | `VersionedEmbeddingPipeline` | Versioning an embedding index for reproducible RAG | | | `lakefs` | lakeFS `VersionedClient` *(installs lakefs)* | Reading versioned files with their commit SHA | | | `vcs` | VCS client base protocol | Implementing a custom versioned data source | | | `otel` | OpenTelemetry helpers *(installs opentelemetry)* | Correlating decisions with existing traces | | | `correlation` | Multi-agent workflow tracing | Correlating decisions across agents in one workflow | | | `events` | `BriefcaseEvent` emitter | Emitting events on low confidence or drift | | | `mcp` | `briefcase-mcp` server *(installs mcp)* | Exposing SDK tools to MCP clients | | | `all` | Everything above | Evaluation or local development | | | `dev` | Test and lint tooling | Contributing to Briefcase | ## Recommended path for the quickstart 1. Install the base package — it covers recording and inspection, which is all the [Quickstart](/getting-started/quickstart/) needs to begin. ```bash pip install briefcase-ai ``` 2. Add `storage` and `replay` for the Quickstart's persist-and-replay steps. ```bash pip install "briefcase-ai[replay,storage]" ``` 3. Add further extras later, only when a need appears — for example `otel` for tracing or `guardrails` to gate actions. ## Install everything ```bash pip install "briefcase-ai[all]" ``` ## Requirements - **Python 3.9+** — that's the only requirement. Wheels are precompiled, so no Rust toolchain is needed unless you [build the Rust core](/sdk/rust/) yourself. ## Where this fits Installation is step zero of the journey. Next, capture and inspect a decision, then learn the data model behind it. - Quickstart — Record, persist, and replay your first decision in about 5 minutes. (/getting-started/quickstart/) - Core Concepts — The DecisionSnapshot, ExecutionContext, and Snapshot data model. (/getting-started/core-concepts/) ## What's New Source: https://briefcaseai.io/getting-started/whats-new/ > The 3.3.0 release launches oci-bai — an artifact graph for models and fine-tunes — plus the 3.2.x cost rate cards, prompt-cache billing, and latest model pricing. `briefcase-ai` **v3.3.0** launches **oci-bai**, an artifact graph for tracking every model, fine-tune, dataset, and runtime you push; the recent **3.2.x** line made cost estimates match how you actually buy inference. Both are summarized below, newest first. The decision, replay, and cost APIs are fully backward compatible — every new parameter is keyword-only, so existing calls behave identically. See the full [Changelog](/resources/changelog/) for details. ## 3.3.0 — oci-bai artifact graph **oci-bai** tracks every image you push through an OCI-compatible gateway in a single artifact graph — with lineage, provenance, deduplication, and search built in. Push with any OCI tool (`docker push`, `crane`); the graph builds itself. ```bash docker tag my-model:latest localhost:8080/my-repo:v1 docker push localhost:8080/my-repo:v1 oci-bai --repo my-repo log v1 oci-bai --repo my-repo diff base v1 --depth package oci-bai search "format==safetensors cuda>=12.4" ``` **Why it matters:** every fine-tune and dataset version is tracked, searchable, and linked to its parent — no manual bookkeeping. Weight-sharing metrics tell you whether a push was a re-tag, a partial fine-tune, or a full retrain. oci-bai is in **private beta** — contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access. The full documentation lives at **[oci.briefcaseai.io](https://oci.briefcaseai.io)**. For an overview of the CLI and capabilities, see [Artifact Graph & Evaluate](/evaluate/runs/). ## 3.2.x — Cost & pricing ### Price any platform with rate cards `CostCalculator.estimate_cost` takes an optional `rate_card` — a forgiving `platform × tier × modifiers` string — so an estimate reflects the platform and tier you actually run on, not just first-party list price. ```python from briefcase.cost import CostCalculator calc = CostCalculator() # First-party standard pricing (unchanged default) standard = calc.estimate_cost("claude-opus-4-8", 500_000, 50_000) # Same call, priced for the AWS Bedrock batch tier batch = calc.estimate_cost("claude-opus-4-8", 500_000, 50_000, rate_card="bedrock:batch") print(standard.total_cost, batch.total_cost) print(calc.get_available_rate_cards()) ``` | Part | Values | Effect | | --- | --- | --- | | Platform | `first_party` · `bedrock` · `vertex` · `azure` | Selects the provider's price sheet | | Tier | `standard` · `batch` · `cached` · `priority` · `flex` | `batch`/`flex` ≈ 0.5×; `priority` is a premium | | Modifiers | `regional` · `us` · `fast` | Regional/residency add ~10% | **Why it matters:** the same `classify_ticket` workload costs very differently on Bedrock batch versus first-party standard. Rate cards let you compare the real number before you ship a platform or tier change. ### Prompt-cache billing Anthropic prompt caching changes the math: cache reads are billed at a fraction of the input rate. `estimate_cost` accepts cache-token counts and exposes a `cache_cost` on the estimate. ```python estimate = calc.estimate_cost( "claude-opus-4-8", input_tokens=0, output_tokens=1_000, cache_read_tokens=100_000, # also: cache_write_5m_tokens, cache_write_1h_tokens ) print(estimate.cache_cost, estimate.total_cost) ``` **Why it matters:** a cache-heavy agent's bill is dominated by cache reads at 0.1× input — now your estimate reflects that instead of overcounting. ### Latest model pricing The default pricing table covers the current frontier: Anthropic Claude 4.x (`claude-opus-4-8`, `claude-sonnet-4-6`, `claude-haiku-4-5`, …), OpenAI GPT-5.x (`gpt-5.5`, `gpt-5.4-mini`, …), and Google Gemini (`gemini-3.1-pro`, `gemini-2.5-flash`, …). Every previously priced model is retained. **Why it matters:** you can estimate and compare today's models without hand-maintaining a price sheet. ### Wider Python support A single stable-ABI wheel per platform installs on **Python 3.9–3.13**, and the source distribution bundles its license files. **Why it matters:** `pip install briefcase-ai` works across more environments without building from source. ## Explore - oci-bai Quick Start — Push your first tracked image, explore the commit graph, and run your first search. (https://oci.briefcaseai.io/getting-started) - Artifact Graph Overview — How oci-bai fits into the Briefcase platform. (/evaluate/runs/) - Cost Tracking — Rate cards, prompt-cache billing, model comparison, and budgets in depth. (/features/cost-tracking/) - Changelog — The full release history. (/resources/changelog/) ======================================================================== # Capture ======================================================================== ## Decision Recording Source: https://briefcaseai.io/features/decision-recording/ > Capture every AI decision — inputs, outputs, model, and a content fingerprint — so it can be replayed, audited, and proven later. Decision recording captures the complete context behind every decision your AI system makes, so it can be replayed, audited, and verified later. > When you'd reach for this A customer disputes how your triage agent routed their ticket three weeks ago. Without a record, you can only guess what the model saw and why. With decision recording, every `classify_ticket` call already kept its exact inputs, the label it returned, the model used, and a content fingerprint — so you can pull up that one decision and explain it. ## Why persist a decision A decision that vanishes the moment it runs can't be replayed, audited, or proven. Recording it turns a fleeting model call into a durable, verifiable record: - **Replay** — re-run the exact inputs later to check whether behavior changed. - **Audit** — answer "what did the agent decide, and on what basis?" months later. - **Prove** — the `fingerprint()` content hash makes tampering detectable. There are two ways to record. The lightweight `@capture` decorator records a dict per call and hands it to an exporter. The native `DecisionSnapshot` builds a structured record you can persist and replay. ## How recording flows 1. **Capture** — `@capture` wraps `classify_ticket` and records its inputs, outputs, timing, and type for each call. 2. **Export** — the recorded dict is handed to an exporter (console, a `.jsonl` file, or your own). 3. **Persist** — for storage and replay, build a native `DecisionSnapshot` and save it to a backend. 4. **Replay & verify** — later, load the snapshot, re-run it, and compare its `fingerprint()`. ```mermaid flowchart LR A["classify_ticket()"] -->|"@capture"| B["recorded dict"] B --> C[Exporter] A -->|"native API"| D[DecisionSnapshot] D --> E[Backend] E --> F["Replay & Verify"] ``` > Diagram description A `classify_ticket` call can be recorded two ways. The `@capture` decorator produces a lightweight dict that flows to an exporter for streaming and inspection. The native `DecisionSnapshot` is a structured record that is saved to a storage backend, which is what a later replay-and-verify step loads from. ## Record a decision with @capture The simplest way to record a decision is the `@capture` decorator. Pass an `exporter` to send each record somewhere: ```python from briefcase import capture from briefcase.exporters import BaseExporter class CollectingExporter(BaseExporter): def __init__(self): self.records = [] async def export(self, decision): self.records.append(decision) return True async def flush(self): ... async def close(self): ... exporter = CollectingExporter() @capture(decision_type="classification", context_version="v1", exporter=exporter, async_capture=False) def classify_ticket(text: str) -> str: # call your model here return "account_access" classify_ticket("Reset my password") print(exporter.records[0]) ``` The decorator wraps the call, records a dict (decision id, inputs, outputs, timing, `decision_type`, `context_version`), and exports it through the exporter you pass. It does not persist a native `DecisionSnapshot` on its own; use the native objects below when you need storage or replay. ### @capture parameters | Parameter | Default | Description | |-----------|---------|-------------| | `decision_type` | `None` | Label for the kind of decision recorded | | `context_version` | `None` | Version tag for the surrounding context or prompt | | `max_input_chars` | `1000` | Truncate recorded inputs to this length | | `max_output_chars` | `1000` | Truncate recorded outputs to this length | | `exporter` | `None` | Exporter that receives each recorded dict | | `async_capture` | `True` | Export off the calling thread | `@capture` works with or without arguments: ```python from briefcase import capture @capture def classify(text: str) -> str: # call your model here return "billing" ``` ## Emit records `@capture` records a decision but has nowhere to send it until you configure an exporter. `briefcase.observe()` wires one up in a single line and returns it. ```python import briefcase mem = briefcase.observe("memory") # or "console", or a "*.jsonl" path @briefcase.capture(decision_type="classification", async_capture=False) def classify_ticket(text: str) -> str: # call your model here return "account_access" classify_ticket("Reset my password") print(mem.records[0]) ``` `@capture` exports in a background thread by default. Pass `async_capture=False` when you want the record available synchronously — for example, to read `MemoryExporter.records` right after the call. The per-call `exporter=` argument shown above overrides the global one set by `observe()`. See [Exporters](/features/exporters/) for the stock exporters and how to write a custom one. ## Build a native DecisionSnapshot When you need storage or replay, build a structured `DecisionSnapshot`: ```python from briefcase import DecisionSnapshot, Input, Output, ModelParameters decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "Reset my password", "string")) params = ModelParameters("your-model") params.with_provider("your-provider") params.with_parameter("temperature", 0.0) decision.with_model_parameters(params) output = Output("category", "account_access", "string") output.with_confidence(0.92) decision.add_output(output) decision.with_execution_time(12.5) decision.with_module("triage_service") decision.add_tag("environment", "production") print(decision.function_name) print(decision.fingerprint()) ``` ### Fingerprints make a record verifiable `fingerprint()` returns a stable hash over inputs, outputs, and model parameters. Store it alongside the record; recompute it later to detect when the same decision produces a different result — this is what makes a decision tamper-evident and replay-checkable. ```python digest = decision.fingerprint() # stable content hash # later, on a loaded snapshot: assert loaded.fingerprint() == digest # unchanged ``` ## Key classes - `@capture` — decorator that records a dict and exports it - `DecisionSnapshot` — structured record you can persist and replay; exposes `fingerprint()` - `Input` / `Output` — typed wrappers; `Output.with_confidence(score)` attaches a confidence value - `ModelParameters` — model name, provider, and per-call parameters - `Snapshot` — groups multiple decisions; `add_decision(decision)` appends to it | Field | Description | Why it matters | |-------|-------------|----------------| | `function_name` | The recorded function | Identifies which decision this is | | `inputs` | Typed inputs | The exact inputs a replay re-runs against | | `outputs` | Typed outputs | What the agent actually decided | | `tags` | Arbitrary key/value tags | Carries your own context (e.g. environment, queue) | | `execution_time_ms` | How long the call took | Anchors performance over time | | `fingerprint()` | Content hash | Makes the record tamper-evident and verifiable | ## @capture vs DecisionSnapshot vs persisted storage Three layers, each for a different need — pick by what you're trying to do. | Use this | When you want to... | Lifetime | |----------|---------------------|----------| | `@capture` decorator | Instrument a real function (like `classify_ticket`) with zero boilerplate and stream a lightweight record to an exporter | Per call | | `DecisionSnapshot` | Build a structured record by hand — to persist, replay, or fingerprint it | In-memory object | | Persisted backend (`SqliteBackend`) | Keep decisions durably so you can query, replay, and audit them weeks later | Durable | `@capture` records a dict and exports it — it does not persist a native `DecisionSnapshot` on its own. For anything you'll need to query or replay later, build a `DecisionSnapshot` and save it to a backend. ## Persist a decision Save a `DecisionSnapshot` to a storage backend so it can be queried or replayed later. This is the bridge from Capture into the Store & Query act. ```python import briefcase from briefcase import DecisionSnapshot, Input, Output from briefcase.storage import SqliteBackend briefcase.init() decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "Reset my password", "string")) decision.add_output(Output("category", "account_access", "string")) decision.with_execution_time(12.5) backend = SqliteBackend.in_memory() # or SqliteBackend("decisions.db") decision_id = backend.save_decision(decision) restored = backend.load_decision(decision_id) print(restored.function_name) ``` ## Where this fits - Next · Capture: Exporters — Send each recorded decision to the console, a file, or your own backend. (/features/exporters/) - Then · Store & Query: Storage Adapters — Persist snapshots durably so you can query and replay them later. (/features/storage-adapters/) ## Exporters Source: https://briefcaseai.io/features/exporters/ > Stream captured decisions to the console, a file, memory, or your own external sink as they happen. Exporters control where decision records go the moment they're captured — to the console, a file, memory, or a sink of your own. > When you'd reach for this While iterating on the triage agent, you want to *see* each `classify_ticket` decision as it happens — so a `ConsoleExporter` prints them to your terminal. In CI, you instead want to assert on what was captured without touching disk, so a `MemoryExporter` holds the records for your test to inspect. Same `@capture` code, swapped exporter. `@capture` records every call, but on its own it has nowhere to send the record. An exporter is about *streaming records out as they happen* (for inspection, tests, or forwarding). A [storage backend](/features/storage-adapters/) is about *durable persistence you query later*. Many setups use both. ## How exporting fits 1. **Capture** — `@capture` records a `classify_ticket` call as a lightweight dict. 2. **Wire an exporter** — `briefcase.observe()` configures the global exporter in one line (or pass `exporter=` to `@capture`). 3. **Land it** — the exporter writes the record where you pointed it: stderr, a `.jsonl` file, an in-memory list, or your own sink. `observe`, `setup`, and all stock exporters ship in the base package — no extra required. ## Emit records in one line `briefcase.observe()` configures the global exporter and returns it. After calling it, every `@capture` decision is sent to that exporter. ```python import briefcase mem = briefcase.observe("memory") @briefcase.capture(decision_type="ticket-classification", async_capture=False) def classify_ticket(text: str) -> str: # call your model here return "billing" classify_ticket("My invoice is wrong") print(mem.records[0]) ``` `@capture` exports in a background thread by default. Pass `async_capture=False` when you want the record to be available synchronously — for example, to read `MemoryExporter.records` right after the call. ### observe() shorthands `briefcase.observe(exporter="console", *, level=None)` accepts either a `BaseExporter` instance or a shorthand string, and returns the configured exporter. | Argument | Result | |----------|--------| | `"console"` (default) | `ConsoleExporter` — writes JSON lines to stderr | | `"memory"` | `MemoryExporter` — collects records in `.records` | | a path ending in `.jsonl` | `JSONLFileExporter` — appends to that file | | a `BaseExporter` instance | used as-is | `level=` (optional) also enables Briefcase logging at that level — the same as calling [`enable_logging()`](/sdk/python/#logging). ```python import briefcase # Each call replaces the global exporter. briefcase.observe("console") # JSON lines to stderr (default) briefcase.observe("memory") # collect in memory briefcase.observe("decisions.jsonl") # append to a file briefcase.observe("console", level="INFO") # also turn on logging ``` `observe()` calls `setup(exporter=...)` under the hood, so `briefcase.setup(exporter=ConsoleExporter())` is equivalent to `briefcase.observe("console")`. ## Which stock exporter? | Exporter | Sends records to... | Reach for it when | |----------|---------------------|-------------------| | `ConsoleExporter` | a stream (`sys.stderr` by default) | Developing or debugging and you want to watch decisions live | | `JSONLFileExporter` | a `.jsonl` file (one record per line) | You want a durable, append-only local log you can grep or post-process | | `MemoryExporter` | an in-memory list on `.records` | Tests and notebooks — capture decisions, then assert on them without I/O | ### ConsoleExporter Writes each record as one line of JSON to a stream. The quickest way to confirm `@capture` is producing records. ```python import sys from briefcase import setup from briefcase.exporters import ConsoleExporter setup(exporter=ConsoleExporter(sys.stdout, pretty=True)) ``` `ConsoleExporter(stream=None, *, pretty=False)` — `stream` defaults to `sys.stderr`; `pretty=True` indents the JSON. ### JSONLFileExporter Appends records to a file as JSON Lines (one object per line). Durable, append-only, and thread-safe, so it is safe to share across the background export threads `@capture` spawns. Parent directories are created on demand. ```python import briefcase briefcase.observe("decisions.jsonl") # or JSONLFileExporter("decisions.jsonl") ``` `JSONLFileExporter(path)` — `path` is a string or `pathlib.Path`. ### MemoryExporter Collects records in a list on `.records`. Ideal for tests and notebooks where you want to read the captured decisions back. ```python import briefcase mem = briefcase.observe("memory") @briefcase.capture(async_capture=False) def classify_ticket(text: str) -> str: # call your model here return "billing" classify_ticket("My invoice is wrong") assert mem.records[0]["function_name"] == "classify_ticket" mem.clear() # drop all collected records ``` `MemoryExporter()` exposes `.records` (a list) and `.clear()`. ## Custom exporters: ship to an external sink Subclass `BaseExporter` to forward decisions anywhere your stack already collects events — a log aggregator, a message queue, an analytics pipeline. You implement three async methods; register the instance with `observe()` (it returns it unchanged) or with `setup(exporter=...)`. > When you'd reach for this Your team already routes operational events through an internal collector. Rather than build a second pipeline for triage decisions, a small custom exporter forwards each `classify_ticket` record to that same sink. ```python from typing import Any import briefcase from briefcase.exporters import BaseExporter class WebhookExporter(BaseExporter): async def export(self, decision: Any) -> bool: # ship `decision` (a dict) to your external sink here # e.g. post to a collector, enqueue, or forward to a log pipeline return True async def flush(self) -> None: ... async def close(self) -> None: ... exporter = briefcase.observe(WebhookExporter()) @briefcase.capture(decision_type="classification", async_capture=False) def classify_ticket(text: str) -> str: # call your model here return "account_access" classify_ticket("Reset my password") ``` - `export(decision)` ships a single record; return `True` on success. - `flush()` flushes any buffered records. - `close()` releases resources. For durable, queryable persistence rather than fire-and-forget streaming, use a [storage backend](/features/storage-adapters/) instead of (or alongside) a custom exporter. ## Record shape Each record `@capture` hands to an exporter is a dict: - `decision_id` — a UUID string - `decision_type` — the value you passed, or the function qualified name - `function_name` - `inputs` / `outputs` — truncated reprs of the arguments and return value - `started_at` / `ended_at` — ISO 8601 timestamps - `execution_time_ms` - `context_version` — present only when you pass it - `error` — present only when the call raised ## Key symbols - `briefcase.observe(exporter="console", *, level=None)` — configure and return the global exporter. - `briefcase.exporters.ConsoleExporter` — JSON lines to a stream. - `briefcase.exporters.JSONLFileExporter` — append JSON Lines to a file. - `briefcase.exporters.MemoryExporter` — collect records in `.records`. - `briefcase.exporters.BaseExporter` — base class for custom exporters. ## Where this fits - Act 1 · Capture: Decision Recording — Where the records an exporter ships actually come from. (/features/decision-recording/) - Next · Store & Query: Storage Adapters — Persist decisions durably so you can query and replay them later. (/features/storage-adapters/) ## PII Sanitization Source: https://briefcaseai.io/features/pii-sanitization/ > Minimize sensitive data — detect and redact it before a decision record is ever stored. PII sanitization is data minimization for your records: detect sensitive data and redact it *before* a decision is recorded or stored. > When you'd reach for this Support tickets routinely contain emails, phone numbers, and account details. You want a durable record of how each ticket was triaged — but you don't want raw personal data sitting in that record forever. Sanitizing on the way in keeps the decision auditable while keeping the sensitive payload out of storage. The principle is **minimize before you store**: a record you never wrote sensitive data into is one you never have to scrub later. ## Install ```bash pip install briefcase-ai[sanitize] ``` The `sanitize` extra installs no third-party dependencies; detection uses built-in regex patterns. ## Sanitize before capture 1. **Detect** — scan the incoming ticket text for known patterns (email, phone, and any custom patterns you register). 2. **Redact** — replace matches with a `[REDACTED_]` marker so the meaning survives but the sensitive value doesn't. 3. **Capture** — record the decision on the *sanitized* text, so the stored record never held raw PII. ```python from briefcase.sanitize import Sanitizer sanitizer = Sanitizer() result = sanitizer.sanitize("Email me at jane.doe@example.com please") print(result.sanitized) # Email me at [REDACTED_EMAIL] please print(result.redaction_count) # 1 print(result.has_redactions) # True # feed result.sanitized into classify_ticket() so the captured record is clean ``` `sanitize()` returns a `SanitizationResult` with `.sanitized`, `.redactions`, `.redaction_count`, and `.has_redactions`. ## Redaction markers Each match is replaced with a `[REDACTED_]` marker. The built-in PII types and their markers: | PII type | Marker | |----------|--------| | `email` | `[REDACTED_EMAIL]` | | `phone` | `[REDACTED_PHONE]` | | `credit_card` | `[REDACTED_CREDIT_CARD]` | | `ssn` | `[REDACTED_SSN]` | | `ip_address` | `[REDACTED_IP]` | | `api_key` | `[REDACTED_API_KEY]` | ## Inspect redactions Each entry in `result.redactions` is a `Redaction` with `.pii_type`, `.start_position`, `.end_position`, and `.original_length` (positions index into the original text). ```python from briefcase.sanitize import Sanitizer sanitizer = Sanitizer() result = sanitizer.sanitize("Call 555-123-4567 or email jane.doe@example.com") for redaction in result.redactions: print(redaction.pii_type, redaction.start_position, redaction.end_position) # phone 5 21 # email 27 43 ``` ## Sanitize JSON `sanitize_json()` walks a dict and redacts string values, returning a `SanitizationJsonResult` with `.sanitized` and `.redaction_count`. Useful for sanitizing a structured ticket payload before you record it. ```python from briefcase.sanitize import Sanitizer sanitizer = Sanitizer() record = { "ticket_id": "TKT-4821", "contact_email": "jane.doe@example.com", "priority": 2, } result = sanitizer.sanitize_json(record) print(result.sanitized) # {'contact_email': '[REDACTED_EMAIL]', 'priority': 2, 'ticket_id': 'TKT-4821'} print(result.redaction_count) # 1 ``` ## Reject sensitive data in a guardrail Sometimes you don't want to redact and continue — you want to *stop*. Use `contains_pii` (a fast boolean) or `analyze_pii` (a summary that doesn't modify the text) to refuse a payload before it's ever recorded. > When you'd reach for this A policy says certain decisions must never be stored if they still carry raw identifiers. Before persisting a triage decision, check the inputs and reject the call if detection still finds PII — failing closed instead of writing a non-compliant record. ```python from briefcase.sanitize import Sanitizer sanitizer = Sanitizer() def guard(text: str) -> None: if sanitizer.contains_pii(text): report = sanitizer.analyze_pii(text) # summary for logging the reason raise ValueError(f"refusing to store record: PII detected ({report})") guard("Email jane.doe@example.com") # raises before classify_ticket is recorded ``` ```python report = sanitizer.analyze_pii("Email jane.doe@example.com and call 555-123-4567") print(report) # {'has_pii': True, 'total_matches': 2, 'unique_types': 2, # 'detected_types': ['phone', 'email']} ``` | Method | Returns | Use it to... | |--------|---------|--------------| | `sanitize(text)` | `SanitizationResult` | Strip PII and keep going | | `sanitize_json(data)` | `SanitizationJsonResult` | Strip PII from a structured payload | | `contains_pii(text)` | `bool` | Cheaply gate a guardrail — proceed or reject | | `analyze_pii(text)` | summary `dict` | Get the details (types, counts) for logging or decisions | This pairs naturally with [Guardrails](/advanced/guardrails/), where you can run this check inside an `evaluate()` and return `DENY` when PII is still present. ## Custom patterns Register your own patterns for identifiers specific to your domain — a ticket number scheme, an internal account ID format — with `add_pattern(name, regex)`. The marker uppercases the name, so `ticket_id` redacts to `[REDACTED_TICKET_ID]`. Registered patterns are picked up by `sanitize`, `contains_pii`, and `analyze_pii`. ```python from briefcase.sanitize import Sanitizer sanitizer = Sanitizer() sanitizer.add_pattern("ticket_id", r"\bTKT-\d{4}\b") result = sanitizer.sanitize("Ticket TKT-4821 was escalated") print(result.sanitized) # Ticket [REDACTED_TICKET_ID] was escalated ``` | Argument | Type | Description | |----------|------|-------------| | `name` | `str` | A label for the pattern; uppercased into the `[REDACTED_]` marker and reported by `analyze_pii` | | `pattern` | `str` | The regex to match and redact | `remove_pattern(pattern_name)` removes a registered pattern again. Choose pattern names that read clearly in an audit trail — they become the redaction marker and appear in `analyze_pii` summaries. ## Key classes - `Sanitizer` — detects and redacts PII; `sanitize`, `sanitize_json`, `add_pattern`, `remove_pattern`, `contains_pii`, `analyze_pii`. - `SanitizationResult` — `.sanitized`, `.redactions`, `.redaction_count`, `.has_redactions`. - `Redaction` — `.pii_type`, `.start_position`, `.end_position`, `.original_length`. - `SanitizationJsonResult` — `.sanitized`, `.redaction_count`. ## Where this fits - Capture: Decision Recording — Capture the sanitized decision as a verifiable snapshot. (/features/decision-recording/) - Next · Store & Query: Storage Adapters — Persist the sanitized snapshots to a durable, queryable backend. (/features/storage-adapters/) ======================================================================== # Control & Route ======================================================================== ## Guardrails Source: https://briefcaseai.io/advanced/guardrails/ > Enforce deny-by-default, fail-closed controls before an agent action runs. A guardrail decides whether an agent may perform an action on a resource — and blocks it before the action runs. > When you'd reach for this Your triage agent wants to write to `kb/internal-articles` or invoke an action on a customer's behalf. Before that runs, you need a single, testable gate that says yes or no — and that defaults to *no* if anything goes wrong. Guardrails are that gate: a deny-by-default control evaluated on every request, so an unauthorized or error-state action never reaches your model or your tools. The guardrail system is a framework, not a fixed set of classes: `GuardrailEnv` is a runtime-checkable protocol with a single core method, `evaluate(request)`. Any object that implements it — yours or a registered one — plugs into the wrappers, pipelines, and batch evaluators the same way. ## How it works 1. **Write a guardrail** — subclass `BaseGuardrailEnv` and implement a pure `evaluate(request)` that returns `ALLOW` or `DENY`. 2. **Compose** — chain guardrails in a `GuardrailPipeline` and stack `Wrapper`s (cache, timeout, audit) around any of them. 3. **Fail closed** — wrap the outermost layer in `DenyByDefaultWrapper` so any exception becomes `DENY`, then gate the action on `result.is_allowed`. ```mermaid flowchart LR A["Agent action"] --> B["DenyByDefaultWrapper"] B --> C["GuardrailPipeline"] C -- "ALLOW" --> D["Action runs"] C -- "DENY" --> X["Action blocked"] B -- "exception" --> X ``` > Diagram description Every agent action is evaluated by a guardrail before it runs. The outermost `DenyByDefaultWrapper` calls the `GuardrailPipeline`. An `ALLOW` lets the action run; a `DENY` blocks it. If evaluation raises an exception, the wrapper catches it and the action is blocked — the system fails closed, never open. ## Install ```bash pip install briefcase-ai[guardrails] ``` ```python from briefcase.guardrails import ( BaseGuardrailEnv, EvalRequest, EvalResult, Effect, ) ``` ## Write a Guardrail Subclass `BaseGuardrailEnv` and implement `evaluate()`. It receives an `EvalRequest` and returns an `EvalResult` carrying an `Effect` (`ALLOW` or `DENY`). ```python from briefcase.guardrails import BaseGuardrailEnv, EvalRequest, EvalResult, Effect class TierGuardrail(BaseGuardrailEnv): """Allow access to a resource only for agents on the right tier.""" _name = "tier-check" def __init__(self, allowed_tiers): self._allowed = set(allowed_tiers) def evaluate(self, request: EvalRequest) -> EvalResult: tier = request.context.get("tier") if tier in self._allowed: return EvalResult( effect=Effect.ALLOW, guardrail_name=self._name, reason=f"tier '{tier}' is permitted", ) return EvalResult( effect=Effect.DENY, guardrail_name=self._name, reason=f"tier '{tier}' is not permitted", ) env = TierGuardrail(allowed_tiers={"standard", "premium"}) request = EvalRequest( agent="support-bot", action="invoke", resource="kb/internal-articles", context={"tier": "premium"}, ) result = env.evaluate(request) print(result.is_allowed) # True print(result.effect) # Effect.ALLOW print(result.reason) # "tier 'premium' is permitted" ``` > Keep `evaluate()` pure `evaluate()` must be deterministic and side-effect-free: the same request yields the same result, and it performs no I/O. Keep it fast. ### EvalRequest ```python EvalRequest( agent="support-bot", # who is acting action="invoke", # what they want to do resource="kb/internal-articles", # what they want to act on context={"tier": "premium"}, # attributes the guardrail evaluates request_id=None, # optional correlation id ) ``` ### EvalResult | Field | Description | |-------|-------------| | `effect` | `Effect.ALLOW` or `Effect.DENY` | | `guardrail_name` | Name of the guardrail that produced the result | | `reason` | Human-readable explanation | | `is_allowed` | `True` when `effect == Effect.ALLOW` | | `policy_id` | Optional identifier of the policy applied | | `lakefs_sha` | Optional commit the policy was loaded from | | `eval_time_ms` | Evaluation time | ## Register and Instantiate Register a guardrail by string id and instantiate it with `make()`, the same register/make split used by Gymnasium. This lets callers construct guardrails without importing the implementation. ```python from briefcase.guardrails import register, make # entry_point is a "module:ClassName" string — in your code that is the import # path of your guardrail, e.g. "myapp.guardrails:TierGuardrail". register( id="tier-check-v1", entry_point=f"{__name__}:TierGuardrail", kwargs={"allowed_tiers": ["standard", "premium"]}, ) env = make("tier-check-v1") # uses the registered kwargs env = make("tier-check-v1", allowed_tiers=["premium"]) # override per call ``` ## Chain Guardrails with a Pipeline 1. **Define each check** as its own guardrail (resource allowlist, tier check, rate check) so each stays small and testable. 2. **Order them** in a `GuardrailPipeline`, cheapest and most restrictive first. 3. **Pick a mode** — `FIRST_DENY` short-circuits on the first `DENY` (the default and the cheapest); `ALL` and `MAJORITY` run every stage. `GuardrailPipeline` evaluates a request through several guardrails in order. By default it short-circuits on the first `DENY`. ```python from briefcase.guardrails import ( BaseGuardrailEnv, EvalRequest, EvalResult, Effect, GuardrailPipeline, PipelineMode, ) class ResourceAllowlist(BaseGuardrailEnv): _name = "resource-allowlist" def __init__(self, allowed): self._allowed = set(allowed) def evaluate(self, request: EvalRequest) -> EvalResult: ok = request.resource in self._allowed return EvalResult( effect=Effect.ALLOW if ok else Effect.DENY, guardrail_name=self._name, reason="resource permitted" if ok else "resource not allowlisted", ) pipeline = GuardrailPipeline( stages=[ ResourceAllowlist(allowed={"kb/internal-articles"}), TierGuardrail(allowed_tiers={"standard", "premium"}), ], mode=PipelineMode.FIRST_DENY, ) request = EvalRequest( agent="support-bot", action="invoke", resource="kb/internal-articles", context={"tier": "premium"}, ) outcome = pipeline.evaluate(request) print(outcome.is_allowed) # True print(len(outcome.individual_results)) # one result per stage that ran ``` `PipelineMode` options: | Mode | Behavior | |------|----------| | `FIRST_DENY` | Stop on the first `DENY` (default) | | `ALL` | Evaluate every stage; `DENY` if any stage denies | | `MAJORITY` | Majority vote across stages | ```mermaid flowchart LR A["EvalRequest"] --> B["ResourceAllowlist"] B -- "DENY" --> X["Block (short-circuit)"] B -- "ALLOW" --> C["TierGuardrail"] C -- "DENY" --> X C -- "ALLOW" --> D["Allow"] ``` > Diagram description An `EvalRequest` flows through two stages in order. `ResourceAllowlist` runs first: a `DENY` short-circuits straight to Block, while an `ALLOW` passes to `TierGuardrail`. `TierGuardrail` then either denies (also Block) or allows. Only when both stages allow does the request reach the final Allow outcome. ## Composable Wrappers A wrapper *is* a `GuardrailEnv`, so wrappers stack around any guardrail. > Fail closed: deny by default Use `DenyByDefaultWrapper` as the **outermost** layer. It catches any exception raised during evaluation and returns `DENY`, so a bug, a timeout, or an unreachable policy store never results in accidental access. Combined with `TimeoutWrapper`'s `fallback_effect=Effect.DENY`, the whole stack denies under any failure condition rather than letting an action through. ```python from briefcase.guardrails import ( CacheWrapper, TimeoutWrapper, DenyByDefaultWrapper, Effect, ) env = DenyByDefaultWrapper( TimeoutWrapper( CacheWrapper(TierGuardrail(allowed_tiers={"premium"})), max_ms=10.0, fallback_effect=Effect.DENY, ) ) result = env.evaluate(request) ``` | Wrapper | Effect | |---------|--------| | `CacheWrapper` | Caches results with a TTL | | `TimeoutWrapper` | Falls back (default `DENY`) if evaluation exceeds `max_ms` | | `AuditWrapper` | Records every `(request, result)` for observability | | `SamplingWrapper` | Evaluates a fraction of requests; allows the rest | | `DenyByDefaultWrapper` | Catches exceptions and returns `DENY` | | `ViolationModeWrapper` | Converts `DENY` to `ALLOW` for soft-deny workflows | ## Gate the Action (fail-closed) A guardrail only governs an action if you actually gate on its result. Evaluate *before* the side effect runs, treat anything that is not an explicit `ALLOW` as a deny, and let the `DenyByDefaultWrapper` turn any unexpected exception into a block. ```python from briefcase.guardrails import DenyByDefaultWrapper, EvalRequest # Outermost layer fails closed: any exception inside becomes DENY. gate = DenyByDefaultWrapper(TierGuardrail(allowed_tiers={"premium"})) def classify_ticket(ticket, *, agent="support-bot"): request = EvalRequest( agent=agent, action="invoke", resource="kb/internal-articles", context={"tier": ticket["tier"]}, ) try: result = gate.evaluate(request) except Exception: # Belt and suspenders: even if the gate itself raises, deny. raise PermissionError("guardrail evaluation failed; action blocked") if not result.is_allowed: raise PermissionError(f"action denied: {result.reason}") # Authorized — now it is safe to run the side effect. # return run_classification(ticket) # call your model / tools here ``` > Never act on a missing ALLOW Branch on `result.is_allowed`, not on the absence of a deny. If evaluation is skipped, times out, or raises, the action must not run. Deny-by-default means *allow only when explicitly allowed*. ## Where this fits Guardrails are the **Control** act of the journey: enforce the rule before the action runs. Once an action is authorized, route it; once it has run, capture it. - Routing — Next: decide whether an authorized decision is handled automatically or escalated to human review. (/advanced/routing/) - Decision Recording — Capture: record the full context behind every decision the agent makes. (/features/decision-recording/) ## Routing Source: https://briefcaseai.io/advanced/routing/ > Route an AI decision between automatic handling and human review with a narrow, in-process gate. The routing module decides what happens to a decision: handle it automatically or escalate it to human review. A router takes a decision context and returns a `RoutingDecision`. > When you'd reach for this Your triage agent has classified a ticket and produced a confidence score. Some results are safe to act on automatically; low-confidence ones should go to a human. `BaseRouter` is the small, in-process gate that makes that auto-vs-human call inside your request path, with no policy store and no history to manage. > For production audit trails, use Versioned Routing Policy `BaseRouter` does **not** version its logic and cannot tell you later which rule fired or which configuration was active on a past date. When the routing choice is governed by a policy that changes over time — and you need to reconstruct exactly which rule and which policy version produced a past decision — use [**Versioned Routing Policy**](/advanced/versioned-routing-policy/) instead. Reach for `BaseRouter` only when an in-process, non-versioned gate is enough. ## Simple vs. versioned routing | | `BaseRouter` (this page) | [`AgentRouter`](/advanced/versioned-routing-policy/) (versioned) | |---|---|---| | Purpose | In-process auto-vs-human gate | Policy-governed choice with attribution | | Logic lives in | Your subclass code | Versioned `PolicyVersion` rules | | Call style | `async` (I/O-bound) | `sync` (pure, in-memory) | | Versioned? | No | Yes — every version is an append | | Reconstruct a past decision? | No | Yes — route `as_of` a date | | Records which rule fired? | No | Yes — `matched_rule_id` | | Backed by | Nothing | Bitemporal store | | Reach for it when | A quick, non-audited gate is enough | You must prove which rule fired, when | ## Install ```bash pip install briefcase-ai[routing] ``` ```python from briefcase.routing import BaseRouter, RoutingDecision ``` ## Route a Decision 1. **Subclass `BaseRouter`** and implement the `route` coroutine. 2. **Read the decision context** — for triage, the classifier's confidence. 3. **Return a `RoutingDecision`** with an `action` (`"auto"` or `"human_review"`) and a `reason` you can attach to the decision record. `BaseRouter` is an abstract base class with a single abstract coroutine, `route(decision_context) -> RoutingDecision`. Subclass it and implement `route`. The router is asynchronous because real routers usually call out to an external policy service or model. ```python import asyncio import time from briefcase.routing import BaseRouter, RoutingDecision class ConfidenceRouter(BaseRouter): """Route a support ticket to automatic handling or human review.""" def __init__(self, auto_threshold: float = 0.85): self.auto_threshold = auto_threshold async def route(self, decision_context) -> RoutingDecision: start = time.perf_counter() confidence = decision_context.get("confidence", 0.0) if confidence >= self.auto_threshold: action = "auto" reason = f"confidence {confidence:.2f} >= {self.auto_threshold}" else: action = "human_review" reason = f"confidence {confidence:.2f} below threshold" eval_time_ms = (time.perf_counter() - start) * 1000 return RoutingDecision( action=action, source="internal", eval_time_ms=eval_time_ms, reason=reason, ) async def main(): router = ConfidenceRouter(auto_threshold=0.85) high = await router.route({"ticket_id": "T-1001", "confidence": 0.93}) low = await router.route({"ticket_id": "T-1002", "confidence": 0.40}) print(high.action, high.reason) # auto ... print(low.action, low.reason) # human_review ... asyncio.run(main()) ``` ## RoutingDecision `RoutingDecision` is a dataclass with four fields: | Field | Type | Description | |----------------|------------------|----------------------------------------------------------| | `action` | `str` | The routing outcome, e.g. `"auto"` or `"human_review"`. | | `source` | `str` | Where the decision came from, e.g. `"internal"`, `"opa"`.| | `eval_time_ms` | `float` | How long evaluation took, in milliseconds. | | `reason` | `str` (optional) | Human-readable explanation of the outcome. | ```mermaid flowchart LR A["Decision context"] --> B["BaseRouter.route"] B --> C{"meets criteria?"} C -- yes --> D["action = auto"] C -- no --> E["action = human_review"] D & E --> F["RoutingDecision"] ``` > Diagram description A decision context enters `BaseRouter.route`, which checks whether it meets the criteria. If yes, the action becomes `auto`; if no, the action becomes `human_review`. Both branches converge into a single `RoutingDecision`. ## Choosing a Layer `BaseRouter` is intentionally narrow: an in-process gate for the auto-versus-human question. It does not version its logic, and it cannot tell you later which rule fired or which configuration was active on a given date. When the routing choice is governed by a policy that changes over time — and you need to reconstruct a past decision exactly — use the [versioned routing layer](/advanced/versioned-routing-policy/) instead, which adds `AgentRouter`, `PolicyRegistry`, and `PolicyVersion` for auditable, time-travel routing. ## Where this fits Routing is part of the **Control** act: once an action is authorized by a [guardrail](/advanced/guardrails/), the router decides who handles it. For a production audit trail, route through the versioned layer. - Guardrails — Previous: enforce deny-by-default controls before an action is allowed to run. (/advanced/guardrails/) - Versioned Routing Policy — Next: route through versioned policies and reconstruct past decisions as-of a date. (/advanced/versioned-routing-policy/) ## Versioned Routing Policy Source: https://briefcaseai.io/advanced/versioned-routing-policy/ > Reconstruct exactly which rule fired and which policy version was active on any past date. Route agent decisions through versioned, time-travelable policies — so you can prove which rule fired on any past date. > When you'd reach for this Six weeks ago your triage agent routed an enterprise ticket to the senior queue. Someone asks why. Since then the policy changed twice. To answer, you need the exact rule set that was active *that day* and the specific rule that fired — not today's policy. Versioned routing reconstructs that decision as-of its original date, every time. When an agent's routing choice is governed by a policy that changes over time, recording the choice is not enough. To reconstruct a past decision you need both the full policy that was in effect on the decision date and the specific rule that fired. A single-version policy store cannot answer that once the policy has changed. The versioned routing layer stores every policy version in a bitemporal store. Publishing a new version is an append, never a mutation, so reading "the policy as of date X" returns exactly the rule set that was active then. ## How it works 1. **Define a `PolicyVersion`** — an ordered list of `PolicyRule`s plus a `default_choice`. 2. **Publish it** to a `PolicyRegistry` with a `valid_from` date; publishing is an append, so older versions are never overwritten. 3. **Route** a context through an `AgentRouter` — it selects the first matching rule and records `matched_rule_id`, `policy_version`, and `rationale`. 4. **Reconstruct as-of** a past date by passing `as_of_transaction_time` — the registry returns the rule set that was active then. ## Install ```bash pip install briefcase-ai[routing] ``` ```python from briefcase.routing import ( AgentRouter, PolicyRegistry, PolicyRule, PolicyVersion, ) ``` ## Route a Ticket This example routes a support ticket to a queue based on the ticket context, publishes a second policy version, and reconstructs the earlier decision as-of a past date. ```python from datetime import datetime, timezone from briefcase.routing import ( AgentRouter, PolicyRegistry, PolicyRule, PolicyVersion, ) # 1. Define a policy: route a support ticket to a queue by context. policy_v1 = PolicyVersion( policy_id="ticket-routing", version="1.0.0", description="Route support tickets to a queue.", rules=[ PolicyRule( rule_id="enterprise-priority", condition={"plan": "enterprise", "priority": "high"}, choice="senior-queue", rationale="High-priority enterprise tickets go to the senior queue.", ), PolicyRule( rule_id="routine-lookup", condition={"category": {"in": ["faq", "status-check"]}}, choice="self-serve", rationale="Routine lookups are deflected to self-serve.", ), ], default_choice="standard-queue", ) # 2. Publish it to a versioned registry (bitemporal-backed by default). # valid_from is when the policy takes effect; transaction_time is when # the registry learned of it (defaults to now if omitted). registry = PolicyRegistry() registry.publish( policy_v1, valid_from=datetime(2026, 1, 1, tzinfo=timezone.utc), transaction_time=datetime(2026, 1, 1, tzinfo=timezone.utc), ) # 3. Route a request through the registry. router = AgentRouter( registry, use_case="ticket-routing", policy_id="ticket-routing", ) decision = router.route( {"plan": "enterprise", "priority": "high", "category": "billing"}, evidence_refs=["tkt-9001"], ) print(decision.selected) # senior-queue print(decision.policy_version) # 1.0.0 print(decision.matched_rule_id) # enterprise-priority print(decision.rationale) # High-priority enterprise tickets ... print(decision.evidence_refs) # ['tkt-9001'] # 4. Publish a newer version. Older decisions still reconstruct correctly. policy_v2 = PolicyVersion( policy_id="ticket-routing", version="2.0.0", description="De-escalate enterprise high-priority to the standard queue.", rules=[ PolicyRule( rule_id="enterprise-priority", condition={"plan": "enterprise", "priority": "high"}, choice="standard-queue", rationale="Updated: enterprise high-priority now goes to the standard queue.", ), ], default_choice="standard-queue", ) registry.publish( policy_v2, valid_from=datetime(2026, 4, 1, tzinfo=timezone.utc), transaction_time=datetime(2026, 4, 1, tzinfo=timezone.utc), ) # Current routing uses v2. current = router.route({"plan": "enterprise", "priority": "high"}) print(current.selected, current.policy_version) # standard-queue 2.0.0 # Reconstruct the decision as it would have been made before v2 was published. as_of = datetime(2026, 2, 1, tzinfo=timezone.utc) historical = router.route( {"plan": "enterprise", "priority": "high"}, as_of_transaction_time=as_of, ) print(historical.selected, historical.policy_version) # senior-queue 1.0.0 # Inspect the full version history. print([v.version for v in registry.history("ticket-routing")]) # ['1.0.0', '2.0.0'] ``` ```mermaid flowchart LR A["Context"] --> B["AgentRouter.route"] B --> C["PolicyRegistry.get(as_of)"] C --> D["PolicyVersion.select"] D --> E{"rule matches?"} E -- yes --> F["matched rule choice"] E -- no --> G["default_choice"] F & G --> H["AgentRoutingDecision"] ``` > Diagram description A context enters `AgentRouter.route`, which fetches the policy from `PolicyRegistry.get(as_of)` and runs `PolicyVersion.select`. If a rule matches, the matched rule's choice is used; if no rule matches, the policy's `default_choice` is used. Both branches produce an `AgentRoutingDecision`. ## PolicyRule A `PolicyRule` is a single "if the context matches this condition, select this choice" rule. The `condition` is a small dict predicate evaluated against the routing context. | Field | Type | Description | |-------------|------------------|------------------------------------------------------| | `rule_id` | `str` | Stable identifier, recorded as `matched_rule_id`. | | `condition` | `dict` | Predicate evaluated against the context. | | `choice` | `str` | The choice selected when the condition matches. | | `rationale` | `str` (optional) | Explanation copied into the decision record. | `condition` supports three forms: | Syntax | Meaning | |---------------------------------|-------------| | `{"field": value}` | equality | | `{"field": {"in": [a, b]}}` | membership | | `{"field": {"ne": value}}` | inequality | All keys in a condition must match (logical AND). Call `matches(context)` to test a rule directly: > Misconfiguration fails loudly Unknown condition operators raise `KeyError` rather than silently evaluating to `False`, so a typo in a rule surfaces immediately instead of mis-routing. ```python from briefcase.routing import PolicyRule, PolicyVersion rule = PolicyRule( rule_id="routine-lookup", condition={"category": {"in": ["faq", "status-check"]}}, choice="self-serve", rationale="Routine lookups are deflected to self-serve.", ) print(rule.matches({"category": "faq"})) # True print(rule.matches({"category": "billing"})) # False ``` ## PolicyVersion A `PolicyVersion` is an ordered list of rules. `select(context)` evaluates the rules in order and returns the first match. If no rule matches and `default_choice` is set, the default is returned; otherwise `choice` is `None` so the caller can fall back to human review. ```python policy = PolicyVersion( policy_id="ticket-routing", version="1.0.0", rules=[rule], default_choice="standard-queue", ) hit = policy.select({"category": "status-check"}) print(hit.choice, hit.matched_rule_id) # self-serve routine-lookup miss = policy.select({"category": "billing"}) print(miss.choice, miss.matched_rule_id) # standard-queue None ``` `select` returns a `PolicyEvaluationResult` with `choice`, `matched_rule_id`, `policy_id`, `policy_version`, and `rationale`. ## PolicyRegistry `PolicyRegistry(store=None)` is a versioned registry of policies. It defaults to an in-memory bitemporal store; pass any `BitemporalStore` for durable storage. | Method | Returns | Description | |-----------------------------------------------------------------|--------------------------|-----------------------------------------------------------------------------| | `publish(policy, *, valid_from, transaction_time=None, source=...)` | `BitemporalRecord` | Append a new version. `valid_from` is a `datetime`. | | `get(policy_id, *, as_of_transaction_time=None, as_of_valid_time=None)` | `PolicyVersion` | The version visible at the as-of point; latest if no clamp is given. | | `history(policy_id)` | `list[PolicyVersion]` | Every published version, oldest first. | `valid_from` records when a policy takes effect in the real world; `transaction_time` records when the registry learned of it (defaults to now). An as-of read clamps both, which is how a past decision reconstructs the rule set that was active on its decision date. ## AgentRouter `AgentRouter` joins a registry to a use case and a policy, and produces an `AgentRoutingDecision` ready to attach to a Briefcase decision snapshot. ```python router = AgentRouter( registry, use_case="ticket-routing", policy_id="ticket-routing", candidates_provider=None, # optional: derive candidate choices from context ) ``` `route` is synchronous — policy evaluation is a pure, in-memory computation against the bitemporal store. This differs from the I/O-bound, asynchronous [`BaseRouter`](/advanced/routing/); the two are independent abstractions. | `route` parameter | Description | |---------------------------|-----------------------------------------------------------------------| | `context` | The dict evaluated against the policy rules. | | `evidence_refs` | Record IDs of the bitemporal rows that informed the decision. | | `as_of_transaction_time` | Reconstruct the decision using the policy active on a past date. | The returned `AgentRoutingDecision` carries the full attribution: | Field | Description | |-------------------|--------------------------------------------------------------| | `selected` | The chosen option (may be `None` if no rule and no default). | | `policy_version` | The version that produced the choice. | | `matched_rule_id` | The rule that fired, or `None` for the default. | | `rationale` | Human-readable explanation from the matched rule. | | `evidence_refs` | The evidence record IDs passed to `route`. | | `candidates` | The set of choices the policy could have selected from. | ## Where this fits Versioned routing is the **Control** act made reproducible: it records which rule fired so the decision can later be reconstructed and proven. It rests on the append-only store from the **Store & Query** act and feeds the tamper-evident bundles of the **Prove** act. - Routing — Previous: the narrow, in-process auto-versus-human-review router. (/advanced/routing/) - Bitemporal Storage — The append-only store that backs the registry and powers as-of reconstruction. (/advanced/bitemporal-storage/) - Audit Bundles — Next: package a routing decision, its policy version, and evidence into a content-hashed, verifiable bundle. (/advanced/compliance-bundles/) ## Validation Engine Source: https://briefcaseai.io/advanced/validation-engine/ > Block prompts whose references no longer resolve, before they reach a model. The validation engine checks that the references in a prompt (document paths, section numbers, identifiers) actually resolve against a versioned knowledge base before the prompt reaches a model. > When you'd reach for this Your triage agent builds a prompt that cites `handbook/onboarding.md` and `Section 4.2.3`. Last week someone moved the handbook and the section number changed. Without a check, the model answers confidently from a reference that no longer resolves. The validation engine catches that *before* the prompt runs and hands back structured remediation instead of a silent, wrong answer. It is a pure framework: you supply an *extractor* that finds references and a *resolver* that checks them, and the engine orchestrates the layers and records the commit it validated against. ## How it works 1. **Extract** — your extractor finds candidate references in the prompt. No references means the prompt passes immediately. 2. **Resolve** — your resolver checks each reference against the versioned knowledge base and returns a `ValidationError` for anything that fails. 3. **Semantic (optional)** — if there are no errors, an optional `semantic_validator` can add warnings based on the prompt's meaning. 4. **Stamp** — the engine records the knowledge-base commit it validated against, so the result is reproducible. ## Install ```bash pip install briefcase-ai[validate] ``` ```python from briefcase.validation import PromptValidationEngine from briefcase.validation import ValidationError, ValidationErrorCode ``` ## Validate a Prompt ```python import re from briefcase.validation import PromptValidationEngine from briefcase.validation import ValidationError, ValidationErrorCode class HandbookExtractor: """Finds ``handbook/*.md`` paths and ``Section X.Y`` references in a prompt.""" _REF = re.compile(r"handbook/[\w/]+\.md|Section\s+[\d.]+") def extract(self, prompt: str) -> list: return self._REF.findall(prompt) class KnowledgeBaseResolver: """Resolves references against an allowlist of known documents.""" def __init__(self, known_references: set): self._known = known_references def resolve_all(self, references: list) -> list: errors = [] for ref in references: if ref not in self._known: errors.append( ValidationError( code=ValidationErrorCode.REFERENCE_NOT_FOUND, message=f"Reference not found in knowledge base: {ref}", reference=ref, severity="error", layer="resolution", remediation="Add the document to the knowledge base or fix the reference.", ) ) return errors class DemoKnowledgeBase: """Stand-in for VersionedClient.get_commit() so the example runs offline.""" def get_commit(self, repository: str, branch: str) -> str: return "demo0000000000000000000000000000000000000" engine = PromptValidationEngine( extractor=HandbookExtractor(), resolver=KnowledgeBaseResolver( known_references={"handbook/onboarding.md", "Section 4.2.3"}, ), lakefs_client=DemoKnowledgeBase(), repository="knowledge-base", branch="main", mode="strict", # fail on errors ) prompt = """ Follow the onboarding policy in handbook/onboarding.md and reference Section 4.2.3 for account-setup steps. Also see handbook/missing.md. """ report = engine.validate(prompt) print(report.status) # "failed" — handbook/missing.md is unknown print(report.references_checked) # 3 print(report.lakefs_commit[:8]) # "demo0000" if report.has_errors: for error in report.errors: print(error.reference, "->", error.message) print(" fix:", error.remediation) ``` In production, pass any versioned-data client (the bundled [`briefcase.integrations.lakefs.VersionedClient`](/integrations/lakefs/), or your own via the `vcs` protocol) as `lakefs_client`, to resolve references against a live, version-controlled knowledge base. The engine calls `lakefs_client.get_commit(repository, branch)` to stamp every report with the commit it validated against. ## Pluggable Protocols You provide objects that satisfy two protocols. The engine ships no built-in extractors or resolvers and installs no third-party dependencies for them. You supply the objects that find and check references. ```python # briefcase.validation exports these as runtime-checkable protocols; any object # with the right method signature satisfies them (no base class required). from typing import Protocol # Extractor: find references in a prompt. class Extractor(Protocol): def extract(self, prompt: str) -> list: ... # Resolver: check each reference, return a list of ValidationError. class Resolver(Protocol): def resolve_all(self, references: list) -> list: ... ``` A resolver returns `ValidationError` objects. Errors with `severity="error"` become `report.errors`; any other severity becomes `report.warnings`. An optional third layer runs only when there are no errors: pass a `semantic_validator` with a `validate_semantic(prompt, references) -> list` method to attach warnings based on the meaning of the prompt. ```python class KeywordSemanticValidator: def validate_semantic(self, prompt: str, references: list) -> list: return [] # return ValidationError warnings based on the prompt's meaning engine = PromptValidationEngine( extractor=HandbookExtractor(), resolver=KnowledgeBaseResolver(known_references=set()), lakefs_client=DemoKnowledgeBase(), repository="knowledge-base", semantic_validator=KeywordSemanticValidator(), # optional third layer ) ``` ## Validation Layers ```mermaid flowchart LR A["Prompt"] --> B["extractor.extract()"] B -- "no refs" --> P["status: passed"] B -- "refs found" --> C["resolver.resolve_all()"] C --> D{"errors?"} D -- "yes" --> F["status: failed"] D -- "no" --> E["semantic_validator (optional)"] E --> G["status: passed / warning"] ``` > Diagram description A prompt runs through `extractor.extract()`. If no references are found, the status is `passed`. If references are found, `resolver.resolve_all()` runs and the engine checks for errors. Any error yields status `failed`. With no errors, the optional `semantic_validator` runs and the status becomes `passed` or `warning`. ## ValidationReport `validate()` returns a `ValidationReport`. | Field | Description | |-------|-------------| | `status` | `"passed"`, `"warning"`, or `"failed"` | | `errors` | List of `ValidationError` with `severity="error"` | | `warnings` | List of `ValidationError` with other severities | | `references_checked` | Number of references the extractor found | | `validation_time_ms` | Wall-clock validation time | | `lakefs_commit` | Commit SHA the validation ran against | | `has_errors` | `True` when `errors` is non-empty | | `has_warnings` | `True` when `warnings` is non-empty | ## ValidationError Each error carries structured remediation context. ```python ValidationError( code=ValidationErrorCode.REFERENCE_NOT_FOUND, message="Reference not found in knowledge base: handbook/missing.md", reference="handbook/missing.md", severity="error", layer="resolution", remediation="Add the document to the knowledge base or fix the reference.", ) ``` `ValidationErrorCode` is an enum of the conditions the engine reports: | Code | Meaning | |------|---------| | `INVALID_SYNTAX` | Reference is malformed | | `REFERENCE_NOT_FOUND` | Reference does not exist in the knowledge base | | `REFERENCE_AMBIGUOUS` | Reference matches more than one document | | `REFERENCE_GONE` | Reference existed but was removed | | `VERSION_MISMATCH` | Reference resolves to an unexpected version | | `SCHEMA_INVALID` | Resolved document fails schema checks | | `LAKEFS_UNAVAILABLE` | The versioned knowledge base could not be reached | ## Validation Modes The `mode` argument controls how `status` is derived. | Mode | Errors | Warnings only | Clean | |------|--------|---------------|-------| | `"strict"` | `failed` | `warning` | `passed` | | `"tolerant"` | `failed` | `passed` | `passed` | | `"warn_only"` | `passed` | `passed` | `passed` | ## Where this fits Validation is a **Control** act: it stops a prompt with stale references before it runs, and stamps the commit it checked against so the result is reproducible. It pairs naturally with versioned retrieval. - Guardrails — Previous: enforce deny-by-default controls before an agent action runs. (/advanced/guardrails/) - RAG Versioning — Pin retrieval to a versioned knowledge base so references resolve to a known commit. (/advanced/rag-versioning/) - Reproducible RAG (guide) — End-to-end: validate references and pin retrieval so a run reproduces exactly. (/guides/reproducible-rag/) ======================================================================== # Store & Query ======================================================================== ## Storage Adapters Source: https://briefcaseai.io/features/storage-adapters/ > Choose where your decision records live — the backend that holds your audit trail. A storage adapter is the backend that holds your audit trail — the durable home for every decision Briefcase captures, and the surface you query when someone asks why a decision was made. > When you'd reach for this Your support-triage agent has been recording `classify_ticket` decisions for a month, and a reviewer now wants to pull every support-queue decision from last week. Those records have to live somewhere durable and queryable. The backend you pick decides whether that review is a one-line query against a file on disk or a lost cause because the records were only ever in memory. ## How it works 1. **Init** — call `briefcase.init()` once and construct a backend. 2. **Create** — build a `DecisionSnapshot` for each `classify_ticket` call. 3. **Save** — `save_decision()` persists the record and returns its id. 4. **Query** — pull records back with a `SnapshotQuery` when you need to review them. ```mermaid flowchart LR A[classify_ticket] --> B[DecisionSnapshot] B --> C[SqliteBackend] C --> D[(Audit trail)] E[Reviewer query] --> C ``` > Diagram description A decision becomes a DecisionSnapshot, which is saved into the configured storage backend, building a durable audit trail. A reviewer's SnapshotQuery reads back from the same backend. ## Install ```bash pip install briefcase-ai[storage] ``` The `storage` extra installs no third-party dependencies; the SQLite backend uses the runtime that ships with the package. ## Which backend should I use? | Backend | Persistence | Reach for it when | |---------|-------------|-------------------| | `SqliteBackend.in_memory()` | No — gone on exit | Tests and local experiments, where no durable audit trail is needed | | `SqliteBackend("path.db")` | Yes — a file on disk | A single node where you want a real, queryable audit trail | | `BufferedBackend` | Yes — wraps another backend | High write volume, where batching `save_decision` calls matters | `BufferedBackend` is not a separate store — it wraps a durable backend (such as `SqliteBackend`) and batches `save_decision` calls until the buffer fills, so you trade a small flush delay for less write pressure under load. ## Init -> create -> save -> query 1. **Init the runtime and backend** ```python import briefcase from briefcase.storage import SqliteBackend briefcase.init() # start the native runtime once per process backend = SqliteBackend("decisions.db") ``` 2. **Create a decision** for each `classify_ticket` call. ```python from briefcase import DecisionSnapshot, Input, Output decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "reset my password", "string")) output = Output("category", "account_access", "string") output.with_confidence(0.92) decision.add_output(output) decision.add_tag("queue", "support") ``` 3. **Save** the record — `save_decision()` returns its id. ```python decision_id = backend.save_decision(decision) loaded = backend.load_decision(decision_id) print(loaded.function_name) # classify_ticket ``` 4. **Query the audit trail** with a `SnapshotQuery`. ```python from briefcase import SnapshotQuery results = backend.query( SnapshotQuery() .with_function_name("classify_ticket") .with_tag("queue", "support") ) print(len(results)) ``` `briefcase.init()` must be called once before using a backend to start the native runtime. ## Backends in detail ### In-memory (for tests) `SqliteBackend.in_memory()` keeps data in memory — fast and ephemeral, the right choice for tests where you do not need records to survive the process. ```python import briefcase from briefcase.storage import SqliteBackend briefcase.init() backend = SqliteBackend.in_memory() ``` ### File on disk (for a real audit trail) `SqliteBackend(path)` writes to a file — a durable, queryable audit trail in one place, the workhorse for single-node deployments. ```python import briefcase from briefcase.storage import SqliteBackend briefcase.init() backend = SqliteBackend("decisions.db") print(backend.health_check()) # True ``` ### Buffered (for high volume) `BufferedBackend` wraps a durable backend and batches `save_decision` calls until the buffer fills. ```python import briefcase from briefcase.storage import SqliteBackend, BufferedBackend from briefcase import DecisionSnapshot, Input briefcase.init() backend = BufferedBackend(SqliteBackend("decisions.db"), buffer_size=100) decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "update my address", "string")) backend.save_decision(decision) ``` ## A governance query: load decisions for review The point of a durable backend is the review it enables. When a reviewer asks for last week's support-queue decisions, you answer with a tagged `SnapshotQuery` against the same store that captured them. ```python import briefcase from briefcase.storage import SqliteBackend from briefcase import SnapshotQuery briefcase.init() backend = SqliteBackend("decisions.db") # Pull every support-queue triage decision for review query = ( SnapshotQuery() .with_function_name("classify_ticket") .with_tag("queue", "support") .with_limit(50) .with_offset(0) ) for decision in backend.query(query): # hand each record to a reviewer, or replay it to verify ... ``` `SnapshotQuery` supports `with_function_name`, `with_module_name`, `with_tag`, `with_limit`, and `with_offset`. From here a reviewer can [audit a decision](/guides/audit-a-decision/) or [replay](/features/replay/) it to reproduce exactly what happened. ## Snapshots: grouping multiple decisions A `Snapshot` groups several decisions; `save()` returns the snapshot id and `load()` returns it. ```python import briefcase from briefcase.storage import SqliteBackend from briefcase import DecisionSnapshot, Input, Snapshot briefcase.init() backend = SqliteBackend.in_memory() decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "where is my order", "string")) session = Snapshot("session") session.add_decision(decision) snapshot_id = backend.save(session) restored = backend.load(snapshot_id) print(len(restored.decisions)) # 1 ``` ## The persistence interface `SqliteBackend` exposes the full interface (`BufferedBackend` only buffers `save_decision` calls before flushing them to the backend it wraps): ```python backend.save(snapshot) # store a Snapshot, returns its id backend.load(snapshot_id) # load a Snapshot backend.save_decision(decision) # store a DecisionSnapshot, returns its id backend.load_decision(decision_id) backend.query(snapshot_query) # run a SnapshotQuery backend.delete(snapshot_id) backend.health_check() ``` ## Available backends | Backend | Class | Description | |---------|-------|-------------| | SQLite | `SqliteBackend` | Local SQLite database (file or in-memory) | | Buffered | `BufferedBackend` | Wraps a backend and batches writes | > Scaling beyond a single node The open-source package ships these two backends, and `SqliteBackend` covers single-node deployments well. When you outgrow it — multiple writers, central retention, or shared query access — the path is a server-grade backend: S3, GCS, Azure Blob, and PostgreSQL backends are available in the enterprise build. Moving to one is a backend swap, not a change to your decision code. ## Where this fits Storage is the **Store & Query** act: the durable home for everything Capture produced, and the surface the later acts read from. - Decision Recording — Where decision snapshots come from — the Capture act that fills this backend. (/features/decision-recording/) - Deterministic Replay — Next: load a record back out and reproduce exactly what happened. (/features/replay/) ## Bitemporal Storage Source: https://briefcaseai.io/advanced/bitemporal-storage/ > Answer "what did we know at decision time?" even after a value is later corrected. # Bitemporal Storage Bitemporal storage tracks two independent time dimensions for every record: **valid time** (when a fact was true in the world) and **transaction time** (when the system learned about it) — so a later correction never erases what you actually knew at decision time. > When you'd reach for this Your support-triage agent escalates a ticket because the `max_upload_mb` config read `50` that morning. Two days later someone corrects that value — backdated, because it had been wrong all along. When the escalation is questioned, you need to show what the agent actually saw on the day, not the corrected value. Bitemporal storage keeps both beliefs so "what did we know at decision time?" has an exact answer. ## Valid time vs transaction time Traditional storage overwrites: when a value changes, the old value is lost and the record of what you believed disappears with it. Bitemporal storage separates the two clocks so a backdated correction and the original belief can coexist. Writes are append-only — a correction is a new record, not an edit. | Dimension | Answers | Why it matters | |-----------|---------|----------------| | **Valid time** | When was this fact true in the world? | Lets a correction apply to a past date without rewriting history. | | **Transaction time** | When did the system learn it? | Distinguishes "what we knew on May 2" from "what we know now" — the key to a defensible audit. | ## How it works 1. **Record a fact** with a `valid_time` and a `transaction_time` via `BitemporalRecord.new`. 2. **Append a backdated correction** with `append_correction` — same valid time, a fresh transaction time, so both versions coexist. 3. **Reconstruct the past** by clamping reads through an `AsOfView` at the decision's transaction time. ```mermaid flowchart LR A["BitemporalRecord.new
(valid_time, transaction_time)"] --> B["store.append"] C["append_correction
(new transaction_time)"] --> B B --> D["BitemporalStore
(append-only)"] D --> E["store.latest / history"] D --> F["AsOfView(transaction_time)
clamped read"] F --> G["reconstruct past belief"] ``` > Diagram description Both `BitemporalRecord.new` and `append_correction` write into the append-only `BitemporalStore`. Reads branch two ways: `store.latest` / `history` return current truth, while an `AsOfView` clamped to a transaction time reconstructs the past belief held at that instant. ## Install ```bash pip install briefcase-ai[bitemporal] ``` ```python from briefcase.bitemporal import ( BitemporalRecord, InMemoryBitemporalStore, AsOfView, append_correction, ) ``` ## Record a Fact ```python from datetime import datetime, timezone from briefcase.bitemporal import ( BitemporalRecord, InMemoryBitemporalStore, ) store = InMemoryBitemporalStore() # A feature-flag rollout percentage that was true in the real world at t0. t0 = datetime(2026, 5, 1, tzinfo=timezone.utc) learned_t0 = datetime(2026, 5, 1, 9, 0, tzinfo=timezone.utc) record = BitemporalRecord.new( key="flag:new_checkout", valid_time=t0, value={"rollout_percent": 25}, source="config_service", transaction_time=learned_t0, ) store.append(record) latest = store.latest("flag:new_checkout") print(latest.value) # {'rollout_percent': 25} print(latest.content_hash()[:12]) # SHA-256 of the value payload ``` `valid_time` and `transaction_time` must be timezone-aware; `BitemporalRecord.new` raises `ValueError` otherwise. When `transaction_time` is omitted it defaults to now. ## Correct a Value and Reconstruct the Past A correction shares the original `valid_time` but gets a fresh `transaction_time` and a `parent_record_id` back to the original. The old belief stays in the store. ```python from datetime import datetime, timezone from briefcase.bitemporal import ( BitemporalRecord, InMemoryBitemporalStore, AsOfView, append_correction, ) store = InMemoryBitemporalStore() t0 = datetime(2026, 5, 1, tzinfo=timezone.utc) learned_t0 = datetime(2026, 5, 1, 9, 0, tzinfo=timezone.utc) original = BitemporalRecord.new( key="config:max_upload_mb", valid_time=t0, value=50, source="config_service", transaction_time=learned_t0, ) store.append(original) # A decision was made at this instant, reading what the system knew then. decision_ts = datetime(2026, 5, 2, 12, 0, tzinfo=timezone.utc) # Later, the config service corrects the same valid_time: it was 100, not 50. learned_correction = datetime(2026, 5, 3, 8, 0, tzinfo=timezone.utc) append_correction( store, original, corrected_value=100, source="config_service", transaction_time=learned_correction, ) # Current truth reflects the correction. print(store.latest("config:max_upload_mb").value) # 100 # As-of the decision, the system had not yet learned the correction. with AsOfView(store, transaction_time=decision_ts) as view: print(view.latest("config:max_upload_mb").value) # 50 print(len(store.history("config:max_upload_mb"))) # 2 — both beliefs kept ``` `append_correction` requires the correction's `transaction_time` to be strictly after the original's; it raises `ValueError` otherwise, so a correction can never silently fail to supersede. ## Clamp Reads with AsOfView `AsOfView(store, transaction_time=...)` wraps any store and clamps every read to a historical instant. Application code keeps calling `latest(key)` / `as_of(key)` unchanged — no post-instant information leaks in. The view is read-only; `append` raises. ```python with AsOfView(store, transaction_time=decision_ts) as view: record = view.latest("config:max_upload_mb") rows = view.history("config:max_upload_mb") keys = view.keys() ``` Pass `valid_time=` as well to restrict to facts that were true at a past real-world moment, distinct from what the system had learned by then. ## Batch vs. Stream Ingestion Both produce identical bitemporal output; they differ in `transaction_time` semantics. `batch_append` settles a whole batch at one shared instant; `stream_append` learns each record independently. ```python from datetime import datetime, timezone from briefcase.bitemporal import ( BitemporalRecord, InMemoryBitemporalStore, batch_append, stream_append, ) store = InMemoryBitemporalStore() valid = datetime(2026, 5, 1, tzinfo=timezone.utc) # batch_append: many records settle at one shared transaction_time. settled_at = datetime(2026, 5, 1, 23, 0, tzinfo=timezone.utc) batch = [ BitemporalRecord.new(key="rate:US", valid_time=valid, value=0.07, source="rates_feed"), BitemporalRecord.new(key="rate:EU", valid_time=valid, value=0.19, source="rates_feed"), ] batch_append(store, batch, transaction_time=settled_at) # stream_append: each record is learned independently, at append time. tick = BitemporalRecord.new(key="rate:US", valid_time=valid, value=0.075, source="rates_feed") stream_append(store, tick) print(store.latest("rate:US").value) # 0.075 print(sorted(store.keys())) # ['rate:EU', 'rate:US'] ``` | Helper | When you'd reach for it | |--------|-------------------------| | `batch_append` | A whole batch becomes known at once — settle it at one shared `transaction_time`. | | `stream_append` | Records arrive one at a time — each is learned independently at append time. | ## Durable Backends `InMemoryBitemporalStore` is the reference implementation. For persistence across process restarts, use the SQLite backend. Append-only is enforced at the database layer via triggers. The SQLite backend ships in the base install and adds no third-party dependencies. ```python from briefcase.bitemporal.backends import SqliteBitemporalBackend backend = SqliteBitemporalBackend("evidence.db") ``` For multi-writer analytics, the Iceberg backend wraps `pyiceberg`: ```bash pip install briefcase-ai[bitemporal-iceberg] ``` ```python from briefcase.bitemporal.backends import IcebergBitemporalBackend ``` All backends implement the same `BitemporalStore` protocol, so `AsOfView` and application code are backend-agnostic. ## Key Classes | Symbol | Why it matters | |--------|----------------| | `BitemporalRecord` | Immutable record with `valid_time`, `transaction_time`, `value`, `source`; `new()` constructor, `content_hash()`. | | `InMemoryBitemporalStore` | Reference store; `append` / `append_many` / `latest` / `history` / `as_of` / `keys`. | | `AsOfView` | Read-only view clamped to `transaction_time` and/or `valid_time` — reconstructs a past belief. | | `append_correction` | Appends a superseding record so the original belief is preserved, not overwritten. | | `batch_append` / `stream_append` | Shared-instant vs. per-record ingestion. | | `SqliteBitemporalBackend` / `IcebergBitemporalBackend` | Durable backends sharing the `BitemporalStore` protocol. | ## Where this fits - Related: Versioned Routing Policy — The same backdated-correction model, applied to which rules were in force. (/advanced/versioned-routing-policy/) - Next: Audit Bundles — Seal a decision together with the as-of-then evidence into a verifiable record. (/advanced/compliance-bundles/) ## External Data Source: https://briefcaseai.io/advanced/external-data/ > Pin the exact upstream value a decision used, so it stays reproducible when the source changes. # External Data Tracking `ExternalDataTracker` records a hashed snapshot of every external fetch a decision relied on — an API response, a database query, a file — so the decision stays reproducible even after that source changes underneath you. > When you'd reach for this Your support-triage agent classified a ticket as low priority because the `pricing_service` reported a customer's plan at one tier. Two weeks later that upstream record is corrected to a higher tier, and the routing looks wrong in hindsight. Without a snapshot you can't prove the agent acted on the value that was live at the time. Capture the upstream value with the decision and the answer is exact, not reconstructed. ## How it works 1. **Snapshot the fetch.** `track_api_call` hashes the response and stores it when the policy allows. 2. **Detect drift.** `detect_drift` compares the live value against the latest snapshot to see whether the source moved. 3. **Append a correction.** When the upstream value is fixed, `correct_snapshot` records the fix without erasing the original. ```mermaid flowchart LR A["pricing_service fetch"] --> B["track_api_call()"] B --> C["snapshot + SHA-256 hash"] C --> D["classify_ticket decision"] E["live value, two weeks later"] --> F{"detect_drift"} C --> F F -- "changed" --> G["correct_snapshot
original kept"] ``` > Diagram description The triage agent snapshots the pricing service response via track_api_call, which hashes it before the ticket is classified. Two weeks later detect_drift compares the live value against the stored snapshot and reports a change; correct_snapshot records the fix while leaving the original snapshot intact for audit. ## Install ```bash pip install briefcase-ai[external] ``` ```python from briefcase.external import ( ExternalDataTracker, SnapshotPolicy, SnapshotFrequency, ) ``` ## Track an External Call `track_api_call()` hashes the response, stores a snapshot when the policy allows, and reports whether the source drifted since the last snapshot. ```python from briefcase.external import ExternalDataTracker tracker = ExternalDataTracker() response = {"items": [{"sku": "A-100", "price": 19.99}]} result = tracker.track_api_call( api_name="pricing_service", endpoint="https://pricing.internal/v1/catalog", method="GET", response_data=response, record_count=1, ) print(result["data_hash"][:12]) # SHA-256 of the response print(result["snapshot_stored"]) # True — first snapshot is always stored print(result["drift_detected"]) # False — nothing to compare against yet ``` The same shape applies to database queries and file fetches: ```python tracker.track_db_query( db_system="postgres", db_name="catalog", query="SELECT sku, price FROM products", result_data=[{"sku": "A-100", "price": 19.99}], result_count=1, store_snapshot=True, ) tracker.track_file_fetch( source_name="reference_rates", file_data=b"sku,price\nA-100,19.99\n", file_path="reference/rates.csv", record_count=1, ) ``` | Method | Use it for | |--------|------------| | `track_api_call` | An HTTP/API response the decision read. | | `track_db_query` | A database query result the decision read. | | `track_file_fetch` | A reference file the decision read. | ## Detect Drift Compare current data against the latest stored snapshot for a source. `detect_drift()` returns `None` when there is no prior snapshot. ```python tracker.track_api_call( api_name="pricing_service", endpoint="https://pricing.internal/v1/catalog", method="GET", response_data={"items": [{"sku": "A-100", "price": 19.99}]}, ) report = tracker.detect_drift( "pricing_service", current_data={"items": [{"sku": "A-100", "price": 24.99}]}, ) print(report.has_changed) # True print(report.drift_score) # 1.0 print(report.size_delta) # byte difference vs the baseline snapshot ``` Compare two specific snapshots by id with `compare_snapshots()`: ```python tracker = ExternalDataTracker() first = tracker.track_api_call( api_name="inventory_api", endpoint="https://inv.internal/v1/stock", method="GET", response_data={"items": [{"sku": "A-100", "qty": 40}]}, ) second = tracker.track_api_call( api_name="inventory_api", endpoint="https://inv.internal/v1/stock", method="GET", response_data={"items": [{"sku": "A-100", "qty": 25}]}, ) report = tracker.compare_snapshots(first["snapshot_id"], second["snapshot_id"]) print(report.has_changed) # True ``` ## Snapshot Policy A `SnapshotPolicy` controls when snapshots are taken and how long they are kept. Set a per-source policy, or pass `default_policy` to the tracker. ```python from briefcase.external import ( ExternalDataTracker, SnapshotPolicy, SnapshotFrequency, ) tracker = ExternalDataTracker( default_policy=SnapshotPolicy( frequency=SnapshotFrequency.ON_CHANGE, retention_days=90, max_snapshots=100, ) ) tracker.set_policy( "pricing_service", SnapshotPolicy(frequency=SnapshotFrequency.EVERY_CALL), ) ``` | Field | Default | Description | |-------|---------|-------------| | `frequency` | `ON_CHANGE` | When to store a snapshot | | `retention_days` | `90` | Days to retain snapshots (`0` = forever) | | `change_threshold` | `0.0` | Minimum change to count as drift on `ON_CHANGE` | | `max_snapshots` | `0` | Max snapshots per source (`0` = unlimited) | | `compress` | `False` | Compress snapshot bodies before storage | `SnapshotFrequency` values: `EVERY_CALL`, `ON_CHANGE`, `HOURLY`, `DAILY`, `WEEKLY`. ## Append a Correction When a source returned bad data, append a correction instead of overwriting the snapshot. The correction keeps the parent's `valid_time` (when the data was true in the real world) but gets a fresh transaction time, so historical queries still see the original belief and later queries see the corrected value. ```python original = tracker.track_api_call( api_name="pricing_service", endpoint="https://pricing.internal/v1/catalog", method="GET", response_data={"items": [{"sku": "A-100", "price": 1999.00}]}, # bad value ) corrected = tracker.correct_snapshot( original["snapshot_id"], corrected_data={"items": [{"sku": "A-100", "price": 19.99}]}, source="manual_review", ) print(corrected.parent_snapshot_id == original["snapshot_id"]) # True ``` The original snapshot is never mutated; the correction records its lineage through `parent_snapshot_id`. ## Redact PII Before Storage Pass a `sanitizer` (for example [`briefcase.sanitize.Sanitizer`](/features/pii-sanitization/)) and snapshot bodies are redacted before they are persisted to durable storage. The `data_hash` is still computed over the original payload, so drift detection is unaffected. ```python from briefcase.external import ExternalDataTracker from briefcase.sanitize import Sanitizer tracker = ExternalDataTracker(sanitizer=Sanitizer()) ``` > Fails closed If the sanitizer raises, the tracker persists metadata only — raw, potentially PII-bearing data never reaches storage on the error path. ## Where this fits - Related: RAG Versioning — The same snapshot-and-detect model, applied to a retrieval index. (/advanced/rag-versioning/) - Guide: Reproducible RAG — Put snapshots and versioning together end to end. (/guides/reproducible-rag/) ## RAG Versioning Source: https://briefcaseai.io/advanced/rag-versioning/ > Tie each retrieval to the exact corpus state, and catch a stale index before it informs a decision. # RAG Versioning RAG versioning records exactly which documents were embedded, with which model, at which source commit — so a retrieval can be tied back to the exact corpus state, and a stale index is caught before it answers anything. > When you'd reach for this Your support-triage agent answers policy questions from a retrieval index built over a `support_kb`. The team revises a document and adds a new one, but the index isn't rebuilt — so the agent keeps citing the old version and routes a ticket on outdated guidance. A versioned manifest makes the drift detectable: `check_invalidation` tells you precisely what changed before you trust a retrieval. ## How it works 1. **Build a manifest.** `create_embedding_batch` then `create_manifest` fingerprint the indexed documents into an `EmbeddingManifest`. 2. **Check invalidation.** `check_invalidation` compares the current documents and model against the manifest to detect a stale index. 3. **Rebuild.** When sources changed, `rebuild_index` produces a fresh manifest so retrievals reflect the current corpus. ```mermaid flowchart LR A["Documents"] --> B["create_embedding_batch()"] B --> C["create_manifest()"] C --> D["EmbeddingManifest"] E["Current documents + model"] --> F["check_invalidation()"] D --> F F --> G{"is_valid?"} G -- "no" --> H["rebuild_index()"] G -- "yes" --> I["Reuse index"] ``` > Diagram description Documents flow through `create_embedding_batch()` and `create_manifest()` to produce an `EmbeddingManifest`. `check_invalidation()` compares that manifest against the current documents and model: if it is no longer valid the index is rebuilt with `rebuild_index()`, otherwise the existing index is reused. ## Install ```bash pip install briefcase-ai[rag] ``` The `rag` extra is pure Python and installs no third-party dependencies. ```python from briefcase.rag import VersionedEmbeddingPipeline, Document ``` ## Build an Index A `Document` carries an id, content, and optional metadata. The pipeline's `embedding_model` is any object with an `embed(texts) -> list[list[float]]` method; optional `name` and `version` attributes are recorded on the manifest. ```python from briefcase.rag import VersionedEmbeddingPipeline, Document class HashingEmbedder: """Trivial deterministic embedder for the example.""" name = "demo-embedder" version = "1.0" def embed(self, texts): return [[float(len(t) % 7), float(len(t) % 11)] for t in texts] pipeline = VersionedEmbeddingPipeline(embedding_model=HashingEmbedder()) documents = [ Document(id="kb-1", content="How to reset your password.", path="handbook/auth.md"), Document(id="kb-2", content="Refund policy for digital goods.", path="handbook/refunds.md"), ] batch = pipeline.create_embedding_batch(documents) manifest = pipeline.create_manifest("support_kb", [batch]) print(manifest.index_name) # "support_kb" print(manifest.document_count) # 2 print(manifest.model) # "demo-embedder" print(manifest.status) # "current" ``` `rebuild_index()` chains both steps when you just want a fresh manifest: ```python manifest = pipeline.rebuild_index("support_kb", documents) ``` ## Detect Staleness `check_invalidation()` compares the latest manifest against the current document set and model, and returns an `InvalidationReport` describing what changed. ```python # A document changed and one was added since the manifest was built. current = [ Document(id="kb-1", content="How to reset your password (updated)."), Document(id="kb-2", content="Refund policy for digital goods."), Document(id="kb-3", content="Two-factor authentication setup."), ] report = pipeline.check_invalidation("support_kb", current) print(report.is_valid) # False print(report.status) # "stale_documents" print(report.added_documents) # ["kb-3"] print(report.changed_documents) # ["kb-1"] print(report.removed_documents) # [] print(report.model_changed) # False ``` `status` is one of: `current`, `stale_documents`, `stale_model`, `stale_both`, `rebuilding`. When the index is stale, rebuild it: ```python if not report.is_valid: manifest = pipeline.rebuild_index("support_kb", current) ``` ## EmbeddingManifest The manifest is the versioning artifact. Persist it to compare future builds against it. | Field | Description | Why it matters | |-------|-------------|----------------| | `manifest_id` | Unique id for this build | Names the corpus version a retrieval ran against. | | `index_name` | Name of the index | Groups builds of the same index over time. | | `model` / `model_version` | Model that produced the embeddings | A model change invalidates the index too. | | `source_commit` | Source commit the documents came from | Ties the corpus to a versioned source. | | `document_count` | Number of documents embedded | Quick sanity check on coverage. | | `document_hashes` | `doc_id -> content_hash` at embed time | What `check_invalidation` compares against. | | `status` | Current `ManifestStatus` value | `current` vs `stale_*`. | | `manifest_hash` | Deterministic SHA-256 over the manifest content | Tamper-evident fingerprint of the whole build. | ```python print(manifest.manifest_hash) # integrity hash serialized = manifest.to_json() ``` ## Instrument Retrieval `InstrumentedRetriever` captures version provenance on each retrieved document. It is a reference implementation: the base `retrieve()` returns placeholder results (and emits a `RuntimeWarning`) so the provenance shape is clear. Override `retrieve()` with a real vector-store query — returning your own `RetrievalResult` objects — to silence the warning and use it for real. ```python from briefcase.rag import InstrumentedRetriever class KbRetriever(InstrumentedRetriever): def retrieve(self, query, top_k=5, similarity_threshold=0.7): # Query your real vector store here, then wrap hits in RetrievalResult. return super().retrieve(query, top_k, similarity_threshold) retriever = KbRetriever( vector_store=None, # your vector store client lakefs_client=None, # resolves document_version (commit SHA) repository="support_kb", ) results = retriever.retrieve("how do I reset my password?", top_k=3) for r in results: print(r.rank, r.document_id, r.score, r.document_version) ``` Each `RetrievalResult` carries `document_id`, `content`, `score`, `rank`, `document_version` (the commit SHA the document was read at), and `metadata`. ## Where this fits - Related: External Data Tracking — The same versioning idea for non-RAG upstream values. (/advanced/external-data/) - Related: Validation Engine — Validate the content a versioned retrieval returns before a decision. (/advanced/validation-engine/) - Guide: Reproducible RAG — Wire manifests and snapshots into a reproducible pipeline. (/guides/reproducible-rag/) ======================================================================== # Replay & Verify ======================================================================== ## Deterministic Replay Source: https://briefcaseai.io/features/replay/ > Re-run a saved decision to prove it still produces the same output — and catch the run that doesn't. Re-execute a saved decision against a `ReplayEngine` and compare the new output to the one you recorded — so a model swap, a prompt edit, or a dependency bump can't silently change what your system decides. > When you'd reach for this You ship a new version of the `classify_ticket` model and want to know, before it reaches users, whether it still routes the same tickets the same way. Replay the decisions you already recorded against the new build: if `outputs_match` flips to `False` on tickets that used to be stable, you have caught a regression instead of a customer noticing it. It is also how you demonstrate, after the fact, that a recorded decision is reproducible. ## How it works 1. **Persist** a `DecisionSnapshot` to a storage backend (this is the original you'll compare against). 2. **Replay** it through `ReplayEngine`, which re-executes the decision and compares the new output to the recorded one. 3. **Interpret** the `ReplayResult` — `outputs_match`, `status`, and `policy_violations` tell you whether the decision held and what to do next. ```mermaid flowchart LR A[Recorded decision
in storage] --> B[ReplayEngine.replay] B --> C{outputs_match?} C -->|True| D[Reproducible — ship] C -->|False| E[Regression — investigate] ``` > Diagram description A recorded decision in storage flows into `ReplayEngine.replay`. The engine checks whether the replayed output matches the original. If it matches, the decision is reproducible and safe to ship; if it does not match, you have a regression to investigate. ## Install ```bash pip install briefcase-ai[replay] ``` ```python from briefcase.replay import ReplayEngine, ReplayPolicy, ReplayStats ``` ## Persist, then replay ```python import briefcase from briefcase import DecisionSnapshot, Input, Output from briefcase.storage import SqliteBackend from briefcase.replay import ReplayEngine briefcase.init() decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "Reset my password", "string")) decision.add_output(Output("category", "account_access", "string")) decision.with_execution_time(12.5) backend = SqliteBackend.in_memory() decision_id = backend.save_decision(decision) engine = ReplayEngine(backend) result = engine.replay(decision_id, "strict") print(result.status) print(result.outputs_match) print(result.replay_output) print(result.execution_time_ms) print(result.policy_violations) ``` `ReplayEngine(backend)` takes a storage backend. `replay(decision_id, mode)` takes the mode explicitly. ## Strict vs. tolerant The mode decides how exactly the replayed output must match the original. Pick it from how deterministic the decision is supposed to be. | Mode | Matches when | Reach for it when | |------|--------------|-------------------| | `"strict"` | The replayed output is identical to the original | The decision is meant to be deterministic — a fixed classifier, `temperature=0`, a routing rule. Any difference is a regression. | | `"tolerant"` (default) | Minor differences are allowed | The output is free-form or sampled (a generated reply, a summary) where wording can vary but meaning should not. | `"tolerant"` is the engine default. For per-field control — exact match on `category`, a similarity threshold on a free-text `summary` — use a `ReplayPolicy` instead of choosing one mode for the whole decision. ## Interpreting a ReplayResult `ReplayResult` is what you act on. The table maps each field to the decision it should drive. | Field | What it tells you | What to do | |-------|-------------------|------------| | `status` | `"success"` or a failure status for the replay | A non-success status means the replay itself couldn't complete (decision missing, load error) — fix the harness before trusting the result. | | `outputs_match` | `True` when the replayed output matches the original | `False` on a decision that used to be stable is a regression — investigate the change you just made. | | `replay_output` | The output produced during this replay | Diff it against the original to see exactly what drifted. | | `policy_violations` | List of policy rules the replay violated | Non-empty means a specific field broke its match rule — read it to know which field and why. | | `execution_time_ms` | Replay execution time in milliseconds | A large swing can flag a performance regression even when the output still matches. | | `original_snapshot` | The recorded decision being replayed | The baseline for the comparison and your audit reference. | ## Replay with a policy A `ReplayPolicy` declares how each output field must match. Combine exact-match fields with similarity-threshold fields when one decision has both a structured label and free text. ```python from briefcase.replay import ReplayPolicy policy = ReplayPolicy("output-consistency") policy.with_exact_match("category") policy.with_similarity_threshold("summary", 0.95) result = engine.replay_with_policy(decision_id, policy, "strict") print(result.status) print(result.policy_violations) ``` Here `category` must match exactly (a misroute is unacceptable) while `summary` only has to stay 95% similar (wording may vary). ## Replay in batches Verify a whole regression set at once instead of one decision at a time. ```python results = engine.replay_batch([decision_id], "strict", 4) for result in results: print(result.status, result.outputs_match) ``` `replay_batch(decision_ids, mode, max_concurrent)` replays many decisions concurrently, bounded by `max_concurrent`. ## Aggregate replay statistics ```python from briefcase.replay import ReplayStats stats = engine.get_replay_stats([decision_id]) print(stats.total_replays) print(stats.successful_replays) print(stats.exact_matches) print(stats.success_rate) print(stats.average_execution_time_ms) ``` `success_rate` across your regression set is the one number to watch release over release: a drop means more decisions changed than you expected. ## Key classes | Class | Why it matters | |-------|----------------| | `ReplayEngine` | Loads a persisted decision from a backend and re-executes it — the entry point for every replay. | | `ReplayResult` | Outcome of a single replay; the fields you act on to catch a regression. | | `ReplayPolicy` | Per-field match rules for `replay_with_policy` so structured and free-text fields can be judged differently. | | `ReplayStats` | Aggregate counts and rates across many replays — your release-over-release health signal. | ## Where this fits Replay is the start of the **Replay & Verify** act: re-run a stored decision, then measure how far it moved over time and prove the record is intact. - Storage Adapters — Previous: persist the decisions that replay re-executes. (/features/storage-adapters/) - Drift Detection — Next: measure how consistent outputs stay across many runs. (/features/drift-detection/) - Audit Bundles — Then: bundle a decision into a tamper-evident, verifiable record. (/advanced/compliance-bundles/) - Decision Recording — Build the DecisionSnapshot objects replay compares against. (/features/decision-recording/) ## Drift Detection Source: https://briefcaseai.io/features/drift-detection/ > Catch a model whose answers are quietly getting less consistent over time — before it changes what your system decides. Measure how consistent a model's outputs are across repeated runs, so you can tell the difference between normal variation and a model that is quietly drifting. > When you'd reach for this Two weeks ago your `classify_ticket` agent labeled the same five "reset my password" tickets `account_access` every time. Today, on the same tickets, it returns `account_access` three times and `billing` twice. Nothing in your code changed — but a provider-side model update, a prompt tweak, or a creeping context shift made the agent less consistent. Drift detection turns "it feels flakier lately" into a number you can alert on. ## How it works A `DriftCalculator` takes a list of outputs sampled from the same prompt and returns `DriftMetrics`: a consistency score, an agreement rate, the consensus output, and the indices of any outliers. You feed it the outputs; it tells you how much they disagree. ```mermaid flowchart LR A[Same prompt,
many runs] --> B[Collect outputs] B --> C[DriftCalculator.calculate_drift] C --> D{consistency_score
below threshold?} D -->|No| E[Stable — keep watching] D -->|Yes| F[Drifting — emit an event] ``` > Diagram description The same prompt is run many times and the outputs are collected into a list. `DriftCalculator.calculate_drift` measures them and produces a consistency score (1.0 = identical outputs, 0.0 = all different). If the score is at or above your threshold the outputs are stable and you keep watching; if it falls below, the model is drifting and you emit an event so something downstream can react. ## Install ```bash pip install briefcase-ai[drift] ``` ```python from briefcase.drift import DriftCalculator, DriftMetrics ``` ## Calculate drift ```python from briefcase.drift import DriftCalculator calculator = DriftCalculator() outputs = ["account_access", "account_access", "billing", "account_access", "account_access"] metrics = calculator.calculate_drift(outputs) print(metrics.consistency_score) print(metrics.agreement_rate) print(metrics.drift_score) print(metrics.consensus_output) print(metrics.outliers) print(metrics.get_status(calculator)) ``` `calculate_drift(outputs)` accepts a list of outputs sampled from the same prompt and returns `DriftMetrics`. `get_status(calculator)` classifies the result (for example `"stable"` or `"drifting"`) using the calculator's threshold. ## A monitoring workflow In production you don't measure once — you measure repeatedly and react when consistency slips. The recorded decisions you already store are the source of the outputs. 1. **Record multiple runs** of the same decision through [Decision Recording](/features/decision-recording/) over a sampling window (a day, a week). 2. **Extract the outputs** for the prompt you're watching into a plain list. 3. **Measure** them with `calculate_drift` and read `consistency_score` / `get_status`. 4. **If it crosses your threshold, emit an event** so something downstream — an on-call alert, a routing change — can respond. See [Multi-Agent & Events](/features/multi-agent/). ```python import asyncio from briefcase.drift import DriftCalculator from briefcase.events import emit_drift_detected calculator = DriftCalculator() async def monitor(decision, outputs): metrics = calculator.calculate_drift(outputs) status = metrics.get_status(calculator) if status != "stable": # emit_drift_detected is a coroutine; await it in an async context await emit_drift_detected(decision, {"drift_score": metrics.drift_score}) return status # `decision` is the recorded classify_ticket decision; `outputs` are this window's labels asyncio.run(monitor({"id": "dec-1"}, outputs)) ``` ## Interpreting DriftMetrics These scores are only useful if you know what action each implies. | Field | What it tells you | What to do | |-------|-------------------|------------| | `consistency_score` | Overall consistency of the sampled outputs | Trend it window over window. A steady decline is the early warning, even before any single window looks bad. | | `agreement_rate` | Fraction of outputs that match the consensus | A falling agreement rate means more runs are disagreeing — tighten the sampling and look at the outliers. | | `drift_score` | How far the outputs diverge from one another | The value to put in an alert threshold and pass to `emit_drift_detected`. | | `consensus_output` | The most common output across samples | What the model "usually" decides — your baseline for what changed. | | `outliers` | Indices of outputs that disagree with the consensus | Index back into your list to read the exact runs that broke ranks. | Call `metrics.get_status(calculator)` to turn the scores into a status label like `"stable"` or `"drifting"` using the calculator's threshold. ## Tune the similarity threshold A stricter threshold makes near-matches count as disagreement, so small wording differences register as drift. ```python from briefcase.drift import DriftCalculator calculator = DriftCalculator() calculator.with_similarity_threshold(0.95) metrics = calculator.calculate_drift(["approve", "approve", "aprove"]) print(metrics.agreement_rate) print(metrics.get_status(calculator)) ``` ## Compare outputs over time Run the same calculator across successive sampling windows to watch consistency change — this is the two-week-drift scenario made concrete. ```python from briefcase.drift import DriftCalculator calculator = DriftCalculator() windows = [ ("week 1", ["account_access", "account_access", "billing", "account_access", "account_access"]), ("week 2", ["account_access", "billing", "billing", "account_access", "billing"]), ("week 3", ["billing", "billing", "account_access", "billing", "billing"]), ] for label, outputs in windows: metrics = calculator.calculate_drift(outputs) print(label, metrics.consistency_score, metrics.get_status(calculator)) ``` A consistency score that falls across the windows is exactly the signal to alert on. ## Key classes | Class | Why it matters | |-------|----------------| | `DriftCalculator` | Computes drift over a list of outputs; `with_similarity_threshold(threshold)` tunes how strict matching is. | | `DriftMetrics` | Consistency score, agreement rate, drift score, consensus output, and outlier indices — the numbers you alert and act on. | ## Where this fits Drift detection sits in the **Replay & Verify** act: replay proves a single decision is reproducible, drift detection proves the model stays consistent across many — and when it doesn't, an event hands control back to your governance layer. - Deterministic Replay — Previous: re-run one stored decision and prove it still matches. (/features/replay/) - Decision Recording — Records the runs whose outputs you sample for drift. (/features/decision-recording/) - Multi-Agent & Events — Emit a drift event so something downstream can react. (/features/multi-agent/) - Guardrails — Turn a drift signal into a control that gates the next action. (/advanced/guardrails/) ## Audit Bundles Source: https://briefcaseai.io/advanced/compliance-bundles/ > Seal a decision, its evidence, and the policy behind it into one portable, tamper-evident artifact. # Audit Bundles An `ExaminerBundle` packages a routing decision, the bitemporal evidence that informed it, and the policy version that was in effect — sealed with a content hash so anyone can verify, independently, that nothing was altered. > When you'd reach for this Months after your support-triage agent escalated a ticket, the outcome is challenged and you need to prove what happened. Pointing at a live database isn't enough — it has changed since, and no one can confirm it wasn't edited. An audit bundle freezes the decision, the evidence behind it, and the policy that was active into one artifact whose contents are verified by hash, so whoever reviews it doesn't have to take your word for it. ## How it works 1. **Gather the material.** A routing `decision`, the bitemporal `evidence` store it cited, and the `PolicyRegistry` it routed against. 2. **Build the bundle.** `ExaminerBundle.build` seals the decision, the referenced evidence, and the policy as-of the decision into a content-hashed artifact. 3. **Verify integrity.** `verify()` recomputes the hash and raises `BundleIntegrityError` if a byte changed. 4. **Transport and re-verify.** `to_json` / `from_json` move it anywhere; the recipient re-runs `verify()` to trust it independently. ```mermaid flowchart LR A["AgentRouter.route
AgentRoutingDecision"] --> D["ExaminerBundle.build"] B["InMemoryBitemporalStore
evidence_refs"] --> D C["PolicyRegistry
policy as-of decision"] --> D D --> E["content_hash
(SHA-256)"] E --> F["to_json / from_json"] F --> G["verify()
raises on tampering"] ``` > Diagram description `ExaminerBundle.build` combines three inputs — an `AgentRouter` decision, the referenced evidence from the bitemporal store, and the policy version as-of the decision. It seals them with a SHA-256 `content_hash`. The bundle serializes via `to_json` / `from_json`, and `verify()` recomputes the hash and raises on any tampering. ## Install ```bash pip install briefcase-ai[compliance] ``` ```python from briefcase.compliance import ExaminerBundle, BundleIntegrityError ``` ## Build a Bundle End to End This ties together an `AgentRouter` decision, an `InMemoryBitemporalStore` of evidence, and a `PolicyRegistry`, then seals and verifies the result. ```python from datetime import datetime, timezone from briefcase.bitemporal import BitemporalRecord, InMemoryBitemporalStore from briefcase.routing.policy import ( PolicyRegistry, PolicyVersion, PolicyRule, AgentRouter, ) from briefcase.compliance import ExaminerBundle, BundleIntegrityError # 1. Evidence: a bitemporal store of the facts that inform routing. evidence = InMemoryBitemporalStore() tier_record = BitemporalRecord.new( key="ticket:tier", valid_time=datetime(2026, 5, 1, tzinfo=timezone.utc), value="gold", source="crm", ) evidence.append(tier_record) # 2. Policy: a versioned routing policy in a bitemporal-backed registry. registry = PolicyRegistry() policy = PolicyVersion( policy_id="support_triage", version="2026.05.01", rules=[ PolicyRule( rule_id="gold-to-specialist", condition={"tier": "gold"}, choice="specialist_queue", rationale="gold-tier tickets route to specialist agents", ), ], default_choice="general_queue", ) registry.publish(policy, valid_from=datetime(2026, 5, 1, tzinfo=timezone.utc)) # 3. Decision: route a request, citing the evidence that informed it. router = AgentRouter( registry, use_case="support_triage", policy_id="support_triage", ) decision = router.route( {"tier": "gold"}, evidence_refs=[tier_record.record_id], ) print(decision.selected) # specialist_queue # 4. Bundle: seal decision + evidence + policy with a SHA-256 content hash. bundle = ExaminerBundle.build(decision, evidence, registry) bundle.verify() # passes — internally consistent print(bundle.content_hash[:20]) # sha256:... ``` `build` looks up the policy as-of the decision (`decision.decided_at` by default, overridable with `as_of_transaction_time=`) and pulls exactly the evidence records named in `decision.evidence_refs`. If a referenced record is missing from the store, `build` raises `BundleIntegrityError`. Evidence is sorted deterministically so the hash is stable. ## Transport and Verify Serialize to JSON, send it anywhere, re-import, and re-check the hash. This is what makes a bundle tamper-evident in transit: the recipient re-verifies on their own machine, and any byte changed since the build breaks the content hash, so silent edits cannot pass. ```python payload = bundle.to_json(indent=2) restored = ExaminerBundle.from_json(payload) restored.verify() # ok — hash recomputed from contents matches ``` ## Detect Tampering The hash covers the decision, policy, evidence, and the as-of timestamp. Change any of them and `verify()` raises. ```python restored.evidence[0]["value"] = "platinum" try: restored.verify() except BundleIntegrityError as exc: print("tamper detected:", exc) ``` > The guarantee is scoped The hash proves the bundle is internally consistent. Proving it reflects what production actually did requires storing it alongside an independently signed commit or write-once record — that step is outside this module. ## Key Classes | Symbol | Why it matters | |--------|----------------| | `ExaminerBundle.build(decision, evidence_store, policy_registry, *, as_of_transaction_time=None, metadata=None)` | Seals the decision, its evidence, and the as-of policy into one artifact. | | `ExaminerBundle.verify()` | Recomputes the hash; raises `BundleIntegrityError` on any change. | | `ExaminerBundle.to_json()` / `from_json()` | Move the bundle between systems without losing the integrity check. | | `ExaminerBundle.content_hash` | SHA-256 digest — the tamper-evidence anchor. | | `BundleIntegrityError` | Raised by `build` (missing evidence) and `verify` (hash mismatch). | Bundles are built from `AgentRoutingDecision` records (see [Versioned Routing Policy](/advanced/versioned-routing-policy/)) and `BitemporalRecord` evidence (see [Bitemporal Storage](/advanced/bitemporal-storage/)). ## Where this fits - Related: Bitemporal Storage — The append-only store the bundle reads its as-of-then evidence from. (/advanced/bitemporal-storage/) - Related: Versioned Routing Policy — How the policy version sealed into the bundle is published and resolved. (/advanced/versioned-routing-policy/) - Guide: Audit a Decision — Walk a single decision from record to verified bundle. (/guides/audit-a-decision/) ======================================================================== # Operate ======================================================================== ## Cost Tracking Source: https://briefcaseai.io/features/cost-tracking/ > Estimate token costs across platforms and tiers, bill prompt caching, compare models, project monthly spend, and watch budgets — from the data your decisions already carry. Estimate what a decision costs, price it for the platform and tier you actually run on, account for prompt caching, compare models, and check spend against a budget — all from the token counts your decisions already carry. The cost types ship in the **base package** (no extra). > When you'd reach for this Your support-triage agent runs thousands of `classify_ticket` calls a day. Before you ship a model swap — or move the workload to a batch tier on another platform — you want the real number, and an alert before the monthly bill blows past budget. ## Install ```bash pip install briefcase-ai ``` `briefcase.cost` is in the base package — no extra to install. ## Estimate a cost `CostCalculator.estimate_cost()` takes a model name and token counts and returns a `CostEstimate` with separate input and output costs plus a total. ```python from briefcase.cost import CostCalculator calc = CostCalculator() estimate = calc.estimate_cost("claude-haiku-4-5", input_tokens=1000, output_tokens=500) print(estimate.total_cost) # 0.0035 print(estimate.input_cost) # 0.001 print(estimate.output_cost) # 0.0025 print(estimate.currency) # "USD" ``` ## Price any platform: rate cards By default, estimates use first-party standard list price. A **rate card** prices the same call for the platform, tier, and modifiers you actually run on. It is a forgiving `platform × tier × modifiers` string — pass it as the keyword-only `rate_card`. ```python calc = CostCalculator() # Same workload, two ways to buy it standard = calc.estimate_cost("claude-opus-4-8", 500_000, 50_000) batch = calc.estimate_cost("claude-opus-4-8", 500_000, 50_000, rate_card="bedrock:batch") print(standard.total_cost) # 3.75 print(batch.total_cost) # 1.875 — batch tier on AWS Bedrock, ~0.5x # List representative cards print(calc.get_available_rate_cards()) # ['standard', 'batch', 'cached', 'priority', 'flex', 'first_party:fast', # 'bedrock:standard', 'bedrock:batch', 'vertex:standard', 'azure:standard', ...] ``` | Part | Values | Effect | | --- | --- | --- | | **Platform** | `first_party` · `bedrock` · `vertex` · `azure` | Selects the provider's price sheet | | **Tier** | `standard` · `batch` · `cached` · `priority` · `flex` | `batch` / `flex` ≈ 0.5×; `priority` is a premium | | **Modifiers** | `regional` · `us` · `fast` | `regional` / `us` add ~10%; `fast` is a premium base rate | Cards are order-independent and separator-tolerant, so `"bedrock:batch"`, `"batch + bedrock"`, and `"vertex / standard, us"` all parse. Omitting `rate_card` (or passing `"standard"`) keeps the previous first-party standard pricing. > Long-context pricing Above ~200K input tokens, tiered long-context rates apply, so a large call's total is not a flat multiple of a small one. Always estimate with representative token counts. ## Prompt-cache billing Prompt caching changes the math: cache reads are billed at a fraction of the input rate. Pass cache-token counts (all keyword-only) and read the `cache_cost` on the estimate. ```python estimate = calc.estimate_cost( "claude-opus-4-8", input_tokens=0, output_tokens=1_000, cache_read_tokens=100_000, # also: cache_write_5m_tokens, cache_write_1h_tokens ) print(estimate.cache_cost) # 0.05 — 100K cache reads at 0.1x of the input rate print(estimate.total_cost) # 0.075 — output + cache ``` **Why it matters:** a cache-heavy agent's bill is dominated by cache reads at 0.1× input. Counting those tokens at full input price overstates the cost. ## Compare models `compare_models()` estimates the same workload across two models so you can see the difference before switching. ```python comparison = calc.compare_models( "claude-haiku-4-5", "gpt-5.4-mini", input_tokens=1000, output_tokens=500 ) print(comparison["cheaper_model"]) # "gpt-5.4-mini" print(comparison["savings"]) # 0.0005 — absolute, in USD print(comparison["percent_difference"]) # 14.29 ``` `compare_models()` also accepts a `rate_card` so you can compare like-for-like across tiers or platforms. ## Project monthly spend `project_monthly_cost()` extrapolates a daily workload to a monthly estimate. ```python monthly = calc.project_monthly_cost( "claude-haiku-4-5", daily_input_tokens=100_000, daily_output_tokens=50_000, days_per_month=30, ) print(monthly) # 10.5 — a float, the projected monthly total in USD ``` ## Check a budget `check_budget()` compares current spend to a budget and returns a `BudgetStatus` with an alert level you can act on. ```python status = calc.check_budget(current_spend=85.0, budget_limit=100.0) print(status.status) # "warning" print(status.percent_used) # 85.0 print(status.remaining_budget) # 15.0 print(status.alert_message) ``` > Turn a budget breach into an action Pair a `"critical"` or `"exceeded"` status with [`emit`](/features/multi-agent/) to fire an event, or with a [guardrail](/advanced/guardrails/) that denies further calls until spend resets. ## Supported models The default pricing table covers the current frontier — Anthropic Claude 4.x, OpenAI GPT-5.x, and Google Gemini 2.5–3.x — alongside every previously priced model. See the [Changelog](/resources/changelog/) for the full list added in 3.2.1. ## How cost tracking fits ```mermaid flowchart LR A["Decision record"] --> B["token counts"] B --> C["CostCalculator"] R["rate_card
(platform × tier)"] --> C C --> D["CostEstimate
(+ cache_cost)"] C --> E["BudgetStatus"] ``` > Diagram description A decision record provides token counts, which feed the CostCalculator together with an optional rate card (platform and tier). The calculator produces a CostEstimate — per-call input, output, and cache costs — and a BudgetStatus comparing spend against a budget. ## Key classes | Class / method | Returns | Purpose | | --- | --- | --- | | `CostCalculator.estimate_cost(model, in, out, *, rate_card=None, cache_read_tokens=None, …)` | `CostEstimate` | Per-call cost, optionally for a platform/tier and with cache tokens | | `CostCalculator.estimate_cost_from_text(model, text, est_out, *, rate_card=None)` | `CostEstimate` | Estimate from text instead of token counts | | `CostCalculator.compare_models(a, b, in, out, *, rate_card=None)` | `dict` | Cost delta between two models (`cheaper_model`, `savings`, `percent_difference`) | | `CostCalculator.project_monthly_cost(model, daily_in, daily_out, days, *, rate_card=None)` | `float` | Projected monthly total from daily volume | | `CostCalculator.check_budget(spend, limit)` | `BudgetStatus` | Spend vs. budget with alert level | | `CostCalculator.get_available_rate_cards()` | `list[str]` | Representative rate-card identifiers | | `CostEstimate` | — | `input_cost`, `output_cost`, `cache_cost`, `total_cost`, `currency` | | `BudgetStatus` | — | `status`, `percent_used`, `remaining_budget`, `alert_message` | The `rate_card` and cache-token parameters are **keyword-only**; existing positional calls behave exactly as before. ## Where this fits Cost Tracking is part of the **Operate** act: once decisions are flowing, watch what they cost. - Drift Detection — Catch when model behavior shifts across repeated runs. (/features/drift-detection/) - Multi-Agent & Events — Correlate decisions across a workflow and emit events. (/features/multi-agent/) ## OpenTelemetry Source: https://briefcaseai.io/features/opentelemetry/ > Put governed decisions on the same trace timeline as the rest of your stack. Trace your decisions as OpenTelemetry spans so they sit on the same timeline as every other service in your stack. > When you'd reach for this Your support-triage agent is one hop in a larger request: an API gateway, a retrieval service, then `classify_ticket`. When a ticket gets mis-routed you want to follow the whole request through your existing tracing backend and see exactly where the decision sat in that timeline. A Briefcase tracer puts the decision span inline with every other span, so you debug latency and routing in one view. ## Spans and decision records are complementary These two layers answer different questions, and you usually want both. | Layer | Answers | Lives where | |-------|---------|-------------| | OTel span (timeline) | *When* did it run, how long, in what order, alongside which other services | Your tracing backend | | Briefcase decision record (governance context) | *Why* this output — inputs, outputs, confidence, timing, full reproducible context | Your [exporter](/features/exporters/) or [store](/features/storage-adapters/) | The span is a lightweight timeline marker; the decision record is the deep governance context. Both flow out through Briefcase, so a span you find in your tracing UI can be matched to a decision record you can [replay](/features/replay/) and verify. ```mermaid flowchart LR A[classify_ticket] --> B[Briefcase] B --> C[OTel span: timeline] B --> D[Decision record: governance context] C --> E[Tracing backend] D --> F[Exporter / store] ``` > Diagram description A decision flows into Briefcase, which emits two complementary outputs: an OpenTelemetry span that goes to your tracing backend, and a decision record that goes to your exporter or store. The span gives you the timeline; the record gives you the governance context behind the decision. ## Install ```bash pip install briefcase-ai[otel] ``` The `otel` extra installs `opentelemetry-api` and `opentelemetry-sdk`. ## Without OTel vs with OTel _Without OTel_ Decisions are still captured and exported — you can inspect them through an exporter. But there is no span on your distributed trace, so the decision is invisible to your tracing UI and you cannot see where it sat relative to upstream and downstream services. ```python from briefcase import capture @capture(decision_type="ticket_triage") def classify_ticket(text): # call your model here return "account_access" classify_ticket("reset my password") ``` _With OTel_ Open a span around the decision and it becomes part of the trace, correlated with the rest of your services. Spans flow through whatever OTel `TracerProvider` your application has configured. ```python from briefcase import capture from briefcase.otel import get_tracer tracer = get_tracer("briefcase") @capture(decision_type="ticket_triage") def classify_ticket(text): with tracer.start_as_current_span("classify_ticket") as span: span.set_attribute("briefcase.decision_type", "ticket_triage") category = "account_access" # call your model here span.set_attribute("briefcase.outcome", category) return category classify_ticket("reset my password") ``` ## How it works 1. **Get a tracer** — `get_tracer("briefcase")` returns a standard OpenTelemetry tracer. 2. **Open a span** around the decision and attach attributes that describe it. 3. **Correlate** — propagate trace context to downstream services so spans join one trace. 4. **Inspect** in your tracing backend, then match the decision span to the full decision record an exporter shipped. ## Get a tracer `get_tracer()` returns a standard OpenTelemetry tracer. Use it to open spans around the work you want to trace. ```python from briefcase.otel import get_tracer tracer = get_tracer("briefcase") with tracer.start_as_current_span("classify_ticket") as span: span.set_attribute("briefcase.decision_type", "ticket_triage") category = "account_access" span.set_attribute("briefcase.outcome", category) ``` `get_tracer(name="briefcase")` is the only public symbol in `briefcase.otel`. ## Semantic conventions Briefcase ships span-attribute conventions under `briefcase.semantic_conventions`. Each submodule defines the attribute keys for one subsystem, so the attributes you emit are consistent across services instead of ad-hoc strings. Two submodules you will reach for most on the triage path: ```python from briefcase.semantic_conventions import workflow, rag # Tag the retrieval span on the triage path with RAG attribute keys with tracer.start_as_current_span("retrieve-ticket-history") as span: # use rag.* keys for the retrieval step, workflow.* keys for the workflow it runs in ... ``` The full set of submodules: - `briefcase.semantic_conventions.lakefs` - `briefcase.semantic_conventions.workflow` - `briefcase.semantic_conventions.rag` - `briefcase.semantic_conventions.external_data` - `briefcase.semantic_conventions.cowork` - `briefcase.semantic_conventions.agent_state` - `briefcase.semantic_conventions.bitemporal` - `briefcase.semantic_conventions.routing_policy` - `briefcase.semantic_conventions.validation` Import the module for the subsystem you are instrumenting and use its attribute keys when setting span attributes. The `workflow` keys line up with the [multi-agent correlation](/features/multi-agent/) surface. ## Ship decisions to an external observability sink The span describes the work; the **decision record** carries the captured inputs, outputs, and timing. To forward those records into an external observability sink you already operate — a log aggregator, a message queue, an analytics pipeline — subclass `BaseExporter` and implement its three async methods. > When you'd reach for this Your team already routes operational events through a central collector. Rather than build a second pipeline for triage decisions, a small custom exporter posts each `classify_ticket` record to that same sink, right alongside the spans your tracer emits. ```python from typing import Any from briefcase import setup, capture from briefcase.exporters import BaseExporter class SinkExporter(BaseExporter): async def export(self, decision: Any) -> bool: # ship the decision record to your external observability sink here # e.g. post to a collector, enqueue, or forward to a log pipeline return True async def flush(self) -> None: pass async def close(self) -> None: pass setup(exporter=SinkExporter()) @capture(decision_type="ticket_triage") def classify_ticket(text): # call your model here — span streams to your tracer, record streams to the sink return "account_access" classify_ticket("reset my password") ``` For the stock exporters (`ConsoleExporter`, `JSONLFileExporter`, `MemoryExporter`) and the one-line `briefcase.observe()` setup, see [Exporters](/features/exporters/). ## Key symbols - `briefcase.otel.get_tracer(name="briefcase")` — return an OpenTelemetry tracer. - `briefcase.exporters.BaseExporter` — base class for custom exporters; implement `export`, `flush`, `close`. - `briefcase.semantic_conventions.*` — attribute-key modules for each subsystem. ## Best practices 1. **Sample high-volume paths** — use sampling so a busy triage queue does not overwhelm your backend. 2. **Set resource attributes** — identify the service and environment so spans are easy to filter. 3. **Use the semantic-convention keys** — consistent attribute names make spans queryable across services. 4. **Pair spans with an exporter** — the span gives you the timeline, the exported record gives you the governance context. ## Where this fits OpenTelemetry is part of **operating** a governed system in production: the timeline view that sits next to your cost and event signals. - Cost Tracking — Roll up the model spend that runs alongside these traced decisions. (/features/cost-tracking/) - Multi-Agent & Events — Next: correlate decisions across agents and react to events in real time. (/features/multi-agent/) ## Multi-Agent & Events Source: https://briefcaseai.io/features/multi-agent/ > Tie every decision in a multi-step agent pipeline to one workflow and react to them as they happen. Stitch the decisions from a multi-step agent pipeline together under one workflow, and emit events you can react to as those decisions happen. > When you'd reach for this Your support-triage pipeline is three agents: one retrieves prior tickets, one runs `classify_ticket`, one drafts a reply. When a customer escalates, you need to see all three decisions as one chain — not three unrelated records. Workflow correlation gives them a shared `workflow_id`, and the event surface lets an on-call alert fire the moment a classification comes back low-confidence. ## Why correlation matters _Before correlation_ Each agent records its decision independently. Reviewing an escalation means hunting for three separate records and guessing which retrieve, classify, and draft belong to the same ticket. The chain of reasoning is real but invisible. _After correlation_ All three decisions share one `workflow_id`. The steps trace as a single chain, so a reviewer can reconstruct exactly what context the classifier saw and what the drafter did with it — accountability across agent boundaries, not just within one agent. ## The two surfaces | Surface | What it does | Reach for it when | |---------|--------------|-------------------| | Correlation (`briefcase.correlation`) | Groups decisions under a shared workflow and propagates context across boundaries | You want the pipeline to read as one accountable chain | | Events (`briefcase.events`) | Emits typed signals (`BriefcaseEvent`) you can route as decisions happen | You want to react in real time — alert, page, or trigger follow-up | ## Install ```bash pip install briefcase-ai[correlation] pip install briefcase-ai[events] ``` Neither the `correlation` nor `events` extra installs third-party dependencies. --- ## Correlation ### How a workflow threads the pipeline `briefcase_workflow(name, client)` is a context manager. Every agent registered inside it shares the same `workflow_id`, so the steps of a pipeline trace as one unit. 1. **Open a workflow** — `briefcase_workflow("support_pipeline", client)` gives every agent inside it one shared `workflow_id`. 2. **Retrieve** — the retrieval agent registers under that workflow. 3. **Classify** — `classify_ticket`'s agent registers under the same workflow. 4. **Decide** — the drafting agent registers under the same workflow. 5. **Review** — the three registered agents read back as one retrieve -> classify -> decide chain. ```python from unittest.mock import Mock from briefcase.correlation import briefcase_workflow def retrieve(query): return ["doc-12", "doc-44"] def classify_ticket(docs): # call your model here return "account_access" def decide(category): return "route_to_support" client = Mock() # your Briefcase client; a Mock keeps this example self-contained with briefcase_workflow("support_pipeline", client) as workflow: docs = retrieve("reset my password") workflow.register_agent("retriever-1", "retrieve") category = classify_ticket(docs) workflow.register_agent("classifier-1", "classify") action = decide(category) workflow.register_agent("decider-1", "decide") print(workflow.workflow_id) # all three share this id print(action) # route_to_support ``` ### Agent registration `workflow.register_agent(agent_id, agent_type)` records each agent in the workflow, so the chain knows who made which decision. ```python workflow.register_agent("classifier-1", "classify") ``` ### Reading the active workflow `get_current_workflow()` returns the workflow bound to the current context, or `None` outside a workflow block — so you can read it anywhere inside the chain without threading it through call signatures. ```python from unittest.mock import Mock from briefcase.correlation import briefcase_workflow, get_current_workflow client = Mock() with briefcase_workflow("support_pipeline", client) as workflow: assert get_current_workflow() is workflow assert get_current_workflow() is None ``` ### Propagating context across process boundaries When an agent lives in another service, carry the trace context with the request so the downstream decision joins the same workflow trace. Inject into outbound headers on the producer side; extract from inbound headers on the consumer side. ```python from briefcase.correlation import ( TraceContextCarrier, inject_trace_context, extract_trace_context, ) # Producer service: inject the active trace context into outbound headers. headers = inject_trace_context({}) # ... send headers to the downstream agent ... # Consumer service: restore the trace context from inbound headers. context = extract_trace_context(headers) # TraceContextCarrier offers the same inject/extract pair as a class. carrier = TraceContextCarrier() outbound = carrier.inject() TraceContextCarrier.extract(outbound) ``` `inject_trace_context()` reads the active span context; run under an [OpenTelemetry](/features/opentelemetry/) tracer for the headers to carry `traceparent`. --- ## Events Events are typed signals you emit as decisions happen, so you can react in real time instead of polling the log. The emit functions are **coroutines** — `await` them inside an async context. ### Emitting an event Construct a `BriefcaseEvent` and `await emit(...)`. The `idempotency_key` lets downstream consumers deduplicate retries. ```python import asyncio from briefcase.events import ( BriefcaseEvent, emit, emit_low_confidence, emit_drift_detected, ) class Decision: def __init__(self, decision_id: str): self.decision_id = decision_id async def main() -> None: decision = Decision("dec-91f2") event = BriefcaseEvent( event_type="decision.recorded", decision_id=decision.decision_id, payload={"category": "billing", "confidence": 0.62}, idempotency_key="dec-91f2:recorded", ) await emit(event) # The classifier came back unsure — fires only when confidence is below threshold. await emit_low_confidence(decision, confidence=0.62, threshold=0.75) # A monitored decision drifted — fires when repeated runs disagree. await emit_drift_detected(decision, details={"agreement_rate": 0.4}) asyncio.run(main()) ``` Set `webhook_url`, `webhook_secret`, `events`, or `event_bus` on `setup()` to route emitted events to a destination. ### Event functions | Function | Fires for | |----------|-----------| | `emit(event)` | Any `BriefcaseEvent` you construct | | `emit_low_confidence(decision, confidence, threshold)` | A decision below a confidence threshold | | `emit_drift_detected(decision, details=None)` | Disagreement across repeated runs | `emit_low_confidence` pairs naturally with the confidence score on a `classify_ticket` decision; `emit_drift_detected` is the live counterpart to [drift detection](/features/drift-detection/). ## Key symbols - `briefcase_workflow(name, client)` — context manager yielding the workflow context. - `workflow.workflow_id` / `workflow.register_agent(agent_id, agent_type)` — the shared id and agent registration. - `get_current_workflow()` — the active workflow, or `None` outside a block. - `inject_trace_context` / `extract_trace_context` / `TraceContextCarrier` — carry context across boundaries. - `BriefcaseEvent`, `emit`, `emit_low_confidence`, `emit_drift_detected` — the event surface (coroutines). ## Best practices 1. **Use descriptive workflow names** — `support_pipeline` beats `wf-7` when you review later. 2. **Register every agent** — unregistered agents leave gaps in the chain. 3. **Propagate context across boundaries** — otherwise a remote agent starts its own trace. 4. **Emit events at decision points** — low confidence and drift are the signals worth acting on first. ## Where this fits Correlation and events are part of **operating** a governed system: they keep multi-agent runs accountable and let you react as decisions happen. - OpenTelemetry — The trace timeline that workflow context propagates across. (/features/opentelemetry/) - Drift Detection — The monitoring behind emit_drift_detected — verify decisions hold over time. (/features/drift-detection/) ======================================================================== # Artifact Graph & Evaluate ======================================================================== ## Overview Source: https://briefcaseai.io/evaluate/runs/ > How oci-bai tracks every model, fine-tune, and dataset you push — the commit model, provenance, search, and how to compare versions. **oci-bai** is an OCI-compatible artifact graph CLI and dashboard. Every image pushed through the gateway creates a **commit** in the graph — recording the manifest, files, derivation edges to parent images, and the full audit trail. The graph is the source of truth; the backing registry is just storage. Full documentation: **[oci.briefcaseai.io](https://oci.briefcaseai.io)** > Private beta oci-bai is in private beta. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access. ## The commit model A commit is defined by four things: | # | Part | What it is | |---|------|-----------| | 1 | **Ref** | A tag or digest that names this version (e.g. `v1`, `cartpole`) | | 2 | **Family** | The repository the image belongs to (e.g. `rl-gym-env`) | | 3 | **Derivation** | The parent commit(s) inferred from shared layers, or declared in a cohort push | | 4 | **Manifest** | The full OCI manifest, files, and SBOM recorded at push time | ## Lifecycle 1. ### Push through the gateway Tag and push with any OCI-compatible tool. Repositories are created automatically. ```bash docker tag my-image:latest localhost:8080/my-repo:v1 docker push localhost:8080/my-repo:v1 ``` 2. ### Confirm the commit ```bash oci-bai --repo my-repo log v1 ``` 3. ### Inspect provenance ```bash oci-bai --repo my-repo provenance v2 oci-bai --repo my-repo whodepends base-v1 ``` 4. ### Compare versions ```bash oci-bai --repo my-repo diff base v1 --depth package oci-bai --repo my-repo diff base v1 --depth bench ``` 5. ### Search the catalog ```bash oci-bai search "format==safetensors cuda>=12.4" oci-bai search "gymnasium==1.1.0 arch==arm64" ``` ## Run modes for evaluation Two commands support active evaluation: | Command | What it does | |---------|-------------| | `oci-bai attach-bench ` | Attach a benchmark verdict to a commit | | `oci-bai --repo diff --depth bench` | Compare verdict deltas between two versions | | `oci-bai hunt ` | Host a verdictml HuntEnv episode against the candidate | ## Where this fits - Quick start — Push your first image and run your first search in under five minutes. (/evaluate/quickstart/) - CLI reference — Every oci-bai command and its flags. (/evaluate/cli/) - Full docs — The complete oci-bai documentation at oci.briefcaseai.io. (https://oci.briefcaseai.io) ## Quick Start Source: https://briefcaseai.io/evaluate/quickstart/ > Push your first tracked image, explore the commit graph, and run your first search in under five minutes. Push your first tracked image in under five minutes. All you need is Docker (or `crane`) and a running oci-bai stack. > Private beta oci-bai is in private beta. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access before following these steps. ## Prerequisites - Stack running — `make up` (gateway on `:8080`) - Docker or `crane` for pushing images - The `oci-bai` CLI on your `PATH` Repositories are created automatically on first push. ## Steps 1. ### Push an image through the gateway Tag any image for the gateway and push. Derivation from shared layers is inferred automatically. ```bash docker tag my-image:latest localhost:8080/my-repo:v1 docker push localhost:8080/my-repo:v1 ``` Using `crane`? `crane push my-image.tar localhost:8080/my-repo:v1 --insecure` 2. ### Confirm the commit was recorded ```bash oci-bai --repo my-repo log v1 ``` You should see the commit id, manifest digest, and who pushed it. 3. ### Push a derived image Push a second image built from the first. oci-bai links them automatically. ```bash docker push localhost:8080/my-repo:v2 oci-bai --repo my-repo provenance v2 ``` 4. ### Browse in the dashboard Open the dashboard and select **my-repo**. - **Versions** — the full commit graph - **Provenance** — derivation tree for any version - **Compare** — diff any two versions The hosted dashboard is in early preview. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) for access. 5. ### Search ```bash oci-bai search "cuda>=12.4" oci-bai search "format==safetensors" oci-bai search "numpy==1.26.4 arch==arm64" ``` ## Next steps - Search — Full query syntax including model format filters. (/evaluate/cli/#search) - Provenance & lineage — Derivation trees and the weight-sharing metric. (https://oci.briefcaseai.io/provenance) - Full CLI reference — Every oci-bai command and its flags. (/evaluate/cli/) ## CLI Reference Source: https://briefcaseai.io/evaluate/cli/ > Every oci-bai command and its flags. Every `oci-bai` command and its flags. Run `oci-bai --help` for the installed version. ## Installed help output ``` Execution-native, OCI-compatible artifact-graph CLI (§14) Usage: oci-bai [OPTIONS] Commands: init Initialize an image repo push Push, CDC, commit, tag log Commit DAG for this family op Operation-log commands branch List image families with divergent heads bookmark Bookmark commands undo Invert the last op-log entry fsck Graph + chunk-store integrity gc Mark-and-sweep GC diff Diff two refs at a depth search Search the catalog mount Lazy-snapshotter mount plan show Manifest + SBOM + provenance + size by depth explain Derivation + composition + provenance + drift resolve Path provenance provenance Full derivation tree whodepends Reverse derivation walk referrers List referrers of a subject manifest lineage Lineage commands compose Environment assembly on the graph plane checkout Sparse checkout cohort Transactional N-image cohort push optimize Emit Rationale records attach-bench Attach a benchmark verdict hunt Host a verdictml HuntEnv episode help Print this message or the help of the given subcommand(s) Options: --server oci-jj-server gRPC endpoint --repo Repository (image family) the command targets -h, --help Print help -V, --version Print version ``` ## Global flags | Flag | Description | |------|-------------| | `--server URL` | Server endpoint (default `http://127.0.0.1:50051`) | | `--repo NAME` | Repository to target | | `--help / -h` | Show help | | `--version / -V` | Print version | Set defaults in your shell: `export OCI_BAI_SERVER=http://localhost:50051` and `export OCI_BAI_REPO=my-repo` ## Example output These examples use a small demo family named `rl-gym-env`. ```bash oci-bai --repo rl-gym-env log cartpole ``` ``` commit ab7c49d0e6f3 cartpole parent f84a2d7e91b0 cuda-base manifest sha256:2ef7a63bbd4c70f58fc4e7b42d228a42f0ca9288e3fa4b6f9ab173079a8f284d builder trainer@briefcase message cartpole policy image with safetensors checkpoint ``` ```bash oci-bai search "format==safetensors cuda>=12.4" ``` ``` rank repo ref format manifest 1 rl-gym-env acrobot safetensors sha256:dd21ce69ba84... 2 rl-gym-env cartpole safetensors sha256:2ef7a63bbd4c... ``` ```bash oci-bai --repo rl-gym-env provenance acrobot ``` ``` acrobot derives from cartpole cartpole derives from cuda-base cuda-base shared CUDA 12.4 runtime base ``` ```bash oci-bai --repo rl-gym-env diff cuda-base cartpole --depth package ``` ``` @@ packages + gymnasium 1.1.0 + numpy 1.26.4 ~ torch 2.3.1 -> 2.4.0 ``` ## log ``` oci-bai log ``` Commit history for a ref. ## diff ``` oci-bai diff [--depth ] ``` Compare two versions. | `--depth` value | What it shows | |-----------------|---------------| | `bytes` | Byte-level diff | | `file` | File-level diff (default) | | `package` | Package changes | | `semantic` | Config changes (entrypoint, CUDA, OS) | | `imports` | Runtime imports when telemetry is available | | `loaded-libs` | Loaded shared libraries when telemetry is available | | `syscalls` | Syscall changes when telemetry is available | | `bench` | Benchmark verdict deltas when attached | ``` Usage: oci-bai diff [OPTIONS] Options: --depth [default: file] [possible values: bytes, file, package, semantic, imports, loaded-libs, syscalls, bench] --server oci-jj-server gRPC endpoint --repo Repository (image family) the command targets -h, --help Print help ``` ## search ``` oci-bai search ``` Search the catalog. See the [Search guide at oci.briefcaseai.io](https://oci.briefcaseai.io/search) for the full syntax. ``` Usage: oci-bai search [OPTIONS] Options: --semantic --server oci-jj-server gRPC endpoint --repo Repository (image family) the command targets -h, --help Print help ``` **Built-in facets:** `cuda`, `python`, `os`, `arch`, `format`, `model`. Any name not in this list is treated as a package name. ```bash oci-bai search "cuda>=12.4 format==safetensors arch==arm64" oci-bai search "gymnasium==1.1.0" oci-bai search "model==true" ``` ## provenance ``` oci-bai provenance ``` Full derivation tree for a version. ``` Usage: oci-bai provenance [OPTIONS] Options: --server oci-jj-server gRPC endpoint --repo Repository (image family) the command targets -h, --help Print help ``` ## whodepends ``` oci-bai whodepends ``` Every version that descends from this one. ## checkout ``` oci-bai checkout [--paths ] [--dest ] ``` Fetch specific files without pulling the full image. | Flag | Description | |------|-------------| | `--paths a,b` | Comma-separated path prefixes to fetch | | `--dest dir` | Materialize files here (requires server-side staging) | ## cohort push ``` oci-bai cohort push ``` Push multiple related images atomically. Shared content is uploaded once regardless of how many members reference it. Reads `cohort.json` from the directory. ``` Usage: oci-bai cohort push [OPTIONS] Options: --server oci-jj-server gRPC endpoint --repo Repository (image family) the command targets -h, --help Print help ``` Push each member's image through the gateway before running `cohort push`. ## undo ``` oci-bai undo [--op ] ``` Revert the last push, or a specific operation. History is append-only — undo is non-destructive. ## referrers ``` oci-bai referrers [--mine] ``` List referrers of an image. `--mine` shows your own private referrers. ## fsck ``` oci-bai fsck ``` Verify that every file in a version can be fully reconstructed from stored data. ## How It Works Source: https://briefcaseai.io/evaluate/architecture/ > The gateway, artifact graph, and how pushes become searchable, versioned, deduplicated commits. oci-bai has two pieces: a **gateway** and an **artifact graph server**. Every OCI push goes through the gateway; the server builds and serves the graph. ## The push pipeline ``` docker push localhost:8080/my-repo:v1 │ ▼ OCI Gateway ← any docker push / crane push │ records manifest, files, derivation edges ▼ Artifact Graph ← commit DAG, dedup store, SBOM index │ indexes packages, weight format, CUDA version ▼ Search / CLI ← oci-bai log / diff / search / provenance ``` 1. **Gateway** receives the push, computes content-addressed chunks, and calls the graph server. 2. **Graph server** writes a commit: manifest digest, derivation links (inferred from shared layers or declared via cohort push), and the file tree. 3. **Indexers** run asynchronously: package extraction, model-weight detection (safetensors, GGUF), telemetry (imports, syscalls) when available. 4. **CLI** reads from the graph server over gRPC — no separate data store. ## Deduplication When you push fifty fine-tunes of the same base model, only the novel content-addressed chunks are stored. The weight-sharing percentage the CLI and dashboard report is derived from how many chunks the candidate shares with its parent commit. | Shared % | Meaning | |----------|---------| | ≥ 90% | Same checkpoint — re-uploaded or tag change | | 50–90% | Partial fine-tune — adapter or head-only update | | 5–50% | Significant retraining | | < 5% | Full fine-tune — all weights retrained | ## Engine seam The CLI speaks to the graph server over gRPC with server reflection — no generated stubs required. Set `OCI_BAI_SERVER` to point at a non-local instance. ## Delegation mapping | CLI command | What runs | |-------------|-----------| | `oci-bai log ` | Graph server: commit DAG walk | | `oci-bai diff --depth bench` | Graph server: benchmark verdict delta | | `oci-bai provenance ` | Graph server: derivation tree | | `oci-bai search ` | Graph server: catalog query | | `oci-bai whodepends ` | Graph server: reverse derivation walk | | `oci-bai attach-bench ` | Graph server: write benchmark verdict record | | `oci-bai hunt ` | verdictml HuntEnv episode | ======================================================================== # Guides ======================================================================== ## Audit a Decision End-to-End Source: https://briefcaseai.io/guides/audit-a-decision/ > Follow one support-triage decision from the evidence it depended on, through the policy that governed it, to a sealed, tamper-evident record you can verify months later. This guide threads the whole Briefcase lifecycle through a single decision: a triage agent routes a support ticket, and six months later someone asks you to **prove what happened**. By the end you will have a sealed artifact that reconstructs the decision, the evidence it used, and the exact policy version in effect — and verifies it was not altered. > When you A decision is challenged in an incident review. You need to show the routing choice, the data it relied on, and the rule that governed it *as of the day it ran* — not today's data or today's policy. Install the extras used here: ```bash pip install briefcase-ai[routing,bitemporal,compliance] ``` 1. ### Record the evidence the decision depends on Evidence lives in an append-only bitemporal store. Each record carries **valid time** (when the fact was true) and **transaction time** (when you learned it), so corrections never overwrite history. ```python from datetime import datetime, timezone from briefcase.bitemporal import BitemporalRecord, InMemoryBitemporalStore store = InMemoryBitemporalStore() now = datetime.now(timezone.utc) evidence = BitemporalRecord.new( key="config:max_retries", valid_time=now, value=3, source="config-service", ) store.append(evidence) ``` 2. ### Define the policy that governs the decision A `PolicyVersion` is a set of rules published to a registry. Publishing is an **append**, so "the policy as of date X" always returns exactly the rules that were active then. ```python from briefcase.routing import PolicyRegistry, PolicyVersion, PolicyRule registry = PolicyRegistry() policy = PolicyVersion( policy_id="ticket-routing", version="1", rules=[ PolicyRule( rule_id="high-priority", condition={"priority": "high"}, choice="senior-queue", rationale="High-priority tickets go to the senior queue.", ), ], default_choice="standard-queue", ) registry.publish(policy, valid_from=now) ``` 3. ### Make the routing decision The `AgentRouter` evaluates the context against the active policy and returns a decision that references the evidence it used. ```python from briefcase.routing import AgentRouter router = AgentRouter(registry, use_case="ticket-routing", policy_id="ticket-routing") decision = router.route({"priority": "high"}, evidence_refs=[evidence.record_id]) print(decision.selected) # "senior-queue" print(decision.matched_rule_id) # "high-priority" print(decision.policy_version) # "1" ``` 4. ### Seal a tamper-evident bundle An `ExaminerBundle` joins the decision, the bitemporal evidence, and the policy version in effect, then seals the whole thing with a SHA-256 content hash. `verify()` raises if a single byte changes. ```python from briefcase.compliance import ExaminerBundle, BundleIntegrityError bundle = ExaminerBundle.build(decision, store, registry) print(bundle.content_hash) # "sha256:..." bundle.verify() # raises BundleIntegrityError if tampered ``` 5. ### Transport it and re-verify The bundle serializes to JSON, so it can leave your system and be checked anywhere — the hash makes it self-validating. ```python restored = ExaminerBundle.from_json(bundle.to_json(indent=2)) restored.verify() ``` 6. ### Reconstruct what was known at the time Months later, the underlying config may have changed. Because evidence is append-only, you can reconstruct the store **as of** the decision's moment and read exactly what it saw. ```python from briefcase.bitemporal import AsOfView view = AsOfView(store, transaction_time=now) print(view.as_of("config:max_retries").value) # 3, as it was at decision time ``` > Integrity is enforced, not assumed `verify()` is the whole point: a bundle that has been edited — even by accident in transit — raises `BundleIntegrityError`. Treat a bundle that fails to verify as untrusted. ## Where this fits This guide stitches together four building blocks. Go deeper on each: - Bitemporal Storage — Two time axes and append-only corrections — the evidence layer. (/advanced/bitemporal-storage/) - Versioned Routing Policy — Time-travelable policies and the rule that fired. (/advanced/versioned-routing-policy/) - Audit Bundles — Sealing and verifying the full record. (/advanced/compliance-bundles/) - Govern Agent Actions — Add controls that run before the action, not just after. (/guides/govern-agent-actions/) ## Govern Agent Actions Source: https://briefcaseai.io/guides/govern-agent-actions/ > Put controls in front of an agent so an action is evaluated before it runs — deny-by-default guardrails, fail-closed pipelines, and a versioned policy that decides where the work goes. Recording a decision tells you what *did* happen. Governing one means deciding what is *allowed to* happen — **before** the action runs. This guide adds two controls to the support-triage agent: a guardrail that authorizes the action, and a versioned policy that decides where it goes. > When you Your triage agent can invoke actions with real consequences — escalating, auto-resolving, assigning a queue. You want a control that can say "no" before any of that executes, and a routing decision you can later reconstruct. ```bash pip install briefcase-ai[guardrails,routing] ``` 1. ### Define a guardrail A guardrail answers one question: may this agent perform this action on this resource? Subclass `BaseGuardrailEnv` and implement `evaluate`, returning an `EvalResult` with an `Effect`. ```python from briefcase.guardrails import BaseGuardrailEnv, EvalRequest, EvalResult, Effect class QueueGuardrail(BaseGuardrailEnv): @property def name(self) -> str: return "queue_access" @property def request_space(self): return {} def evaluate(self, request: EvalRequest) -> EvalResult: allowed = request.context.get("priority") == "high" return EvalResult( effect=Effect.ALLOW if allowed else Effect.DENY, guardrail_name=self.name, reason="priority check", ) ``` 2. ### Evaluate before acting Build the request that describes the action, evaluate it, and only proceed if it is allowed. ```python guardrail = QueueGuardrail() request = EvalRequest( agent="triage-bot", action="route", resource="queue:senior", context={"priority": "high"}, ) result = guardrail.evaluate(request) if result.is_allowed: ... # perform the action ``` 3. ### Compose a pipeline and fail closed Real systems chain several guardrails. A `GuardrailPipeline` evaluates them in order and denies on the first denial (its default mode). Wrap the call so that **any error becomes a denial** — controls must fail closed, never open. ```python from briefcase.guardrails import GuardrailPipeline pipeline = GuardrailPipeline(stages=[guardrail]) def is_allowed(request: EvalRequest) -> bool: try: return pipeline.evaluate(request).is_allowed except Exception: return False # fail closed: an error never grants access ``` > Fail closed by default A control that throws and is treated as "allow" is worse than no control. The pattern above denies on error; the Guardrails page also documents a built-in deny-by-default wrapper and other composable wrappers (caching, timeouts, auditing). 4. ### Route the allowed action through a versioned policy Once an action is permitted, decide *where* it goes — using a policy you can reconstruct later. Publishing a policy version is append-only, so the rule that fired is always recoverable. ```python from datetime import datetime, timezone from briefcase.routing import PolicyRegistry, PolicyVersion, PolicyRule, AgentRouter registry = PolicyRegistry() registry.publish( PolicyVersion( policy_id="ticket-routing", version="1", rules=[PolicyRule( rule_id="high-priority", condition={"priority": "high"}, choice="senior-queue", rationale="High-priority tickets go to the senior queue.", )], default_choice="standard-queue", ), valid_from=datetime.now(timezone.utc), ) router = AgentRouter(registry, use_case="ticket-routing", policy_id="ticket-routing") decision = router.route({"priority": "high"}) print(decision.selected, decision.matched_rule_id, decision.policy_version) ``` > Controls before action — the whole point The order matters: **evaluate, then act**. Guardrails decide permission; routing decides destination. Both run before the agent does anything irreversible, and both leave a record you can replay and seal. ## Where this fits - Guardrails — The full guardrail framework: pipelines, modes, and composable wrappers. (/advanced/guardrails/) - Routing — The simple in-process router and when to graduate to versioned policy. (/advanced/routing/) - Versioned Routing Policy — Time-travelable policies and reconstruction. (/advanced/versioned-routing-policy/) - Audit a Decision End-to-End — Seal this decision into a verifiable record. (/guides/audit-a-decision/) ## Observe AI in Production Source: https://briefcaseai.io/guides/observe-in-production/ > Wire up live observability for a running triage agent — emit every decision, watch token cost against a budget, measure output drift, trace with OpenTelemetry, and fire events when something looks off. Once the triage agent is live, you need to *see* what it is doing — cheaply and continuously. This guide layers Briefcase's observability tools onto the same `classify_ticket` function: emit records, track spend, detect drift, trace, and alert. > When you The agent is in production. You want a low-overhead stream of every decision, a guardrail on cost, an early warning when outputs start shifting, and traces that line up with the rest of your telemetry. ```bash pip install briefcase-ai[drift,otel,events] ``` 1. ### Emit every decision in one line `observe()` wires up an exporter so captured decisions actually go somewhere. Use `"console"` in development, a `.jsonl` path for log shipping, or `"memory"` in tests. ```python import briefcase briefcase.observe("decisions.jsonl") # append-only, thread-safe @briefcase.capture(decision_type="ticket-classification") def classify_ticket(text: str) -> str: # call your model here return "billing" ``` 2. ### Watch cost against a budget `CostCalculator` estimates per-call cost from token counts and checks spend against a limit. The cost types ship in the base package. ```python from briefcase.cost import CostCalculator calc = CostCalculator() estimate = calc.estimate_cost("gpt-4o-mini", input_tokens=1000, output_tokens=200) print(estimate.total_cost, estimate.currency) budget = calc.check_budget(current_spend=85.0, budget_limit=100.0) print(budget.status, budget.alert_message) # e.g. "warning", "..." ``` 3. ### Measure drift across repeated runs Sample the same decision over time and ask how consistent it stays. A falling `consistency_score` is your signal that behavior is shifting. ```python from briefcase.drift import DriftCalculator calc = DriftCalculator().with_similarity_threshold(0.9) metrics = calc.calculate_drift(["billing", "billing", "account", "billing"]) print(metrics.consistency_score, metrics.agreement_rate) print(metrics.consensus_output, metrics.outliers) ``` 4. ### Trace alongside your existing telemetry `get_tracer()` returns a standard OpenTelemetry tracer. Spans describe the *timeline* of work; decision records carry the *governance context* — they are complementary and both flow to your collectors. ```python from briefcase.otel import get_tracer tracer = get_tracer("briefcase") with tracer.start_as_current_span("classify_ticket"): classify_ticket("My invoice is wrong") ``` 5. ### Fire events when something looks off Turn signals into action. The emit helpers are coroutines — `await` them inside an async context — and are ideal for low-confidence outputs or detected drift. ```python import asyncio from briefcase.events import emit_low_confidence, emit_drift_detected async def main(): await emit_low_confidence({"id": "dec-1"}, confidence=0.4, threshold=0.7) await emit_drift_detected({"id": "dec-1"}, {"drift_score": 0.3}) asyncio.run(main()) ``` > From signal to response These tools compose: cost and drift produce signals, events broadcast them, and your alerting reacts. Pair drift thresholds with `emit_drift_detected` so a shift in behavior pages the right person automatically. ## Where this fits - Exporters — Stock and custom exporters, and the record shape they emit. (/features/exporters/) - Cost Tracking — Estimates, model comparison, projections, and budgets. (/features/cost-tracking/) - Drift Detection — What the drift metrics mean and how to tune them. (/features/drift-detection/) - Multi-Agent & Events — Correlate decisions across a workflow and emit events. (/features/multi-agent/) ## Reproducible RAG Source: https://briefcaseai.io/guides/reproducible-rag/ > Make a retrieval-augmented decision reproducible — version the embedding index, snapshot the external data it read, and validate that a prompt's references resolve before a model ever sees them. When the triage agent answers from a knowledge base, the answer is only as reproducible as the **context behind it**: which documents, which embedding model, which version of an upstream source. This guide makes a retrieval-augmented decision reconstructable — so "why did it say that?" has an answer. > When you A RAG answer is questioned, but the knowledge base has since changed. You need to know which document versions and which embedding model produced it — and to catch stale indexes before they serve wrong answers. ```bash pip install briefcase-ai[rag,external,validate] ``` 1. ### Version the embedding index A `VersionedEmbeddingPipeline` records which documents and model produced an index in an atomic manifest, so you can detect when it goes stale. ```python from briefcase.rag import VersionedEmbeddingPipeline, Document class EmbeddingModel: def embed(self, texts): return [[0.1, 0.2, 0.3] for _ in texts] pipeline = VersionedEmbeddingPipeline(embedding_model=EmbeddingModel()) documents = [ Document(id="kb-1", content="Reset your password from settings.", metadata={"topic": "account"}), ] batch = pipeline.create_embedding_batch(documents) manifest = pipeline.create_manifest("faq-index", [batch]) print(manifest.index_name) ``` 2. ### Detect when the index is stale When documents or the model change, `check_invalidation` reports it — your cue to rebuild before serving. ```python report = pipeline.check_invalidation("faq-index", documents) print(report.is_valid) # False once a document's content hash changes ``` 3. ### Snapshot the external data a decision read Agents also read sources you do not control. `ExternalDataTracker` hashes each fetch, detects drift against the last snapshot, and appends corrections without mutating history. ```python from briefcase.external import ExternalDataTracker, SnapshotPolicy, SnapshotFrequency tracker = ExternalDataTracker( default_policy=SnapshotPolicy(frequency=SnapshotFrequency.ON_CHANGE, retention_days=30), ) result = tracker.track_api_call( api_name="product-catalog", endpoint="/products", method="GET", response_data={"items": [1, 2, 3]}, record_count=3, ) print(result["snapshot_id"], result["drift_detected"]) ``` 4. ### Validate references before the model runs Before a prompt reaches a model, confirm its references actually resolve against a versioned knowledge base. You supply an extractor (finds references) and a resolver (checks them); the engine records the commit it validated against. ```python import re from briefcase.validation import PromptValidationEngine from briefcase.validation.errors import ValidationError, ValidationErrorCode class RegexExtractor: _REF = re.compile(r"[\w/]+\.md") def extract(self, prompt: str) -> list: return self._REF.findall(prompt) class AllowlistResolver: def __init__(self, known): self._known = known def resolve_all(self, references): return [ ValidationError( code=ValidationErrorCode.REFERENCE_NOT_FOUND, message=f"Reference not found: {ref}", reference=ref, severity="error", layer="resolution", remediation="Add the document to the knowledge base.", ) for ref in references if ref not in self._known ] class DemoLakeFS: def get_commit(self, repository: str, branch: str) -> str: return "demo0000000000000000000000000000000000000" engine = PromptValidationEngine( extractor=RegexExtractor(), resolver=AllowlistResolver({"kb/faq.md"}), lakefs_client=DemoLakeFS(), repository="knowledge-base", branch="main", mode="strict", ) report = engine.validate("See kb/faq.md and kb/missing.md") print(report.status, report.references_checked, report.has_errors) ``` > Reproducibility is the sum of its sources A RAG answer is reproducible only if *every* input is pinned: the document versions (manifest), the upstream data (snapshot), and the validated references (commit). Skip one and the trail has a gap. ## Where this fits - RAG Versioning — Manifests, invalidation reports, and instrumented retrieval. (/advanced/rag-versioning/) - External Data — Snapshot policies, drift detection, and append-only corrections. (/advanced/external-data/) - Validation Engine — Extractors, resolvers, and the layered validation flow. (/advanced/validation-engine/) - lakeFS — Capture commit SHAs for the data your agents read. (/integrations/lakefs/) ## Track & Compare Model Versions Source: https://briefcaseai.io/guides/run-an-evaluation/ > Push a baseline and candidate through the gateway, compare them at every depth, and read the verdict scorecard. This guide walks through a complete evaluation workflow using oci-bai: push a baseline and a candidate, compare them at the package and benchmark depths, and read the verdict back. > Private beta oci-bai is in private beta. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access. ## What you need - A running local stack (`make up`, `make seed`) - The `oci-bai` CLI on your `PATH` - Two image tags to compare (the guide uses the seeded `rl-gym-env` family) ## Workflow 1. ### Push the baseline ```bash docker tag my-base:latest localhost:8080/rl-gym-env:cuda-base docker push localhost:8080/rl-gym-env:cuda-base oci-bai --repo rl-gym-env log cuda-base ``` 2. ### Push the candidate ```bash docker tag my-candidate:latest localhost:8080/rl-gym-env:cartpole docker push localhost:8080/rl-gym-env:cartpole oci-bai --repo rl-gym-env provenance cartpole ``` 3. ### Compare at package depth ```bash oci-bai --repo rl-gym-env diff cuda-base cartpole --depth package ``` Output shows added, removed, and changed packages between the two versions. 4. ### Attach a benchmark verdict ```bash oci-bai --repo rl-gym-env attach-bench cartpole ``` 5. ### Compare at bench depth ```bash oci-bai --repo rl-gym-env diff cuda-base cartpole --depth bench ``` Output shows verdict deltas between the baseline and candidate. 6. ### Check impact ```bash oci-bai --repo rl-gym-env whodepends cuda-base ``` Shows every version that descends from the baseline — useful before promoting a base update. ## Key commands at a glance | Step | Command | |------|---------| | Push | `docker push localhost:8080/:` | | History | `oci-bai --repo log ` | | Provenance | `oci-bai --repo provenance ` | | Compare packages | `oci-bai --repo diff --depth package` | | Compare verdicts | `oci-bai --repo diff --depth bench` | | Find dependents | `oci-bai --repo whodepends ` | | Search | `oci-bai search "format==safetensors cuda>=12.4"` | ======================================================================== # Integrations ======================================================================== ## MCP Server Source: https://briefcaseai.io/integrations/mcp/ > Expose Briefcase's safe, read-only SDK operations to MCP-capable tools. Run a Model Context Protocol (MCP) server that gives MCP-capable tools — Claude Code, Cursor, Codex — direct access to Briefcase operations: sanitize PII, estimate model cost, analyze output drift, and read the usage guide. The tools are read-only and wrap `briefcase.sanitize`, `briefcase.cost`, and `briefcase.drift`. {' '} > When you'd reach for this You're building a support-triage agent in an MCP-capable tool and want it to redact PII before text leaves your environment, sanity-check the cost of a model call, or compare outputs for drift — without wiring up the Python SDK directly. The MCP server lets the runtime call these Briefcase capabilities as plain tools. ## Install ```bash pip install briefcase-ai[mcp] ``` > Requires the mcp extra The `[mcp]` extra installs `mcp>=1.2`. Import from `briefcase.mcp`. ## Run It Start the server over stdio with the console script: ```bash briefcase-mcp ``` Or run the module directly: ```bash python -m briefcase.mcp ``` ## Build the Server in Python `build_server()` returns a configured `FastMCP` instance, and `main()` runs it over stdio. ```python from briefcase.mcp import build_server server = build_server() print(server.name) # -> "briefcase" ``` ## Register in an MCP Client Point an MCP-capable client at the `briefcase-mcp` command. The exact config file differs per tool, but the shape is the same: ```json { "mcpServers": { "briefcase": { "command": "briefcase-mcp" } } } ``` ## Tools The server exposes four tools: | Tool | What it does | Act | | --- | --- | --- | | `sanitize_text` | Redact PII from text before it leaves your environment | Capture | | `estimate_cost` | Estimate the cost of a model call | Operate | | `analyze_drift` | Check a set of outputs for consistency and drift | Replay & Verify | | `how_to` | Retrieve Briefcase usage guidance | — | ### sanitize_text Redact PII (emails, phones, SSNs, cards, API keys, IPs) from text. Wraps `briefcase.sanitize.Sanitizer`. | Input | Type | |-------|------| | `text` | `str` | Returns `{ "sanitized": str, "redactions": list[str] }` — the redacted text and the PII types found. ```text sanitize_text("Contact me at jordan@example.com or 555-123-4567") -> {"sanitized": "Contact me at [REDACTED_EMAIL] or [REDACTED_PHONE]", "redactions": ["email", "phone"]} ``` ### estimate_cost Estimate the USD cost of an LLM call. Wraps `briefcase.cost.CostCalculator`. | Input | Type | | |-------|------|--| | `model` | `str` | | | `input_tokens` | `int` | | | `output_tokens` | `int` | | | `rate_card` | `str` (optional) | `platform × tier` pricing, e.g. `"bedrock:batch"` | Returns `{ "model": str, "rate_card": str, "input_cost": float, "output_cost": float, "cache_cost": float, "total_cost": float }`. ```text estimate_cost("claude-haiku-4-5", 1000, 500) -> {"model": "claude-haiku-4-5", "rate_card": "standard", "input_cost": 0.001, "output_cost": 0.0025, "cache_cost": 0.0, "total_cost": 0.0035} ``` > New in 3.2.1 `estimate_cost` accepts an optional `rate_card` and returns `cache_cost`. See [Cost Tracking](/features/cost-tracking/) for rate cards and prompt-cache billing. ### analyze_drift Analyze a list of model outputs for consistency and drift. Wraps `briefcase.drift.DriftCalculator`. | Input | Type | |-------|------| | `outputs` | `list[str]` | Returns `{ "consistency_score": float, "agreement_rate": float, "consensus_output": str, "status": str }`. ```text analyze_drift(["billing", "billing", "shipping"]) -> {"consistency_score": 0.67, "agreement_rate": 0.67, "consensus_output": "billing", "status": "drifting"} ``` ### how_to Return Briefcase usage guidance. | Input | Type | |-------|------| | `topic` | `str` (optional) | Pass a topic keyword (e.g. `"export"`, `"sanitize"`, `"cost"`, `"logging"`) to get matching sections, or leave it empty for the full guide. ## Resource The server also exposes a resource, `briefcase://llms-full.txt`, which serves the bundled Briefcase usage guide for clients that read MCP resources. > Docs as machine-readable text This documentation site also publishes [`/llms.txt`](https://briefcaseai.io/llms.txt) (a curated index) and [`/llms-full.txt`](https://briefcaseai.io/llms-full.txt) (the core docs as plain text), so an assistant can ingest Briefcase even without the MCP server. See [AI-Assisted Setup](/getting-started/ai-assisted-setup/). ## Where this fits These tools surface Briefcase capabilities to any MCP client. To go deeper on what each one does in the full SDK: - PII Sanitization — How sanitize_text redacts sensitive data before it (/features/pii-sanitization/) - Drift Detection — How analyze_drift compares outputs over time. (/features/drift-detection/) ## Next steps - Exporters — Send decisions from your own code to the same backends these tools read. (/features/exporters/) - Cost Tracking — The library behind the estimate_cost tool, with budget checks. (/features/cost-tracking/) ## lakeFS Source: https://briefcaseai.io/integrations/lakefs/ > Capture lakeFS commit SHAs for the versioned data your agents read. Track exactly which version of a policy document, taxonomy, or reference file an agent read, by capturing the lakeFS commit SHA on every object access. lakeFS is one bundled versioned-data source — if your data lives elsewhere, implement the same capture against any version-controlled store through the generic `vcs` protocol (`pip install briefcase-ai[vcs]`). {' '} > When you'd reach for this Your support-triage agent classifies tickets using a knowledge base that lives in a data lake, and that lake is updated daily. When you replay a decision weeks later, you need the exact data the agent read — not today's version. Capturing the lakeFS commit SHA at decision time lets a replay see the same data and reach the same conclusion. ## Install ```bash pip install briefcase-ai[lakefs] ``` The `[lakefs]` extra installs the `lakefs` package. Import from `briefcase.integrations.lakefs`. > Mock mode When the `lakefs` package or a live endpoint is unavailable, the client runs in mock mode so examples stay runnable. ## Track Reads with a Context Manager Open a `versioned_context` and every read inside it is tagged with the resolved commit SHA. ```python from briefcase.integrations.lakefs import versioned_context from unittest.mock import Mock class MockBriefcaseClient: def __init__(self): self.config = { "lakefs_endpoint": "https://example.lakefscloud.io/api/v1", "lakefs_access_key": "your_access_key", "lakefs_secret_key": "your_secret_key", } client = MockBriefcaseClient() with versioned_context(client, "knowledge-base", "main") as lakefs: refund_policy = lakefs.read_object("docs/refund_policy.pdf") taxonomy = lakefs.read_object("config/category_taxonomy.json") print(f"Read refund policy: {len(refund_policy)} bytes") print(f"Read taxonomy: {len(taxonomy)} bytes") print(f"Commit SHA: {lakefs.get_commit()}") ``` ## Track Reads with a Decorator `@versioned` injects a `VersionedClient` as the `versioned_client` keyword argument. Pass your Briefcase client as `briefcase_client` when you call the function. ```python from briefcase.integrations.lakefs import versioned from unittest.mock import Mock class MockBriefcaseClient: def __init__(self): self.config = { "lakefs_endpoint": "https://example.lakefscloud.io/api/v1", "lakefs_access_key": "your_access_key", "lakefs_secret_key": "your_secret_key", } client = MockBriefcaseClient() @versioned(repository="knowledge-base", branch="main") def classify_ticket(ticket: dict, versioned_client=None) -> dict: policy = versioned_client.read_object("docs/refund_policy.pdf") taxonomy = versioned_client.read_object("config/category_taxonomy.json") return { "category": "billing", "commit_sha": versioned_client.get_commit(), "bytes_read": len(policy) + len(taxonomy), } ticket = {"id": "TKT-4471", "subject": "Refund request"} result = classify_ticket(ticket, briefcase_client=client) print(f"Category: {result['category']}") print(f"Commit SHA: {result['commit_sha']}") ``` ## Use the Client Directly Construct a `VersionedClient` when you need explicit control over reads, existence checks, and listings. ```python from briefcase.integrations.lakefs import VersionedClient from unittest.mock import Mock class MockBriefcaseClient: def __init__(self): self.config = { "lakefs_endpoint": "https://example.lakefscloud.io/api/v1", "lakefs_access_key": "your_access_key", "lakefs_secret_key": "your_secret_key", } client = MockBriefcaseClient() versioned_client = VersionedClient( repository="knowledge-base", branch="main", briefcase_client=client, ) for path in ["docs/refund_policy.pdf", "docs/shipping_policy.pdf"]: if versioned_client.object_exists(path): content = versioned_client.read_object(path) print(f"Read {path}: {len(content)} bytes") objects = versioned_client.list_objects(prefix="docs/") print(f"Found {len(objects)} objects in docs/") print(f"Commit SHA: {versioned_client.get_commit()}") ``` ## VersionedClient Methods | Method | Returns | |--------|---------| | `read_object(path, return_metadata=False)` | Object bytes, optionally with metadata | | `upload_object(path, data, content_type=...)` | Writes bytes to the branch | | `list_objects(prefix="")` | Objects under a prefix | | `object_exists(path)` | `True` if the object is present | | `get_commit()` | The resolved commit SHA for this client | `VersionedClient(repository, branch, commit="latest", briefcase_client=None, ...)` resolves `commit="latest"` against the branch head; pin a SHA to read a fixed version. ## Where this fits Capturing a lakeFS commit SHA is part of the **Store & Query** act: pin exactly what your agents read so replays are reproducible. - External Data — Capture references to any external source — including data-lake commits — behind a decision. (/advanced/external-data/) - Reproducible RAG — Pin retrieval sources so a replayed RAG decision reads the same context. (/guides/reproducible-rag/) ## Next steps - Storage Adapters — Persist the decisions that captured these commit SHAs. (/features/storage-adapters/) - RAG Versioning — Pin and track the document versions feeding your retrieval pipeline. (/advanced/rag-versioning/) ======================================================================== # Reference ======================================================================== ## oci-bai Install & Compatibility Source: https://briefcaseai.io/reference/install-compat/ > Install the oci-bai CLI, bring up the local stack, and check the compatibility matrix. > Private beta oci-bai is in private beta. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access. ## Install the CLI Download the pre-built binary for your platform from the oci-bai releases page, or build from source: ```bash # From source (requires Rust toolchain) cargo install oci-bai ``` Verify: ```bash oci-bai --version ``` ## Bring up the local stack ```bash make up # gateway on :8080, graph server on :50051, Postgres+AGE, MinIO, registry make seed # create demo refs: rl-gym-env family with cuda-base and cartpole ``` To tear down: ```bash make down ``` ## Shell defaults ```bash export OCI_BAI_SERVER=http://localhost:50051 export OCI_BAI_REPO=my-repo ``` ## Compatibility matrix | Component | Version | |-----------|---------| | oci-bai CLI | 0.1.0 | | oci-jj-server API (min) | v1 | | oci-jj image tag | 0.1.0 | | verdictml | v0.1.0 | ## Web dashboard The hosted dashboard is in early preview. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) for access. ## Python SDK Source: https://briefcaseai.io/sdk/python/ > Complete guide to the Briefcase AI Python package. {' '} > This is the narrative guide This page walks through the SDK by example. For exhaustive class, method, and parameter signatures, see the [Python API reference](/api/python/). ## Install ```bash pip install briefcase-ai ``` ## Import Paths ```python from briefcase import ( capture, observe, setup, init, init_with_config, is_initialized, enable_logging, set_log_level, disable_logging, get_logger, BriefcaseConfig, DecisionSnapshot, Snapshot, SnapshotQuery, Input, Output, ModelParameters, ExecutionContext, HardwareMetadata, ) from briefcase.cost import CostCalculator, CostEstimate, BudgetStatus from briefcase.drift import DriftCalculator, DriftMetrics from briefcase.sanitize import Sanitizer from briefcase.storage import SqliteBackend, BufferedBackend from briefcase.replay import ReplayEngine, ReplayPolicy, ReplayResult, ReplayStats from briefcase.validation import PromptValidationEngine, ValidationReport from briefcase.external import ExternalDataTracker, SnapshotPolicy, SnapshotFrequency from briefcase.routing import AgentRouter, PolicyRegistry, PolicyVersion, PolicyRule from briefcase.events import BriefcaseEvent, emit from briefcase.bitemporal import BitemporalRecord, InMemoryBitemporalStore, AsOfView from briefcase.compliance import ExaminerBundle from briefcase.exporters import BaseExporter ``` ## The @capture Decorator ```python from briefcase import capture @capture(decision_type="ticket-classification") def classify_ticket(text: str) -> str: return "billing" ``` `@capture` records a lightweight dict for every call — inputs, outputs, and timing — and hands it to an exporter. It does not persist a `DecisionSnapshot`; to store and replay structured decisions, build a `DecisionSnapshot` and use `SqliteBackend` (see [Core Concepts](/getting-started/core-concepts/)). ## Configuration `setup()` wires up exporters, routing, events, storage, and other components and returns a `BriefcaseConfig`. There is no `configure()` function. ```python from briefcase import setup config = setup( exporter=None, router=None, webhook_url=None, storage=None, ) ``` Start the native runtime once with `init()`, or use `init_with_config()` instead to set worker threads. `BriefcaseConfig.get()` returns the active configuration. ```python from briefcase import init, is_initialized, BriefcaseConfig init() # start the runtime (use init_with_config(worker_threads=4) instead to size the pool) print(is_initialized()) config = BriefcaseConfig.get() ``` ## Logging > Silent by default The top-level `briefcase` logger has only a `NullHandler` attached, so importing the package emits nothing. Opt in when you want to see what the SDK is doing. These functions are in the base package — no extra required. ```python import briefcase # Opt in to briefcase logs on stderr (default level "INFO"). briefcase.enable_logging("DEBUG") # Change the level later without re-adding a handler. briefcase.set_log_level("WARNING") # Use the same logger tree in your own modules. log = briefcase.get_logger(__name__) log.warning("classification fell back to default category") # Turn briefcase logging back off and restore silence. briefcase.disable_logging() ``` `enable_logging(level="INFO", *, stream=None, fmt=None, datefmt=None)` returns the `briefcase` logger and adds a single `StreamHandler` (idempotent). Pass `stream=`, `fmt=`, or `datefmt=` to control where and how records are formatted. `set_log_level(level)` adjusts the level in place. `disable_logging()` removes the handler. `get_logger(__name__)` returns a child of the `briefcase` logger so your own modules inherit the same configuration. Set `BRIEFCASE_LOG_LEVEL` to enable logging automatically at import — useful for turning on diagnostics without touching code: ```bash BRIEFCASE_LOG_LEVEL=DEBUG python app.py ``` ## Extras Install only what you need. See [Installation](/getting-started/installation/) for the full extras table. ## Lazy Imports Optional submodules import only when their backing code is available. Pure-Python extras (`replay`, `validate`, `correlation`, `external`) report a missing extra: ``` ImportError: briefcase.replay requires the 'replay' extra. Install it with: pip install briefcase-ai[replay] ``` Native-backed modules (`cost`, `drift`, `sanitize`, `storage`) instead ask you to reinstall or rebuild the native extension: ``` ImportError: briefcase.storage could not load the native extension. Reinstall the package (pip install --force-reinstall briefcase-ai) or rebuild from source with 'maturin develop'. ``` ## Next steps - Python API Reference — Full signatures for every public symbol, grouped by module. (/api/python/) - Exporters — Wire captured decisions to console, file, or a custom backend. (/features/exporters/) ## Python API Source: https://briefcaseai.io/api/python/ > Full API reference for the Briefcase AI Python SDK, grouped by module. Reference for the public symbols of `briefcase-ai` (v3.3.0). Signatures match the shipped SDK. Each section lists an install command, the import path, and one small runnable usage. > Looking for the narrative? This page is the exhaustive signature reference. For a worked walkthrough of the SDK, start with the [Python SDK guide](/sdk/python/). Install the base package: ```bash pip install briefcase-ai ``` Optional feature extras install only what they need. > Most extras add no dependencies Most extras (`replay`, `drift`, `sanitize`, `storage`, `validate`, `guardrails`, `rag`, `correlation`, `external`, `events`, `routing`, `vcs`, `bitemporal`, `compliance`) pull in no third-party dependencies and exist to document intent. Only `otel`, `lakefs`, `bitemporal-iceberg`, and `mcp` install external packages. ## briefcase ```bash pip install briefcase-ai ``` Top-level exports. ### `capture()` ```python from briefcase import capture @capture(decision_type="classification") def classify_ticket(text: str) -> str: return "account_access" classify_ticket("reset my password") ``` ```python capture( fn=None, *, decision_type=None, context_version=None, max_input_chars=1000, max_output_chars=1000, exporter=None, async_capture=True, ) ``` The `@capture` decorator records a lightweight dict for each call and forwards it to an `exporter`. It does not itself persist a native `DecisionSnapshot`; for storage and replay use the native runtime objects below. ### `setup()` ```python from briefcase import setup config = setup( exporter=None, storage=None, guardrail_packs=None, ) ``` ```python setup( exporter=None, router=None, webhook_url=None, webhook_secret=None, events=None, event_bus=None, storage=None, guardrail_packs=None, ) -> BriefcaseConfig ``` ### `init()`, `init_with_config()`, `is_initialized()` ```python import briefcase briefcase.init() # start the native runtime print(briefcase.is_initialized()) ``` `init()` must be called once before using the native storage and replay layer. Use `init_with_config(worker_threads=2)` instead of `init()` to size the worker pool. The runtime can only be initialized once per process. ### `observe()` ```python import briefcase mem = briefcase.observe("memory") @briefcase.capture(async_capture=False) def classify_ticket(text: str) -> str: return "account_access" classify_ticket("reset my password") print(mem.records[0]["function_name"]) # "classify_ticket" ``` ```python observe(exporter="console", *, level=None) -> BaseExporter ``` Wires up decision export in one call. Without it, `@capture` records decisions but has nowhere to send them. `exporter` accepts a `BaseExporter` instance or a shorthand string: `"console"` (default, `ConsoleExporter`), `"memory"` (`MemoryExporter`), or a path ending in `.jsonl` (`JSONLFileExporter`). Returns the configured exporter, so a `MemoryExporter` can be inspected via `.records`. Pass `level=` to also enable logging at that level. `@capture` exports in a background thread by default, so use `@capture(async_capture=False)` when you want a record to appear synchronously (for example to read `MemoryExporter.records` right after the call). ### `enable_logging()`, `set_log_level()`, `disable_logging()`, `get_logger()` ```python import briefcase logger = briefcase.enable_logging("DEBUG") # opt-in; silent by default briefcase.set_log_level("INFO") module_logger = briefcase.get_logger("briefcase.app") briefcase.disable_logging() ``` ```python enable_logging(level="INFO", *, stream=None, fmt=None, datefmt=None) -> logging.Logger set_log_level(level) -> None disable_logging() -> None get_logger(name) -> logging.Logger ``` The library attaches only a `NullHandler` and emits nothing until you opt in. `enable_logging` idempotently adds a single `StreamHandler` (default `sys.stderr`) and returns the `briefcase` logger. Setting the environment variable `BRIEFCASE_LOG_LEVEL=DEBUG` enables logging automatically at import. ### `BriefcaseConfig` ```python from briefcase import BriefcaseConfig config = BriefcaseConfig.get() registry = config.guardrail_registry config.reset() ``` ### `DecisionSnapshot` ```python from briefcase import DecisionSnapshot, Input, Output, ModelParameters decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "reset my password", "string")) output = Output("category", "account_access", "string") output.with_confidence(0.92) decision.add_output(output) decision.with_execution_time(12.0) decision.with_module("triage_service") decision.add_tag("environment", "production") print(decision.function_name, decision.fingerprint()[:12]) ``` ```python DecisionSnapshot(function_name) .add_input(input) .add_output(output) .add_tag(key, value) .with_model_parameters(params) .with_execution_time(ms) .with_module(module) .with_agent(agent) .with_hardware(hardware) .with_error(error, error_type) .with_scorecard(scorecard) .fingerprint() # attributes: function_name, module_name, inputs, outputs, tags, execution_time_ms ``` ### `Snapshot` ```python from briefcase import Snapshot session = Snapshot("session") session.add_decision(decision) print(len(session.decisions)) ``` ### `SnapshotQuery` ```python from briefcase import SnapshotQuery query = SnapshotQuery() query.with_function_name("classify_ticket") query.with_tag("environment", "production") query.with_limit(50) query.with_offset(0) ``` ### `Input`, `Output` ```python from briefcase import Input, Output text_input = Input("text", "reset my password", "string") print(text_input.name, text_input.value, text_input.data_type) result = Output("category", "account_access", "string") result.with_confidence(0.92) print(result.confidence) ``` ### `ModelParameters` ```python from briefcase import ModelParameters params = ModelParameters("claude-3-haiku") params.with_provider("anthropic") params.with_parameter("temperature", 0.0) params.with_parameter("max_tokens", 256) ``` ### `ExecutionContext` ```python from briefcase import ExecutionContext context = ExecutionContext() context.with_runtime_version("3.11") context.with_dependency("transformers", "4.40.0") context.with_env_var("REGION", "us-east-1") context.with_random_seed(42) ``` ### `HardwareMetadata` ```python from briefcase import HardwareMetadata hardware = HardwareMetadata("gpu", "A10G") hardware.with_provider("aws") hardware.with_vram(24.0) ``` ## briefcase.storage ```bash pip install briefcase-ai[storage] ``` Two backends ship in the open-source package: `SqliteBackend` and `BufferedBackend`. The native runtime must be initialized first. ### `SqliteBackend` ```python import briefcase from briefcase import DecisionSnapshot, Input, Output, Snapshot, SnapshotQuery from briefcase.storage import SqliteBackend briefcase.init() backend = SqliteBackend.in_memory() # or SqliteBackend("decisions.db") decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "reset my password", "string")) decision.add_output(Output("category", "account_access", "string")) decision_id = backend.save_decision(decision) loaded = backend.load_decision(decision_id) session = Snapshot("session") session.add_decision(decision) snapshot_id = backend.save(session) backend.load(snapshot_id) backend.query(SnapshotQuery().with_function_name("classify_ticket")) backend.health_check() ``` ```python SqliteBackend(path) SqliteBackend.in_memory() .save(snapshot) -> snapshot_id .load(snapshot_id) .save_decision(decision) -> decision_id .load_decision(decision_id) .query(query) .delete(id) .health_check() ``` ### `BufferedBackend` ```python from briefcase.storage import BufferedBackend buffered = BufferedBackend(backend, buffer_size=100) buffered.save_decision(decision) ``` ## briefcase.replay ```bash pip install briefcase-ai[replay] ``` Re-executes stored decisions against a backend. Valid modes are `"strict"` and `"tolerant"` (the default). ### `ReplayEngine` ```python import briefcase from briefcase import DecisionSnapshot, Input, Output from briefcase.storage import SqliteBackend from briefcase.replay import ReplayEngine briefcase.init() backend = SqliteBackend.in_memory() decision = DecisionSnapshot("classify_ticket") decision.add_input(Input("text", "reset my password", "string")) decision.add_output(Output("category", "account_access", "string")) decision_id = backend.save_decision(decision) engine = ReplayEngine(backend) result = engine.replay(decision_id, "strict") print(result.status, result.outputs_match, result.execution_time_ms) stats = engine.get_replay_stats([decision_id]) print(stats.total_replays, stats.success_rate) ``` ```python ReplayEngine(storage) .replay(snapshot_id, mode) .replay_batch(snapshot_ids, mode, max_concurrent) .replay_with_policy(snapshot_id, policy, mode) .validate(snapshot_id, policy) .get_replay_stats(snapshot_ids) .default_mode ``` ### `ReplayPolicy` ```python from briefcase.replay import ReplayPolicy policy = ReplayPolicy("output_match") policy.with_exact_match("category") policy.with_similarity_threshold("summary", 0.9) result = engine.replay_with_policy(decision_id, policy, "strict") print(result.status, result.policy_violations) ``` ### `ReplayResult` Returned by `replay` / `replay_with_policy`. Attributes: `status`, `outputs_match`, `replay_output`, `original_snapshot`, `execution_time_ms`, `policy_violations`, plus `to_dict()`. ### `ReplayStats` Returned by `get_replay_stats`. Attributes: `total_replays`, `successful_replays`, `failed_replays`, `exact_matches`, `mismatches`, `success_rate`, `average_execution_time_ms`, `total_execution_time_ms`, plus `to_dict()`. ## briefcase.drift ```bash pip install briefcase-ai[drift] ``` ### `DriftCalculator` ```python from briefcase.drift import DriftCalculator calculator = DriftCalculator() calculator.with_similarity_threshold(0.9) metrics = calculator.calculate_drift(["billing", "billing", "account", "billing"]) print(metrics.consistency_score, metrics.agreement_rate, metrics.drift_score) print(metrics.consensus_output, metrics.outliers) print(metrics.get_status(calculator)) ``` ```python DriftCalculator() .calculate_drift(outputs) -> DriftMetrics .with_similarity_threshold(threshold) .similarity_threshold ``` ### `DriftMetrics` Returned by `calculate_drift`. Attributes: `consistency_score`, `agreement_rate`, `drift_score`, `consensus_output`, `consensus_confidence`, `outliers`, `total_samples`, plus `get_status(calculator)` and `to_dict()`. ## briefcase.cost ```bash pip install briefcase-ai ``` Cost types ship in the base package under `briefcase.cost` — there is no `cost` extra. ### `CostCalculator` ```python from briefcase.cost import CostCalculator calculator = CostCalculator() estimate = calculator.estimate_cost("claude-haiku-4-5", 1000, 500) print(estimate.total_cost, estimate.input_cost, estimate.output_cost) # rate_card (platform × tier) and cache tokens are keyword-only (3.2.1) batch = calculator.estimate_cost("claude-opus-4-8", 500_000, 50_000, rate_card="bedrock:batch") cached = calculator.estimate_cost("claude-opus-4-8", 0, 1000, cache_read_tokens=100_000) print(batch.total_cost, cached.cache_cost) print(calculator.get_available_rate_cards()) budget = calculator.check_budget(85.0, 100.0) print(budget.status, budget.percent_used, budget.remaining_budget, budget.alert_message) print(calculator.compare_models("claude-haiku-4-5", "gpt-5.4-mini", 1000, 500)) print(calculator.project_monthly_cost("claude-haiku-4-5", 5000, 2000, 30)) ``` ```python CostCalculator() .estimate_cost(model_name, input_tokens, output_tokens, *, rate_card=None, cache_read_tokens=None, cache_write_5m_tokens=None, cache_write_1h_tokens=None) -> CostEstimate .estimate_cost_from_text(model_name, input_text, estimated_output_tokens, *, rate_card=None) .estimate_tokens(text) .check_budget(current_spend, budget_limit) -> BudgetStatus .compare_models(model_a, model_b, input_tokens, output_tokens, *, rate_card=None) .project_monthly_cost(model_name, daily_input_tokens, daily_output_tokens, days_per_month, *, rate_card=None) .get_available_rate_cards() -> list[str] .get_available_models() .get_cheapest_model(min_context_window) .get_models_by_provider(provider) .get_models_under_cost(max_cost_per_1k) ``` A `rate_card` is a forgiving `platform × tier × modifiers` string (platforms `first_party` / `bedrock` / `vertex` / `azure`; tiers `standard` / `batch` / `cached` / `priority` / `flex`). Omit it for first-party standard pricing. ### `CostEstimate` Attributes: `model_name`, `input_tokens`, `output_tokens`, `input_cost`, `output_cost`, `cache_cost`, `total_cost`, `cost_per_token`, `currency`, plus `to_dict()`. ### `BudgetStatus` Attributes: `status`, `percent_used`, `remaining_budget`, `current_spend`, `budget_limit`, `alert_message`, plus `to_dict()`. ## briefcase.sanitize ```bash pip install briefcase-ai[sanitize] ``` ### `Sanitizer` ```python from briefcase.sanitize import Sanitizer sanitizer = Sanitizer() result = sanitizer.sanitize("Contact support@example.com or call 555-123-4567") print(result.sanitized, result.redaction_count) for redaction in result.redactions: print(redaction.pii_type, redaction.start_position, redaction.end_position) print(sanitizer.contains_pii("support@example.com")) print(sanitizer.analyze_pii("support@example.com")) json_result = sanitizer.sanitize_json({"contact": "support@example.com"}) print(json_result.redaction_count) sanitizer.add_pattern("ticket_id", r"\bTCK-\d{6}\b") ``` ```python Sanitizer() .sanitize(text) -> SanitizationResult .sanitize_json(data) -> SanitizationJsonResult .contains_pii(text) .analyze_pii(text) .add_pattern(name, pattern) .remove_pattern(pattern_name) .set_enabled(enabled) ``` ### `SanitizationResult` Attributes: `sanitized`, `redactions`, `redaction_count`, `has_redactions`, plus `to_dict()`. ### `Redaction` Attributes: `pii_type`, `start_position`, `end_position`, `original_length`, plus `to_dict()`. ## briefcase.validation ```bash pip install briefcase-ai[validate] ``` The validation engine is pluggable: supply an extractor (finds references in a prompt), a resolver (checks each reference), and a versioned client (records the commit the validation ran against). ### `PromptValidationEngine` ```python import re from briefcase.validation import PromptValidationEngine from briefcase.validation.errors import ValidationError, ValidationErrorCode class RegexExtractor: _REF = re.compile(r"[\w/]+\.md") def extract(self, prompt: str) -> list: return self._REF.findall(prompt) class AllowlistResolver: def __init__(self, known: set): self._known = known def resolve_all(self, references: list) -> list: errors = [] for ref in references: if ref not in self._known: errors.append( ValidationError( code=ValidationErrorCode.REFERENCE_NOT_FOUND, message=f"Reference not found: {ref}", reference=ref, severity="error", layer="resolution", remediation="Add the document to the knowledge base.", ) ) return errors class DemoLakeFS: def get_commit(self, repository: str, branch: str) -> str: return "demo0000000000000000000000000000000000000" engine = PromptValidationEngine( extractor=RegexExtractor(), resolver=AllowlistResolver({"kb/faq.md"}), lakefs_client=DemoLakeFS(), repository="knowledge-base", branch="main", mode="strict", ) report = engine.validate("See kb/faq.md and kb/missing.md") print(report.status, report.references_checked, report.has_errors) ``` ```python PromptValidationEngine( extractor, resolver, lakefs_client, repository, branch="main", mode="strict", semantic_validator=None, ) .validate(prompt) -> ValidationReport ``` ### `ValidationReport` Attributes: `status`, `errors`, `warnings`, `references_checked`, `validation_time_ms`, `lakefs_commit`, `has_errors`, `has_warnings`, plus `to_dict()`. ### `ValidationError` ```python ValidationError( code, # ValidationErrorCode message, reference, severity, layer, remediation=None, metadata=None, ) ``` ### `ValidationErrorCode` Enum: `INVALID_SYNTAX`, `REFERENCE_AMBIGUOUS`, `REFERENCE_NOT_FOUND`, `REFERENCE_GONE`, `VERSION_MISMATCH`, `SCHEMA_INVALID`, `LAKEFS_UNAVAILABLE`. ### Pluggable protocols `Extractor.extract(prompt) -> list`, `Resolver.resolve_all(references) -> list`, and `SemanticValidatorProtocol.validate_semantic(prompt, references) -> list`. ## briefcase.guardrails ```bash pip install briefcase-ai[guardrails] ``` `GuardrailEnv` is a protocol. Subclass `BaseGuardrailEnv` and implement `evaluate`. ### `BaseGuardrailEnv`, `EvalRequest`, `EvalResult`, `Effect` ```python from briefcase.guardrails import BaseGuardrailEnv, EvalRequest, EvalResult, Effect class QueueGuardrail(BaseGuardrailEnv): @property def name(self) -> str: return "queue_access" @property def request_space(self): return {} def evaluate(self, request: EvalRequest) -> EvalResult: effect = Effect.ALLOW if request.context.get("priority") == "high" else Effect.DENY return EvalResult(effect=effect, guardrail_name=self.name, reason="priority check") guardrail = QueueGuardrail() request = EvalRequest( agent="triage-bot", action="route", resource="queue:billing", context={"priority": "high"}, ) result = guardrail.evaluate(request) print(result.effect, result.is_allowed) ``` ```python EvalRequest(agent, action, resource, context={}, request_id=None) EvalResult(effect, guardrail_name, reason=None, policy_id=None, lakefs_sha=None, eval_time_ms=0.0, metadata={}) .is_allowed Effect.ALLOW / Effect.DENY ``` ### `make()` ```python from briefcase.guardrails import make # env = make("registered-guardrail-id", **kwargs) ``` ### `GuardrailPipeline` ```python from briefcase.guardrails import GuardrailPipeline pipeline = GuardrailPipeline(stages=[guardrail]) pipeline_result = pipeline.evaluate(request) print(pipeline.name, pipeline.check_compatibility()) ``` ```python GuardrailPipeline(stages, mode=PipelineMode.FIRST_DENY, name="pipeline") .evaluate(request) -> PipelineResult .check_compatibility() .stages ``` ## briefcase.rag ```bash pip install briefcase-ai[rag] ``` Versions an embedding index so it can be invalidated and rebuilt when documents or the embedding model change. ### `VersionedEmbeddingPipeline`, `Document` ```python from briefcase.rag import VersionedEmbeddingPipeline, Document class EmbeddingModel: def embed(self, texts): return [[0.1, 0.2, 0.3] for _ in texts] pipeline = VersionedEmbeddingPipeline(embedding_model=EmbeddingModel()) documents = [ Document(id="doc-1", content="Reset your password from settings.", metadata={"topic": "account"}), ] print(documents[0].content_hash[:10]) batch = pipeline.create_embedding_batch(documents) manifest = pipeline.create_manifest("faq-index", [batch]) report = pipeline.check_invalidation("faq-index", documents) print(manifest.index_name, report.is_valid) ``` ```python VersionedEmbeddingPipeline(embedding_model=None, lakefs_client=None, repository=None, branch="main") .create_embedding_batch(documents, batch_id=None, source_commit=None) .create_manifest(index_name, batches, metadata=None) .check_invalidation(index_name, current_documents, ...) .rebuild_index(index_name, documents, source_commit=None, batch_id=None) .get_latest_manifest(index_name) .get_manifests(index_name, limit=None) Document(id, content, metadata={}, path="") .content_hash ``` ## briefcase.correlation ```bash pip install briefcase-ai[correlation] ``` Correlates multiple agents executed within one workflow context. ### `briefcase_workflow`, `get_current_workflow` ```python from unittest.mock import Mock from briefcase.correlation import briefcase_workflow, get_current_workflow client = Mock() with briefcase_workflow("ticket-triage", client) as workflow: print(workflow.workflow_id) workflow.register_agent("agent-1", "classifier") workflow.register_agent("agent-2", "responder") print(get_current_workflow() is workflow) ``` ```python briefcase_workflow(workflow_name, briefcase_client, workflow_id=None) # yields BriefcaseWorkflowContext # .workflow_id # .register_agent(agent_id, agent_type) get_current_workflow() -> Optional[BriefcaseWorkflowContext] ``` ### Trace propagation ```python from briefcase.correlation import ( TraceContextCarrier, inject_trace_context, extract_trace_context, ) headers = inject_trace_context({}) extract_trace_context(headers) ``` ## briefcase.events ```bash pip install briefcase-ai[events] ``` Emit functions are coroutines; `await` them inside an async context. ### `BriefcaseEvent`, `emit()` ```python import asyncio from briefcase.events import ( BriefcaseEvent, emit, emit_low_confidence, emit_drift_detected, ) async def main(): event = BriefcaseEvent( event_type="low_confidence", decision_id="dec-1", payload={"confidence": 0.4}, ) await emit(event) await emit_low_confidence({"id": "dec-1"}, 0.4, 0.7) await emit_drift_detected({"id": "dec-1"}, {"drift_score": 0.3}) asyncio.run(main()) ``` ```python BriefcaseEvent(event_type, decision_id, timestamp=..., payload={}, idempotency_key=...) async emit(event) async emit_low_confidence(decision, confidence, threshold) async emit_drift_detected(decision, details=None) ``` ## briefcase.external ```bash pip install briefcase-ai[external] ``` Snapshots external data sources (API responses, database query results, file fetches) and detects drift between them. ### `ExternalDataTracker`, `SnapshotPolicy` ```python from briefcase.external import ( ExternalDataTracker, SnapshotPolicy, SnapshotFrequency, ) tracker = ExternalDataTracker( default_policy=SnapshotPolicy( frequency=SnapshotFrequency.ON_CHANGE, retention_days=30, ), ) result = tracker.track_api_call( api_name="product-catalog", endpoint="/products", method="GET", response_data={"items": [1, 2, 3]}, record_count=3, ) print(result["snapshot_id"], result["drift_detected"]) snapshot = tracker.get_latest_snapshot("product-catalog") print(snapshot.source_name) ``` ```python ExternalDataTracker(lakefs_client=None, repository=None, branch="main", default_policy=None, sanitizer=None) .track_api_call(api_name, endpoint, method, response_data, ...) .track_db_query(db_system, db_name, query, result_data=None, ...) .track_file_fetch(source_name, file_data, file_path=None, ...) .detect_drift(source_name, current_data=None, ...) .compare_snapshots(snapshot_a_id, snapshot_b_id) .correct_snapshot(parent_snapshot_id, corrected_data, *, source=None, ...) SnapshotPolicy(frequency=SnapshotFrequency.ON_CHANGE, retention_days=90, change_threshold=0.0, max_snapshots=0, compress=False) SnapshotFrequency.EVERY_CALL / ON_CHANGE / HOURLY / DAILY / WEEKLY ``` ## briefcase.routing ```bash pip install briefcase-ai[routing] ``` The legacy `BaseRouter` interface and a newer policy-versioned routing layer. ### Legacy `BaseRouter` ```python from briefcase.routing import BaseRouter, RoutingDecision class StaticRouter(BaseRouter): def route(self, decision_context) -> RoutingDecision: return RoutingDecision( action="senior-agent", source="static", eval_time_ms=0.1, reason="default route", ) router = StaticRouter() decision = router.route({"priority": "high"}) print(decision.action, decision.source) ``` ### Policy layer: `PolicyRegistry`, `PolicyVersion`, `PolicyRule`, `AgentRouter` ```python from datetime import datetime, timezone from briefcase.routing import ( PolicyRegistry, PolicyVersion, PolicyRule, AgentRouter, ) registry = PolicyRegistry() policy = PolicyVersion( policy_id="ticket-routing", version="1", rules=[ PolicyRule( rule_id="high-priority", condition={"priority": "high"}, choice="senior-agent", rationale="High priority tickets go to senior agents.", ), ], default_choice="general-agent", ) registry.publish(policy, valid_from=datetime.now(timezone.utc)) router = AgentRouter(registry, use_case="ticket-routing", policy_id="ticket-routing") decision = router.route({"priority": "high"}) print(decision.selected, decision.matched_rule_id, decision.policy_version) ``` ```python PolicyRegistry(store=None) .publish(policy, *, valid_from, transaction_time=None, source="policy_registry") .get(policy_id, *, as_of_transaction_time=None, as_of_valid_time=None) .history(policy_id) PolicyVersion(policy_id, version, rules, default_choice=None, description=None) .select(context) -> PolicyEvaluationResult PolicyRule(rule_id, condition, choice, rationale=None) .matches(context) -> bool AgentRouter(registry, *, use_case, policy_id, candidates_provider=None) .route(context, *, evidence_refs=None, as_of_transaction_time=None) -> AgentRoutingDecision ``` `AgentRoutingDecision` attributes: `decision_id`, `use_case`, `context`, `candidates`, `selected`, `policy_id`, `policy_version`, `matched_rule_id`, `evidence_refs`, `rationale`, `decided_at`, plus `to_dict()`. ## briefcase.bitemporal ```bash pip install briefcase-ai[bitemporal] ``` Append-only store that tracks both valid time (when a fact is true) and transaction time (when it was recorded), so any past state can be reconstructed. An Iceberg-backed store is available via `pip install briefcase-ai[bitemporal-iceberg]`. ### `BitemporalRecord`, `InMemoryBitemporalStore` ```python from datetime import datetime, timezone from briefcase.bitemporal import ( BitemporalRecord, InMemoryBitemporalStore, AsOfView, append_correction, ) store = InMemoryBitemporalStore() now = datetime.now(timezone.utc) record = BitemporalRecord.new( key="config:max_retries", valid_time=now, value=3, source="config-service", ) store.append(record) print(store.latest("config:max_retries").value, record.content_hash()[:12]) # Append-only correction (the original stays in history). append_correction(store, record, 5, source="ops") print(store.latest("config:max_retries").value) print(len(store.history("config:max_retries"))) # Reconstruct the store as of a transaction time. view = AsOfView(store, transaction_time=datetime.now(timezone.utc)) print(view.as_of("config:max_retries").value) ``` ```python BitemporalRecord.new(key, valid_time, value, source, *, transaction_time=None, decision=None, source_trust_level=None, parent_record_id=None, metadata=None, record_id=None) .content_hash() -> str .record_id InMemoryBitemporalStore() .append(record) .append_many(records) .latest(key) .history(key) .as_of(key, *, transaction_time=None, valid_time=None) .keys() AsOfView(store, *, transaction_time=None, valid_time=None) append_correction(store, original, corrected_value, *, source=None, ...) batch_append(store, records, *, transaction_time=None) stream_append(store, record) ``` ## briefcase.compliance ```bash pip install briefcase-ai[compliance] ``` Builds a tamper-evident bundle that reproduces a routing decision together with the policy version and evidence records in effect at the decision's transaction time. Integrity is protected by a SHA-256 content hash; `verify()` raises if the bundle was altered. ### `ExaminerBundle` ```python from datetime import datetime, timezone from briefcase.bitemporal import BitemporalRecord, InMemoryBitemporalStore from briefcase.routing import PolicyRegistry, PolicyVersion, PolicyRule, AgentRouter from briefcase.compliance import ExaminerBundle, BundleIntegrityError store = InMemoryBitemporalStore() now = datetime.now(timezone.utc) evidence = BitemporalRecord.new( key="config:max_retries", valid_time=now, value=3, source="config-service", ) store.append(evidence) registry = PolicyRegistry() policy = PolicyVersion( policy_id="ticket-routing", version="1", rules=[PolicyRule(rule_id="gold-tier", condition={"tier": "gold"}, choice="priority-queue")], default_choice="standard-queue", ) registry.publish(policy, valid_from=now) router = AgentRouter(registry, use_case="ticket-routing", policy_id="ticket-routing") decision = router.route({"tier": "gold"}, evidence_refs=[evidence.record_id]) bundle = ExaminerBundle.build(decision, store, registry) print(bundle.content_hash) # "sha256:..." bundle.verify() # raises BundleIntegrityError if tampered restored = ExaminerBundle.from_json(bundle.to_json(indent=2)) restored.verify() ``` ```python ExaminerBundle.build(decision, evidence_store, policy_registry, *, as_of_transaction_time=None, metadata=None) -> ExaminerBundle .verify() # raises BundleIntegrityError .to_json(*, indent=None) .from_json(s) .to_dict() / .from_dict(d) .content_hash # SHA-256 ``` `evidence_refs` must contain the `record_id` of each evidence record in the store. ## briefcase.otel ```bash pip install briefcase-ai[otel] ``` ### `get_tracer()` ```python from briefcase.otel import get_tracer tracer = get_tracer("briefcase") ``` ```python get_tracer(name="briefcase") ``` ## briefcase.exporters ```bash pip install briefcase-ai ``` Stock exporters ship in the base package. The fastest way to wire one up is `briefcase.observe(...)`; construct them directly when you need full control. ### `ConsoleExporter`, `JSONLFileExporter`, `MemoryExporter` ```python import briefcase from briefcase.exporters import ConsoleExporter, JSONLFileExporter, MemoryExporter console = ConsoleExporter() # JSON lines to stderr (default) jsonl = JSONLFileExporter("runs.jsonl") # append-only, thread-safe memory = MemoryExporter() # collect records in .records briefcase.setup(exporter=memory) # or briefcase.observe(memory) @briefcase.capture(async_capture=False) def classify_ticket(text: str) -> str: return "account_access" classify_ticket("reset my password") print(memory.records[0]["function_name"]) # "classify_ticket" memory.clear() ``` ```python ConsoleExporter(stream=None, *, pretty=False) # default stream: sys.stderr JSONLFileExporter(path) MemoryExporter() .records # list of captured decision records .clear() ``` ### `BaseExporter` ```python from briefcase.exporters import BaseExporter class LoggingExporter(BaseExporter): async def export(self, decision) -> bool: print(decision) return True async def flush(self) -> None: pass async def close(self) -> None: pass ``` ```python BaseExporter() async export(decision) -> bool async flush() async close() ``` ## briefcase.mcp ```bash pip install briefcase-ai[mcp] ``` Exposes safe SDK operations to MCP-capable clients (Cursor, Claude Code, Codex, Replit). Run with the `briefcase-mcp` console script or `python -m briefcase.mcp`. The `mcp` extra installs `mcp>=1.2`. ### `build_server()`, `main()` ```python from briefcase.mcp import build_server, main server = build_server() # returns a FastMCP server # main() is the entry point used by the briefcase-mcp console script ``` ```python build_server() -> FastMCP main() -> None ``` Tools exposed to MCP clients: ```python sanitize_text(text) -> {"sanitized", "redactions"} # wraps briefcase.sanitize estimate_cost(model, input_tokens, output_tokens) # wraps briefcase.cost -> {"model", "input_cost", "output_cost", "total_cost"} analyze_drift(outputs: list[str]) # wraps briefcase.drift -> {"consistency_score", "agreement_rate", "consensus_output", "status"} how_to(topic="") -> str # usage guidance ``` The server also exposes a `briefcase://llms-full.txt` resource with the full usage guide. ## briefcase.integrations.lakefs ```bash pip install briefcase-ai[lakefs] ``` Wraps a lakeFS repository so file reads are captured with the commit SHA they were read at. Without the `lakefs` package installed, the client runs in mock mode. ### `VersionedClient` ```python from unittest.mock import Mock from briefcase.integrations.lakefs import VersionedClient client = Mock() versioned_client = VersionedClient( repository="knowledge-base", branch="main", briefcase_client=client, ) if versioned_client.object_exists("config/defaults.json"): data = versioned_client.read_object("config/defaults.json") versioned_client.list_objects(prefix="config/") print(versioned_client.get_commit()) ``` ```python VersionedClient(repository, branch, commit="latest", briefcase_client=None, ...) .read_object(path, return_metadata=False) .upload_object(path, data, content_type="application/octet-stream") .list_objects(prefix="") .object_exists(path) .get_commit() ``` ### `versioned_context`, `versioned` ```python from briefcase.integrations.lakefs import versioned_context, versioned class BriefcaseClient: """Stand-in for a configured Briefcase client (mock mode without lakeFS).""" config = { "lakefs_endpoint": "https://lakefs.example.com/api/v1", "lakefs_access_key": "access-key", "lakefs_secret_key": "secret-key", } client = BriefcaseClient() # Context manager with versioned_context(client, "knowledge-base", "main") as lakefs: config = lakefs.read_object("config/defaults.json") commit = lakefs.get_commit() # Decorator: injects the client as `versioned_client` @versioned(repository="knowledge-base", branch="main") def load_config(versioned_client=None) -> dict: raw = versioned_client.read_object("config/defaults.json") return {"commit": versioned_client.get_commit()} load_config(briefcase_client=client) ``` ```python versioned_context(briefcase_client, repository, branch="main", commit="latest", **kwargs) versioned(repository, branch="main", commit="latest", client_param="versioned_client") ``` ## Next steps - Python SDK — Install, import paths, logging, and lazy-import behavior in prose. (/sdk/python/) - Exporters — Where captured decisions go and how to write your own backend. (/features/exporters/) ## Glossary Source: https://briefcaseai.io/reference/glossary/ > Definitions for the core vocabulary used across Briefcase — decisions, evidence, policies, bitemporal time, and verifiable bundles. The terms below appear throughout the docs. They are grouped by the lifecycle act they belong to. ## Capture **Decision** — A single choice an AI system makes that you want to govern: a classification, a route, an approval. In Briefcase it is recorded as a [`DecisionSnapshot`](/getting-started/core-concepts/). **`DecisionSnapshot`** — The structured, persistent record of one decision: its inputs, outputs, model parameters, execution context, and timing. You build it, store it, reload it, and replay it. **`@capture`** — A decorator that records a lightweight dict for every call to a function and hands it to an exporter. The quick path for live observability; it does not persist a `DecisionSnapshot` on its own. **Input / Output** — Typed wrappers (`Input(name, value, data_type)`, `Output(...)`) around a single named value. An `Output` can carry a confidence score. **`ModelParameters`** — The model configuration captured at call time: model name, provider, and per-parameter settings. Captured so you can tell when a model change caused output drift. **`ExecutionContext`** — The runtime environment a decision ran in: runtime version, resolved dependencies, random seed, and environment variables. Captured so a [replay](/features/replay/) can run in a comparable environment. **Exporter** — A sink that receives captured decision records — console, JSONL file, in-memory, or a custom backend. Wired up with [`observe()`](/features/exporters/). **Fingerprint** — A stable hash of a decision's content (`DecisionSnapshot.fingerprint()`), used to compare and group decisions. ## Control **Guardrail** — A control that decides whether an agent may perform an action on a resource, returning an `EvalResult`. Deny-by-default and side-effect-free. See [Guardrails](/advanced/guardrails/). **`EvalRequest` / `EvalResult` / `Effect`** — A guardrail evaluates an `EvalRequest` (agent, action, resource, context) and returns an `EvalResult` whose `Effect` is `ALLOW` or `DENY`. **Fail closed** — The principle that a control which errors must deny, never allow. An error should never grant access. **Routing decision** — The choice of where a decision goes (e.g. which queue or handler), produced by a router. See [Routing](/advanced/routing/). **Policy / `PolicyVersion` / `PolicyRule`** — A policy is a named set of rules. A `PolicyVersion` is an immutable, published snapshot of those rules; a `PolicyRule` maps a condition to a choice with a rationale. See [Versioned Routing Policy](/advanced/versioned-routing-policy/). ## Store & Query **Storage backend** — A durable store for decisions and snapshots (e.g. `SqliteBackend`). Queryable via `SnapshotQuery`. See [Storage Adapters](/features/storage-adapters/). **Valid time** — When a fact was true in the real world. **Transaction time** — When the system learned about that fact and recorded it. **Bitemporal record** — An append-only record carrying both valid time and transaction time, so corrections are new records rather than edits and any past state can be reconstructed. See [Bitemporal Storage](/advanced/bitemporal-storage/). **As-of view** — A reconstruction of a store as it stood at a chosen transaction (or valid) time — what was known then, without look-ahead. **Snapshot policy** — Controls how often an [external source](/advanced/external-data/) is snapshotted (`EVERY_CALL`, `ON_CHANGE`, `HOURLY`, `DAILY`, `WEEKLY`). **Manifest** — In [RAG versioning](/advanced/rag-versioning/), the atomic record of which documents and embedding model produced an index, used to detect staleness. ## Replay & Verify **Replay** — Re-executing a stored decision and comparing the result to the original. Modes are `strict` (exact) and `tolerant` (allows minor differences). See [Deterministic Replay](/features/replay/). **Drift** — How much a model's outputs vary across repeated runs of the same decision. Measured by a consistency score and related metrics. See [Drift Detection](/features/drift-detection/). **Audit bundle / `ExaminerBundle`** — A self-contained artifact that joins a decision, its bitemporal evidence, and the policy version in effect, sealed with a SHA-256 content hash. See [Audit Bundles](/advanced/compliance-bundles/). **Content hash** — The SHA-256 hash that seals a bundle. `verify()` recomputes it and raises if a single byte changed. > Still missing a term? If a term here is unfamiliar, start with [Core Concepts](/getting-started/core-concepts/) for the object model, or [Why Briefcase](/getting-started/why-briefcase/) for how the acts fit together. ======================================================================== # Resources ======================================================================== ## Architecture Source: https://briefcaseai.io/resources/architecture/ > System architecture of Briefcase AI. ## Overview Briefcase AI is a layered system. The Python package you install (`briefcase-ai`) is the SDK surface — the `@capture` decorator, configuration, and pure-Python feature modules. Underneath, a PyO3 binding crate exposes a native extension module (`briefcase._native`) backed by a fast Rust core (~11K lines, the `briefcase-core` crate) that provides high-performance decision tracking, replay, drift, cost, sanitization, and SQLite storage. ```mermaid graph TD A[Python package: briefcase-ai] --> B[PyO3 bindings: briefcase._native] B --> C[Rust core: briefcase-core] C --> D[SQLite storage backend] ``` > Diagram description A top-down stack of four layers. The Python package `briefcase-ai` calls the PyO3 bindings `briefcase._native`, which call the Rust core `briefcase-core`, which in turn reads and writes the SQLite storage backend. ## Rust Core (`briefcase-core`) The core crate lives in `crates/briefcase-core` and exposes feature-gated modules: - `models` - `DecisionSnapshot`, `Input`, `Output`, `ModelParameters`, and related types - `storage` - the SQLite storage backend (`SqliteBackend`) - `replay` - the deterministic replay engine - `drift` - drift and consistency calculation - `cost` - token-cost estimation and budget checks - `sanitization` - PII detection and redaction Feature flags (`recording`, `async`, `storage`, `replay`, `drift`, `sanitize`, `otel`, `tokens`, and others) control which modules compile. The Python extension is built with the full feature set. ## PyO3 Bindings (`briefcase._native`) The `bindings/python` crate (`briefcase-python`, library name `briefcase_native`) uses [PyO3](https://pyo3.rs/) and [maturin](https://github.com/PyO3/maturin) to compile the Rust core into the `briefcase._native` extension module. The native-backed Python modules (`briefcase.cost`, `briefcase.drift`, `briefcase.sanitize`, `briefcase.storage`, `briefcase.replay`) import their classes from this extension. ## Python Package (`briefcase-ai`) The Python layer adds: - the `@capture` decorator, which records a lightweight decision dict and ships it through an exporter - configuration via `setup()`, `init()`, and `BriefcaseConfig` - pure-Python feature modules: validation, guardrails, RAG versioning, correlation, events, external-data tracking, routing, bitemporal primitives, and audit bundles - an optional lakeFS integration (`briefcase.integrations.lakefs`) — one bundled versioned-data source; others plug in through the generic VCS protocol ## Capture and Replay Flow The `@capture` decorator and the native runtime layer are separate paths. `@capture` records a dict and hands it to an exporter; persistence and replay use the native `DecisionSnapshot` objects directly. ```mermaid sequenceDiagram participant App participant Capture as capture decorator participant Exporter App->>Capture: call function Capture->>App: function result Capture->>Exporter: export decision dict ``` > Diagram description A sequence across App, the capture decorator, and the Exporter. The App calls the wrapped function through the capture decorator, which returns the result to the App and then exports the recorded decision dict to the Exporter. ```mermaid sequenceDiagram participant App participant Storage as SqliteBackend participant Replay as ReplayEngine App->>Storage: save_decision(snapshot) Storage-->>App: decision_id App->>Replay: replay(decision_id, mode) Replay->>Storage: load snapshot Replay-->>App: ReplayResult ``` > Diagram description A sequence across App, the SqliteBackend, and the ReplayEngine. The App saves a snapshot to storage and receives a decision_id, then asks the ReplayEngine to replay that id; the engine loads the snapshot from storage and returns a ReplayResult to the App. ## Next steps - Core Concepts — How snapshots, capture, and replay fit together in practice. (/getting-started/core-concepts/) - Decision Recording — Record structured decisions that flow through these layers. (/features/decision-recording/) ## Examples Source: https://briefcaseai.io/resources/examples/ > Example code and use cases for Briefcase AI. ## Capture a Decision The `@capture` decorator records a lightweight decision dict and hands it to an exporter. It does not persist a native `DecisionSnapshot` itself. ```python from briefcase import capture @capture(decision_type="summarize") def summarize(text: str) -> str: # Replace with your real model call, e.g. client.responses.create(...). return text[:280] result = summarize("Long document text...") ``` ## Configure an Exporter `briefcase.observe()` wires up an exporter in one line and returns it, so `@capture` decisions are actually emitted. Pass `"console"`, `"memory"`, a `*.jsonl` path, or a `BaseExporter` instance. ```python import briefcase mem = briefcase.observe("memory") # or "console", or "decisions.jsonl" @briefcase.capture(decision_type="summarize", async_capture=False) def summarize(text: str) -> str: return text[:280] summarize("Long document text...") print(mem.records[0]) ``` The stock exporters live in `briefcase.exporters`: `ConsoleExporter` (JSON lines to stderr), `JSONLFileExporter` (append to a file), and `MemoryExporter` (collect in `.records`). For full control, subclass `BaseExporter` and register it with `setup()` or pass it to `observe()`. ```python from briefcase import setup, capture from briefcase.exporters import BaseExporter class PrintExporter(BaseExporter): async def export(self, decision) -> bool: print(decision) return True async def flush(self) -> None: ... async def close(self) -> None: ... setup(exporter=PrintExporter()) @capture() def classify(text: str) -> str: return "billing" ``` See [Exporters](/features/exporters/) for the full reference. ## Persist and Replay a Snapshot The native runtime layer is separate from `@capture`. Call `init()` to start the native runtime, build a `DecisionSnapshot`, save it to a `SqliteBackend`, then replay it with the `ReplayEngine`. ```python from briefcase import DecisionSnapshot, Input, Output, init from briefcase.storage import SqliteBackend from briefcase.replay import ReplayEngine init() # start the native runtime before persisting # Record a classify_ticket decision from the support-triage agent. decision = ( DecisionSnapshot("classify_ticket") .with_module("support_service") ) decision.add_input(Input("ticket_text", "My invoice is wrong", "string")) decision.add_output(Output("category", "billing", "string").with_confidence(0.93)) # Persist it. SqliteBackend.in_memory() is handy for examples and tests. storage = SqliteBackend.in_memory() decision_id = storage.save_decision(decision) # Replay the recorded decision against the stored snapshot. # Modes: "strict" (exact match) or "tolerant" (the default). engine = ReplayEngine(storage) result = engine.replay(decision_id, "strict") print(result.status, result.outputs_match) ``` ## Measure Drift Across Outputs `DriftCalculator.calculate_drift()` scores the consistency of a set of outputs and reports the consensus value and any outliers. ```python from briefcase.drift import DriftCalculator calculator = DriftCalculator() outputs = ["billing", "billing", "account", "billing", "billing"] metrics = calculator.calculate_drift(outputs) print(f"Consistency: {metrics.consistency_score:.3f}") print(f"Agreement: {metrics.agreement_rate:.3f}") print(f"Consensus: {metrics.consensus_output}") print(f"Status: {metrics.get_status(calculator)}") ``` ## Estimate Cost and Check a Budget ```python from briefcase.cost import CostCalculator calculator = CostCalculator() estimate = calculator.estimate_cost("gpt-4", 1000, 500) print(f"Total: ${estimate.total_cost:.4f}") status = calculator.check_budget(85.0, 100.0) print(f"{status.percent_used:.1f}% used - {status.status}") ``` ## Redact PII ```python from briefcase.sanitize import Sanitizer sanitizer = Sanitizer() result = sanitizer.sanitize("Contact support at support@example.com") print(result.sanitized) print(f"{result.redaction_count} redaction(s)") for redaction in result.redactions: print(redaction.pii_type, redaction.start_position, redaction.end_position) ``` ## Correlate a Multi-Agent Workflow `briefcase_workflow` is a context manager that links every agent that runs inside it under one workflow ID. ```python from unittest.mock import Mock from briefcase.correlation import briefcase_workflow client = Mock() # replace with a real Briefcase client with briefcase_workflow("content_pipeline", client) as workflow: print(f"Workflow: {workflow.workflow_id}") workflow.register_agent("retriever", "retrieval") workflow.register_agent("summarizer", "generation") workflow.register_agent("reviewer", "moderation") ``` ## Track and Compare Model Versions with oci-bai Push images through the oci-bai gateway and use the CLI to inspect commits, compare versions, and search the catalog: ```bash docker tag my-base:latest localhost:8080/rl-gym-env:cuda-base docker push localhost:8080/rl-gym-env:cuda-base docker tag my-candidate:latest localhost:8080/rl-gym-env:cartpole docker push localhost:8080/rl-gym-env:cartpole # Inspect and compare oci-bai --repo rl-gym-env log cartpole oci-bai --repo rl-gym-env diff cuda-base cartpole --depth package oci-bai --repo rl-gym-env diff cuda-base cartpole --depth bench # Search the catalog oci-bai search "format==safetensors cuda>=12.4" ``` See the [Quick Start](/evaluate/quickstart/) for the end-to-end walkthrough and [CLI Reference](/evaluate/cli/) for every command and flag. oci-bai is in private beta — contact [support@briefcaseai.org](mailto:support@briefcaseai.org) for access. ## More Examples See the [examples directory](https://github.com/briefcasebrain/briefcase-ai-sdk/tree/main/examples) for complete, runnable scripts covering basic usage, prompt validation, lakeFS versioning, and multi-agent correlation. ## Next steps - Quickstart — Install Briefcase and run your first capture in a few minutes. (/getting-started/quickstart/) - Exporters — Choose where the decisions in these examples are sent. (/features/exporters/) ## Changelog Source: https://briefcaseai.io/resources/changelog/ > Release history for Briefcase AI. The format is based on [Keep a Changelog](https://keepachangelog.com/), and this project follows [Semantic Versioning](https://semver.org/). ## [3.2.1] - 2026-05-30 ### Added - **Cost rate cards** (`briefcase.cost.CostCalculator.estimate_cost`): an optional keyword-only `rate_card` selects a `platform × tier × modifier` pricing scheme — platforms `first_party` / `bedrock` / `vertex` / `azure`, tiers `standard` / `batch` / `cached` / `priority` / `flex`, and modifiers for long-context tiered pricing, data residency (`us`, +10%), and fast-mode. Cards are forgiving strings such as `"batch"`, `"bedrock:batch"`, or `"first_party:fast"`; batch/flex are 0.5×, cache reads are 0.1× of input, and regional/residency add 10%. New keyword-only `cache_read_tokens` / `cache_write_5m_tokens` / `cache_write_1h_tokens` arguments bill prompt-cache usage, a `cache_cost` field is exposed on `CostEstimate`, and `get_available_rate_cards()` lists representative cards. Omitting `rate_card` (or passing `"standard"`) preserves the previous first-party standard pricing. - **Latest model pricing**: added Anthropic Claude 4.x (`claude-opus-4-8` / `4-7` / `4-6` / `4-5` / `4-1`, `claude-sonnet-4-6` / `4-5`, `claude-haiku-4-5` / `3-5`), OpenAI GPT-5.x (`gpt-5.5`, `gpt-5.5-pro`, `gpt-5.4`, `gpt-5.4-mini`, `gpt-5.4-nano`, `gpt-5.4-pro`), and Google Gemini (`gemini-3.5-flash`, `gemini-3.1-pro`, `gemini-3.1-flash-lite`, `gemini-3-flash`, `gemini-2.5-pro` / `flash` / `flash-lite`) to the default pricing table. All previously available models are retained. ### Changed - `CostCalculator.estimate_cost`, `estimate_cost_from_text`, and `project_monthly_cost` gained keyword-only `rate_card` (and, for `estimate_cost`, cache-token) parameters. The existing positional arguments and their `input_tokens` / `output_tokens` keyword names are unchanged, so existing calls behave identically. - The MCP `estimate_cost` tool accepts an optional `rate_card` and returns a `cache_cost` field. ### Fixed - A single stable-ABI wheel per platform now installs on Python 3.9–3.13 (previously the prebuilt wheel was effectively 3.11-only). - The source distribution now bundles `LICENSE` and `NOTICE`. ## [3.2.0] - 2026-05-30 ### Added - Stock exporters in the base package (`briefcase.exporters`): `ConsoleExporter` (JSON lines to stderr), `JSONLFileExporter` (append-only, thread-safe), and `MemoryExporter` (collects records in `.records`). - One-line observability setup: `briefcase.observe(exporter="console", *, level=None)` wires the global exporter so `@capture` records are emitted, and returns the configured exporter for inspection. - Centralized logging in the base package: top-level `enable_logging`, `set_log_level`, `disable_logging`, and `get_logger`. The library is silent by default (`NullHandler`); set `BRIEFCASE_LOG_LEVEL=DEBUG` to enable logging automatically at import. - MCP server (`pip install briefcase-ai[mcp]`): the `briefcase-mcp` console script (or `python -m briefcase.mcp`) exposes `sanitize_text`, `estimate_cost`, `analyze_drift`, and `how_to` tools plus a `briefcase://llms-full.txt` resource to MCP-capable clients. `briefcase.mcp` exports `build_server()` and `main()`. - LLM-friendly `llms.txt`, `llms-full.txt`, and `AGENTS.md` so coding assistants can discover the API surface. - Bitemporal evidence primitives (`briefcase.bitemporal`): `BitemporalRecord`, the `BitemporalStore` protocol with in-memory, SQLite, and Iceberg backends, `AsOfView`, append-only corrections, and batch/stream ingest. - Versioned routing policy (`briefcase.routing`): `PolicyRegistry`, `PolicyVersion`, `PolicyRule`, `AgentRouter`, and `AgentRoutingDecision`. - Audit bundles (`briefcase.compliance`): `ExaminerBundle` with SHA-256 content-hash integrity and tamper detection. - Top-level `briefcase.capture`, `briefcase.setup`, and `briefcase.BriefcaseConfig` re-exports for discoverability. - `ExternalDataTracker(sanitizer=...)` to redact PII from external-data snapshots before they are persisted to durable storage. - `scripts/check_imports.py` import-smoke test for the built wheel. ### Fixed - `briefcase.cost`, `briefcase.drift`, and `briefcase.sanitize` now import from a clean source build. The native bindings were missing `add_class` registrations for `CostEstimate`, `BudgetStatus`, `DriftMetrics`, `Redaction`, `SanitizationResult`, and `SanitizationJsonResult`; `briefcase.cost` also imported a non-existent `BudgetAlert` type. - `briefcase.rag` no longer fails to import on a spurious `pyarrow` requirement. - Misleading `ImportError` messages on native-backed modules now point to reinstall/rebuild rather than no-op pip extras. - `scripts/version_sync.py` missing `Iterable` import; the manifest now also tracks `bindings/python/Cargo.toml`. - The flagship `examples/python-basic` and validation examples now run end-to-end. ### Security - External-data snapshots can be redacted before persistence; redaction fails closed if it errors. - Expanded PII detection: corrected the email regex and added GitHub, GitLab, Stripe, and Hugging Face API-key prefixes. - Robust telemetry opt-out: `BRIEFCASE_TELEMETRY` now accepts `0`, `false`, `no`, and `off`. - `source_name` is sanitized before use in storage object keys (path-traversal hardening). ### Changed - Deduplicated the optional OpenTelemetry import into `briefcase._otel`. - Extracted guardrail core data types into `briefcase.guardrails._types`. - CI builds and tests across Python 3.9-3.13, runs the native binding tests, and import-smoke-tests the built wheel before publish. ## [3.0.0] - 2026-03-22 ### Added - Initial open-source release: decision tracking, deterministic replay, drift and cost calculation, PII sanitization, and SQLite storage, backed by a Rust core. ## FAQ Source: https://briefcaseai.io/resources/faq/ > Frequently asked questions about Briefcase AI. ## What is Briefcase AI? Briefcase AI is an open-source Python SDK for recording, replaying, and auditing AI decisions. It captures every input, output, and parameter of AI calls as immutable snapshots. ## What languages are supported? You install and use Briefcase in Python — `pip install briefcase-ai`, no Rust toolchain required. The performance-critical core is written in Rust and ships precompiled inside the wheel. See [Build the Rust Core](/sdk/rust/) only if you want to contribute to that core. ## Which AI frameworks does it support? The open-source SDK provides extensible framework protocols (guardrails, exporters, routers, event emitters). Pre-built integrations for LangChain, CrewAI, LlamaIndex, AutoGen, AG2, and OpenAI Agents are available in [Briefcase AI Enterprise](https://github.com/briefcasebrain/briefcase-ai-sdk-enterprise). ## Where are persistent decisions stored? A `DecisionSnapshot` you persist goes to a **storage backend**: by default an in-memory SQLite database (non-persistent), or a SQLite file / custom backend you configure for production. See [Storage Adapters](/features/storage-adapters/). ## How does `@capture` emit decisions? `@capture` is a separate, lighter path — it sends to an **exporter**, not a storage backend. It records a lightweight dict per call and emits it once you call `briefcase.observe(...)`. Pass `"console"` to write JSON lines to stderr, a path ending in `.jsonl` to append to a file, or `"memory"` to collect records in `MemoryExporter.records`. `@capture` exports in a background thread by default; use `@capture(async_capture=False)` to make a record available synchronously. ## Is there an MCP server? Yes. Install the `mcp` extra (`pip install briefcase-ai[mcp]`) and run `briefcase-mcp` (or `python -m briefcase.mcp`). It exposes `sanitize_text`, `estimate_cost`, `analyze_drift`, and `how_to` tools to MCP-capable clients. See the [Python API Reference](/api/python/#briefcasemcp). ## Does it add latency? The Rust core adds microseconds of overhead per decision. Storage write latency depends on your backend choice. ## Is it production-ready? Briefcase AI is under active development. Check the [Changelog](/resources/changelog/) for the latest release status. ## How do I contribute? See [Development](/contributing/development/) and [Code Standards](/contributing/code-standards/). ======================================================================== # Contributing ======================================================================== ## Development Source: https://briefcaseai.io/contributing/development/ > Set up a development environment for Briefcase AI. ## Prerequisites - Rust 1.70+ (a recent stable toolchain; CI builds on `stable`) - Python 3.9+ - [maturin](https://github.com/PyO3/maturin) ## Clone the Repository ```bash git clone https://github.com/briefcasebrain/briefcase-ai-sdk.git cd briefcase-ai-sdk ``` The repository is a Cargo workspace: the Rust core (`crates/briefcase-core`), the PyO3 bindings (`bindings/python`), and the Python package (`briefcase/`). ## Install Development Dependencies ```bash pip install briefcase-ai[dev] pip install maturin ``` ## Build the Native Extension `maturin develop` compiles the Rust core through the bindings and installs `briefcase._native` into the active environment: ```bash maturin develop ``` ## Build the Rust Core Directly ```bash cargo build -p briefcase-core --locked ``` ## Run the Test Suites ```bash cargo test -p briefcase-core --locked # Rust core pytest tests/ # Python facade (mocks the extension) pytest bindings/python/tests/ # native binding tests (real extension) ``` See [Testing](/contributing/testing/) for details on the facade vs. native split. ## Development Workflow 1. Create a branch from `main`. 2. Make changes and add tests. 3. Run the formatters and linters (see [Code Standards](/contributing/code-standards/)). 4. Run the test suites. 5. Submit a pull request. ## Build the Rust Core Source: https://briefcaseai.io/sdk/rust/ > For contributors — build the Briefcase AI Rust core and Python bindings from source. Most users never need this page — `pip install briefcase-ai` ships precompiled wheels with no Rust toolchain required. This guide is for **contributors** who want to build the Rust core and Python bindings from source. ## Prerequisites - A recent stable Rust toolchain (Rust 1.70+; CI builds on `stable`) - Python 3.9+ - [maturin](https://github.com/PyO3/maturin) The crates use Rust edition 2021. The `Cargo.toml` does not pin a minimum Rust version, so any current stable toolchain works. ## Clone ```bash git clone https://github.com/briefcasebrain/briefcase-ai-sdk.git cd briefcase-ai-sdk ``` The repository is a Cargo workspace with the core library (`crates/briefcase-core`) and the PyO3 bindings (`bindings/python`). ## Build the Rust Core ```bash cargo build -p briefcase-core --locked ``` ## Run Tests ```bash cargo test -p briefcase-core --locked ``` ## Build the Python Bindings ```bash pip install maturin maturin develop ``` This compiles the Rust core through the bindings and installs the `briefcase._native` extension into the active environment. Add `--release` for an optimized build: ```bash maturin develop --release ``` See [Development](/contributing/development/) for the full contributor workflow. ## Next steps - Development — The full contributor workflow: tooling, tests, and conventions. (/contributing/development/) ## Testing Source: https://briefcaseai.io/contributing/testing/ > Run and write tests for Briefcase AI. ## Install Test Dependencies ```bash pip install briefcase-ai[dev] ``` The `dev` extra installs `pytest`, `pytest-mock`, `pytest-asyncio`, `pytest-subtests`, `black`, `flake8`, and `mypy`. ## Build the Native Extension First The Python facade tests mock `briefcase._native`, while the native binding tests run against the real extension. Build it before running either suite: ```bash maturin develop ``` ## Run the Test Suites ```bash pytest tests/ # Python facade (mocks briefcase._native) pytest bindings/python/tests/ # native binding tests (real extension) python scripts/check_imports.py # smoke-test that every submodule imports ``` The facade suite and the native binding suite mock the extension differently, so run them in separate processes (do not collect both in one `pytest` invocation). ## Run Rust Tests ```bash cargo test -p briefcase-core --locked ``` ## Run Specific Tests ```bash cargo test -p briefcase-core test_snapshot_creation pytest tests/ -k "capture" ``` ## Continuous Integration CI builds the native extension with `maturin` and runs the Python suites across Python 3.9, 3.10, 3.11, 3.12, and 3.13. A separate job builds, tests, and clippy-lints the Rust core. ## Writing Tests ### Rust ```rust #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn test_snapshot_creation() { let snapshot = DecisionSnapshot::new("ai_function") .add_input(Input::new("query", json!("hello"), "string")) .add_output(Output::new("response", json!("hi"), "string")); assert_eq!(snapshot.function_name, "ai_function"); } } ``` ### Python The facade test suite mocks `briefcase._native`, so import the public API and assert on behavior: ```python from briefcase import capture def test_capture_returns_result(): @capture() def my_fn(x): return x * 2 assert my_fn(5) == 10 ``` ## Code Standards Source: https://briefcaseai.io/contributing/code-standards/ > Code style and conventions for Briefcase AI. ## Rust - Format with `rustfmt` (configured in `.rustfmt.toml`, edition 2021) - Run `cargo clippy` with warnings denied before committing - All public APIs must have doc comments ```bash cargo fmt --all -- --check cargo clippy -p briefcase-core --locked -- -D warnings ``` ## Python - Follow PEP 8 - Use type annotations - Format with `black`, lint with `flake8`, and type-check with `mypy` Install the tooling with the `dev` extra: ```bash pip install briefcase-ai[dev] ``` ```bash black briefcase/ tests/ flake8 briefcase/ tests/ mypy briefcase/ ``` ## Commit Messages Use conventional commits: ``` feat: add drift detection threshold config fix: handle empty snapshots in replay docs: update storage adapter examples ``` ## Pull Request Process 1. Branch from `main` 2. Write tests for new functionality 3. Ensure all tests pass 4. Request review from a maintainer ## Governance Source: https://briefcaseai.io/contributing/governance/ > Project governance and decision-making. ## License Briefcase AI is licensed under Apache-2.0. See [LICENSE](https://github.com/briefcasebrain/briefcase-ai-sdk/blob/main/LICENSE). ## Contributions All contributions are welcome. Please read [Development](/contributing/development/) and [Code Standards](/contributing/code-standards/) before submitting a pull request. ## Maintainers The project is maintained by the Briefcase Brain team. Maintainer decisions are made by consensus. ## Reporting Issues File issues on [GitHub](https://github.com/briefcasebrain/briefcase-ai-sdk/issues).