# Briefcase AI — full documentation for AI assistants
> Infrastructure for governing the decisions your AI systems make: enforce controls before an action runs, capture the full context behind every decision, and keep a complete, reproducible record you can verify later. Open-source Python SDK with a Rust core.
Auto-generated from the docs at https://briefcaseai.io. A link index is at https://briefcaseai.io/llms.txt
========================================================================
# Start Here
========================================================================
## Why Briefcase
Source: https://briefcaseai.io/getting-started/why-briefcase/
> Briefcase is infrastructure for governing the decisions your AI systems make — enforce controls before an action runs, capture the full context behind every decision, and keep a complete, reproducible record you can verify later.
AI systems don't just produce text — they **make decisions that trigger actions**: routing a support ticket, approving a request, choosing a tool, escalating to a human. When one of those decisions is wrong, "the model did it" is not an answer anyone can act on.
Briefcase is **infrastructure for governing those decisions**. It sits around the decision points in your application and gives you three things that are otherwise impossible to reconstruct after the fact:
**Controls before action** —
Evaluate whether an action is allowed **before** it runs — deny-by-default, composable, and side-effect-free.
**Full context, captured** —
Every decision is recorded with its inputs, outputs, model parameters, evidence, and the data it depended on.
**A record you can verify** —
Replay decisions, reconstruct exactly what was known at the time, and seal it into a tamper-evident bundle.
## The questions Briefcase lets you answer
When a decision is challenged — by a teammate, an incident review, or a customer — you need to answer, precisely and after the fact:
- **What did the system decide, and what did it see?** The inputs, outputs, and confidence behind the call.
- **What rule governed it?** The exact policy version that was in effect at the decision's moment — not today's policy.
- **Did the controls run first?** Proof that a guardrail evaluated the action before anything happened.
- **What did we know at the time?** The evidence and external data as they were then — corrections appended, never overwritten.
- **Can we reproduce it?** A deterministic replay that compares the original output against a fresh run.
> Built for accountability
Briefcase is designed for teams where accountability, traceability, and operational control are non-negotiable — where "we think it was fine" has to become "here is the record, and here is the proof it wasn't tampered with."
## How it works: five acts
Briefcase organizes around the lifecycle of a single decision. The rest of these docs follow the same five acts, and a single running example threads through all of them: a **support-ticket triage agent**. Each ticket it handles produces two decisions you'll see throughout — it **classifies** the ticket (the `classify_ticket` call in most examples) and **routes** it to a queue. Both are decisions Briefcase captures, governs, and can replay.
```mermaid
graph LR
A["Capture
record inputs, outputs,
context, evidence"] --> B["Control
enforce guardrails &
versioned policy"]
B --> C["Store & Query
durable, append-only,
queryable trail"]
C --> D["Replay & Verify
re-run, compare,
detect drift"]
D --> E["Prove
reconstruct as-of &
seal an audit bundle"]
```
> Diagram description
A left-to-right flow of five stages: Capture (record inputs, outputs, context, and evidence) → Control (enforce guardrails and a versioned policy before the action) → Store & Query (a durable, append-only, queryable trail) → Replay & Verify (re-run a decision, compare outputs, detect drift) → Prove (reconstruct what was known at a past time and seal a tamper-evident bundle).
| Act | What you do | Key building blocks |
| --- | --- | --- |
| **Capture** | Record every decision with full context | `@capture`, `DecisionSnapshot`, exporters, PII sanitization |
| **Control** | Enforce controls before the action runs | Guardrails, routing, versioned routing policy, validation |
| **Store & Query** | Keep a durable, queryable, append-only trail | Storage adapters, bitemporal storage, external data, RAG versioning |
| **Replay & Verify** | Re-run and check decisions hold up | Deterministic replay, drift detection, audit bundles |
| **Prove** | Reconstruct and verify after the fact | As-of reconstruction, `ExaminerBundle` |
## Who Briefcase is for
- I use an AI coding assistant — Let your AI editor add Briefcase for you — point it at the docs or give it the MCP tools. Start with AI-Assisted Setup. (/getting-started/ai-assisted-setup/)
- Engineers — Instrument a decision point in minutes, send records anywhere, and replay to catch regressions. Start with the Quickstart. (/getting-started/quickstart/)
- Platform & governance leads — Define controls that run before actions, route through versioned policies, and prove which rule was in effect. Start with Guardrails. (/advanced/guardrails/)
- Reproducibility & audit reviewers — Reconstruct past decisions exactly and verify a sealed, tamper-evident record. Start with Audit Bundles. (/advanced/compliance-bundles/)
## Where it runs
Briefcase is an open-source Python SDK (with a Rust core) that wraps the decision points in code you already have. It is independent of model, vendor, and framework: bring your own LLM calls and storage. The base package is `pip install briefcase-ai`; optional capabilities are installed as extras.
## Next steps
- Quickstart — Record, persist, and replay your first decision in about 5 minutes. (/getting-started/quickstart/)
- Core Concepts — The object model behind every decision: snapshots, inputs, outputs, evidence. (/getting-started/core-concepts/)
- Audit a Decision End-to-End — Follow one decision from capture all the way to a verifiable sealed record. (/guides/audit-a-decision/)
## AI-Assisted Setup
Source: https://briefcaseai.io/getting-started/ai-assisted-setup/
> The fastest way to add Briefcase to a project — let your AI coding assistant do it. Point it at the docs, or give it Briefcase's tools over MCP.
The fastest way to instrument decisions with Briefcase is to let the AI assistant
you already code with do it for you. You don't have to memorize the API — give
your assistant the docs (or Briefcase's own tools) and ask it to add capture,
controls, and replay to a function.
> Two ways, use either or both
**Point your assistant at the docs** so it answers from accurate, current Briefcase
material. **Add the MCP server** so it can run Briefcase tools — redact PII,
estimate cost, check drift — right inside your editor. They compose: docs for
knowledge, MCP for actions.
_Point your assistant at the docs_
Every page on this site is also published as machine-readable text an assistant
can ingest:
- **`https://briefcaseai.io/llms.txt`** — an index of the docs (titles + links), grouped by the five acts. Best as a map.
- **`https://briefcaseai.io/llms-full.txt`** — every page concatenated as plain text. Best for "read this, then help me."
Both files are generated from the docs on every build, so they always match what's published here.
1. ### Give your assistant the docs
In Cursor, Claude Code, Copilot, or any assistant that can read a URL, paste
the link (or add it to the project's context / docs settings):
```text
Read https://briefcaseai.io/llms-full.txt — that's the Briefcase AI SDK.
```
2. ### Ask it to instrument a function
```text
Using Briefcase, add @capture and observe() to classify_ticket() in
app/triage.py so every classification is recorded, then show me how to read
the records back.
```
3. ### Review what it wrote
The assistant should produce the canonical pattern — `briefcase.observe(...)`
plus `@briefcase.capture(...)`. Confirm it matches the [Quickstart](/getting-started/quickstart/).
`llms.txt` is an emerging convention for exposing docs to AI tools. Even
assistants without a built-in "docs" setting can usually read a pasted URL.
_Give your assistant Briefcase_
The Briefcase MCP server exposes a small set of **read-only** tools your assistant
can call directly: redact PII, estimate model cost, analyze drift, and look up
usage. It also serves a `briefcase://llms-full.txt` resource so the assistant can
read the guide in-editor.
1. ### Install the server
```bash
pip install "briefcase-ai[mcp]"
```
The `[mcp]` extra installs `mcp>=1.2`. The server runs over stdio with the
`briefcase-mcp` command (or `python -m briefcase.mcp`).
2. ### Register it with your assistant
Point your MCP-capable client at the `briefcase-mcp` command. The config file
differs per tool, but the shape is the same:
```json
{
"mcpServers": {
"briefcase": {
"command": "briefcase-mcp"
}
}
}
```
3. ### Use the tools from your editor
Ask the assistant things like:
```text
Use the briefcase tools to redact PII from this support ticket, then estimate
the cost of classifying it with claude-haiku-4-5.
```
The assistant calls `sanitize_text` and `estimate_cost` — the latter now
supports a `rate_card` (e.g. `"bedrock:batch"`) and returns a `cache_cost`,
added in [v3.2.1](/resources/changelog/).
The server exposes `sanitize_text`, `estimate_cost`, `analyze_drift`, and `how_to`.
See the [MCP Server](/integrations/mcp/) reference for each tool's inputs and
outputs.
## Prefer to do it by hand?
The [Quickstart](/getting-started/quickstart/) walks the same path manually —
record, persist, and replay a decision — in about 5 minutes.
## Where this fits
- Quickstart — The manual path: record, persist, and replay a decision. (/getting-started/quickstart/)
- MCP Server — Full reference for the tools and resource your assistant can call. (/integrations/mcp/)
- Why Briefcase — What Briefcase governs, and the five-act lifecycle. (/getting-started/why-briefcase/)
- What — oci-bai artifact graph launch, cost rate cards, and latest model pricing. (/getting-started/whats-new/)
## Quickstart
Source: https://briefcaseai.io/getting-started/quickstart/
> Record, persist, and replay your first decision in about 5 minutes.
> Fastest: set up with your AI assistant
The quickest way to add Briefcase is to let your AI editor do it — point it at the
docs or give it Briefcase's tools over MCP. See [AI-Assisted Setup](/getting-started/ai-assisted-setup/).
Prefer to do it by hand? Continue below.
## Install
```bash
pip install briefcase-ai
```
## Pick your path
Briefcase records decisions two ways. They serve two different outcomes — pick based on whether the decision needs to outlive the current process.
> Pick your path
Reach for **Live Observability** when you just want to watch decisions as they happen — local debugging, a notebook, a test. Reach for **Persistent Decisions** the moment you'll need to load, replay, or audit a decision *after* the process ends — that's the foundation for the rest of the journey.
| Path | Outcome | API | When to use |
| --- | --- | --- | --- |
| **Live Observability** | Watch decisions as they happen | `@capture` + `observe()` | Local debugging, notebooks, low-overhead logging |
| **Persistent Decisions** | Reload, replay & audit later | `DecisionSnapshot` + a backend | Anything you'll need to reproduce, verify, or govern |
_@capture (Live Observability)_
- Lightweight: wraps a function and logs its inputs, outputs, and timing.
- Handed to an exporter (console, file, or memory) — not persisted to a backend.
- Best for low-overhead logging and live observability.
_DecisionSnapshot (Persistent Decisions)_
- Structured: you build the record field by field with typed inputs and outputs.
- Persisted to a storage backend and reloadable by ID.
- Replayable, so you can re-run and compare against the original output.
See [Core Concepts](/getting-started/core-concepts/) for how each record is structured.
## Record, persist, and replay
1. ### Record a decision (Live Observability)
The `@capture` decorator records every call — inputs, outputs, and timing — and hands the lightweight record to an exporter. But `@capture` alone has nowhere to send what it records: `briefcase.observe()` is the one call that wires up that exporter. Pass `"memory"` to collect records in a list, `"console"` to print them to stderr, or a `.jsonl` path to append them to a file. Because `@capture` exports in a background thread by default, pass `async_capture=False` when you want the record available synchronously — for example to read it back right after the call.
```python
import briefcase
mem = briefcase.observe("memory") # send captured records to memory
@briefcase.capture(decision_type="ticket-classification", async_capture=False)
def classify_ticket(text: str) -> str:
# call your model here
return "billing"
classify_ticket("My invoice is wrong")
print(mem.records[0])
```
:::note
See [the stock and custom exporters](/features/exporters/) for everything `observe()` can target.
:::
`@capture` is for low-overhead logging. To persist a structured decision that you can reload and replay, build a `DecisionSnapshot` and store it with a backend.
2. ### Persist a snapshot (Persistent Decisions)
```python
from briefcase import (
DecisionSnapshot,
Input,
ModelParameters,
Output,
init,
)
from briefcase.storage import SqliteBackend
init() # start the native runtime
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("ticket_text", "My invoice is wrong", "string"))
params = ModelParameters("gpt-4o-mini")
params.with_provider("openai")
params.with_parameter("temperature", 0.0)
decision.with_model_parameters(params)
output = Output("category", "billing", "string")
output.with_confidence(0.93)
decision.add_output(output)
decision.with_execution_time(12.0)
backend = SqliteBackend.in_memory() # or SqliteBackend("./decisions.db")
decision_id = backend.save_decision(decision)
print(f"Recorded decision {decision_id}")
```
`save_decision` returns the snapshot ID. Use a file path instead of `in_memory()` to keep decisions across runs.
3. ### Replay a decision
Re-run a stored decision and compare the result against the original output.
```python
from briefcase.replay import ReplayEngine
engine = ReplayEngine(backend)
result = engine.replay(decision_id, "strict")
print("status:", result.status)
print("outputs match:", result.outputs_match)
print("execution time (ms):", result.execution_time_ms)
```
`ReplayResult` exposes `.status`, `.outputs_match`, `.replay_output`, `.execution_time_ms`, and `.policy_violations`. Valid replay modes are `"strict"` and `"tolerant"`.
## What's next
You've captured, persisted, and replayed a decision. The journey continues with controlling what runs before a decision, auditing one after the fact, and choosing where persistent decisions live.
- Core Concepts — The data model behind every recorded decision. (/getting-started/core-concepts/)
- Guardrails — Control & route: enforce rules before a decision runs. (/advanced/guardrails/)
- Audit a decision — Replay & verify: reconstruct why a decision happened. (/guides/audit-a-decision/)
- Storage Adapters — Store & query: pick where persistent decisions live. (/features/storage-adapters/)
## Core Concepts
Source: https://briefcaseai.io/getting-started/core-concepts/
> The mental model behind Briefcase — why a decision, its inputs and outputs, its environment, and its grouping are recorded as distinct, reproducible records.
> When to read this
Read this if you want the mental model before the feature pages. If you'd rather record something first and pick up the types as you go, start with the [Quickstart](/getting-started/quickstart/) and circle back here.
## The mental model
To make a past AI decision **reproducible and accountable**, you have to capture more than the answer — you need what went in, what the model was, and the environment it ran in. Briefcase keeps these as distinct records so each can be stored, queried, replayed, and verified on its own.
Briefcase records decisions at two levels:
- **`@capture`** is the lightweight path: it wraps a function and emits a plain dict to an exporter. Best for live observability.
- **`DecisionSnapshot`** is the native, persistent record described below — a structured snapshot you build, store, reload, and replay. This is the thing you audit and reproduce later.
The rest of this page walks the persistent types using the running `classify_ticket` triage example.
## DecisionSnapshot
A `DecisionSnapshot` is the immutable, point-in-time account of a **single** AI decision — the thing you audit, replay, and verify later. Everything else in Briefcase exists to enrich or store this record. It holds:
- **inputs** — a list of `Input` values sent to the model
- **outputs** — a list of `Output` values the model returned
- **ModelParameters** — model name, provider, and per-parameter settings
- **ExecutionContext** — the runtime environment the decision ran in
- **execution_time_ms** — how long the call took
| Field | What it is | Why it matters |
| --- | --- | --- |
| `inputs` | The `Input` values sent to the model | A replay needs the same inputs to reproduce the outcome |
| `outputs` | The `Output` values the model returned | The result under audit, and the baseline a replay is compared against |
| `model_parameters` | Model name, provider, per-call settings | Explains *why* the output looked the way it did; a parameter change can be attributed |
| `execution_context` | The runtime environment | So a replay runs somewhere comparable and differences trace to a real change |
| `execution_time_ms` | How long the call took | A performance baseline you can compare a replay against |
| `tags` | Key/value labels (e.g. `environment`) | Lets you query and group decisions later via `SnapshotQuery` |
```python
from briefcase import (
DecisionSnapshot,
Input,
ModelParameters,
Output,
init,
)
from briefcase.storage import SqliteBackend
init()
backend = SqliteBackend("./decisions.db")
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("ticket_text", "My invoice is wrong", "string"))
params = ModelParameters("gpt-4o-mini")
params.with_provider("openai")
params.with_parameter("temperature", 0.0)
decision.with_model_parameters(params)
decision.add_output(Output("category", "billing", "string").with_confidence(0.93))
decision.with_execution_time(12.0)
decision_id = backend.save_decision(decision)
loaded = backend.load_decision(decision_id)
print(loaded.function_name)
print([(i.name, i.value) for i in loaded.inputs])
print([(o.name, o.value, o.confidence) for o in loaded.outputs])
print(loaded.execution_time_ms)
```
`load_decision` returns a `DecisionSnapshot`. Inputs and outputs are lists, so read them as `.inputs` and `.outputs`.
## Decision Flow
```mermaid
graph LR
A["DecisionSnapshot built"] --> B["save_decision"]
B --> C["Storage Backend"]
C --> D["load_decision / query"]
D --> E["Replay / Audit"]
```
> Diagram description
A left-to-right flow with five stages: a built DecisionSnapshot is passed to save_decision, which writes to a storage backend; the backend is read back through load_decision or a query, and the loaded decision feeds replay or audit.
1. Build a `DecisionSnapshot` with inputs, outputs, and parameters
2. Persist it with `backend.save_decision(decision)`
3. The backend stores it and returns the decision ID
4. Reload it later with `backend.load_decision(decision_id)`
5. Replay or audit the stored decision
## Input and Output
`Input(name, value, data_type)` and `Output(name, value, data_type)` are typed wrappers around a single named value. `data_type` is a string describing the value (for example `"string"` or `"json"`).
```python
from briefcase import Input, Output
prompt = Input("prompt", "Summarize this article", "string")
answer = Output("summary", "A short summary.", "string")
answer.with_confidence(0.88) # returns the Output, so it chains
```
`Output.with_confidence(confidence)` attaches a confidence score, readable as `output.confidence`.
## ModelParameters
`ModelParameters(model_name)` captures the model configuration at call time.
```python
from briefcase import ModelParameters
params = ModelParameters("gpt-4o-mini")
params.with_provider("openai")
params.with_parameter("temperature", 0.0)
params.with_parameter("max_tokens", 256)
print(params.model_name, params.provider, params.parameters)
```
Read back the configuration via `.model_name`, `.provider`, and the `.parameters` dict.
## ExecutionContext
The same inputs can produce a different answer on a different runtime version or seed. `ExecutionContext` records the environment a decision ran in — **so a replay can run in a comparable one** and any difference is attributable to a real change rather than a moved goalpost. It captures the runtime version, resolved dependencies, the random seed, and relevant environment variables. It does not carry timing — use `DecisionSnapshot.execution_time_ms` for that.
```python
from briefcase import ExecutionContext
ctx = ExecutionContext()
ctx.with_runtime_version("3.11.0")
ctx.with_dependency("torch", "2.1.0")
ctx.with_random_seed(42)
ctx.with_env_var("ENVIRONMENT", "production")
print(ctx.runtime_version) # "3.11.0"
print(ctx.dependencies) # {"torch": "2.1.0"}
print(ctx.random_seed) # 42
print(ctx.environment_variables) # {"ENVIRONMENT": "production"}
```
## Snapshot
A single decision rarely tells the whole story — a request or session usually produces several. `Snapshot(snapshot_type)` groups related decisions **so you can store, load, and reason about them as one unit** — for example, every decision made in one support request or session. Add decisions with `add_decision` and persist the group with `backend.save`.
```python
from briefcase import DecisionSnapshot, Input, Output, Snapshot, init
from briefcase.storage import SqliteBackend
init()
backend = SqliteBackend.in_memory()
session = Snapshot("session")
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("ticket_text", "Where is my refund?", "string"))
decision.add_output(Output("category", "refunds", "string").with_confidence(0.9))
session.add_decision(decision)
snapshot_id = backend.save(session)
loaded = backend.load(snapshot_id)
print(loaded.snapshot_type, len(loaded.decisions))
```
`backend.load` returns a `Snapshot`; read its grouped decisions from `.decisions`.
## Where this fits
These types are the vocabulary for the whole journey. Now put them to work, or go deeper on the recording API that produces them.
- Quickstart — Put these abstractions to work: record, persist, and replay a decision. (/getting-started/quickstart/)
- Decision Recording — Go deeper on capturing inputs, outputs, and parameters. (/features/decision-recording/)
- Exporters — Emit lightweight @capture records to the console, a file, or memory. (/features/exporters/)
## Installation
Source: https://briefcaseai.io/getting-started/installation/
> Install Briefcase AI, then add only the extras you need — grouped by what you're trying to do, with no surprise dependencies.
Briefcase ships as `briefcase-ai` — a Python SDK over a Rust core. Install the base package, then add extras only when a concrete need appears.
## Base Install
_pip_
```bash
pip install briefcase-ai
```
_uv_
```bash
uv add briefcase-ai
```
## What ships in the base package
The base install carries the whole core loop — capture a decision, store it, replay it, verify it — with no extras. The exports below are grouped by the act of the journey they belong to.
**Recording** —
`capture` · `observe` · `setup` · `DecisionSnapshot` · `Snapshot` · `Input` · `Output` · `ModelParameters` · `ExecutionContext` · `HardwareMetadata`
Plus the stock exporters under `briefcase.exporters` and `enable_logging` / `get_logger`. Record each `classify_ticket` call and emit it to the console, a file, or memory.
**Store & query** —
`init` · `init_with_config` · `is_initialized` · `BriefcaseConfig` · `SnapshotQuery`
Start the native runtime and query stored decisions. Cost types live in `briefcase.cost` — also base, no extra.
**Replay & verify** —
Wired through the runtime above
Persist a decision, then re-run it and confirm its record is intact. The replay engine itself is gated behind the `replay` extra below.
Cost tracking ships in the base package (`briefcase.cost`) — there is **no** separate `cost` extra. The stock exporters (`ConsoleExporter`, `JSONLFileExporter`, `MemoryExporter`) are also base.
## Extras
Extras gate the import surface of optional submodules. Most install **nothing** and simply mark intent — only `otel`, `lakefs`, `bitemporal-iceberg`, and `mcp` pull in third-party dependencies. Install only what your deployment needs.
| Group | Extra | What it adds | Recommended for |
| --- | --- | --- | --- |
| | `storage` | `SqliteBackend`, `BufferedBackend` | Persisting `classify_ticket` decisions to a SQLite file |
| | `replay` | `ReplayEngine` | Re-running stored decisions to confirm a change reproduces them |
| | `drift` | `DriftCalculator`, `DriftMetrics` | Measuring how consistent repeated decisions are |
| | `validate` | `PromptValidationEngine` | Checking prompt references before a call runs |
| | `guardrails` | `GuardrailEnv` framework | Allowing or denying an action before it executes |
| | `routing` | `AgentRouter`, `PolicyRegistry` | Versioned, policy-based routing of decisions |
| | `compliance` | `ExaminerBundle` | Building a tamper-evident, verifiable bundle for a decision |
| | `bitemporal` | `BitemporalRecord`, in-memory store | Reconstructing any past state of recorded facts |
| | `bitemporal-iceberg` | Iceberg-backed store *(installs pyiceberg, pyarrow)* | A scalable bitemporal store |
| | `sanitize` | `Sanitizer` (redaction) | Stripping sensitive spans from inputs/outputs |
| | `external` | `ExternalDataTracker` | Snapshotting external data a decision read |
| | `rag` | `VersionedEmbeddingPipeline` | Versioning an embedding index for reproducible RAG |
| | `lakefs` | lakeFS `VersionedClient` *(installs lakefs)* | Reading versioned files with their commit SHA |
| | `vcs` | VCS client base protocol | Implementing a custom versioned data source |
| | `otel` | OpenTelemetry helpers *(installs opentelemetry)* | Correlating decisions with existing traces |
| | `correlation` | Multi-agent workflow tracing | Correlating decisions across agents in one workflow |
| | `events` | `BriefcaseEvent` emitter | Emitting events on low confidence or drift |
| | `mcp` | `briefcase-mcp` server *(installs mcp)* | Exposing SDK tools to MCP clients |
| | `all` | Everything above | Evaluation or local development |
| | `dev` | Test and lint tooling | Contributing to Briefcase |
## Recommended path for the quickstart
1. Install the base package — it covers recording and inspection, which is all the [Quickstart](/getting-started/quickstart/) needs to begin.
```bash
pip install briefcase-ai
```
2. Add `storage` and `replay` for the Quickstart's persist-and-replay steps.
```bash
pip install "briefcase-ai[replay,storage]"
```
3. Add further extras later, only when a need appears — for example `otel` for tracing or `guardrails` to gate actions.
## Install everything
```bash
pip install "briefcase-ai[all]"
```
## Requirements
- **Python 3.9+** — that's the only requirement. Wheels are precompiled, so no
Rust toolchain is needed unless you [build the Rust core](/sdk/rust/) yourself.
## Where this fits
Installation is step zero of the journey. Next, capture and inspect a decision, then learn the data model behind it.
- Quickstart — Record, persist, and replay your first decision in about 5 minutes. (/getting-started/quickstart/)
- Core Concepts — The DecisionSnapshot, ExecutionContext, and Snapshot data model. (/getting-started/core-concepts/)
## What's New
Source: https://briefcaseai.io/getting-started/whats-new/
> The 3.3.0 release launches oci-bai — an artifact graph for models and fine-tunes — plus the 3.2.x cost rate cards, prompt-cache billing, and latest model pricing.
`briefcase-ai` **v3.3.0** launches **oci-bai**, an artifact graph for tracking every model,
fine-tune, dataset, and runtime you push; the recent **3.2.x** line made cost estimates match
how you actually buy inference. Both are summarized below, newest first. The decision, replay,
and cost APIs are fully backward compatible — every new parameter is keyword-only, so existing
calls behave identically. See the full [Changelog](/resources/changelog/) for details.
## 3.3.0 — oci-bai artifact graph
**oci-bai** tracks every image you push through an OCI-compatible gateway in a single artifact
graph — with lineage, provenance, deduplication, and search built in. Push with any OCI tool
(`docker push`, `crane`); the graph builds itself.
```bash
docker tag my-model:latest localhost:8080/my-repo:v1
docker push localhost:8080/my-repo:v1
oci-bai --repo my-repo log v1
oci-bai --repo my-repo diff base v1 --depth package
oci-bai search "format==safetensors cuda>=12.4"
```
**Why it matters:** every fine-tune and dataset version is tracked, searchable, and linked to
its parent — no manual bookkeeping. Weight-sharing metrics tell you whether a push was a
re-tag, a partial fine-tune, or a full retrain.
oci-bai is in **private beta** — contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access.
The full documentation lives at **[oci.briefcaseai.io](https://oci.briefcaseai.io)**. For an
overview of the CLI and capabilities, see [Artifact Graph & Evaluate](/evaluate/runs/).
## 3.2.x — Cost & pricing
### Price any platform with rate cards
`CostCalculator.estimate_cost` takes an optional `rate_card` — a forgiving
`platform × tier × modifiers` string — so an estimate reflects the platform and
tier you actually run on, not just first-party list price.
```python
from briefcase.cost import CostCalculator
calc = CostCalculator()
# First-party standard pricing (unchanged default)
standard = calc.estimate_cost("claude-opus-4-8", 500_000, 50_000)
# Same call, priced for the AWS Bedrock batch tier
batch = calc.estimate_cost("claude-opus-4-8", 500_000, 50_000, rate_card="bedrock:batch")
print(standard.total_cost, batch.total_cost)
print(calc.get_available_rate_cards())
```
| Part | Values | Effect |
| --- | --- | --- |
| Platform | `first_party` · `bedrock` · `vertex` · `azure` | Selects the provider's price sheet |
| Tier | `standard` · `batch` · `cached` · `priority` · `flex` | `batch`/`flex` ≈ 0.5×; `priority` is a premium |
| Modifiers | `regional` · `us` · `fast` | Regional/residency add ~10% |
**Why it matters:** the same `classify_ticket` workload costs very differently on
Bedrock batch versus first-party standard. Rate cards let you compare the real
number before you ship a platform or tier change.
### Prompt-cache billing
Anthropic prompt caching changes the math: cache reads are billed at a fraction
of the input rate. `estimate_cost` accepts cache-token counts and exposes a
`cache_cost` on the estimate.
```python
estimate = calc.estimate_cost(
"claude-opus-4-8",
input_tokens=0,
output_tokens=1_000,
cache_read_tokens=100_000, # also: cache_write_5m_tokens, cache_write_1h_tokens
)
print(estimate.cache_cost, estimate.total_cost)
```
**Why it matters:** a cache-heavy agent's bill is dominated by cache reads at
0.1× input — now your estimate reflects that instead of overcounting.
### Latest model pricing
The default pricing table covers the current frontier: Anthropic Claude 4.x
(`claude-opus-4-8`, `claude-sonnet-4-6`, `claude-haiku-4-5`, …), OpenAI GPT-5.x
(`gpt-5.5`, `gpt-5.4-mini`, …), and Google Gemini (`gemini-3.1-pro`,
`gemini-2.5-flash`, …). Every previously priced model is retained.
**Why it matters:** you can estimate and compare today's models without
hand-maintaining a price sheet.
### Wider Python support
A single stable-ABI wheel per platform installs on **Python 3.9–3.13**, and
the source distribution bundles its license files. **Why it matters:** `pip
install briefcase-ai` works across more environments without building from source.
## Explore
- oci-bai Quick Start — Push your first tracked image, explore the commit graph, and run your first search. (https://oci.briefcaseai.io/getting-started)
- Artifact Graph Overview — How oci-bai fits into the Briefcase platform. (/evaluate/runs/)
- Cost Tracking — Rate cards, prompt-cache billing, model comparison, and budgets in depth. (/features/cost-tracking/)
- Changelog — The full release history. (/resources/changelog/)
========================================================================
# Capture
========================================================================
## Decision Recording
Source: https://briefcaseai.io/features/decision-recording/
> Capture every AI decision — inputs, outputs, model, and a content fingerprint — so it can be replayed, audited, and proven later.
Decision recording captures the complete context behind every decision your AI system makes, so it can be replayed, audited, and verified later.
> When you'd reach for this
A customer disputes how your triage agent routed their ticket three weeks ago. Without a record, you can only guess what the model saw and why. With decision recording, every `classify_ticket` call already kept its exact inputs, the label it returned, the model used, and a content fingerprint — so you can pull up that one decision and explain it.
## Why persist a decision
A decision that vanishes the moment it runs can't be replayed, audited, or proven. Recording it turns a fleeting model call into a durable, verifiable record:
- **Replay** — re-run the exact inputs later to check whether behavior changed.
- **Audit** — answer "what did the agent decide, and on what basis?" months later.
- **Prove** — the `fingerprint()` content hash makes tampering detectable.
There are two ways to record. The lightweight `@capture` decorator records a dict per call and hands it to an exporter. The native `DecisionSnapshot` builds a structured record you can persist and replay.
## How recording flows
1. **Capture** — `@capture` wraps `classify_ticket` and records its inputs, outputs, timing, and type for each call.
2. **Export** — the recorded dict is handed to an exporter (console, a `.jsonl` file, or your own).
3. **Persist** — for storage and replay, build a native `DecisionSnapshot` and save it to a backend.
4. **Replay & verify** — later, load the snapshot, re-run it, and compare its `fingerprint()`.
```mermaid
flowchart LR
A["classify_ticket()"] -->|"@capture"| B["recorded dict"]
B --> C[Exporter]
A -->|"native API"| D[DecisionSnapshot]
D --> E[Backend]
E --> F["Replay & Verify"]
```
> Diagram description
A `classify_ticket` call can be recorded two ways. The `@capture` decorator produces a lightweight dict that flows to an exporter for streaming and inspection. The native `DecisionSnapshot` is a structured record that is saved to a storage backend, which is what a later replay-and-verify step loads from.
## Record a decision with @capture
The simplest way to record a decision is the `@capture` decorator. Pass an `exporter` to send each record somewhere:
```python
from briefcase import capture
from briefcase.exporters import BaseExporter
class CollectingExporter(BaseExporter):
def __init__(self):
self.records = []
async def export(self, decision):
self.records.append(decision)
return True
async def flush(self):
...
async def close(self):
...
exporter = CollectingExporter()
@capture(decision_type="classification", context_version="v1",
exporter=exporter, async_capture=False)
def classify_ticket(text: str) -> str:
# call your model here
return "account_access"
classify_ticket("Reset my password")
print(exporter.records[0])
```
The decorator wraps the call, records a dict (decision id, inputs, outputs,
timing, `decision_type`, `context_version`), and exports it through the
exporter you pass. It does not persist a native `DecisionSnapshot` on its own;
use the native objects below when you need storage or replay.
### @capture parameters
| Parameter | Default | Description |
|-----------|---------|-------------|
| `decision_type` | `None` | Label for the kind of decision recorded |
| `context_version` | `None` | Version tag for the surrounding context or prompt |
| `max_input_chars` | `1000` | Truncate recorded inputs to this length |
| `max_output_chars` | `1000` | Truncate recorded outputs to this length |
| `exporter` | `None` | Exporter that receives each recorded dict |
| `async_capture` | `True` | Export off the calling thread |
`@capture` works with or without arguments:
```python
from briefcase import capture
@capture
def classify(text: str) -> str:
# call your model here
return "billing"
```
## Emit records
`@capture` records a decision but has nowhere to send it until you configure an
exporter. `briefcase.observe()` wires one up in a single line and returns it.
```python
import briefcase
mem = briefcase.observe("memory") # or "console", or a "*.jsonl" path
@briefcase.capture(decision_type="classification", async_capture=False)
def classify_ticket(text: str) -> str:
# call your model here
return "account_access"
classify_ticket("Reset my password")
print(mem.records[0])
```
`@capture` exports in a background thread by default. Pass `async_capture=False`
when you want the record available synchronously — for example, to read
`MemoryExporter.records` right after the call.
The per-call `exporter=` argument shown above overrides the global one set by
`observe()`. See [Exporters](/features/exporters/) for the stock exporters and
how to write a custom one.
## Build a native DecisionSnapshot
When you need storage or replay, build a structured `DecisionSnapshot`:
```python
from briefcase import DecisionSnapshot, Input, Output, ModelParameters
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "Reset my password", "string"))
params = ModelParameters("your-model")
params.with_provider("your-provider")
params.with_parameter("temperature", 0.0)
decision.with_model_parameters(params)
output = Output("category", "account_access", "string")
output.with_confidence(0.92)
decision.add_output(output)
decision.with_execution_time(12.5)
decision.with_module("triage_service")
decision.add_tag("environment", "production")
print(decision.function_name)
print(decision.fingerprint())
```
### Fingerprints make a record verifiable
`fingerprint()` returns a stable hash over inputs, outputs, and model
parameters. Store it alongside the record; recompute it later to detect when the
same decision produces a different result — this is what makes a decision
tamper-evident and replay-checkable.
```python
digest = decision.fingerprint() # stable content hash
# later, on a loaded snapshot:
assert loaded.fingerprint() == digest # unchanged
```
## Key classes
- `@capture` — decorator that records a dict and exports it
- `DecisionSnapshot` — structured record you can persist and replay; exposes `fingerprint()`
- `Input` / `Output` — typed wrappers; `Output.with_confidence(score)` attaches a confidence value
- `ModelParameters` — model name, provider, and per-call parameters
- `Snapshot` — groups multiple decisions; `add_decision(decision)` appends to it
| Field | Description | Why it matters |
|-------|-------------|----------------|
| `function_name` | The recorded function | Identifies which decision this is |
| `inputs` | Typed inputs | The exact inputs a replay re-runs against |
| `outputs` | Typed outputs | What the agent actually decided |
| `tags` | Arbitrary key/value tags | Carries your own context (e.g. environment, queue) |
| `execution_time_ms` | How long the call took | Anchors performance over time |
| `fingerprint()` | Content hash | Makes the record tamper-evident and verifiable |
## @capture vs DecisionSnapshot vs persisted storage
Three layers, each for a different need — pick by what you're trying to do.
| Use this | When you want to... | Lifetime |
|----------|---------------------|----------|
| `@capture` decorator | Instrument a real function (like `classify_ticket`) with zero boilerplate and stream a lightweight record to an exporter | Per call |
| `DecisionSnapshot` | Build a structured record by hand — to persist, replay, or fingerprint it | In-memory object |
| Persisted backend (`SqliteBackend`) | Keep decisions durably so you can query, replay, and audit them weeks later | Durable |
`@capture` records a dict and exports it — it does not persist a native
`DecisionSnapshot` on its own. For anything you'll need to query or replay
later, build a `DecisionSnapshot` and save it to a backend.
## Persist a decision
Save a `DecisionSnapshot` to a storage backend so it can be queried or replayed
later. This is the bridge from Capture into the Store & Query act.
```python
import briefcase
from briefcase import DecisionSnapshot, Input, Output
from briefcase.storage import SqliteBackend
briefcase.init()
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "Reset my password", "string"))
decision.add_output(Output("category", "account_access", "string"))
decision.with_execution_time(12.5)
backend = SqliteBackend.in_memory() # or SqliteBackend("decisions.db")
decision_id = backend.save_decision(decision)
restored = backend.load_decision(decision_id)
print(restored.function_name)
```
## Where this fits
- Next · Capture: Exporters — Send each recorded decision to the console, a file, or your own backend. (/features/exporters/)
- Then · Store & Query: Storage Adapters — Persist snapshots durably so you can query and replay them later. (/features/storage-adapters/)
## Exporters
Source: https://briefcaseai.io/features/exporters/
> Stream captured decisions to the console, a file, memory, or your own external sink as they happen.
Exporters control where decision records go the moment they're captured — to the console, a file, memory, or a sink of your own.
> When you'd reach for this
While iterating on the triage agent, you want to *see* each `classify_ticket` decision as it happens — so a `ConsoleExporter` prints them to your terminal. In CI, you instead want to assert on what was captured without touching disk, so a `MemoryExporter` holds the records for your test to inspect. Same `@capture` code, swapped exporter.
`@capture` records every call, but on its own it has nowhere to send the record. An exporter is about *streaming records out as they happen* (for inspection, tests, or forwarding). A [storage backend](/features/storage-adapters/) is about *durable persistence you query later*. Many setups use both.
## How exporting fits
1. **Capture** — `@capture` records a `classify_ticket` call as a lightweight dict.
2. **Wire an exporter** — `briefcase.observe()` configures the global exporter in one line (or pass `exporter=` to `@capture`).
3. **Land it** — the exporter writes the record where you pointed it: stderr, a `.jsonl` file, an in-memory list, or your own sink.
`observe`, `setup`, and all stock exporters ship in the base package — no extra required.
## Emit records in one line
`briefcase.observe()` configures the global exporter and returns it. After
calling it, every `@capture` decision is sent to that exporter.
```python
import briefcase
mem = briefcase.observe("memory")
@briefcase.capture(decision_type="ticket-classification", async_capture=False)
def classify_ticket(text: str) -> str:
# call your model here
return "billing"
classify_ticket("My invoice is wrong")
print(mem.records[0])
```
`@capture` exports in a background thread by default. Pass
`async_capture=False` when you want the record to be available synchronously —
for example, to read `MemoryExporter.records` right after the call.
### observe() shorthands
`briefcase.observe(exporter="console", *, level=None)` accepts either a
`BaseExporter` instance or a shorthand string, and returns the configured
exporter.
| Argument | Result |
|----------|--------|
| `"console"` (default) | `ConsoleExporter` — writes JSON lines to stderr |
| `"memory"` | `MemoryExporter` — collects records in `.records` |
| a path ending in `.jsonl` | `JSONLFileExporter` — appends to that file |
| a `BaseExporter` instance | used as-is |
`level=` (optional) also enables Briefcase logging at that level — the same as
calling [`enable_logging()`](/sdk/python/#logging).
```python
import briefcase
# Each call replaces the global exporter.
briefcase.observe("console") # JSON lines to stderr (default)
briefcase.observe("memory") # collect in memory
briefcase.observe("decisions.jsonl") # append to a file
briefcase.observe("console", level="INFO") # also turn on logging
```
`observe()` calls `setup(exporter=...)` under the hood, so
`briefcase.setup(exporter=ConsoleExporter())` is equivalent to
`briefcase.observe("console")`.
## Which stock exporter?
| Exporter | Sends records to... | Reach for it when |
|----------|---------------------|-------------------|
| `ConsoleExporter` | a stream (`sys.stderr` by default) | Developing or debugging and you want to watch decisions live |
| `JSONLFileExporter` | a `.jsonl` file (one record per line) | You want a durable, append-only local log you can grep or post-process |
| `MemoryExporter` | an in-memory list on `.records` | Tests and notebooks — capture decisions, then assert on them without I/O |
### ConsoleExporter
Writes each record as one line of JSON to a stream. The quickest way to confirm
`@capture` is producing records.
```python
import sys
from briefcase import setup
from briefcase.exporters import ConsoleExporter
setup(exporter=ConsoleExporter(sys.stdout, pretty=True))
```
`ConsoleExporter(stream=None, *, pretty=False)` — `stream` defaults to
`sys.stderr`; `pretty=True` indents the JSON.
### JSONLFileExporter
Appends records to a file as JSON Lines (one object per line). Durable,
append-only, and thread-safe, so it is safe to share across the background
export threads `@capture` spawns. Parent directories are created on demand.
```python
import briefcase
briefcase.observe("decisions.jsonl") # or JSONLFileExporter("decisions.jsonl")
```
`JSONLFileExporter(path)` — `path` is a string or `pathlib.Path`.
### MemoryExporter
Collects records in a list on `.records`. Ideal for tests and notebooks where
you want to read the captured decisions back.
```python
import briefcase
mem = briefcase.observe("memory")
@briefcase.capture(async_capture=False)
def classify_ticket(text: str) -> str:
# call your model here
return "billing"
classify_ticket("My invoice is wrong")
assert mem.records[0]["function_name"] == "classify_ticket"
mem.clear() # drop all collected records
```
`MemoryExporter()` exposes `.records` (a list) and `.clear()`.
## Custom exporters: ship to an external sink
Subclass `BaseExporter` to forward decisions anywhere your stack already
collects events — a log aggregator, a message queue, an analytics pipeline. You
implement three async methods; register the instance with `observe()` (it returns
it unchanged) or with `setup(exporter=...)`.
> When you'd reach for this
Your team already routes operational events through an internal collector. Rather than build a second pipeline for triage decisions, a small custom exporter forwards each `classify_ticket` record to that same sink.
```python
from typing import Any
import briefcase
from briefcase.exporters import BaseExporter
class WebhookExporter(BaseExporter):
async def export(self, decision: Any) -> bool:
# ship `decision` (a dict) to your external sink here
# e.g. post to a collector, enqueue, or forward to a log pipeline
return True
async def flush(self) -> None:
...
async def close(self) -> None:
...
exporter = briefcase.observe(WebhookExporter())
@briefcase.capture(decision_type="classification", async_capture=False)
def classify_ticket(text: str) -> str:
# call your model here
return "account_access"
classify_ticket("Reset my password")
```
- `export(decision)` ships a single record; return `True` on success.
- `flush()` flushes any buffered records.
- `close()` releases resources.
For durable, queryable persistence rather than fire-and-forget streaming, use a
[storage backend](/features/storage-adapters/) instead of (or alongside) a
custom exporter.
## Record shape
Each record `@capture` hands to an exporter is a dict:
- `decision_id` — a UUID string
- `decision_type` — the value you passed, or the function qualified name
- `function_name`
- `inputs` / `outputs` — truncated reprs of the arguments and return value
- `started_at` / `ended_at` — ISO 8601 timestamps
- `execution_time_ms`
- `context_version` — present only when you pass it
- `error` — present only when the call raised
## Key symbols
- `briefcase.observe(exporter="console", *, level=None)` — configure and return the global exporter.
- `briefcase.exporters.ConsoleExporter` — JSON lines to a stream.
- `briefcase.exporters.JSONLFileExporter` — append JSON Lines to a file.
- `briefcase.exporters.MemoryExporter` — collect records in `.records`.
- `briefcase.exporters.BaseExporter` — base class for custom exporters.
## Where this fits
- Act 1 · Capture: Decision Recording — Where the records an exporter ships actually come from. (/features/decision-recording/)
- Next · Store & Query: Storage Adapters — Persist decisions durably so you can query and replay them later. (/features/storage-adapters/)
## PII Sanitization
Source: https://briefcaseai.io/features/pii-sanitization/
> Minimize sensitive data — detect and redact it before a decision record is ever stored.
PII sanitization is data minimization for your records: detect sensitive data and redact it *before* a decision is recorded or stored.
> When you'd reach for this
Support tickets routinely contain emails, phone numbers, and account details. You want a durable record of how each ticket was triaged — but you don't want raw personal data sitting in that record forever. Sanitizing on the way in keeps the decision auditable while keeping the sensitive payload out of storage.
The principle is **minimize before you store**: a record you never wrote sensitive data into is one you never have to scrub later.
## Install
```bash
pip install briefcase-ai[sanitize]
```
The `sanitize` extra installs no third-party dependencies; detection uses built-in regex patterns.
## Sanitize before capture
1. **Detect** — scan the incoming ticket text for known patterns (email, phone, and any custom patterns you register).
2. **Redact** — replace matches with a `[REDACTED_]` marker so the meaning survives but the sensitive value doesn't.
3. **Capture** — record the decision on the *sanitized* text, so the stored record never held raw PII.
```python
from briefcase.sanitize import Sanitizer
sanitizer = Sanitizer()
result = sanitizer.sanitize("Email me at jane.doe@example.com please")
print(result.sanitized) # Email me at [REDACTED_EMAIL] please
print(result.redaction_count) # 1
print(result.has_redactions) # True
# feed result.sanitized into classify_ticket() so the captured record is clean
```
`sanitize()` returns a `SanitizationResult` with `.sanitized`, `.redactions`, `.redaction_count`, and `.has_redactions`.
## Redaction markers
Each match is replaced with a `[REDACTED_]` marker. The built-in PII types and their markers:
| PII type | Marker |
|----------|--------|
| `email` | `[REDACTED_EMAIL]` |
| `phone` | `[REDACTED_PHONE]` |
| `credit_card` | `[REDACTED_CREDIT_CARD]` |
| `ssn` | `[REDACTED_SSN]` |
| `ip_address` | `[REDACTED_IP]` |
| `api_key` | `[REDACTED_API_KEY]` |
## Inspect redactions
Each entry in `result.redactions` is a `Redaction` with `.pii_type`, `.start_position`, `.end_position`, and `.original_length` (positions index into the original text).
```python
from briefcase.sanitize import Sanitizer
sanitizer = Sanitizer()
result = sanitizer.sanitize("Call 555-123-4567 or email jane.doe@example.com")
for redaction in result.redactions:
print(redaction.pii_type, redaction.start_position, redaction.end_position)
# phone 5 21
# email 27 43
```
## Sanitize JSON
`sanitize_json()` walks a dict and redacts string values, returning a `SanitizationJsonResult` with `.sanitized` and `.redaction_count`. Useful for sanitizing a structured ticket payload before you record it.
```python
from briefcase.sanitize import Sanitizer
sanitizer = Sanitizer()
record = {
"ticket_id": "TKT-4821",
"contact_email": "jane.doe@example.com",
"priority": 2,
}
result = sanitizer.sanitize_json(record)
print(result.sanitized)
# {'contact_email': '[REDACTED_EMAIL]', 'priority': 2, 'ticket_id': 'TKT-4821'}
print(result.redaction_count) # 1
```
## Reject sensitive data in a guardrail
Sometimes you don't want to redact and continue — you want to *stop*. Use
`contains_pii` (a fast boolean) or `analyze_pii` (a summary that doesn't modify
the text) to refuse a payload before it's ever recorded.
> When you'd reach for this
A policy says certain decisions must never be stored if they still carry raw identifiers. Before persisting a triage decision, check the inputs and reject the call if detection still finds PII — failing closed instead of writing a non-compliant record.
```python
from briefcase.sanitize import Sanitizer
sanitizer = Sanitizer()
def guard(text: str) -> None:
if sanitizer.contains_pii(text):
report = sanitizer.analyze_pii(text) # summary for logging the reason
raise ValueError(f"refusing to store record: PII detected ({report})")
guard("Email jane.doe@example.com") # raises before classify_ticket is recorded
```
```python
report = sanitizer.analyze_pii("Email jane.doe@example.com and call 555-123-4567")
print(report)
# {'has_pii': True, 'total_matches': 2, 'unique_types': 2,
# 'detected_types': ['phone', 'email']}
```
| Method | Returns | Use it to... |
|--------|---------|--------------|
| `sanitize(text)` | `SanitizationResult` | Strip PII and keep going |
| `sanitize_json(data)` | `SanitizationJsonResult` | Strip PII from a structured payload |
| `contains_pii(text)` | `bool` | Cheaply gate a guardrail — proceed or reject |
| `analyze_pii(text)` | summary `dict` | Get the details (types, counts) for logging or decisions |
This pairs naturally with [Guardrails](/advanced/guardrails/), where you can run
this check inside an `evaluate()` and return `DENY` when PII is still present.
## Custom patterns
Register your own patterns for identifiers specific to your domain — a ticket
number scheme, an internal account ID format — with `add_pattern(name, regex)`.
The marker uppercases the name, so `ticket_id` redacts to `[REDACTED_TICKET_ID]`.
Registered patterns are picked up by `sanitize`, `contains_pii`, and
`analyze_pii`.
```python
from briefcase.sanitize import Sanitizer
sanitizer = Sanitizer()
sanitizer.add_pattern("ticket_id", r"\bTKT-\d{4}\b")
result = sanitizer.sanitize("Ticket TKT-4821 was escalated")
print(result.sanitized) # Ticket [REDACTED_TICKET_ID] was escalated
```
| Argument | Type | Description |
|----------|------|-------------|
| `name` | `str` | A label for the pattern; uppercased into the `[REDACTED_]` marker and reported by `analyze_pii` |
| `pattern` | `str` | The regex to match and redact |
`remove_pattern(pattern_name)` removes a registered pattern again.
Choose pattern names that read clearly in an audit trail — they become the
redaction marker and appear in `analyze_pii` summaries.
## Key classes
- `Sanitizer` — detects and redacts PII; `sanitize`, `sanitize_json`, `add_pattern`, `remove_pattern`, `contains_pii`, `analyze_pii`.
- `SanitizationResult` — `.sanitized`, `.redactions`, `.redaction_count`, `.has_redactions`.
- `Redaction` — `.pii_type`, `.start_position`, `.end_position`, `.original_length`.
- `SanitizationJsonResult` — `.sanitized`, `.redaction_count`.
## Where this fits
- Capture: Decision Recording — Capture the sanitized decision as a verifiable snapshot. (/features/decision-recording/)
- Next · Store & Query: Storage Adapters — Persist the sanitized snapshots to a durable, queryable backend. (/features/storage-adapters/)
========================================================================
# Control & Route
========================================================================
## Guardrails
Source: https://briefcaseai.io/advanced/guardrails/
> Enforce deny-by-default, fail-closed controls before an agent action runs.
A guardrail decides whether an agent may perform an action on a resource — and
blocks it before the action runs.
> When you'd reach for this
Your triage agent wants to write to `kb/internal-articles` or invoke an action
on a customer's behalf. Before that runs, you need a single, testable gate that
says yes or no — and that defaults to *no* if anything goes wrong. Guardrails
are that gate: a deny-by-default control evaluated on every request, so an
unauthorized or error-state action never reaches your model or your tools.
The guardrail system is a framework, not a fixed set of classes: `GuardrailEnv`
is a runtime-checkable protocol with a single core method, `evaluate(request)`.
Any object that implements it — yours or a registered one — plugs into the
wrappers, pipelines, and batch evaluators the same way.
## How it works
1. **Write a guardrail** — subclass `BaseGuardrailEnv` and implement a pure
`evaluate(request)` that returns `ALLOW` or `DENY`.
2. **Compose** — chain guardrails in a `GuardrailPipeline` and stack
`Wrapper`s (cache, timeout, audit) around any of them.
3. **Fail closed** — wrap the outermost layer in `DenyByDefaultWrapper` so any
exception becomes `DENY`, then gate the action on `result.is_allowed`.
```mermaid
flowchart LR
A["Agent action"] --> B["DenyByDefaultWrapper"]
B --> C["GuardrailPipeline"]
C -- "ALLOW" --> D["Action runs"]
C -- "DENY" --> X["Action blocked"]
B -- "exception" --> X
```
> Diagram description
Every agent action is evaluated by a guardrail before it runs. The outermost
`DenyByDefaultWrapper` calls the `GuardrailPipeline`. An `ALLOW` lets the action
run; a `DENY` blocks it. If evaluation raises an exception, the wrapper catches
it and the action is blocked — the system fails closed, never open.
## Install
```bash
pip install briefcase-ai[guardrails]
```
```python
from briefcase.guardrails import (
BaseGuardrailEnv,
EvalRequest,
EvalResult,
Effect,
)
```
## Write a Guardrail
Subclass `BaseGuardrailEnv` and implement `evaluate()`. It receives an
`EvalRequest` and returns an `EvalResult` carrying an `Effect` (`ALLOW` or
`DENY`).
```python
from briefcase.guardrails import BaseGuardrailEnv, EvalRequest, EvalResult, Effect
class TierGuardrail(BaseGuardrailEnv):
"""Allow access to a resource only for agents on the right tier."""
_name = "tier-check"
def __init__(self, allowed_tiers):
self._allowed = set(allowed_tiers)
def evaluate(self, request: EvalRequest) -> EvalResult:
tier = request.context.get("tier")
if tier in self._allowed:
return EvalResult(
effect=Effect.ALLOW,
guardrail_name=self._name,
reason=f"tier '{tier}' is permitted",
)
return EvalResult(
effect=Effect.DENY,
guardrail_name=self._name,
reason=f"tier '{tier}' is not permitted",
)
env = TierGuardrail(allowed_tiers={"standard", "premium"})
request = EvalRequest(
agent="support-bot",
action="invoke",
resource="kb/internal-articles",
context={"tier": "premium"},
)
result = env.evaluate(request)
print(result.is_allowed) # True
print(result.effect) # Effect.ALLOW
print(result.reason) # "tier 'premium' is permitted"
```
> Keep `evaluate()` pure
`evaluate()` must be deterministic and side-effect-free: the same request yields
the same result, and it performs no I/O. Keep it fast.
### EvalRequest
```python
EvalRequest(
agent="support-bot", # who is acting
action="invoke", # what they want to do
resource="kb/internal-articles", # what they want to act on
context={"tier": "premium"}, # attributes the guardrail evaluates
request_id=None, # optional correlation id
)
```
### EvalResult
| Field | Description |
|-------|-------------|
| `effect` | `Effect.ALLOW` or `Effect.DENY` |
| `guardrail_name` | Name of the guardrail that produced the result |
| `reason` | Human-readable explanation |
| `is_allowed` | `True` when `effect == Effect.ALLOW` |
| `policy_id` | Optional identifier of the policy applied |
| `lakefs_sha` | Optional commit the policy was loaded from |
| `eval_time_ms` | Evaluation time |
## Register and Instantiate
Register a guardrail by string id and instantiate it with `make()`, the same
register/make split used by Gymnasium. This lets callers construct guardrails
without importing the implementation.
```python
from briefcase.guardrails import register, make
# entry_point is a "module:ClassName" string — in your code that is the import
# path of your guardrail, e.g. "myapp.guardrails:TierGuardrail".
register(
id="tier-check-v1",
entry_point=f"{__name__}:TierGuardrail",
kwargs={"allowed_tiers": ["standard", "premium"]},
)
env = make("tier-check-v1") # uses the registered kwargs
env = make("tier-check-v1", allowed_tiers=["premium"]) # override per call
```
## Chain Guardrails with a Pipeline
1. **Define each check** as its own guardrail (resource allowlist, tier check,
rate check) so each stays small and testable.
2. **Order them** in a `GuardrailPipeline`, cheapest and most restrictive first.
3. **Pick a mode** — `FIRST_DENY` short-circuits on the first `DENY` (the
default and the cheapest); `ALL` and `MAJORITY` run every stage.
`GuardrailPipeline` evaluates a request through several guardrails in order. By
default it short-circuits on the first `DENY`.
```python
from briefcase.guardrails import (
BaseGuardrailEnv, EvalRequest, EvalResult, Effect,
GuardrailPipeline, PipelineMode,
)
class ResourceAllowlist(BaseGuardrailEnv):
_name = "resource-allowlist"
def __init__(self, allowed):
self._allowed = set(allowed)
def evaluate(self, request: EvalRequest) -> EvalResult:
ok = request.resource in self._allowed
return EvalResult(
effect=Effect.ALLOW if ok else Effect.DENY,
guardrail_name=self._name,
reason="resource permitted" if ok else "resource not allowlisted",
)
pipeline = GuardrailPipeline(
stages=[
ResourceAllowlist(allowed={"kb/internal-articles"}),
TierGuardrail(allowed_tiers={"standard", "premium"}),
],
mode=PipelineMode.FIRST_DENY,
)
request = EvalRequest(
agent="support-bot",
action="invoke",
resource="kb/internal-articles",
context={"tier": "premium"},
)
outcome = pipeline.evaluate(request)
print(outcome.is_allowed) # True
print(len(outcome.individual_results)) # one result per stage that ran
```
`PipelineMode` options:
| Mode | Behavior |
|------|----------|
| `FIRST_DENY` | Stop on the first `DENY` (default) |
| `ALL` | Evaluate every stage; `DENY` if any stage denies |
| `MAJORITY` | Majority vote across stages |
```mermaid
flowchart LR
A["EvalRequest"] --> B["ResourceAllowlist"]
B -- "DENY" --> X["Block (short-circuit)"]
B -- "ALLOW" --> C["TierGuardrail"]
C -- "DENY" --> X
C -- "ALLOW" --> D["Allow"]
```
> Diagram description
An `EvalRequest` flows through two stages in order. `ResourceAllowlist` runs first: a `DENY` short-circuits straight to Block, while an `ALLOW` passes to `TierGuardrail`. `TierGuardrail` then either denies (also Block) or allows. Only when both stages allow does the request reach the final Allow outcome.
## Composable Wrappers
A wrapper *is* a `GuardrailEnv`, so wrappers stack around any guardrail.
> Fail closed: deny by default
Use `DenyByDefaultWrapper` as the **outermost** layer. It catches any exception
raised during evaluation and returns `DENY`, so a bug, a timeout, or an
unreachable policy store never results in accidental access. Combined with
`TimeoutWrapper`'s `fallback_effect=Effect.DENY`, the whole stack denies under
any failure condition rather than letting an action through.
```python
from briefcase.guardrails import (
CacheWrapper, TimeoutWrapper, DenyByDefaultWrapper, Effect,
)
env = DenyByDefaultWrapper(
TimeoutWrapper(
CacheWrapper(TierGuardrail(allowed_tiers={"premium"})),
max_ms=10.0,
fallback_effect=Effect.DENY,
)
)
result = env.evaluate(request)
```
| Wrapper | Effect |
|---------|--------|
| `CacheWrapper` | Caches results with a TTL |
| `TimeoutWrapper` | Falls back (default `DENY`) if evaluation exceeds `max_ms` |
| `AuditWrapper` | Records every `(request, result)` for observability |
| `SamplingWrapper` | Evaluates a fraction of requests; allows the rest |
| `DenyByDefaultWrapper` | Catches exceptions and returns `DENY` |
| `ViolationModeWrapper` | Converts `DENY` to `ALLOW` for soft-deny workflows |
## Gate the Action (fail-closed)
A guardrail only governs an action if you actually gate on its result. Evaluate
*before* the side effect runs, treat anything that is not an explicit `ALLOW` as
a deny, and let the `DenyByDefaultWrapper` turn any unexpected exception into a
block.
```python
from briefcase.guardrails import DenyByDefaultWrapper, EvalRequest
# Outermost layer fails closed: any exception inside becomes DENY.
gate = DenyByDefaultWrapper(TierGuardrail(allowed_tiers={"premium"}))
def classify_ticket(ticket, *, agent="support-bot"):
request = EvalRequest(
agent=agent,
action="invoke",
resource="kb/internal-articles",
context={"tier": ticket["tier"]},
)
try:
result = gate.evaluate(request)
except Exception:
# Belt and suspenders: even if the gate itself raises, deny.
raise PermissionError("guardrail evaluation failed; action blocked")
if not result.is_allowed:
raise PermissionError(f"action denied: {result.reason}")
# Authorized — now it is safe to run the side effect.
# return run_classification(ticket) # call your model / tools here
```
> Never act on a missing ALLOW
Branch on `result.is_allowed`, not on the absence of a deny. If evaluation is
skipped, times out, or raises, the action must not run. Deny-by-default means
*allow only when explicitly allowed*.
## Where this fits
Guardrails are the **Control** act of the journey: enforce the rule before the
action runs. Once an action is authorized, route it; once it has run, capture it.
- Routing — Next: decide whether an authorized decision is handled automatically or escalated to human review. (/advanced/routing/)
- Decision Recording — Capture: record the full context behind every decision the agent makes. (/features/decision-recording/)
## Routing
Source: https://briefcaseai.io/advanced/routing/
> Route an AI decision between automatic handling and human review with a narrow, in-process gate.
The routing module decides what happens to a decision: handle it automatically
or escalate it to human review. A router takes a decision context and returns a
`RoutingDecision`.
> When you'd reach for this
Your triage agent has classified a ticket and produced a confidence score. Some
results are safe to act on automatically; low-confidence ones should go to a
human. `BaseRouter` is the small, in-process gate that makes that auto-vs-human
call inside your request path, with no policy store and no history to manage.
> For production audit trails, use Versioned Routing Policy
`BaseRouter` does **not** version its logic and cannot tell you later which rule
fired or which configuration was active on a past date. When the routing choice
is governed by a policy that changes over time — and you need to reconstruct
exactly which rule and which policy version produced a past decision — use
[**Versioned Routing Policy**](/advanced/versioned-routing-policy/) instead.
Reach for `BaseRouter` only when an in-process, non-versioned gate is enough.
## Simple vs. versioned routing
| | `BaseRouter` (this page) | [`AgentRouter`](/advanced/versioned-routing-policy/) (versioned) |
|---|---|---|
| Purpose | In-process auto-vs-human gate | Policy-governed choice with attribution |
| Logic lives in | Your subclass code | Versioned `PolicyVersion` rules |
| Call style | `async` (I/O-bound) | `sync` (pure, in-memory) |
| Versioned? | No | Yes — every version is an append |
| Reconstruct a past decision? | No | Yes — route `as_of` a date |
| Records which rule fired? | No | Yes — `matched_rule_id` |
| Backed by | Nothing | Bitemporal store |
| Reach for it when | A quick, non-audited gate is enough | You must prove which rule fired, when |
## Install
```bash
pip install briefcase-ai[routing]
```
```python
from briefcase.routing import BaseRouter, RoutingDecision
```
## Route a Decision
1. **Subclass `BaseRouter`** and implement the `route` coroutine.
2. **Read the decision context** — for triage, the classifier's confidence.
3. **Return a `RoutingDecision`** with an `action` (`"auto"` or
`"human_review"`) and a `reason` you can attach to the decision record.
`BaseRouter` is an abstract base class with a single abstract coroutine,
`route(decision_context) -> RoutingDecision`. Subclass it and implement `route`.
The router is asynchronous because real routers usually call out to an external
policy service or model.
```python
import asyncio
import time
from briefcase.routing import BaseRouter, RoutingDecision
class ConfidenceRouter(BaseRouter):
"""Route a support ticket to automatic handling or human review."""
def __init__(self, auto_threshold: float = 0.85):
self.auto_threshold = auto_threshold
async def route(self, decision_context) -> RoutingDecision:
start = time.perf_counter()
confidence = decision_context.get("confidence", 0.0)
if confidence >= self.auto_threshold:
action = "auto"
reason = f"confidence {confidence:.2f} >= {self.auto_threshold}"
else:
action = "human_review"
reason = f"confidence {confidence:.2f} below threshold"
eval_time_ms = (time.perf_counter() - start) * 1000
return RoutingDecision(
action=action,
source="internal",
eval_time_ms=eval_time_ms,
reason=reason,
)
async def main():
router = ConfidenceRouter(auto_threshold=0.85)
high = await router.route({"ticket_id": "T-1001", "confidence": 0.93})
low = await router.route({"ticket_id": "T-1002", "confidence": 0.40})
print(high.action, high.reason) # auto ...
print(low.action, low.reason) # human_review ...
asyncio.run(main())
```
## RoutingDecision
`RoutingDecision` is a dataclass with four fields:
| Field | Type | Description |
|----------------|------------------|----------------------------------------------------------|
| `action` | `str` | The routing outcome, e.g. `"auto"` or `"human_review"`. |
| `source` | `str` | Where the decision came from, e.g. `"internal"`, `"opa"`.|
| `eval_time_ms` | `float` | How long evaluation took, in milliseconds. |
| `reason` | `str` (optional) | Human-readable explanation of the outcome. |
```mermaid
flowchart LR
A["Decision context"] --> B["BaseRouter.route"]
B --> C{"meets criteria?"}
C -- yes --> D["action = auto"]
C -- no --> E["action = human_review"]
D & E --> F["RoutingDecision"]
```
> Diagram description
A decision context enters `BaseRouter.route`, which checks whether it meets the criteria. If yes, the action becomes `auto`; if no, the action becomes `human_review`. Both branches converge into a single `RoutingDecision`.
## Choosing a Layer
`BaseRouter` is intentionally narrow: an in-process gate for the auto-versus-human
question. It does not version its logic, and it cannot tell you later which rule
fired or which configuration was active on a given date.
When the routing choice is governed by a policy that changes over time — and you
need to reconstruct a past decision exactly — use the [versioned routing
layer](/advanced/versioned-routing-policy/) instead, which adds `AgentRouter`,
`PolicyRegistry`, and `PolicyVersion` for auditable, time-travel routing.
## Where this fits
Routing is part of the **Control** act: once an action is authorized by a
[guardrail](/advanced/guardrails/), the router decides who handles it. For a
production audit trail, route through the versioned layer.
- Guardrails — Previous: enforce deny-by-default controls before an action is allowed to run. (/advanced/guardrails/)
- Versioned Routing Policy — Next: route through versioned policies and reconstruct past decisions as-of a date. (/advanced/versioned-routing-policy/)
## Versioned Routing Policy
Source: https://briefcaseai.io/advanced/versioned-routing-policy/
> Reconstruct exactly which rule fired and which policy version was active on any past date.
Route agent decisions through versioned, time-travelable policies — so you can
prove which rule fired on any past date.
> When you'd reach for this
Six weeks ago your triage agent routed an enterprise ticket to the senior
queue. Someone asks why. Since then the policy changed twice. To answer,
you need the exact rule set that was active *that day* and the specific rule that
fired — not today's policy. Versioned routing reconstructs that decision
as-of its original date, every time.
When an agent's routing choice is governed by a policy that changes over time,
recording the choice is not enough. To reconstruct a past decision you need both
the full policy that was in effect on the decision date and the specific rule
that fired. A single-version policy store cannot answer that once the policy has
changed.
The versioned routing layer stores every policy version in a bitemporal store.
Publishing a new version is an append, never a mutation, so reading "the policy
as of date X" returns exactly the rule set that was active then.
## How it works
1. **Define a `PolicyVersion`** — an ordered list of `PolicyRule`s plus a
`default_choice`.
2. **Publish it** to a `PolicyRegistry` with a `valid_from` date; publishing is
an append, so older versions are never overwritten.
3. **Route** a context through an `AgentRouter` — it selects the first matching
rule and records `matched_rule_id`, `policy_version`, and `rationale`.
4. **Reconstruct as-of** a past date by passing `as_of_transaction_time` — the
registry returns the rule set that was active then.
## Install
```bash
pip install briefcase-ai[routing]
```
```python
from briefcase.routing import (
AgentRouter,
PolicyRegistry,
PolicyRule,
PolicyVersion,
)
```
## Route a Ticket
This example routes a support ticket to a queue based on the ticket
context, publishes a second policy version, and reconstructs the earlier
decision as-of a past date.
```python
from datetime import datetime, timezone
from briefcase.routing import (
AgentRouter,
PolicyRegistry,
PolicyRule,
PolicyVersion,
)
# 1. Define a policy: route a support ticket to a queue by context.
policy_v1 = PolicyVersion(
policy_id="ticket-routing",
version="1.0.0",
description="Route support tickets to a queue.",
rules=[
PolicyRule(
rule_id="enterprise-priority",
condition={"plan": "enterprise", "priority": "high"},
choice="senior-queue",
rationale="High-priority enterprise tickets go to the senior queue.",
),
PolicyRule(
rule_id="routine-lookup",
condition={"category": {"in": ["faq", "status-check"]}},
choice="self-serve",
rationale="Routine lookups are deflected to self-serve.",
),
],
default_choice="standard-queue",
)
# 2. Publish it to a versioned registry (bitemporal-backed by default).
# valid_from is when the policy takes effect; transaction_time is when
# the registry learned of it (defaults to now if omitted).
registry = PolicyRegistry()
registry.publish(
policy_v1,
valid_from=datetime(2026, 1, 1, tzinfo=timezone.utc),
transaction_time=datetime(2026, 1, 1, tzinfo=timezone.utc),
)
# 3. Route a request through the registry.
router = AgentRouter(
registry,
use_case="ticket-routing",
policy_id="ticket-routing",
)
decision = router.route(
{"plan": "enterprise", "priority": "high", "category": "billing"},
evidence_refs=["tkt-9001"],
)
print(decision.selected) # senior-queue
print(decision.policy_version) # 1.0.0
print(decision.matched_rule_id) # enterprise-priority
print(decision.rationale) # High-priority enterprise tickets ...
print(decision.evidence_refs) # ['tkt-9001']
# 4. Publish a newer version. Older decisions still reconstruct correctly.
policy_v2 = PolicyVersion(
policy_id="ticket-routing",
version="2.0.0",
description="De-escalate enterprise high-priority to the standard queue.",
rules=[
PolicyRule(
rule_id="enterprise-priority",
condition={"plan": "enterprise", "priority": "high"},
choice="standard-queue",
rationale="Updated: enterprise high-priority now goes to the standard queue.",
),
],
default_choice="standard-queue",
)
registry.publish(
policy_v2,
valid_from=datetime(2026, 4, 1, tzinfo=timezone.utc),
transaction_time=datetime(2026, 4, 1, tzinfo=timezone.utc),
)
# Current routing uses v2.
current = router.route({"plan": "enterprise", "priority": "high"})
print(current.selected, current.policy_version) # standard-queue 2.0.0
# Reconstruct the decision as it would have been made before v2 was published.
as_of = datetime(2026, 2, 1, tzinfo=timezone.utc)
historical = router.route(
{"plan": "enterprise", "priority": "high"},
as_of_transaction_time=as_of,
)
print(historical.selected, historical.policy_version) # senior-queue 1.0.0
# Inspect the full version history.
print([v.version for v in registry.history("ticket-routing")]) # ['1.0.0', '2.0.0']
```
```mermaid
flowchart LR
A["Context"] --> B["AgentRouter.route"]
B --> C["PolicyRegistry.get(as_of)"]
C --> D["PolicyVersion.select"]
D --> E{"rule matches?"}
E -- yes --> F["matched rule choice"]
E -- no --> G["default_choice"]
F & G --> H["AgentRoutingDecision"]
```
> Diagram description
A context enters `AgentRouter.route`, which fetches the policy from `PolicyRegistry.get(as_of)` and runs `PolicyVersion.select`. If a rule matches, the matched rule's choice is used; if no rule matches, the policy's `default_choice` is used. Both branches produce an `AgentRoutingDecision`.
## PolicyRule
A `PolicyRule` is a single "if the context matches this condition, select this
choice" rule. The `condition` is a small dict predicate evaluated against the
routing context.
| Field | Type | Description |
|-------------|------------------|------------------------------------------------------|
| `rule_id` | `str` | Stable identifier, recorded as `matched_rule_id`. |
| `condition` | `dict` | Predicate evaluated against the context. |
| `choice` | `str` | The choice selected when the condition matches. |
| `rationale` | `str` (optional) | Explanation copied into the decision record. |
`condition` supports three forms:
| Syntax | Meaning |
|---------------------------------|-------------|
| `{"field": value}` | equality |
| `{"field": {"in": [a, b]}}` | membership |
| `{"field": {"ne": value}}` | inequality |
All keys in a condition must match (logical AND). Call `matches(context)` to test
a rule directly:
> Misconfiguration fails loudly
Unknown condition operators raise `KeyError` rather than silently evaluating to
`False`, so a typo in a rule surfaces immediately instead of mis-routing.
```python
from briefcase.routing import PolicyRule, PolicyVersion
rule = PolicyRule(
rule_id="routine-lookup",
condition={"category": {"in": ["faq", "status-check"]}},
choice="self-serve",
rationale="Routine lookups are deflected to self-serve.",
)
print(rule.matches({"category": "faq"})) # True
print(rule.matches({"category": "billing"})) # False
```
## PolicyVersion
A `PolicyVersion` is an ordered list of rules. `select(context)` evaluates the
rules in order and returns the first match. If no rule matches and
`default_choice` is set, the default is returned; otherwise `choice` is `None`
so the caller can fall back to human review.
```python
policy = PolicyVersion(
policy_id="ticket-routing",
version="1.0.0",
rules=[rule],
default_choice="standard-queue",
)
hit = policy.select({"category": "status-check"})
print(hit.choice, hit.matched_rule_id) # self-serve routine-lookup
miss = policy.select({"category": "billing"})
print(miss.choice, miss.matched_rule_id) # standard-queue None
```
`select` returns a `PolicyEvaluationResult` with `choice`, `matched_rule_id`,
`policy_id`, `policy_version`, and `rationale`.
## PolicyRegistry
`PolicyRegistry(store=None)` is a versioned registry of policies. It defaults to
an in-memory bitemporal store; pass any `BitemporalStore` for durable storage.
| Method | Returns | Description |
|-----------------------------------------------------------------|--------------------------|-----------------------------------------------------------------------------|
| `publish(policy, *, valid_from, transaction_time=None, source=...)` | `BitemporalRecord` | Append a new version. `valid_from` is a `datetime`. |
| `get(policy_id, *, as_of_transaction_time=None, as_of_valid_time=None)` | `PolicyVersion` | The version visible at the as-of point; latest if no clamp is given. |
| `history(policy_id)` | `list[PolicyVersion]` | Every published version, oldest first. |
`valid_from` records when a policy takes effect in the real world;
`transaction_time` records when the registry learned of it (defaults to now).
An as-of read clamps both, which is how a past decision reconstructs the rule
set that was active on its decision date.
## AgentRouter
`AgentRouter` joins a registry to a use case and a policy, and produces an
`AgentRoutingDecision` ready to attach to a Briefcase decision snapshot.
```python
router = AgentRouter(
registry,
use_case="ticket-routing",
policy_id="ticket-routing",
candidates_provider=None, # optional: derive candidate choices from context
)
```
`route` is synchronous — policy evaluation is a pure, in-memory computation
against the bitemporal store. This differs from the I/O-bound, asynchronous
[`BaseRouter`](/advanced/routing/); the two are independent abstractions.
| `route` parameter | Description |
|---------------------------|-----------------------------------------------------------------------|
| `context` | The dict evaluated against the policy rules. |
| `evidence_refs` | Record IDs of the bitemporal rows that informed the decision. |
| `as_of_transaction_time` | Reconstruct the decision using the policy active on a past date. |
The returned `AgentRoutingDecision` carries the full attribution:
| Field | Description |
|-------------------|--------------------------------------------------------------|
| `selected` | The chosen option (may be `None` if no rule and no default). |
| `policy_version` | The version that produced the choice. |
| `matched_rule_id` | The rule that fired, or `None` for the default. |
| `rationale` | Human-readable explanation from the matched rule. |
| `evidence_refs` | The evidence record IDs passed to `route`. |
| `candidates` | The set of choices the policy could have selected from. |
## Where this fits
Versioned routing is the **Control** act made reproducible: it records which
rule fired so the decision can later be reconstructed and proven. It rests on
the append-only store from the **Store & Query** act and feeds the
tamper-evident bundles of the **Prove** act.
- Routing — Previous: the narrow, in-process auto-versus-human-review router. (/advanced/routing/)
- Bitemporal Storage — The append-only store that backs the registry and powers as-of reconstruction. (/advanced/bitemporal-storage/)
- Audit Bundles — Next: package a routing decision, its policy version, and evidence into a content-hashed, verifiable bundle. (/advanced/compliance-bundles/)
## Validation Engine
Source: https://briefcaseai.io/advanced/validation-engine/
> Block prompts whose references no longer resolve, before they reach a model.
The validation engine checks that the references in a prompt (document paths,
section numbers, identifiers) actually resolve against a versioned knowledge
base before the prompt reaches a model.
> When you'd reach for this
Your triage agent builds a prompt that cites `handbook/onboarding.md` and
`Section 4.2.3`. Last week someone moved the handbook and the section number
changed. Without a check, the model answers confidently from a reference that no
longer resolves. The validation engine catches that *before* the prompt runs and
hands back structured remediation instead of a silent, wrong answer.
It is a pure framework: you supply an *extractor* that finds references and a
*resolver* that checks them, and the engine orchestrates the layers and records
the commit it validated against.
## How it works
1. **Extract** — your extractor finds candidate references in the prompt. No
references means the prompt passes immediately.
2. **Resolve** — your resolver checks each reference against the versioned
knowledge base and returns a `ValidationError` for anything that fails.
3. **Semantic (optional)** — if there are no errors, an optional
`semantic_validator` can add warnings based on the prompt's meaning.
4. **Stamp** — the engine records the knowledge-base commit it validated
against, so the result is reproducible.
## Install
```bash
pip install briefcase-ai[validate]
```
```python
from briefcase.validation import PromptValidationEngine
from briefcase.validation import ValidationError, ValidationErrorCode
```
## Validate a Prompt
```python
import re
from briefcase.validation import PromptValidationEngine
from briefcase.validation import ValidationError, ValidationErrorCode
class HandbookExtractor:
"""Finds ``handbook/*.md`` paths and ``Section X.Y`` references in a prompt."""
_REF = re.compile(r"handbook/[\w/]+\.md|Section\s+[\d.]+")
def extract(self, prompt: str) -> list:
return self._REF.findall(prompt)
class KnowledgeBaseResolver:
"""Resolves references against an allowlist of known documents."""
def __init__(self, known_references: set):
self._known = known_references
def resolve_all(self, references: list) -> list:
errors = []
for ref in references:
if ref not in self._known:
errors.append(
ValidationError(
code=ValidationErrorCode.REFERENCE_NOT_FOUND,
message=f"Reference not found in knowledge base: {ref}",
reference=ref,
severity="error",
layer="resolution",
remediation="Add the document to the knowledge base or fix the reference.",
)
)
return errors
class DemoKnowledgeBase:
"""Stand-in for VersionedClient.get_commit() so the example runs offline."""
def get_commit(self, repository: str, branch: str) -> str:
return "demo0000000000000000000000000000000000000"
engine = PromptValidationEngine(
extractor=HandbookExtractor(),
resolver=KnowledgeBaseResolver(
known_references={"handbook/onboarding.md", "Section 4.2.3"},
),
lakefs_client=DemoKnowledgeBase(),
repository="knowledge-base",
branch="main",
mode="strict", # fail on errors
)
prompt = """
Follow the onboarding policy in handbook/onboarding.md and reference
Section 4.2.3 for account-setup steps. Also see handbook/missing.md.
"""
report = engine.validate(prompt)
print(report.status) # "failed" — handbook/missing.md is unknown
print(report.references_checked) # 3
print(report.lakefs_commit[:8]) # "demo0000"
if report.has_errors:
for error in report.errors:
print(error.reference, "->", error.message)
print(" fix:", error.remediation)
```
In production, pass any versioned-data client (the bundled
[`briefcase.integrations.lakefs.VersionedClient`](/integrations/lakefs/), or your
own via the `vcs` protocol) as `lakefs_client`, to resolve references against a
live, version-controlled knowledge base. The engine calls
`lakefs_client.get_commit(repository, branch)` to stamp every report with the
commit it validated against.
## Pluggable Protocols
You provide objects that satisfy two protocols.
The engine ships no built-in extractors or resolvers and installs no third-party
dependencies for them. You supply the objects that find and check references.
```python
# briefcase.validation exports these as runtime-checkable protocols; any object
# with the right method signature satisfies them (no base class required).
from typing import Protocol
# Extractor: find references in a prompt.
class Extractor(Protocol):
def extract(self, prompt: str) -> list: ...
# Resolver: check each reference, return a list of ValidationError.
class Resolver(Protocol):
def resolve_all(self, references: list) -> list: ...
```
A resolver returns `ValidationError` objects. Errors with `severity="error"`
become `report.errors`; any other severity becomes `report.warnings`.
An optional third layer runs only when there are no errors: pass a
`semantic_validator` with a `validate_semantic(prompt, references) -> list`
method to attach warnings based on the meaning of the prompt.
```python
class KeywordSemanticValidator:
def validate_semantic(self, prompt: str, references: list) -> list:
return [] # return ValidationError warnings based on the prompt's meaning
engine = PromptValidationEngine(
extractor=HandbookExtractor(),
resolver=KnowledgeBaseResolver(known_references=set()),
lakefs_client=DemoKnowledgeBase(),
repository="knowledge-base",
semantic_validator=KeywordSemanticValidator(), # optional third layer
)
```
## Validation Layers
```mermaid
flowchart LR
A["Prompt"] --> B["extractor.extract()"]
B -- "no refs" --> P["status: passed"]
B -- "refs found" --> C["resolver.resolve_all()"]
C --> D{"errors?"}
D -- "yes" --> F["status: failed"]
D -- "no" --> E["semantic_validator (optional)"]
E --> G["status: passed / warning"]
```
> Diagram description
A prompt runs through `extractor.extract()`. If no references are found, the status is `passed`. If references are found, `resolver.resolve_all()` runs and the engine checks for errors. Any error yields status `failed`. With no errors, the optional `semantic_validator` runs and the status becomes `passed` or `warning`.
## ValidationReport
`validate()` returns a `ValidationReport`.
| Field | Description |
|-------|-------------|
| `status` | `"passed"`, `"warning"`, or `"failed"` |
| `errors` | List of `ValidationError` with `severity="error"` |
| `warnings` | List of `ValidationError` with other severities |
| `references_checked` | Number of references the extractor found |
| `validation_time_ms` | Wall-clock validation time |
| `lakefs_commit` | Commit SHA the validation ran against |
| `has_errors` | `True` when `errors` is non-empty |
| `has_warnings` | `True` when `warnings` is non-empty |
## ValidationError
Each error carries structured remediation context.
```python
ValidationError(
code=ValidationErrorCode.REFERENCE_NOT_FOUND,
message="Reference not found in knowledge base: handbook/missing.md",
reference="handbook/missing.md",
severity="error",
layer="resolution",
remediation="Add the document to the knowledge base or fix the reference.",
)
```
`ValidationErrorCode` is an enum of the conditions the engine reports:
| Code | Meaning |
|------|---------|
| `INVALID_SYNTAX` | Reference is malformed |
| `REFERENCE_NOT_FOUND` | Reference does not exist in the knowledge base |
| `REFERENCE_AMBIGUOUS` | Reference matches more than one document |
| `REFERENCE_GONE` | Reference existed but was removed |
| `VERSION_MISMATCH` | Reference resolves to an unexpected version |
| `SCHEMA_INVALID` | Resolved document fails schema checks |
| `LAKEFS_UNAVAILABLE` | The versioned knowledge base could not be reached |
## Validation Modes
The `mode` argument controls how `status` is derived.
| Mode | Errors | Warnings only | Clean |
|------|--------|---------------|-------|
| `"strict"` | `failed` | `warning` | `passed` |
| `"tolerant"` | `failed` | `passed` | `passed` |
| `"warn_only"` | `passed` | `passed` | `passed` |
## Where this fits
Validation is a **Control** act: it stops a prompt with stale references before
it runs, and stamps the commit it checked against so the result is reproducible.
It pairs naturally with versioned retrieval.
- Guardrails — Previous: enforce deny-by-default controls before an agent action runs. (/advanced/guardrails/)
- RAG Versioning — Pin retrieval to a versioned knowledge base so references resolve to a known commit. (/advanced/rag-versioning/)
- Reproducible RAG (guide) — End-to-end: validate references and pin retrieval so a run reproduces exactly. (/guides/reproducible-rag/)
========================================================================
# Store & Query
========================================================================
## Storage Adapters
Source: https://briefcaseai.io/features/storage-adapters/
> Choose where your decision records live — the backend that holds your audit trail.
A storage adapter is the backend that holds your audit trail — the durable home for every decision Briefcase captures, and the surface you query when someone asks why a decision was made.
> When you'd reach for this
Your support-triage agent has been recording `classify_ticket` decisions for a month, and a reviewer now wants to pull every support-queue decision from last week. Those records have to live somewhere durable and queryable. The backend you pick decides whether that review is a one-line query against a file on disk or a lost cause because the records were only ever in memory.
## How it works
1. **Init** — call `briefcase.init()` once and construct a backend.
2. **Create** — build a `DecisionSnapshot` for each `classify_ticket` call.
3. **Save** — `save_decision()` persists the record and returns its id.
4. **Query** — pull records back with a `SnapshotQuery` when you need to review them.
```mermaid
flowchart LR
A[classify_ticket] --> B[DecisionSnapshot]
B --> C[SqliteBackend]
C --> D[(Audit trail)]
E[Reviewer query] --> C
```
> Diagram description
A decision becomes a DecisionSnapshot, which is saved into the configured storage backend, building a durable audit trail. A reviewer's SnapshotQuery reads back from the same backend.
## Install
```bash
pip install briefcase-ai[storage]
```
The `storage` extra installs no third-party dependencies; the SQLite backend uses the runtime that ships with the package.
## Which backend should I use?
| Backend | Persistence | Reach for it when |
|---------|-------------|-------------------|
| `SqliteBackend.in_memory()` | No — gone on exit | Tests and local experiments, where no durable audit trail is needed |
| `SqliteBackend("path.db")` | Yes — a file on disk | A single node where you want a real, queryable audit trail |
| `BufferedBackend` | Yes — wraps another backend | High write volume, where batching `save_decision` calls matters |
`BufferedBackend` is not a separate store — it wraps a durable backend (such as `SqliteBackend`) and batches `save_decision` calls until the buffer fills, so you trade a small flush delay for less write pressure under load.
## Init -> create -> save -> query
1. **Init the runtime and backend**
```python
import briefcase
from briefcase.storage import SqliteBackend
briefcase.init() # start the native runtime once per process
backend = SqliteBackend("decisions.db")
```
2. **Create a decision** for each `classify_ticket` call.
```python
from briefcase import DecisionSnapshot, Input, Output
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "reset my password", "string"))
output = Output("category", "account_access", "string")
output.with_confidence(0.92)
decision.add_output(output)
decision.add_tag("queue", "support")
```
3. **Save** the record — `save_decision()` returns its id.
```python
decision_id = backend.save_decision(decision)
loaded = backend.load_decision(decision_id)
print(loaded.function_name) # classify_ticket
```
4. **Query the audit trail** with a `SnapshotQuery`.
```python
from briefcase import SnapshotQuery
results = backend.query(
SnapshotQuery()
.with_function_name("classify_ticket")
.with_tag("queue", "support")
)
print(len(results))
```
`briefcase.init()` must be called once before using a backend to start the native runtime.
## Backends in detail
### In-memory (for tests)
`SqliteBackend.in_memory()` keeps data in memory — fast and ephemeral, the right choice for tests where you do not need records to survive the process.
```python
import briefcase
from briefcase.storage import SqliteBackend
briefcase.init()
backend = SqliteBackend.in_memory()
```
### File on disk (for a real audit trail)
`SqliteBackend(path)` writes to a file — a durable, queryable audit trail in one place, the workhorse for single-node deployments.
```python
import briefcase
from briefcase.storage import SqliteBackend
briefcase.init()
backend = SqliteBackend("decisions.db")
print(backend.health_check()) # True
```
### Buffered (for high volume)
`BufferedBackend` wraps a durable backend and batches `save_decision` calls until the buffer fills.
```python
import briefcase
from briefcase.storage import SqliteBackend, BufferedBackend
from briefcase import DecisionSnapshot, Input
briefcase.init()
backend = BufferedBackend(SqliteBackend("decisions.db"), buffer_size=100)
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "update my address", "string"))
backend.save_decision(decision)
```
## A governance query: load decisions for review
The point of a durable backend is the review it enables. When a reviewer asks for last week's support-queue decisions, you answer with a tagged `SnapshotQuery` against the same store that captured them.
```python
import briefcase
from briefcase.storage import SqliteBackend
from briefcase import SnapshotQuery
briefcase.init()
backend = SqliteBackend("decisions.db")
# Pull every support-queue triage decision for review
query = (
SnapshotQuery()
.with_function_name("classify_ticket")
.with_tag("queue", "support")
.with_limit(50)
.with_offset(0)
)
for decision in backend.query(query):
# hand each record to a reviewer, or replay it to verify
...
```
`SnapshotQuery` supports `with_function_name`, `with_module_name`, `with_tag`, `with_limit`, and `with_offset`. From here a reviewer can [audit a decision](/guides/audit-a-decision/) or [replay](/features/replay/) it to reproduce exactly what happened.
## Snapshots: grouping multiple decisions
A `Snapshot` groups several decisions; `save()` returns the snapshot id and `load()` returns it.
```python
import briefcase
from briefcase.storage import SqliteBackend
from briefcase import DecisionSnapshot, Input, Snapshot
briefcase.init()
backend = SqliteBackend.in_memory()
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "where is my order", "string"))
session = Snapshot("session")
session.add_decision(decision)
snapshot_id = backend.save(session)
restored = backend.load(snapshot_id)
print(len(restored.decisions)) # 1
```
## The persistence interface
`SqliteBackend` exposes the full interface (`BufferedBackend` only buffers `save_decision` calls before flushing them to the backend it wraps):
```python
backend.save(snapshot) # store a Snapshot, returns its id
backend.load(snapshot_id) # load a Snapshot
backend.save_decision(decision) # store a DecisionSnapshot, returns its id
backend.load_decision(decision_id)
backend.query(snapshot_query) # run a SnapshotQuery
backend.delete(snapshot_id)
backend.health_check()
```
## Available backends
| Backend | Class | Description |
|---------|-------|-------------|
| SQLite | `SqliteBackend` | Local SQLite database (file or in-memory) |
| Buffered | `BufferedBackend` | Wraps a backend and batches writes |
> Scaling beyond a single node
The open-source package ships these two backends, and `SqliteBackend` covers single-node deployments well. When you outgrow it — multiple writers, central retention, or shared query access — the path is a server-grade backend: S3, GCS, Azure Blob, and PostgreSQL backends are available in the enterprise build. Moving to one is a backend swap, not a change to your decision code.
## Where this fits
Storage is the **Store & Query** act: the durable home for everything Capture produced, and the surface the later acts read from.
- Decision Recording — Where decision snapshots come from — the Capture act that fills this backend. (/features/decision-recording/)
- Deterministic Replay — Next: load a record back out and reproduce exactly what happened. (/features/replay/)
## Bitemporal Storage
Source: https://briefcaseai.io/advanced/bitemporal-storage/
> Answer "what did we know at decision time?" even after a value is later corrected.
# Bitemporal Storage
Bitemporal storage tracks two independent time dimensions for every record: **valid time** (when a fact was true in the world) and **transaction time** (when the system learned about it) — so a later correction never erases what you actually knew at decision time.
> When you'd reach for this
Your support-triage agent escalates a ticket because the `max_upload_mb` config read `50` that morning. Two days later someone corrects that value — backdated, because it had been wrong all along. When the escalation is questioned, you need to show what the agent actually saw on the day, not the corrected value. Bitemporal storage keeps both beliefs so "what did we know at decision time?" has an exact answer.
## Valid time vs transaction time
Traditional storage overwrites: when a value changes, the old value is lost and the record of what you believed disappears with it. Bitemporal storage separates the two clocks so a backdated correction and the original belief can coexist. Writes are append-only — a correction is a new record, not an edit.
| Dimension | Answers | Why it matters |
|-----------|---------|----------------|
| **Valid time** | When was this fact true in the world? | Lets a correction apply to a past date without rewriting history. |
| **Transaction time** | When did the system learn it? | Distinguishes "what we knew on May 2" from "what we know now" — the key to a defensible audit. |
## How it works
1. **Record a fact** with a `valid_time` and a `transaction_time` via `BitemporalRecord.new`.
2. **Append a backdated correction** with `append_correction` — same valid time, a fresh transaction time, so both versions coexist.
3. **Reconstruct the past** by clamping reads through an `AsOfView` at the decision's transaction time.
```mermaid
flowchart LR
A["BitemporalRecord.new
(valid_time, transaction_time)"] --> B["store.append"]
C["append_correction
(new transaction_time)"] --> B
B --> D["BitemporalStore
(append-only)"]
D --> E["store.latest / history"]
D --> F["AsOfView(transaction_time)
clamped read"]
F --> G["reconstruct past belief"]
```
> Diagram description
Both `BitemporalRecord.new` and `append_correction` write into the append-only
`BitemporalStore`. Reads branch two ways: `store.latest` / `history` return
current truth, while an `AsOfView` clamped to a transaction time reconstructs
the past belief held at that instant.
## Install
```bash
pip install briefcase-ai[bitemporal]
```
```python
from briefcase.bitemporal import (
BitemporalRecord,
InMemoryBitemporalStore,
AsOfView,
append_correction,
)
```
## Record a Fact
```python
from datetime import datetime, timezone
from briefcase.bitemporal import (
BitemporalRecord,
InMemoryBitemporalStore,
)
store = InMemoryBitemporalStore()
# A feature-flag rollout percentage that was true in the real world at t0.
t0 = datetime(2026, 5, 1, tzinfo=timezone.utc)
learned_t0 = datetime(2026, 5, 1, 9, 0, tzinfo=timezone.utc)
record = BitemporalRecord.new(
key="flag:new_checkout",
valid_time=t0,
value={"rollout_percent": 25},
source="config_service",
transaction_time=learned_t0,
)
store.append(record)
latest = store.latest("flag:new_checkout")
print(latest.value) # {'rollout_percent': 25}
print(latest.content_hash()[:12]) # SHA-256 of the value payload
```
`valid_time` and `transaction_time` must be timezone-aware; `BitemporalRecord.new` raises `ValueError` otherwise. When `transaction_time` is omitted it defaults to now.
## Correct a Value and Reconstruct the Past
A correction shares the original `valid_time` but gets a fresh `transaction_time` and a `parent_record_id` back to the original. The old belief stays in the store.
```python
from datetime import datetime, timezone
from briefcase.bitemporal import (
BitemporalRecord,
InMemoryBitemporalStore,
AsOfView,
append_correction,
)
store = InMemoryBitemporalStore()
t0 = datetime(2026, 5, 1, tzinfo=timezone.utc)
learned_t0 = datetime(2026, 5, 1, 9, 0, tzinfo=timezone.utc)
original = BitemporalRecord.new(
key="config:max_upload_mb",
valid_time=t0,
value=50,
source="config_service",
transaction_time=learned_t0,
)
store.append(original)
# A decision was made at this instant, reading what the system knew then.
decision_ts = datetime(2026, 5, 2, 12, 0, tzinfo=timezone.utc)
# Later, the config service corrects the same valid_time: it was 100, not 50.
learned_correction = datetime(2026, 5, 3, 8, 0, tzinfo=timezone.utc)
append_correction(
store,
original,
corrected_value=100,
source="config_service",
transaction_time=learned_correction,
)
# Current truth reflects the correction.
print(store.latest("config:max_upload_mb").value) # 100
# As-of the decision, the system had not yet learned the correction.
with AsOfView(store, transaction_time=decision_ts) as view:
print(view.latest("config:max_upload_mb").value) # 50
print(len(store.history("config:max_upload_mb"))) # 2 — both beliefs kept
```
`append_correction` requires the correction's `transaction_time` to be strictly after the original's; it raises `ValueError` otherwise, so a correction can never silently fail to supersede.
## Clamp Reads with AsOfView
`AsOfView(store, transaction_time=...)` wraps any store and clamps every read to a historical instant. Application code keeps calling `latest(key)` / `as_of(key)` unchanged — no post-instant information leaks in. The view is read-only; `append` raises.
```python
with AsOfView(store, transaction_time=decision_ts) as view:
record = view.latest("config:max_upload_mb")
rows = view.history("config:max_upload_mb")
keys = view.keys()
```
Pass `valid_time=` as well to restrict to facts that were true at a past real-world moment, distinct from what the system had learned by then.
## Batch vs. Stream Ingestion
Both produce identical bitemporal output; they differ in `transaction_time` semantics. `batch_append` settles a whole batch at one shared instant; `stream_append` learns each record independently.
```python
from datetime import datetime, timezone
from briefcase.bitemporal import (
BitemporalRecord,
InMemoryBitemporalStore,
batch_append,
stream_append,
)
store = InMemoryBitemporalStore()
valid = datetime(2026, 5, 1, tzinfo=timezone.utc)
# batch_append: many records settle at one shared transaction_time.
settled_at = datetime(2026, 5, 1, 23, 0, tzinfo=timezone.utc)
batch = [
BitemporalRecord.new(key="rate:US", valid_time=valid, value=0.07, source="rates_feed"),
BitemporalRecord.new(key="rate:EU", valid_time=valid, value=0.19, source="rates_feed"),
]
batch_append(store, batch, transaction_time=settled_at)
# stream_append: each record is learned independently, at append time.
tick = BitemporalRecord.new(key="rate:US", valid_time=valid, value=0.075, source="rates_feed")
stream_append(store, tick)
print(store.latest("rate:US").value) # 0.075
print(sorted(store.keys())) # ['rate:EU', 'rate:US']
```
| Helper | When you'd reach for it |
|--------|-------------------------|
| `batch_append` | A whole batch becomes known at once — settle it at one shared `transaction_time`. |
| `stream_append` | Records arrive one at a time — each is learned independently at append time. |
## Durable Backends
`InMemoryBitemporalStore` is the reference implementation. For persistence across process restarts, use the SQLite backend. Append-only is enforced at the database layer via triggers.
The SQLite backend ships in the base install and adds no third-party dependencies.
```python
from briefcase.bitemporal.backends import SqliteBitemporalBackend
backend = SqliteBitemporalBackend("evidence.db")
```
For multi-writer analytics, the Iceberg backend wraps `pyiceberg`:
```bash
pip install briefcase-ai[bitemporal-iceberg]
```
```python
from briefcase.bitemporal.backends import IcebergBitemporalBackend
```
All backends implement the same `BitemporalStore` protocol, so `AsOfView` and application code are backend-agnostic.
## Key Classes
| Symbol | Why it matters |
|--------|----------------|
| `BitemporalRecord` | Immutable record with `valid_time`, `transaction_time`, `value`, `source`; `new()` constructor, `content_hash()`. |
| `InMemoryBitemporalStore` | Reference store; `append` / `append_many` / `latest` / `history` / `as_of` / `keys`. |
| `AsOfView` | Read-only view clamped to `transaction_time` and/or `valid_time` — reconstructs a past belief. |
| `append_correction` | Appends a superseding record so the original belief is preserved, not overwritten. |
| `batch_append` / `stream_append` | Shared-instant vs. per-record ingestion. |
| `SqliteBitemporalBackend` / `IcebergBitemporalBackend` | Durable backends sharing the `BitemporalStore` protocol. |
## Where this fits
- Related: Versioned Routing Policy — The same backdated-correction model, applied to which rules were in force. (/advanced/versioned-routing-policy/)
- Next: Audit Bundles — Seal a decision together with the as-of-then evidence into a verifiable record. (/advanced/compliance-bundles/)
## External Data
Source: https://briefcaseai.io/advanced/external-data/
> Pin the exact upstream value a decision used, so it stays reproducible when the source changes.
# External Data Tracking
`ExternalDataTracker` records a hashed snapshot of every external fetch a decision relied on — an API response, a database query, a file — so the decision stays reproducible even after that source changes underneath you.
> When you'd reach for this
Your support-triage agent classified a ticket as low priority because the `pricing_service` reported a customer's plan at one tier. Two weeks later that upstream record is corrected to a higher tier, and the routing looks wrong in hindsight. Without a snapshot you can't prove the agent acted on the value that was live at the time. Capture the upstream value with the decision and the answer is exact, not reconstructed.
## How it works
1. **Snapshot the fetch.** `track_api_call` hashes the response and stores it when the policy allows.
2. **Detect drift.** `detect_drift` compares the live value against the latest snapshot to see whether the source moved.
3. **Append a correction.** When the upstream value is fixed, `correct_snapshot` records the fix without erasing the original.
```mermaid
flowchart LR
A["pricing_service fetch"] --> B["track_api_call()"]
B --> C["snapshot + SHA-256 hash"]
C --> D["classify_ticket decision"]
E["live value, two weeks later"] --> F{"detect_drift"}
C --> F
F -- "changed" --> G["correct_snapshot
original kept"]
```
> Diagram description
The triage agent snapshots the pricing service response via track_api_call, which hashes it before the ticket is classified. Two weeks later detect_drift compares the live value against the stored snapshot and reports a change; correct_snapshot records the fix while leaving the original snapshot intact for audit.
## Install
```bash
pip install briefcase-ai[external]
```
```python
from briefcase.external import (
ExternalDataTracker,
SnapshotPolicy,
SnapshotFrequency,
)
```
## Track an External Call
`track_api_call()` hashes the response, stores a snapshot when the policy
allows, and reports whether the source drifted since the last snapshot.
```python
from briefcase.external import ExternalDataTracker
tracker = ExternalDataTracker()
response = {"items": [{"sku": "A-100", "price": 19.99}]}
result = tracker.track_api_call(
api_name="pricing_service",
endpoint="https://pricing.internal/v1/catalog",
method="GET",
response_data=response,
record_count=1,
)
print(result["data_hash"][:12]) # SHA-256 of the response
print(result["snapshot_stored"]) # True — first snapshot is always stored
print(result["drift_detected"]) # False — nothing to compare against yet
```
The same shape applies to database queries and file fetches:
```python
tracker.track_db_query(
db_system="postgres",
db_name="catalog",
query="SELECT sku, price FROM products",
result_data=[{"sku": "A-100", "price": 19.99}],
result_count=1,
store_snapshot=True,
)
tracker.track_file_fetch(
source_name="reference_rates",
file_data=b"sku,price\nA-100,19.99\n",
file_path="reference/rates.csv",
record_count=1,
)
```
| Method | Use it for |
|--------|------------|
| `track_api_call` | An HTTP/API response the decision read. |
| `track_db_query` | A database query result the decision read. |
| `track_file_fetch` | A reference file the decision read. |
## Detect Drift
Compare current data against the latest stored snapshot for a source.
`detect_drift()` returns `None` when there is no prior snapshot.
```python
tracker.track_api_call(
api_name="pricing_service",
endpoint="https://pricing.internal/v1/catalog",
method="GET",
response_data={"items": [{"sku": "A-100", "price": 19.99}]},
)
report = tracker.detect_drift(
"pricing_service",
current_data={"items": [{"sku": "A-100", "price": 24.99}]},
)
print(report.has_changed) # True
print(report.drift_score) # 1.0
print(report.size_delta) # byte difference vs the baseline snapshot
```
Compare two specific snapshots by id with `compare_snapshots()`:
```python
tracker = ExternalDataTracker()
first = tracker.track_api_call(
api_name="inventory_api", endpoint="https://inv.internal/v1/stock", method="GET",
response_data={"items": [{"sku": "A-100", "qty": 40}]},
)
second = tracker.track_api_call(
api_name="inventory_api", endpoint="https://inv.internal/v1/stock", method="GET",
response_data={"items": [{"sku": "A-100", "qty": 25}]},
)
report = tracker.compare_snapshots(first["snapshot_id"], second["snapshot_id"])
print(report.has_changed) # True
```
## Snapshot Policy
A `SnapshotPolicy` controls when snapshots are taken and how long they are
kept. Set a per-source policy, or pass `default_policy` to the tracker.
```python
from briefcase.external import (
ExternalDataTracker, SnapshotPolicy, SnapshotFrequency,
)
tracker = ExternalDataTracker(
default_policy=SnapshotPolicy(
frequency=SnapshotFrequency.ON_CHANGE,
retention_days=90,
max_snapshots=100,
)
)
tracker.set_policy(
"pricing_service",
SnapshotPolicy(frequency=SnapshotFrequency.EVERY_CALL),
)
```
| Field | Default | Description |
|-------|---------|-------------|
| `frequency` | `ON_CHANGE` | When to store a snapshot |
| `retention_days` | `90` | Days to retain snapshots (`0` = forever) |
| `change_threshold` | `0.0` | Minimum change to count as drift on `ON_CHANGE` |
| `max_snapshots` | `0` | Max snapshots per source (`0` = unlimited) |
| `compress` | `False` | Compress snapshot bodies before storage |
`SnapshotFrequency` values: `EVERY_CALL`, `ON_CHANGE`, `HOURLY`, `DAILY`,
`WEEKLY`.
## Append a Correction
When a source returned bad data, append a correction instead of overwriting the
snapshot. The correction keeps the parent's `valid_time` (when the data was
true in the real world) but gets a fresh transaction time, so historical
queries still see the original belief and later queries see the corrected value.
```python
original = tracker.track_api_call(
api_name="pricing_service",
endpoint="https://pricing.internal/v1/catalog",
method="GET",
response_data={"items": [{"sku": "A-100", "price": 1999.00}]}, # bad value
)
corrected = tracker.correct_snapshot(
original["snapshot_id"],
corrected_data={"items": [{"sku": "A-100", "price": 19.99}]},
source="manual_review",
)
print(corrected.parent_snapshot_id == original["snapshot_id"]) # True
```
The original snapshot is never mutated; the correction records its lineage
through `parent_snapshot_id`.
## Redact PII Before Storage
Pass a `sanitizer` (for example
[`briefcase.sanitize.Sanitizer`](/features/pii-sanitization/)) and snapshot bodies
are redacted before they are persisted to durable storage. The `data_hash` is
still computed over the original payload, so drift detection is unaffected.
```python
from briefcase.external import ExternalDataTracker
from briefcase.sanitize import Sanitizer
tracker = ExternalDataTracker(sanitizer=Sanitizer())
```
> Fails closed
If the sanitizer raises, the tracker persists metadata only — raw, potentially
PII-bearing data never reaches storage on the error path.
## Where this fits
- Related: RAG Versioning — The same snapshot-and-detect model, applied to a retrieval index. (/advanced/rag-versioning/)
- Guide: Reproducible RAG — Put snapshots and versioning together end to end. (/guides/reproducible-rag/)
## RAG Versioning
Source: https://briefcaseai.io/advanced/rag-versioning/
> Tie each retrieval to the exact corpus state, and catch a stale index before it informs a decision.
# RAG Versioning
RAG versioning records exactly which documents were embedded, with which model, at which source commit — so a retrieval can be tied back to the exact corpus state, and a stale index is caught before it answers anything.
> When you'd reach for this
Your support-triage agent answers policy questions from a retrieval index built over a `support_kb`. The team revises a document and adds a new one, but the index isn't rebuilt — so the agent keeps citing the old version and routes a ticket on outdated guidance. A versioned manifest makes the drift detectable: `check_invalidation` tells you precisely what changed before you trust a retrieval.
## How it works
1. **Build a manifest.** `create_embedding_batch` then `create_manifest` fingerprint the indexed documents into an `EmbeddingManifest`.
2. **Check invalidation.** `check_invalidation` compares the current documents and model against the manifest to detect a stale index.
3. **Rebuild.** When sources changed, `rebuild_index` produces a fresh manifest so retrievals reflect the current corpus.
```mermaid
flowchart LR
A["Documents"] --> B["create_embedding_batch()"]
B --> C["create_manifest()"]
C --> D["EmbeddingManifest"]
E["Current documents + model"] --> F["check_invalidation()"]
D --> F
F --> G{"is_valid?"}
G -- "no" --> H["rebuild_index()"]
G -- "yes" --> I["Reuse index"]
```
> Diagram description
Documents flow through `create_embedding_batch()` and `create_manifest()` to
produce an `EmbeddingManifest`. `check_invalidation()` compares that manifest
against the current documents and model: if it is no longer valid the index is
rebuilt with `rebuild_index()`, otherwise the existing index is reused.
## Install
```bash
pip install briefcase-ai[rag]
```
The `rag` extra is pure Python and installs no third-party dependencies.
```python
from briefcase.rag import VersionedEmbeddingPipeline, Document
```
## Build an Index
A `Document` carries an id, content, and optional metadata. The pipeline's
`embedding_model` is any object with an `embed(texts) -> list[list[float]]`
method; optional `name` and `version` attributes are recorded on the manifest.
```python
from briefcase.rag import VersionedEmbeddingPipeline, Document
class HashingEmbedder:
"""Trivial deterministic embedder for the example."""
name = "demo-embedder"
version = "1.0"
def embed(self, texts):
return [[float(len(t) % 7), float(len(t) % 11)] for t in texts]
pipeline = VersionedEmbeddingPipeline(embedding_model=HashingEmbedder())
documents = [
Document(id="kb-1", content="How to reset your password.", path="handbook/auth.md"),
Document(id="kb-2", content="Refund policy for digital goods.", path="handbook/refunds.md"),
]
batch = pipeline.create_embedding_batch(documents)
manifest = pipeline.create_manifest("support_kb", [batch])
print(manifest.index_name) # "support_kb"
print(manifest.document_count) # 2
print(manifest.model) # "demo-embedder"
print(manifest.status) # "current"
```
`rebuild_index()` chains both steps when you just want a fresh manifest:
```python
manifest = pipeline.rebuild_index("support_kb", documents)
```
## Detect Staleness
`check_invalidation()` compares the latest manifest against the current document
set and model, and returns an `InvalidationReport` describing what changed.
```python
# A document changed and one was added since the manifest was built.
current = [
Document(id="kb-1", content="How to reset your password (updated)."),
Document(id="kb-2", content="Refund policy for digital goods."),
Document(id="kb-3", content="Two-factor authentication setup."),
]
report = pipeline.check_invalidation("support_kb", current)
print(report.is_valid) # False
print(report.status) # "stale_documents"
print(report.added_documents) # ["kb-3"]
print(report.changed_documents) # ["kb-1"]
print(report.removed_documents) # []
print(report.model_changed) # False
```
`status` is one of: `current`, `stale_documents`, `stale_model`, `stale_both`,
`rebuilding`. When the index is stale, rebuild it:
```python
if not report.is_valid:
manifest = pipeline.rebuild_index("support_kb", current)
```
## EmbeddingManifest
The manifest is the versioning artifact. Persist it to compare future builds
against it.
| Field | Description | Why it matters |
|-------|-------------|----------------|
| `manifest_id` | Unique id for this build | Names the corpus version a retrieval ran against. |
| `index_name` | Name of the index | Groups builds of the same index over time. |
| `model` / `model_version` | Model that produced the embeddings | A model change invalidates the index too. |
| `source_commit` | Source commit the documents came from | Ties the corpus to a versioned source. |
| `document_count` | Number of documents embedded | Quick sanity check on coverage. |
| `document_hashes` | `doc_id -> content_hash` at embed time | What `check_invalidation` compares against. |
| `status` | Current `ManifestStatus` value | `current` vs `stale_*`. |
| `manifest_hash` | Deterministic SHA-256 over the manifest content | Tamper-evident fingerprint of the whole build. |
```python
print(manifest.manifest_hash) # integrity hash
serialized = manifest.to_json()
```
## Instrument Retrieval
`InstrumentedRetriever` captures version provenance on each retrieved document.
It is a reference implementation: the base `retrieve()` returns placeholder
results (and emits a `RuntimeWarning`) so the provenance shape is clear. Override
`retrieve()` with a real vector-store query — returning your own
`RetrievalResult` objects — to silence the warning and use it for real.
```python
from briefcase.rag import InstrumentedRetriever
class KbRetriever(InstrumentedRetriever):
def retrieve(self, query, top_k=5, similarity_threshold=0.7):
# Query your real vector store here, then wrap hits in RetrievalResult.
return super().retrieve(query, top_k, similarity_threshold)
retriever = KbRetriever(
vector_store=None, # your vector store client
lakefs_client=None, # resolves document_version (commit SHA)
repository="support_kb",
)
results = retriever.retrieve("how do I reset my password?", top_k=3)
for r in results:
print(r.rank, r.document_id, r.score, r.document_version)
```
Each `RetrievalResult` carries `document_id`, `content`, `score`, `rank`,
`document_version` (the commit SHA the document was read at), and `metadata`.
## Where this fits
- Related: External Data Tracking — The same versioning idea for non-RAG upstream values. (/advanced/external-data/)
- Related: Validation Engine — Validate the content a versioned retrieval returns before a decision. (/advanced/validation-engine/)
- Guide: Reproducible RAG — Wire manifests and snapshots into a reproducible pipeline. (/guides/reproducible-rag/)
========================================================================
# Replay & Verify
========================================================================
## Deterministic Replay
Source: https://briefcaseai.io/features/replay/
> Re-run a saved decision to prove it still produces the same output — and catch the run that doesn't.
Re-execute a saved decision against a `ReplayEngine` and compare the new output to the one you recorded — so a model swap, a prompt edit, or a dependency bump can't silently change what your system decides.
> When you'd reach for this
You ship a new version of the `classify_ticket` model and want to know, before it reaches users, whether it still routes the same tickets the same way. Replay the decisions you already recorded against the new build: if `outputs_match` flips to `False` on tickets that used to be stable, you have caught a regression instead of a customer noticing it. It is also how you demonstrate, after the fact, that a recorded decision is reproducible.
## How it works
1. **Persist** a `DecisionSnapshot` to a storage backend (this is the original you'll compare against).
2. **Replay** it through `ReplayEngine`, which re-executes the decision and compares the new output to the recorded one.
3. **Interpret** the `ReplayResult` — `outputs_match`, `status`, and `policy_violations` tell you whether the decision held and what to do next.
```mermaid
flowchart LR
A[Recorded decision
in storage] --> B[ReplayEngine.replay]
B --> C{outputs_match?}
C -->|True| D[Reproducible — ship]
C -->|False| E[Regression — investigate]
```
> Diagram description
A recorded decision in storage flows into `ReplayEngine.replay`. The engine checks whether the replayed output matches the original. If it matches, the decision is reproducible and safe to ship; if it does not match, you have a regression to investigate.
## Install
```bash
pip install briefcase-ai[replay]
```
```python
from briefcase.replay import ReplayEngine, ReplayPolicy, ReplayStats
```
## Persist, then replay
```python
import briefcase
from briefcase import DecisionSnapshot, Input, Output
from briefcase.storage import SqliteBackend
from briefcase.replay import ReplayEngine
briefcase.init()
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "Reset my password", "string"))
decision.add_output(Output("category", "account_access", "string"))
decision.with_execution_time(12.5)
backend = SqliteBackend.in_memory()
decision_id = backend.save_decision(decision)
engine = ReplayEngine(backend)
result = engine.replay(decision_id, "strict")
print(result.status)
print(result.outputs_match)
print(result.replay_output)
print(result.execution_time_ms)
print(result.policy_violations)
```
`ReplayEngine(backend)` takes a storage backend. `replay(decision_id, mode)` takes the mode explicitly.
## Strict vs. tolerant
The mode decides how exactly the replayed output must match the original. Pick it from how deterministic the decision is supposed to be.
| Mode | Matches when | Reach for it when |
|------|--------------|-------------------|
| `"strict"` | The replayed output is identical to the original | The decision is meant to be deterministic — a fixed classifier, `temperature=0`, a routing rule. Any difference is a regression. |
| `"tolerant"` (default) | Minor differences are allowed | The output is free-form or sampled (a generated reply, a summary) where wording can vary but meaning should not. |
`"tolerant"` is the engine default. For per-field control — exact match on `category`, a similarity threshold on a free-text `summary` — use a `ReplayPolicy` instead of choosing one mode for the whole decision.
## Interpreting a ReplayResult
`ReplayResult` is what you act on. The table maps each field to the decision it should drive.
| Field | What it tells you | What to do |
|-------|-------------------|------------|
| `status` | `"success"` or a failure status for the replay | A non-success status means the replay itself couldn't complete (decision missing, load error) — fix the harness before trusting the result. |
| `outputs_match` | `True` when the replayed output matches the original | `False` on a decision that used to be stable is a regression — investigate the change you just made. |
| `replay_output` | The output produced during this replay | Diff it against the original to see exactly what drifted. |
| `policy_violations` | List of policy rules the replay violated | Non-empty means a specific field broke its match rule — read it to know which field and why. |
| `execution_time_ms` | Replay execution time in milliseconds | A large swing can flag a performance regression even when the output still matches. |
| `original_snapshot` | The recorded decision being replayed | The baseline for the comparison and your audit reference. |
## Replay with a policy
A `ReplayPolicy` declares how each output field must match. Combine exact-match fields with similarity-threshold fields when one decision has both a structured label and free text.
```python
from briefcase.replay import ReplayPolicy
policy = ReplayPolicy("output-consistency")
policy.with_exact_match("category")
policy.with_similarity_threshold("summary", 0.95)
result = engine.replay_with_policy(decision_id, policy, "strict")
print(result.status)
print(result.policy_violations)
```
Here `category` must match exactly (a misroute is unacceptable) while `summary` only has to stay 95% similar (wording may vary).
## Replay in batches
Verify a whole regression set at once instead of one decision at a time.
```python
results = engine.replay_batch([decision_id], "strict", 4)
for result in results:
print(result.status, result.outputs_match)
```
`replay_batch(decision_ids, mode, max_concurrent)` replays many decisions concurrently, bounded by `max_concurrent`.
## Aggregate replay statistics
```python
from briefcase.replay import ReplayStats
stats = engine.get_replay_stats([decision_id])
print(stats.total_replays)
print(stats.successful_replays)
print(stats.exact_matches)
print(stats.success_rate)
print(stats.average_execution_time_ms)
```
`success_rate` across your regression set is the one number to watch release over release: a drop means more decisions changed than you expected.
## Key classes
| Class | Why it matters |
|-------|----------------|
| `ReplayEngine` | Loads a persisted decision from a backend and re-executes it — the entry point for every replay. |
| `ReplayResult` | Outcome of a single replay; the fields you act on to catch a regression. |
| `ReplayPolicy` | Per-field match rules for `replay_with_policy` so structured and free-text fields can be judged differently. |
| `ReplayStats` | Aggregate counts and rates across many replays — your release-over-release health signal. |
## Where this fits
Replay is the start of the **Replay & Verify** act: re-run a stored decision, then measure how far it moved over time and prove the record is intact.
- Storage Adapters — Previous: persist the decisions that replay re-executes. (/features/storage-adapters/)
- Drift Detection — Next: measure how consistent outputs stay across many runs. (/features/drift-detection/)
- Audit Bundles — Then: bundle a decision into a tamper-evident, verifiable record. (/advanced/compliance-bundles/)
- Decision Recording — Build the DecisionSnapshot objects replay compares against. (/features/decision-recording/)
## Drift Detection
Source: https://briefcaseai.io/features/drift-detection/
> Catch a model whose answers are quietly getting less consistent over time — before it changes what your system decides.
Measure how consistent a model's outputs are across repeated runs, so you can tell the difference between normal variation and a model that is quietly drifting.
> When you'd reach for this
Two weeks ago your `classify_ticket` agent labeled the same five "reset my password" tickets `account_access` every time. Today, on the same tickets, it returns `account_access` three times and `billing` twice. Nothing in your code changed — but a provider-side model update, a prompt tweak, or a creeping context shift made the agent less consistent. Drift detection turns "it feels flakier lately" into a number you can alert on.
## How it works
A `DriftCalculator` takes a list of outputs sampled from the same prompt and returns `DriftMetrics`: a consistency score, an agreement rate, the consensus output, and the indices of any outliers. You feed it the outputs; it tells you how much they disagree.
```mermaid
flowchart LR
A[Same prompt,
many runs] --> B[Collect outputs]
B --> C[DriftCalculator.calculate_drift]
C --> D{consistency_score
below threshold?}
D -->|No| E[Stable — keep watching]
D -->|Yes| F[Drifting — emit an event]
```
> Diagram description
The same prompt is run many times and the outputs are collected into a list. `DriftCalculator.calculate_drift` measures them and produces a consistency score (1.0 = identical outputs, 0.0 = all different). If the score is at or above your threshold the outputs are stable and you keep watching; if it falls below, the model is drifting and you emit an event so something downstream can react.
## Install
```bash
pip install briefcase-ai[drift]
```
```python
from briefcase.drift import DriftCalculator, DriftMetrics
```
## Calculate drift
```python
from briefcase.drift import DriftCalculator
calculator = DriftCalculator()
outputs = ["account_access", "account_access", "billing", "account_access", "account_access"]
metrics = calculator.calculate_drift(outputs)
print(metrics.consistency_score)
print(metrics.agreement_rate)
print(metrics.drift_score)
print(metrics.consensus_output)
print(metrics.outliers)
print(metrics.get_status(calculator))
```
`calculate_drift(outputs)` accepts a list of outputs sampled from the same prompt and returns `DriftMetrics`. `get_status(calculator)` classifies the result (for example `"stable"` or `"drifting"`) using the calculator's threshold.
## A monitoring workflow
In production you don't measure once — you measure repeatedly and react when consistency slips. The recorded decisions you already store are the source of the outputs.
1. **Record multiple runs** of the same decision through [Decision Recording](/features/decision-recording/) over a sampling window (a day, a week).
2. **Extract the outputs** for the prompt you're watching into a plain list.
3. **Measure** them with `calculate_drift` and read `consistency_score` / `get_status`.
4. **If it crosses your threshold, emit an event** so something downstream — an on-call alert, a routing change — can respond. See [Multi-Agent & Events](/features/multi-agent/).
```python
import asyncio
from briefcase.drift import DriftCalculator
from briefcase.events import emit_drift_detected
calculator = DriftCalculator()
async def monitor(decision, outputs):
metrics = calculator.calculate_drift(outputs)
status = metrics.get_status(calculator)
if status != "stable":
# emit_drift_detected is a coroutine; await it in an async context
await emit_drift_detected(decision, {"drift_score": metrics.drift_score})
return status
# `decision` is the recorded classify_ticket decision; `outputs` are this window's labels
asyncio.run(monitor({"id": "dec-1"}, outputs))
```
## Interpreting DriftMetrics
These scores are only useful if you know what action each implies.
| Field | What it tells you | What to do |
|-------|-------------------|------------|
| `consistency_score` | Overall consistency of the sampled outputs | Trend it window over window. A steady decline is the early warning, even before any single window looks bad. |
| `agreement_rate` | Fraction of outputs that match the consensus | A falling agreement rate means more runs are disagreeing — tighten the sampling and look at the outliers. |
| `drift_score` | How far the outputs diverge from one another | The value to put in an alert threshold and pass to `emit_drift_detected`. |
| `consensus_output` | The most common output across samples | What the model "usually" decides — your baseline for what changed. |
| `outliers` | Indices of outputs that disagree with the consensus | Index back into your list to read the exact runs that broke ranks. |
Call `metrics.get_status(calculator)` to turn the scores into a status label like `"stable"` or `"drifting"` using the calculator's threshold.
## Tune the similarity threshold
A stricter threshold makes near-matches count as disagreement, so small wording differences register as drift.
```python
from briefcase.drift import DriftCalculator
calculator = DriftCalculator()
calculator.with_similarity_threshold(0.95)
metrics = calculator.calculate_drift(["approve", "approve", "aprove"])
print(metrics.agreement_rate)
print(metrics.get_status(calculator))
```
## Compare outputs over time
Run the same calculator across successive sampling windows to watch consistency change — this is the two-week-drift scenario made concrete.
```python
from briefcase.drift import DriftCalculator
calculator = DriftCalculator()
windows = [
("week 1", ["account_access", "account_access", "billing", "account_access", "account_access"]),
("week 2", ["account_access", "billing", "billing", "account_access", "billing"]),
("week 3", ["billing", "billing", "account_access", "billing", "billing"]),
]
for label, outputs in windows:
metrics = calculator.calculate_drift(outputs)
print(label, metrics.consistency_score, metrics.get_status(calculator))
```
A consistency score that falls across the windows is exactly the signal to alert on.
## Key classes
| Class | Why it matters |
|-------|----------------|
| `DriftCalculator` | Computes drift over a list of outputs; `with_similarity_threshold(threshold)` tunes how strict matching is. |
| `DriftMetrics` | Consistency score, agreement rate, drift score, consensus output, and outlier indices — the numbers you alert and act on. |
## Where this fits
Drift detection sits in the **Replay & Verify** act: replay proves a single decision is reproducible, drift detection proves the model stays consistent across many — and when it doesn't, an event hands control back to your governance layer.
- Deterministic Replay — Previous: re-run one stored decision and prove it still matches. (/features/replay/)
- Decision Recording — Records the runs whose outputs you sample for drift. (/features/decision-recording/)
- Multi-Agent & Events — Emit a drift event so something downstream can react. (/features/multi-agent/)
- Guardrails — Turn a drift signal into a control that gates the next action. (/advanced/guardrails/)
## Audit Bundles
Source: https://briefcaseai.io/advanced/compliance-bundles/
> Seal a decision, its evidence, and the policy behind it into one portable, tamper-evident artifact.
# Audit Bundles
An `ExaminerBundle` packages a routing decision, the bitemporal evidence that informed it, and the policy version that was in effect — sealed with a content hash so anyone can verify, independently, that nothing was altered.
> When you'd reach for this
Months after your support-triage agent escalated a ticket, the outcome is challenged and you need to prove what happened. Pointing at a live database isn't enough — it has changed since, and no one can confirm it wasn't edited. An audit bundle freezes the decision, the evidence behind it, and the policy that was active into one artifact whose contents are verified by hash, so whoever reviews it doesn't have to take your word for it.
## How it works
1. **Gather the material.** A routing `decision`, the bitemporal `evidence` store it cited, and the `PolicyRegistry` it routed against.
2. **Build the bundle.** `ExaminerBundle.build` seals the decision, the referenced evidence, and the policy as-of the decision into a content-hashed artifact.
3. **Verify integrity.** `verify()` recomputes the hash and raises `BundleIntegrityError` if a byte changed.
4. **Transport and re-verify.** `to_json` / `from_json` move it anywhere; the recipient re-runs `verify()` to trust it independently.
```mermaid
flowchart LR
A["AgentRouter.route
AgentRoutingDecision"] --> D["ExaminerBundle.build"]
B["InMemoryBitemporalStore
evidence_refs"] --> D
C["PolicyRegistry
policy as-of decision"] --> D
D --> E["content_hash
(SHA-256)"]
E --> F["to_json / from_json"]
F --> G["verify()
raises on tampering"]
```
> Diagram description
`ExaminerBundle.build` combines three inputs — an `AgentRouter` decision, the
referenced evidence from the bitemporal store, and the policy version as-of the
decision. It seals them with a SHA-256 `content_hash`. The bundle serializes via
`to_json` / `from_json`, and `verify()` recomputes the hash and raises on any
tampering.
## Install
```bash
pip install briefcase-ai[compliance]
```
```python
from briefcase.compliance import ExaminerBundle, BundleIntegrityError
```
## Build a Bundle End to End
This ties together an `AgentRouter` decision, an `InMemoryBitemporalStore` of evidence, and a `PolicyRegistry`, then seals and verifies the result.
```python
from datetime import datetime, timezone
from briefcase.bitemporal import BitemporalRecord, InMemoryBitemporalStore
from briefcase.routing.policy import (
PolicyRegistry,
PolicyVersion,
PolicyRule,
AgentRouter,
)
from briefcase.compliance import ExaminerBundle, BundleIntegrityError
# 1. Evidence: a bitemporal store of the facts that inform routing.
evidence = InMemoryBitemporalStore()
tier_record = BitemporalRecord.new(
key="ticket:tier",
valid_time=datetime(2026, 5, 1, tzinfo=timezone.utc),
value="gold",
source="crm",
)
evidence.append(tier_record)
# 2. Policy: a versioned routing policy in a bitemporal-backed registry.
registry = PolicyRegistry()
policy = PolicyVersion(
policy_id="support_triage",
version="2026.05.01",
rules=[
PolicyRule(
rule_id="gold-to-specialist",
condition={"tier": "gold"},
choice="specialist_queue",
rationale="gold-tier tickets route to specialist agents",
),
],
default_choice="general_queue",
)
registry.publish(policy, valid_from=datetime(2026, 5, 1, tzinfo=timezone.utc))
# 3. Decision: route a request, citing the evidence that informed it.
router = AgentRouter(
registry,
use_case="support_triage",
policy_id="support_triage",
)
decision = router.route(
{"tier": "gold"},
evidence_refs=[tier_record.record_id],
)
print(decision.selected) # specialist_queue
# 4. Bundle: seal decision + evidence + policy with a SHA-256 content hash.
bundle = ExaminerBundle.build(decision, evidence, registry)
bundle.verify() # passes — internally consistent
print(bundle.content_hash[:20]) # sha256:...
```
`build` looks up the policy as-of the decision (`decision.decided_at` by default, overridable with `as_of_transaction_time=`) and pulls exactly the evidence records named in `decision.evidence_refs`. If a referenced record is missing from the store, `build` raises `BundleIntegrityError`. Evidence is sorted deterministically so the hash is stable.
## Transport and Verify
Serialize to JSON, send it anywhere, re-import, and re-check the hash. This is what makes a bundle tamper-evident in transit: the recipient re-verifies on their own machine, and any byte changed since the build breaks the content hash, so silent edits cannot pass.
```python
payload = bundle.to_json(indent=2)
restored = ExaminerBundle.from_json(payload)
restored.verify() # ok — hash recomputed from contents matches
```
## Detect Tampering
The hash covers the decision, policy, evidence, and the as-of timestamp. Change any of them and `verify()` raises.
```python
restored.evidence[0]["value"] = "platinum"
try:
restored.verify()
except BundleIntegrityError as exc:
print("tamper detected:", exc)
```
> The guarantee is scoped
The hash proves the bundle is internally consistent. Proving it reflects what
production actually did requires storing it alongside an independently signed
commit or write-once record — that step is outside this module.
## Key Classes
| Symbol | Why it matters |
|--------|----------------|
| `ExaminerBundle.build(decision, evidence_store, policy_registry, *, as_of_transaction_time=None, metadata=None)` | Seals the decision, its evidence, and the as-of policy into one artifact. |
| `ExaminerBundle.verify()` | Recomputes the hash; raises `BundleIntegrityError` on any change. |
| `ExaminerBundle.to_json()` / `from_json()` | Move the bundle between systems without losing the integrity check. |
| `ExaminerBundle.content_hash` | SHA-256 digest — the tamper-evidence anchor. |
| `BundleIntegrityError` | Raised by `build` (missing evidence) and `verify` (hash mismatch). |
Bundles are built from `AgentRoutingDecision` records (see [Versioned Routing Policy](/advanced/versioned-routing-policy/)) and `BitemporalRecord` evidence (see [Bitemporal Storage](/advanced/bitemporal-storage/)).
## Where this fits
- Related: Bitemporal Storage — The append-only store the bundle reads its as-of-then evidence from. (/advanced/bitemporal-storage/)
- Related: Versioned Routing Policy — How the policy version sealed into the bundle is published and resolved. (/advanced/versioned-routing-policy/)
- Guide: Audit a Decision — Walk a single decision from record to verified bundle. (/guides/audit-a-decision/)
========================================================================
# Operate
========================================================================
## Cost Tracking
Source: https://briefcaseai.io/features/cost-tracking/
> Estimate token costs across platforms and tiers, bill prompt caching, compare models, project monthly spend, and watch budgets — from the data your decisions already carry.
Estimate what a decision costs, price it for the platform and tier you actually
run on, account for prompt caching, compare models, and check spend against a
budget — all from the token counts your decisions already carry. The cost types
ship in the **base package** (no extra).
> When you'd reach for this
Your support-triage agent runs thousands of `classify_ticket` calls a day. Before
you ship a model swap — or move the workload to a batch tier on another platform —
you want the real number, and an alert before the monthly bill blows past budget.
## Install
```bash
pip install briefcase-ai
```
`briefcase.cost` is in the base package — no extra to install.
## Estimate a cost
`CostCalculator.estimate_cost()` takes a model name and token counts and returns a
`CostEstimate` with separate input and output costs plus a total.
```python
from briefcase.cost import CostCalculator
calc = CostCalculator()
estimate = calc.estimate_cost("claude-haiku-4-5", input_tokens=1000, output_tokens=500)
print(estimate.total_cost) # 0.0035
print(estimate.input_cost) # 0.001
print(estimate.output_cost) # 0.0025
print(estimate.currency) # "USD"
```
## Price any platform: rate cards
By default, estimates use first-party standard list price. A **rate card** prices
the same call for the platform, tier, and modifiers you actually run on. It is a
forgiving `platform × tier × modifiers` string — pass it as the keyword-only
`rate_card`.
```python
calc = CostCalculator()
# Same workload, two ways to buy it
standard = calc.estimate_cost("claude-opus-4-8", 500_000, 50_000)
batch = calc.estimate_cost("claude-opus-4-8", 500_000, 50_000, rate_card="bedrock:batch")
print(standard.total_cost) # 3.75
print(batch.total_cost) # 1.875 — batch tier on AWS Bedrock, ~0.5x
# List representative cards
print(calc.get_available_rate_cards())
# ['standard', 'batch', 'cached', 'priority', 'flex', 'first_party:fast',
# 'bedrock:standard', 'bedrock:batch', 'vertex:standard', 'azure:standard', ...]
```
| Part | Values | Effect |
| --- | --- | --- |
| **Platform** | `first_party` · `bedrock` · `vertex` · `azure` | Selects the provider's price sheet |
| **Tier** | `standard` · `batch` · `cached` · `priority` · `flex` | `batch` / `flex` ≈ 0.5×; `priority` is a premium |
| **Modifiers** | `regional` · `us` · `fast` | `regional` / `us` add ~10%; `fast` is a premium base rate |
Cards are order-independent and separator-tolerant, so `"bedrock:batch"`,
`"batch + bedrock"`, and `"vertex / standard, us"` all parse. Omitting `rate_card`
(or passing `"standard"`) keeps the previous first-party standard pricing.
> Long-context pricing
Above ~200K input tokens, tiered long-context rates apply, so a large call's total
is not a flat multiple of a small one. Always estimate with representative token
counts.
## Prompt-cache billing
Prompt caching changes the math: cache reads are billed at a fraction of the input
rate. Pass cache-token counts (all keyword-only) and read the `cache_cost` on the
estimate.
```python
estimate = calc.estimate_cost(
"claude-opus-4-8",
input_tokens=0,
output_tokens=1_000,
cache_read_tokens=100_000, # also: cache_write_5m_tokens, cache_write_1h_tokens
)
print(estimate.cache_cost) # 0.05 — 100K cache reads at 0.1x of the input rate
print(estimate.total_cost) # 0.075 — output + cache
```
**Why it matters:** a cache-heavy agent's bill is dominated by cache reads at 0.1×
input. Counting those tokens at full input price overstates the cost.
## Compare models
`compare_models()` estimates the same workload across two models so you can see the
difference before switching.
```python
comparison = calc.compare_models(
"claude-haiku-4-5", "gpt-5.4-mini", input_tokens=1000, output_tokens=500
)
print(comparison["cheaper_model"]) # "gpt-5.4-mini"
print(comparison["savings"]) # 0.0005 — absolute, in USD
print(comparison["percent_difference"]) # 14.29
```
`compare_models()` also accepts a `rate_card` so you can compare like-for-like
across tiers or platforms.
## Project monthly spend
`project_monthly_cost()` extrapolates a daily workload to a monthly estimate.
```python
monthly = calc.project_monthly_cost(
"claude-haiku-4-5",
daily_input_tokens=100_000,
daily_output_tokens=50_000,
days_per_month=30,
)
print(monthly) # 10.5 — a float, the projected monthly total in USD
```
## Check a budget
`check_budget()` compares current spend to a budget and returns a `BudgetStatus`
with an alert level you can act on.
```python
status = calc.check_budget(current_spend=85.0, budget_limit=100.0)
print(status.status) # "warning"
print(status.percent_used) # 85.0
print(status.remaining_budget) # 15.0
print(status.alert_message)
```
> Turn a budget breach into an action
Pair a `"critical"` or `"exceeded"` status with [`emit`](/features/multi-agent/) to
fire an event, or with a [guardrail](/advanced/guardrails/) that denies further
calls until spend resets.
## Supported models
The default pricing table covers the current frontier — Anthropic Claude 4.x,
OpenAI GPT-5.x, and Google Gemini 2.5–3.x — alongside every previously priced
model. See the [Changelog](/resources/changelog/) for the full list added in 3.2.1.
## How cost tracking fits
```mermaid
flowchart LR
A["Decision record"] --> B["token counts"]
B --> C["CostCalculator"]
R["rate_card
(platform × tier)"] --> C
C --> D["CostEstimate
(+ cache_cost)"]
C --> E["BudgetStatus"]
```
> Diagram description
A decision record provides token counts, which feed the CostCalculator together
with an optional rate card (platform and tier). The calculator produces a
CostEstimate — per-call input, output, and cache costs — and a BudgetStatus
comparing spend against a budget.
## Key classes
| Class / method | Returns | Purpose |
| --- | --- | --- |
| `CostCalculator.estimate_cost(model, in, out, *, rate_card=None, cache_read_tokens=None, …)` | `CostEstimate` | Per-call cost, optionally for a platform/tier and with cache tokens |
| `CostCalculator.estimate_cost_from_text(model, text, est_out, *, rate_card=None)` | `CostEstimate` | Estimate from text instead of token counts |
| `CostCalculator.compare_models(a, b, in, out, *, rate_card=None)` | `dict` | Cost delta between two models (`cheaper_model`, `savings`, `percent_difference`) |
| `CostCalculator.project_monthly_cost(model, daily_in, daily_out, days, *, rate_card=None)` | `float` | Projected monthly total from daily volume |
| `CostCalculator.check_budget(spend, limit)` | `BudgetStatus` | Spend vs. budget with alert level |
| `CostCalculator.get_available_rate_cards()` | `list[str]` | Representative rate-card identifiers |
| `CostEstimate` | — | `input_cost`, `output_cost`, `cache_cost`, `total_cost`, `currency` |
| `BudgetStatus` | — | `status`, `percent_used`, `remaining_budget`, `alert_message` |
The `rate_card` and cache-token parameters are **keyword-only**; existing
positional calls behave exactly as before.
## Where this fits
Cost Tracking is part of the **Operate** act: once decisions are flowing, watch
what they cost.
- Drift Detection — Catch when model behavior shifts across repeated runs. (/features/drift-detection/)
- Multi-Agent & Events — Correlate decisions across a workflow and emit events. (/features/multi-agent/)
## OpenTelemetry
Source: https://briefcaseai.io/features/opentelemetry/
> Put governed decisions on the same trace timeline as the rest of your stack.
Trace your decisions as OpenTelemetry spans so they sit on the same timeline as every other service in your stack.
> When you'd reach for this
Your support-triage agent is one hop in a larger request: an API gateway, a retrieval service, then `classify_ticket`. When a ticket gets mis-routed you want to follow the whole request through your existing tracing backend and see exactly where the decision sat in that timeline. A Briefcase tracer puts the decision span inline with every other span, so you debug latency and routing in one view.
## Spans and decision records are complementary
These two layers answer different questions, and you usually want both.
| Layer | Answers | Lives where |
|-------|---------|-------------|
| OTel span (timeline) | *When* did it run, how long, in what order, alongside which other services | Your tracing backend |
| Briefcase decision record (governance context) | *Why* this output — inputs, outputs, confidence, timing, full reproducible context | Your [exporter](/features/exporters/) or [store](/features/storage-adapters/) |
The span is a lightweight timeline marker; the decision record is the deep governance context. Both flow out through Briefcase, so a span you find in your tracing UI can be matched to a decision record you can [replay](/features/replay/) and verify.
```mermaid
flowchart LR
A[classify_ticket] --> B[Briefcase]
B --> C[OTel span: timeline]
B --> D[Decision record: governance context]
C --> E[Tracing backend]
D --> F[Exporter / store]
```
> Diagram description
A decision flows into Briefcase, which emits two complementary outputs: an OpenTelemetry span that goes to your tracing backend, and a decision record that goes to your exporter or store. The span gives you the timeline; the record gives you the governance context behind the decision.
## Install
```bash
pip install briefcase-ai[otel]
```
The `otel` extra installs `opentelemetry-api` and `opentelemetry-sdk`.
## Without OTel vs with OTel
_Without OTel_
Decisions are still captured and exported — you can inspect them through an exporter. But there is no span on your distributed trace, so the decision is invisible to your tracing UI and you cannot see where it sat relative to upstream and downstream services.
```python
from briefcase import capture
@capture(decision_type="ticket_triage")
def classify_ticket(text):
# call your model here
return "account_access"
classify_ticket("reset my password")
```
_With OTel_
Open a span around the decision and it becomes part of the trace, correlated with the rest of your services. Spans flow through whatever OTel `TracerProvider` your application has configured.
```python
from briefcase import capture
from briefcase.otel import get_tracer
tracer = get_tracer("briefcase")
@capture(decision_type="ticket_triage")
def classify_ticket(text):
with tracer.start_as_current_span("classify_ticket") as span:
span.set_attribute("briefcase.decision_type", "ticket_triage")
category = "account_access" # call your model here
span.set_attribute("briefcase.outcome", category)
return category
classify_ticket("reset my password")
```
## How it works
1. **Get a tracer** — `get_tracer("briefcase")` returns a standard OpenTelemetry tracer.
2. **Open a span** around the decision and attach attributes that describe it.
3. **Correlate** — propagate trace context to downstream services so spans join one trace.
4. **Inspect** in your tracing backend, then match the decision span to the full decision record an exporter shipped.
## Get a tracer
`get_tracer()` returns a standard OpenTelemetry tracer. Use it to open spans around the work you want to trace.
```python
from briefcase.otel import get_tracer
tracer = get_tracer("briefcase")
with tracer.start_as_current_span("classify_ticket") as span:
span.set_attribute("briefcase.decision_type", "ticket_triage")
category = "account_access"
span.set_attribute("briefcase.outcome", category)
```
`get_tracer(name="briefcase")` is the only public symbol in `briefcase.otel`.
## Semantic conventions
Briefcase ships span-attribute conventions under `briefcase.semantic_conventions`. Each submodule defines the attribute keys for one subsystem, so the attributes you emit are consistent across services instead of ad-hoc strings.
Two submodules you will reach for most on the triage path:
```python
from briefcase.semantic_conventions import workflow, rag
# Tag the retrieval span on the triage path with RAG attribute keys
with tracer.start_as_current_span("retrieve-ticket-history") as span:
# use rag.* keys for the retrieval step, workflow.* keys for the workflow it runs in
...
```
The full set of submodules:
- `briefcase.semantic_conventions.lakefs`
- `briefcase.semantic_conventions.workflow`
- `briefcase.semantic_conventions.rag`
- `briefcase.semantic_conventions.external_data`
- `briefcase.semantic_conventions.cowork`
- `briefcase.semantic_conventions.agent_state`
- `briefcase.semantic_conventions.bitemporal`
- `briefcase.semantic_conventions.routing_policy`
- `briefcase.semantic_conventions.validation`
Import the module for the subsystem you are instrumenting and use its attribute keys when setting span attributes. The `workflow` keys line up with the [multi-agent correlation](/features/multi-agent/) surface.
## Ship decisions to an external observability sink
The span describes the work; the **decision record** carries the captured inputs, outputs, and timing. To forward those records into an external observability sink you already operate — a log aggregator, a message queue, an analytics pipeline — subclass `BaseExporter` and implement its three async methods.
> When you'd reach for this
Your team already routes operational events through a central collector. Rather than build a second pipeline for triage decisions, a small custom exporter posts each `classify_ticket` record to that same sink, right alongside the spans your tracer emits.
```python
from typing import Any
from briefcase import setup, capture
from briefcase.exporters import BaseExporter
class SinkExporter(BaseExporter):
async def export(self, decision: Any) -> bool:
# ship the decision record to your external observability sink here
# e.g. post to a collector, enqueue, or forward to a log pipeline
return True
async def flush(self) -> None:
pass
async def close(self) -> None:
pass
setup(exporter=SinkExporter())
@capture(decision_type="ticket_triage")
def classify_ticket(text):
# call your model here — span streams to your tracer, record streams to the sink
return "account_access"
classify_ticket("reset my password")
```
For the stock exporters (`ConsoleExporter`, `JSONLFileExporter`, `MemoryExporter`) and the one-line `briefcase.observe()` setup, see [Exporters](/features/exporters/).
## Key symbols
- `briefcase.otel.get_tracer(name="briefcase")` — return an OpenTelemetry tracer.
- `briefcase.exporters.BaseExporter` — base class for custom exporters; implement `export`, `flush`, `close`.
- `briefcase.semantic_conventions.*` — attribute-key modules for each subsystem.
## Best practices
1. **Sample high-volume paths** — use sampling so a busy triage queue does not overwhelm your backend.
2. **Set resource attributes** — identify the service and environment so spans are easy to filter.
3. **Use the semantic-convention keys** — consistent attribute names make spans queryable across services.
4. **Pair spans with an exporter** — the span gives you the timeline, the exported record gives you the governance context.
## Where this fits
OpenTelemetry is part of **operating** a governed system in production: the timeline view that sits next to your cost and event signals.
- Cost Tracking — Roll up the model spend that runs alongside these traced decisions. (/features/cost-tracking/)
- Multi-Agent & Events — Next: correlate decisions across agents and react to events in real time. (/features/multi-agent/)
## Multi-Agent & Events
Source: https://briefcaseai.io/features/multi-agent/
> Tie every decision in a multi-step agent pipeline to one workflow and react to them as they happen.
Stitch the decisions from a multi-step agent pipeline together under one workflow, and emit events you can react to as those decisions happen.
> When you'd reach for this
Your support-triage pipeline is three agents: one retrieves prior tickets, one runs `classify_ticket`, one drafts a reply. When a customer escalates, you need to see all three decisions as one chain — not three unrelated records. Workflow correlation gives them a shared `workflow_id`, and the event surface lets an on-call alert fire the moment a classification comes back low-confidence.
## Why correlation matters
_Before correlation_
Each agent records its decision independently. Reviewing an escalation means hunting for three separate records and guessing which retrieve, classify, and draft belong to the same ticket. The chain of reasoning is real but invisible.
_After correlation_
All three decisions share one `workflow_id`. The steps trace as a single chain, so a reviewer can reconstruct exactly what context the classifier saw and what the drafter did with it — accountability across agent boundaries, not just within one agent.
## The two surfaces
| Surface | What it does | Reach for it when |
|---------|--------------|-------------------|
| Correlation (`briefcase.correlation`) | Groups decisions under a shared workflow and propagates context across boundaries | You want the pipeline to read as one accountable chain |
| Events (`briefcase.events`) | Emits typed signals (`BriefcaseEvent`) you can route as decisions happen | You want to react in real time — alert, page, or trigger follow-up |
## Install
```bash
pip install briefcase-ai[correlation]
pip install briefcase-ai[events]
```
Neither the `correlation` nor `events` extra installs third-party dependencies.
---
## Correlation
### How a workflow threads the pipeline
`briefcase_workflow(name, client)` is a context manager. Every agent registered inside it shares the same `workflow_id`, so the steps of a pipeline trace as one unit.
1. **Open a workflow** — `briefcase_workflow("support_pipeline", client)` gives every agent inside it one shared `workflow_id`.
2. **Retrieve** — the retrieval agent registers under that workflow.
3. **Classify** — `classify_ticket`'s agent registers under the same workflow.
4. **Decide** — the drafting agent registers under the same workflow.
5. **Review** — the three registered agents read back as one retrieve -> classify -> decide chain.
```python
from unittest.mock import Mock
from briefcase.correlation import briefcase_workflow
def retrieve(query):
return ["doc-12", "doc-44"]
def classify_ticket(docs):
# call your model here
return "account_access"
def decide(category):
return "route_to_support"
client = Mock() # your Briefcase client; a Mock keeps this example self-contained
with briefcase_workflow("support_pipeline", client) as workflow:
docs = retrieve("reset my password")
workflow.register_agent("retriever-1", "retrieve")
category = classify_ticket(docs)
workflow.register_agent("classifier-1", "classify")
action = decide(category)
workflow.register_agent("decider-1", "decide")
print(workflow.workflow_id) # all three share this id
print(action) # route_to_support
```
### Agent registration
`workflow.register_agent(agent_id, agent_type)` records each agent in the workflow, so the chain knows who made which decision.
```python
workflow.register_agent("classifier-1", "classify")
```
### Reading the active workflow
`get_current_workflow()` returns the workflow bound to the current context, or `None` outside a workflow block — so you can read it anywhere inside the chain without threading it through call signatures.
```python
from unittest.mock import Mock
from briefcase.correlation import briefcase_workflow, get_current_workflow
client = Mock()
with briefcase_workflow("support_pipeline", client) as workflow:
assert get_current_workflow() is workflow
assert get_current_workflow() is None
```
### Propagating context across process boundaries
When an agent lives in another service, carry the trace context with the request so the downstream decision joins the same workflow trace. Inject into outbound headers on the producer side; extract from inbound headers on the consumer side.
```python
from briefcase.correlation import (
TraceContextCarrier,
inject_trace_context,
extract_trace_context,
)
# Producer service: inject the active trace context into outbound headers.
headers = inject_trace_context({})
# ... send headers to the downstream agent ...
# Consumer service: restore the trace context from inbound headers.
context = extract_trace_context(headers)
# TraceContextCarrier offers the same inject/extract pair as a class.
carrier = TraceContextCarrier()
outbound = carrier.inject()
TraceContextCarrier.extract(outbound)
```
`inject_trace_context()` reads the active span context; run under an [OpenTelemetry](/features/opentelemetry/) tracer for the headers to carry `traceparent`.
---
## Events
Events are typed signals you emit as decisions happen, so you can react in real time instead of polling the log. The emit functions are **coroutines** — `await` them inside an async context.
### Emitting an event
Construct a `BriefcaseEvent` and `await emit(...)`. The `idempotency_key` lets downstream consumers deduplicate retries.
```python
import asyncio
from briefcase.events import (
BriefcaseEvent,
emit,
emit_low_confidence,
emit_drift_detected,
)
class Decision:
def __init__(self, decision_id: str):
self.decision_id = decision_id
async def main() -> None:
decision = Decision("dec-91f2")
event = BriefcaseEvent(
event_type="decision.recorded",
decision_id=decision.decision_id,
payload={"category": "billing", "confidence": 0.62},
idempotency_key="dec-91f2:recorded",
)
await emit(event)
# The classifier came back unsure — fires only when confidence is below threshold.
await emit_low_confidence(decision, confidence=0.62, threshold=0.75)
# A monitored decision drifted — fires when repeated runs disagree.
await emit_drift_detected(decision, details={"agreement_rate": 0.4})
asyncio.run(main())
```
Set `webhook_url`, `webhook_secret`, `events`, or `event_bus` on `setup()` to route emitted events to a destination.
### Event functions
| Function | Fires for |
|----------|-----------|
| `emit(event)` | Any `BriefcaseEvent` you construct |
| `emit_low_confidence(decision, confidence, threshold)` | A decision below a confidence threshold |
| `emit_drift_detected(decision, details=None)` | Disagreement across repeated runs |
`emit_low_confidence` pairs naturally with the confidence score on a `classify_ticket` decision; `emit_drift_detected` is the live counterpart to [drift detection](/features/drift-detection/).
## Key symbols
- `briefcase_workflow(name, client)` — context manager yielding the workflow context.
- `workflow.workflow_id` / `workflow.register_agent(agent_id, agent_type)` — the shared id and agent registration.
- `get_current_workflow()` — the active workflow, or `None` outside a block.
- `inject_trace_context` / `extract_trace_context` / `TraceContextCarrier` — carry context across boundaries.
- `BriefcaseEvent`, `emit`, `emit_low_confidence`, `emit_drift_detected` — the event surface (coroutines).
## Best practices
1. **Use descriptive workflow names** — `support_pipeline` beats `wf-7` when you review later.
2. **Register every agent** — unregistered agents leave gaps in the chain.
3. **Propagate context across boundaries** — otherwise a remote agent starts its own trace.
4. **Emit events at decision points** — low confidence and drift are the signals worth acting on first.
## Where this fits
Correlation and events are part of **operating** a governed system: they keep multi-agent runs accountable and let you react as decisions happen.
- OpenTelemetry — The trace timeline that workflow context propagates across. (/features/opentelemetry/)
- Drift Detection — The monitoring behind emit_drift_detected — verify decisions hold over time. (/features/drift-detection/)
========================================================================
# Artifact Graph & Evaluate
========================================================================
## Overview
Source: https://briefcaseai.io/evaluate/runs/
> How oci-bai tracks every model, fine-tune, and dataset you push — the commit model, provenance, search, and how to compare versions.
**oci-bai** is an OCI-compatible artifact graph CLI and dashboard. Every image pushed through
the gateway creates a **commit** in the graph — recording the manifest, files, derivation edges
to parent images, and the full audit trail. The graph is the source of truth; the backing
registry is just storage.
Full documentation: **[oci.briefcaseai.io](https://oci.briefcaseai.io)**
> Private beta
oci-bai is in private beta. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access.
## The commit model
A commit is defined by four things:
| # | Part | What it is |
|---|------|-----------|
| 1 | **Ref** | A tag or digest that names this version (e.g. `v1`, `cartpole`) |
| 2 | **Family** | The repository the image belongs to (e.g. `rl-gym-env`) |
| 3 | **Derivation** | The parent commit(s) inferred from shared layers, or declared in a cohort push |
| 4 | **Manifest** | The full OCI manifest, files, and SBOM recorded at push time |
## Lifecycle
1. ### Push through the gateway
Tag and push with any OCI-compatible tool. Repositories are created automatically.
```bash
docker tag my-image:latest localhost:8080/my-repo:v1
docker push localhost:8080/my-repo:v1
```
2. ### Confirm the commit
```bash
oci-bai --repo my-repo log v1
```
3. ### Inspect provenance
```bash
oci-bai --repo my-repo provenance v2
oci-bai --repo my-repo whodepends base-v1
```
4. ### Compare versions
```bash
oci-bai --repo my-repo diff base v1 --depth package
oci-bai --repo my-repo diff base v1 --depth bench
```
5. ### Search the catalog
```bash
oci-bai search "format==safetensors cuda>=12.4"
oci-bai search "gymnasium==1.1.0 arch==arm64"
```
## Run modes for evaluation
Two commands support active evaluation:
| Command | What it does |
|---------|-------------|
| `oci-bai attach-bench ` | Attach a benchmark verdict to a commit |
| `oci-bai --repo diff --depth bench` | Compare verdict deltas between two versions |
| `oci-bai hunt ` | Host a verdictml HuntEnv episode against the candidate |
## Where this fits
- Quick start — Push your first image and run your first search in under five minutes. (/evaluate/quickstart/)
- CLI reference — Every oci-bai command and its flags. (/evaluate/cli/)
- Full docs — The complete oci-bai documentation at oci.briefcaseai.io. (https://oci.briefcaseai.io)
## Quick Start
Source: https://briefcaseai.io/evaluate/quickstart/
> Push your first tracked image, explore the commit graph, and run your first search in under five minutes.
Push your first tracked image in under five minutes. All you need is Docker (or `crane`) and a
running oci-bai stack.
> Private beta
oci-bai is in private beta. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access before following these steps.
## Prerequisites
- Stack running — `make up` (gateway on `:8080`)
- Docker or `crane` for pushing images
- The `oci-bai` CLI on your `PATH`
Repositories are created automatically on first push.
## Steps
1. ### Push an image through the gateway
Tag any image for the gateway and push. Derivation from shared layers is inferred automatically.
```bash
docker tag my-image:latest localhost:8080/my-repo:v1
docker push localhost:8080/my-repo:v1
```
Using `crane`? `crane push my-image.tar localhost:8080/my-repo:v1 --insecure`
2. ### Confirm the commit was recorded
```bash
oci-bai --repo my-repo log v1
```
You should see the commit id, manifest digest, and who pushed it.
3. ### Push a derived image
Push a second image built from the first. oci-bai links them automatically.
```bash
docker push localhost:8080/my-repo:v2
oci-bai --repo my-repo provenance v2
```
4. ### Browse in the dashboard
Open the dashboard and select **my-repo**.
- **Versions** — the full commit graph
- **Provenance** — derivation tree for any version
- **Compare** — diff any two versions
The hosted dashboard is in early preview. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) for access.
5. ### Search
```bash
oci-bai search "cuda>=12.4"
oci-bai search "format==safetensors"
oci-bai search "numpy==1.26.4 arch==arm64"
```
## Next steps
- Search — Full query syntax including model format filters. (/evaluate/cli/#search)
- Provenance & lineage — Derivation trees and the weight-sharing metric. (https://oci.briefcaseai.io/provenance)
- Full CLI reference — Every oci-bai command and its flags. (/evaluate/cli/)
## CLI Reference
Source: https://briefcaseai.io/evaluate/cli/
> Every oci-bai command and its flags.
Every `oci-bai` command and its flags. Run `oci-bai --help` for the installed version.
## Installed help output
```
Execution-native, OCI-compatible artifact-graph CLI (§14)
Usage: oci-bai [OPTIONS]
Commands:
init Initialize an image repo
push Push, CDC, commit, tag
log Commit DAG for this family
op Operation-log commands
branch List image families with divergent heads
bookmark Bookmark commands
undo Invert the last op-log entry
fsck Graph + chunk-store integrity
gc Mark-and-sweep GC
diff Diff two refs at a depth
search Search the catalog
mount Lazy-snapshotter mount plan
show Manifest + SBOM + provenance + size by depth
explain Derivation + composition + provenance + drift
resolve Path provenance
provenance Full derivation tree
whodepends Reverse derivation walk
referrers List referrers of a subject manifest
lineage Lineage commands
compose Environment assembly on the graph plane
checkout Sparse checkout
cohort Transactional N-image cohort push
optimize Emit Rationale records
attach-bench Attach a benchmark verdict
hunt Host a verdictml HuntEnv episode
help Print this message or the help of the given subcommand(s)
Options:
--server oci-jj-server gRPC endpoint
--repo Repository (image family) the command targets
-h, --help Print help
-V, --version Print version
```
## Global flags
| Flag | Description |
|------|-------------|
| `--server URL` | Server endpoint (default `http://127.0.0.1:50051`) |
| `--repo NAME` | Repository to target |
| `--help / -h` | Show help |
| `--version / -V` | Print version |
Set defaults in your shell: `export OCI_BAI_SERVER=http://localhost:50051` and `export OCI_BAI_REPO=my-repo`
## Example output
These examples use a small demo family named `rl-gym-env`.
```bash
oci-bai --repo rl-gym-env log cartpole
```
```
commit ab7c49d0e6f3 cartpole
parent f84a2d7e91b0 cuda-base
manifest sha256:2ef7a63bbd4c70f58fc4e7b42d228a42f0ca9288e3fa4b6f9ab173079a8f284d
builder trainer@briefcase
message cartpole policy image with safetensors checkpoint
```
```bash
oci-bai search "format==safetensors cuda>=12.4"
```
```
rank repo ref format manifest
1 rl-gym-env acrobot safetensors sha256:dd21ce69ba84...
2 rl-gym-env cartpole safetensors sha256:2ef7a63bbd4c...
```
```bash
oci-bai --repo rl-gym-env provenance acrobot
```
```
acrobot
derives from cartpole
cartpole
derives from cuda-base
cuda-base
shared CUDA 12.4 runtime base
```
```bash
oci-bai --repo rl-gym-env diff cuda-base cartpole --depth package
```
```
@@ packages
+ gymnasium 1.1.0
+ numpy 1.26.4
~ torch 2.3.1 -> 2.4.0
```
## log
```
oci-bai log [
```
Commit history for a ref.
## diff
```
oci-bai diff [--depth ]
```
Compare two versions.
| `--depth` value | What it shows |
|-----------------|---------------|
| `bytes` | Byte-level diff |
| `file` | File-level diff (default) |
| `package` | Package changes |
| `semantic` | Config changes (entrypoint, CUDA, OS) |
| `imports` | Runtime imports when telemetry is available |
| `loaded-libs` | Loaded shared libraries when telemetry is available |
| `syscalls` | Syscall changes when telemetry is available |
| `bench` | Benchmark verdict deltas when attached |
```
Usage: oci-bai diff [OPTIONS]
Options:
--depth [default: file]
[possible values: bytes, file, package, semantic, imports, loaded-libs, syscalls, bench]
--server oci-jj-server gRPC endpoint
--repo Repository (image family) the command targets
-h, --help Print help
```
## search
```
oci-bai search
```
Search the catalog. See the [Search guide at oci.briefcaseai.io](https://oci.briefcaseai.io/search) for the full syntax.
```
Usage: oci-bai search [OPTIONS]
Options:
--semantic
--server oci-jj-server gRPC endpoint
--repo Repository (image family) the command targets
-h, --help Print help
```
**Built-in facets:** `cuda`, `python`, `os`, `arch`, `format`, `model`. Any name not in this list is treated as a package name.
```bash
oci-bai search "cuda>=12.4 format==safetensors arch==arm64"
oci-bai search "gymnasium==1.1.0"
oci-bai search "model==true"
```
## provenance
```
oci-bai provenance ][
```
Full derivation tree for a version.
```
Usage: oci-bai provenance [OPTIONS]
Options:
--server oci-jj-server gRPC endpoint
--repo Repository (image family) the command targets
-h, --help Print help
```
## whodepends
```
oci-bai whodepends ][
```
Every version that descends from this one.
## checkout
```
oci-bai checkout ][ [--paths ] [--dest ]
```
Fetch specific files without pulling the full image.
| Flag | Description |
|------|-------------|
| `--paths a,b` | Comma-separated path prefixes to fetch |
| `--dest dir` | Materialize files here (requires server-side staging) |
## cohort push
```
oci-bai cohort push
```
Push multiple related images atomically. Shared content is uploaded once regardless of how many
members reference it. Reads `cohort.json` from the directory.
```
Usage: oci-bai cohort push [OPTIONS]
Options:
--server oci-jj-server gRPC endpoint
--repo Repository (image family) the command targets
-h, --help Print help
```
Push each member's image through the gateway before running `cohort push`.
## undo
```
oci-bai undo [--op ]
```
Revert the last push, or a specific operation. History is append-only — undo is non-destructive.
## referrers
```
oci-bai referrers [--mine]
```
List referrers of an image. `--mine` shows your own private referrers.
## fsck
```
oci-bai fsck ][
```
Verify that every file in a version can be fully reconstructed from stored data.
## How It Works
Source: https://briefcaseai.io/evaluate/architecture/
> The gateway, artifact graph, and how pushes become searchable, versioned, deduplicated commits.
oci-bai has two pieces: a **gateway** and an **artifact graph server**. Every OCI push goes
through the gateway; the server builds and serves the graph.
## The push pipeline
```
docker push localhost:8080/my-repo:v1
│
▼
OCI Gateway ← any docker push / crane push
│ records manifest, files, derivation edges
▼
Artifact Graph ← commit DAG, dedup store, SBOM index
│ indexes packages, weight format, CUDA version
▼
Search / CLI ← oci-bai log / diff / search / provenance
```
1. **Gateway** receives the push, computes content-addressed chunks, and calls the graph server.
2. **Graph server** writes a commit: manifest digest, derivation links (inferred from shared layers or declared via cohort push), and the file tree.
3. **Indexers** run asynchronously: package extraction, model-weight detection (safetensors, GGUF), telemetry (imports, syscalls) when available.
4. **CLI** reads from the graph server over gRPC — no separate data store.
## Deduplication
When you push fifty fine-tunes of the same base model, only the novel content-addressed chunks
are stored. The weight-sharing percentage the CLI and dashboard report is derived from how many
chunks the candidate shares with its parent commit.
| Shared % | Meaning |
|----------|---------|
| ≥ 90% | Same checkpoint — re-uploaded or tag change |
| 50–90% | Partial fine-tune — adapter or head-only update |
| 5–50% | Significant retraining |
| < 5% | Full fine-tune — all weights retrained |
## Engine seam
The CLI speaks to the graph server over gRPC with server reflection — no generated stubs
required. Set `OCI_BAI_SERVER` to point at a non-local instance.
## Delegation mapping
| CLI command | What runs |
|-------------|-----------|
| `oci-bai log ` | Graph server: commit DAG walk |
| `oci-bai diff --depth bench` | Graph server: benchmark verdict delta |
| `oci-bai provenance ` | Graph server: derivation tree |
| `oci-bai search ` | Graph server: catalog query |
| `oci-bai whodepends ` | Graph server: reverse derivation walk |
| `oci-bai attach-bench ` | Graph server: write benchmark verdict record |
| `oci-bai hunt ` | verdictml HuntEnv episode |
========================================================================
# Guides
========================================================================
## Audit a Decision End-to-End
Source: https://briefcaseai.io/guides/audit-a-decision/
> Follow one support-triage decision from the evidence it depended on, through the policy that governed it, to a sealed, tamper-evident record you can verify months later.
This guide threads the whole Briefcase lifecycle through a single decision: a triage agent routes a support ticket, and six months later someone asks you to **prove what happened**. By the end you will have a sealed artifact that reconstructs the decision, the evidence it used, and the exact policy version in effect — and verifies it was not altered.
> When you
A decision is challenged in an incident review. You need to show the routing choice, the data it relied on, and the rule that governed it *as of the day it ran* — not today's data or today's policy.
Install the extras used here:
```bash
pip install briefcase-ai[routing,bitemporal,compliance]
```
1. ### Record the evidence the decision depends on
Evidence lives in an append-only bitemporal store. Each record carries **valid time** (when the fact was true) and **transaction time** (when you learned it), so corrections never overwrite history.
```python
from datetime import datetime, timezone
from briefcase.bitemporal import BitemporalRecord, InMemoryBitemporalStore
store = InMemoryBitemporalStore()
now = datetime.now(timezone.utc)
evidence = BitemporalRecord.new(
key="config:max_retries",
valid_time=now,
value=3,
source="config-service",
)
store.append(evidence)
```
2. ### Define the policy that governs the decision
A `PolicyVersion` is a set of rules published to a registry. Publishing is an **append**, so "the policy as of date X" always returns exactly the rules that were active then.
```python
from briefcase.routing import PolicyRegistry, PolicyVersion, PolicyRule
registry = PolicyRegistry()
policy = PolicyVersion(
policy_id="ticket-routing",
version="1",
rules=[
PolicyRule(
rule_id="high-priority",
condition={"priority": "high"},
choice="senior-queue",
rationale="High-priority tickets go to the senior queue.",
),
],
default_choice="standard-queue",
)
registry.publish(policy, valid_from=now)
```
3. ### Make the routing decision
The `AgentRouter` evaluates the context against the active policy and returns a decision that references the evidence it used.
```python
from briefcase.routing import AgentRouter
router = AgentRouter(registry, use_case="ticket-routing", policy_id="ticket-routing")
decision = router.route({"priority": "high"}, evidence_refs=[evidence.record_id])
print(decision.selected) # "senior-queue"
print(decision.matched_rule_id) # "high-priority"
print(decision.policy_version) # "1"
```
4. ### Seal a tamper-evident bundle
An `ExaminerBundle` joins the decision, the bitemporal evidence, and the policy version in effect, then seals the whole thing with a SHA-256 content hash. `verify()` raises if a single byte changes.
```python
from briefcase.compliance import ExaminerBundle, BundleIntegrityError
bundle = ExaminerBundle.build(decision, store, registry)
print(bundle.content_hash) # "sha256:..."
bundle.verify() # raises BundleIntegrityError if tampered
```
5. ### Transport it and re-verify
The bundle serializes to JSON, so it can leave your system and be checked anywhere — the hash makes it self-validating.
```python
restored = ExaminerBundle.from_json(bundle.to_json(indent=2))
restored.verify()
```
6. ### Reconstruct what was known at the time
Months later, the underlying config may have changed. Because evidence is append-only, you can reconstruct the store **as of** the decision's moment and read exactly what it saw.
```python
from briefcase.bitemporal import AsOfView
view = AsOfView(store, transaction_time=now)
print(view.as_of("config:max_retries").value) # 3, as it was at decision time
```
> Integrity is enforced, not assumed
`verify()` is the whole point: a bundle that has been edited — even by accident in transit — raises `BundleIntegrityError`. Treat a bundle that fails to verify as untrusted.
## Where this fits
This guide stitches together four building blocks. Go deeper on each:
- Bitemporal Storage — Two time axes and append-only corrections — the evidence layer. (/advanced/bitemporal-storage/)
- Versioned Routing Policy — Time-travelable policies and the rule that fired. (/advanced/versioned-routing-policy/)
- Audit Bundles — Sealing and verifying the full record. (/advanced/compliance-bundles/)
- Govern Agent Actions — Add controls that run before the action, not just after. (/guides/govern-agent-actions/)
## Govern Agent Actions
Source: https://briefcaseai.io/guides/govern-agent-actions/
> Put controls in front of an agent so an action is evaluated before it runs — deny-by-default guardrails, fail-closed pipelines, and a versioned policy that decides where the work goes.
Recording a decision tells you what *did* happen. Governing one means deciding what is *allowed to* happen — **before** the action runs. This guide adds two controls to the support-triage agent: a guardrail that authorizes the action, and a versioned policy that decides where it goes.
> When you
Your triage agent can invoke actions with real consequences — escalating, auto-resolving, assigning a queue. You want a control that can say "no" before any of that executes, and a routing decision you can later reconstruct.
```bash
pip install briefcase-ai[guardrails,routing]
```
1. ### Define a guardrail
A guardrail answers one question: may this agent perform this action on this resource? Subclass `BaseGuardrailEnv` and implement `evaluate`, returning an `EvalResult` with an `Effect`.
```python
from briefcase.guardrails import BaseGuardrailEnv, EvalRequest, EvalResult, Effect
class QueueGuardrail(BaseGuardrailEnv):
@property
def name(self) -> str:
return "queue_access"
@property
def request_space(self):
return {}
def evaluate(self, request: EvalRequest) -> EvalResult:
allowed = request.context.get("priority") == "high"
return EvalResult(
effect=Effect.ALLOW if allowed else Effect.DENY,
guardrail_name=self.name,
reason="priority check",
)
```
2. ### Evaluate before acting
Build the request that describes the action, evaluate it, and only proceed if it is allowed.
```python
guardrail = QueueGuardrail()
request = EvalRequest(
agent="triage-bot",
action="route",
resource="queue:senior",
context={"priority": "high"},
)
result = guardrail.evaluate(request)
if result.is_allowed:
... # perform the action
```
3. ### Compose a pipeline and fail closed
Real systems chain several guardrails. A `GuardrailPipeline` evaluates them in order and denies on the first denial (its default mode). Wrap the call so that **any error becomes a denial** — controls must fail closed, never open.
```python
from briefcase.guardrails import GuardrailPipeline
pipeline = GuardrailPipeline(stages=[guardrail])
def is_allowed(request: EvalRequest) -> bool:
try:
return pipeline.evaluate(request).is_allowed
except Exception:
return False # fail closed: an error never grants access
```
> Fail closed by default
A control that throws and is treated as "allow" is worse than no control. The pattern above denies on error; the Guardrails page also documents a built-in deny-by-default wrapper and other composable wrappers (caching, timeouts, auditing).
4. ### Route the allowed action through a versioned policy
Once an action is permitted, decide *where* it goes — using a policy you can reconstruct later. Publishing a policy version is append-only, so the rule that fired is always recoverable.
```python
from datetime import datetime, timezone
from briefcase.routing import PolicyRegistry, PolicyVersion, PolicyRule, AgentRouter
registry = PolicyRegistry()
registry.publish(
PolicyVersion(
policy_id="ticket-routing",
version="1",
rules=[PolicyRule(
rule_id="high-priority",
condition={"priority": "high"},
choice="senior-queue",
rationale="High-priority tickets go to the senior queue.",
)],
default_choice="standard-queue",
),
valid_from=datetime.now(timezone.utc),
)
router = AgentRouter(registry, use_case="ticket-routing", policy_id="ticket-routing")
decision = router.route({"priority": "high"})
print(decision.selected, decision.matched_rule_id, decision.policy_version)
```
> Controls before action — the whole point
The order matters: **evaluate, then act**. Guardrails decide permission; routing decides destination. Both run before the agent does anything irreversible, and both leave a record you can replay and seal.
## Where this fits
- Guardrails — The full guardrail framework: pipelines, modes, and composable wrappers. (/advanced/guardrails/)
- Routing — The simple in-process router and when to graduate to versioned policy. (/advanced/routing/)
- Versioned Routing Policy — Time-travelable policies and reconstruction. (/advanced/versioned-routing-policy/)
- Audit a Decision End-to-End — Seal this decision into a verifiable record. (/guides/audit-a-decision/)
## Observe AI in Production
Source: https://briefcaseai.io/guides/observe-in-production/
> Wire up live observability for a running triage agent — emit every decision, watch token cost against a budget, measure output drift, trace with OpenTelemetry, and fire events when something looks off.
Once the triage agent is live, you need to *see* what it is doing — cheaply and continuously. This guide layers Briefcase's observability tools onto the same `classify_ticket` function: emit records, track spend, detect drift, trace, and alert.
> When you
The agent is in production. You want a low-overhead stream of every decision, a guardrail on cost, an early warning when outputs start shifting, and traces that line up with the rest of your telemetry.
```bash
pip install briefcase-ai[drift,otel,events]
```
1. ### Emit every decision in one line
`observe()` wires up an exporter so captured decisions actually go somewhere. Use `"console"` in development, a `.jsonl` path for log shipping, or `"memory"` in tests.
```python
import briefcase
briefcase.observe("decisions.jsonl") # append-only, thread-safe
@briefcase.capture(decision_type="ticket-classification")
def classify_ticket(text: str) -> str:
# call your model here
return "billing"
```
2. ### Watch cost against a budget
`CostCalculator` estimates per-call cost from token counts and checks spend against a limit. The cost types ship in the base package.
```python
from briefcase.cost import CostCalculator
calc = CostCalculator()
estimate = calc.estimate_cost("gpt-4o-mini", input_tokens=1000, output_tokens=200)
print(estimate.total_cost, estimate.currency)
budget = calc.check_budget(current_spend=85.0, budget_limit=100.0)
print(budget.status, budget.alert_message) # e.g. "warning", "..."
```
3. ### Measure drift across repeated runs
Sample the same decision over time and ask how consistent it stays. A falling `consistency_score` is your signal that behavior is shifting.
```python
from briefcase.drift import DriftCalculator
calc = DriftCalculator().with_similarity_threshold(0.9)
metrics = calc.calculate_drift(["billing", "billing", "account", "billing"])
print(metrics.consistency_score, metrics.agreement_rate)
print(metrics.consensus_output, metrics.outliers)
```
4. ### Trace alongside your existing telemetry
`get_tracer()` returns a standard OpenTelemetry tracer. Spans describe the *timeline* of work; decision records carry the *governance context* — they are complementary and both flow to your collectors.
```python
from briefcase.otel import get_tracer
tracer = get_tracer("briefcase")
with tracer.start_as_current_span("classify_ticket"):
classify_ticket("My invoice is wrong")
```
5. ### Fire events when something looks off
Turn signals into action. The emit helpers are coroutines — `await` them inside an async context — and are ideal for low-confidence outputs or detected drift.
```python
import asyncio
from briefcase.events import emit_low_confidence, emit_drift_detected
async def main():
await emit_low_confidence({"id": "dec-1"}, confidence=0.4, threshold=0.7)
await emit_drift_detected({"id": "dec-1"}, {"drift_score": 0.3})
asyncio.run(main())
```
> From signal to response
These tools compose: cost and drift produce signals, events broadcast them, and your alerting reacts. Pair drift thresholds with `emit_drift_detected` so a shift in behavior pages the right person automatically.
## Where this fits
- Exporters — Stock and custom exporters, and the record shape they emit. (/features/exporters/)
- Cost Tracking — Estimates, model comparison, projections, and budgets. (/features/cost-tracking/)
- Drift Detection — What the drift metrics mean and how to tune them. (/features/drift-detection/)
- Multi-Agent & Events — Correlate decisions across a workflow and emit events. (/features/multi-agent/)
## Reproducible RAG
Source: https://briefcaseai.io/guides/reproducible-rag/
> Make a retrieval-augmented decision reproducible — version the embedding index, snapshot the external data it read, and validate that a prompt's references resolve before a model ever sees them.
When the triage agent answers from a knowledge base, the answer is only as reproducible as the **context behind it**: which documents, which embedding model, which version of an upstream source. This guide makes a retrieval-augmented decision reconstructable — so "why did it say that?" has an answer.
> When you
A RAG answer is questioned, but the knowledge base has since changed. You need to know which document versions and which embedding model produced it — and to catch stale indexes before they serve wrong answers.
```bash
pip install briefcase-ai[rag,external,validate]
```
1. ### Version the embedding index
A `VersionedEmbeddingPipeline` records which documents and model produced an index in an atomic manifest, so you can detect when it goes stale.
```python
from briefcase.rag import VersionedEmbeddingPipeline, Document
class EmbeddingModel:
def embed(self, texts):
return [[0.1, 0.2, 0.3] for _ in texts]
pipeline = VersionedEmbeddingPipeline(embedding_model=EmbeddingModel())
documents = [
Document(id="kb-1", content="Reset your password from settings.", metadata={"topic": "account"}),
]
batch = pipeline.create_embedding_batch(documents)
manifest = pipeline.create_manifest("faq-index", [batch])
print(manifest.index_name)
```
2. ### Detect when the index is stale
When documents or the model change, `check_invalidation` reports it — your cue to rebuild before serving.
```python
report = pipeline.check_invalidation("faq-index", documents)
print(report.is_valid) # False once a document's content hash changes
```
3. ### Snapshot the external data a decision read
Agents also read sources you do not control. `ExternalDataTracker` hashes each fetch, detects drift against the last snapshot, and appends corrections without mutating history.
```python
from briefcase.external import ExternalDataTracker, SnapshotPolicy, SnapshotFrequency
tracker = ExternalDataTracker(
default_policy=SnapshotPolicy(frequency=SnapshotFrequency.ON_CHANGE, retention_days=30),
)
result = tracker.track_api_call(
api_name="product-catalog",
endpoint="/products",
method="GET",
response_data={"items": [1, 2, 3]},
record_count=3,
)
print(result["snapshot_id"], result["drift_detected"])
```
4. ### Validate references before the model runs
Before a prompt reaches a model, confirm its references actually resolve against a versioned knowledge base. You supply an extractor (finds references) and a resolver (checks them); the engine records the commit it validated against.
```python
import re
from briefcase.validation import PromptValidationEngine
from briefcase.validation.errors import ValidationError, ValidationErrorCode
class RegexExtractor:
_REF = re.compile(r"[\w/]+\.md")
def extract(self, prompt: str) -> list:
return self._REF.findall(prompt)
class AllowlistResolver:
def __init__(self, known): self._known = known
def resolve_all(self, references):
return [
ValidationError(
code=ValidationErrorCode.REFERENCE_NOT_FOUND,
message=f"Reference not found: {ref}",
reference=ref, severity="error", layer="resolution",
remediation="Add the document to the knowledge base.",
)
for ref in references if ref not in self._known
]
class DemoLakeFS:
def get_commit(self, repository: str, branch: str) -> str:
return "demo0000000000000000000000000000000000000"
engine = PromptValidationEngine(
extractor=RegexExtractor(),
resolver=AllowlistResolver({"kb/faq.md"}),
lakefs_client=DemoLakeFS(),
repository="knowledge-base",
branch="main",
mode="strict",
)
report = engine.validate("See kb/faq.md and kb/missing.md")
print(report.status, report.references_checked, report.has_errors)
```
> Reproducibility is the sum of its sources
A RAG answer is reproducible only if *every* input is pinned: the document versions (manifest), the upstream data (snapshot), and the validated references (commit). Skip one and the trail has a gap.
## Where this fits
- RAG Versioning — Manifests, invalidation reports, and instrumented retrieval. (/advanced/rag-versioning/)
- External Data — Snapshot policies, drift detection, and append-only corrections. (/advanced/external-data/)
- Validation Engine — Extractors, resolvers, and the layered validation flow. (/advanced/validation-engine/)
- lakeFS — Capture commit SHAs for the data your agents read. (/integrations/lakefs/)
## Track & Compare Model Versions
Source: https://briefcaseai.io/guides/run-an-evaluation/
> Push a baseline and candidate through the gateway, compare them at every depth, and read the verdict scorecard.
This guide walks through a complete evaluation workflow using oci-bai: push a baseline and a
candidate, compare them at the package and benchmark depths, and read the verdict back.
> Private beta
oci-bai is in private beta. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access.
## What you need
- A running local stack (`make up`, `make seed`)
- The `oci-bai` CLI on your `PATH`
- Two image tags to compare (the guide uses the seeded `rl-gym-env` family)
## Workflow
1. ### Push the baseline
```bash
docker tag my-base:latest localhost:8080/rl-gym-env:cuda-base
docker push localhost:8080/rl-gym-env:cuda-base
oci-bai --repo rl-gym-env log cuda-base
```
2. ### Push the candidate
```bash
docker tag my-candidate:latest localhost:8080/rl-gym-env:cartpole
docker push localhost:8080/rl-gym-env:cartpole
oci-bai --repo rl-gym-env provenance cartpole
```
3. ### Compare at package depth
```bash
oci-bai --repo rl-gym-env diff cuda-base cartpole --depth package
```
Output shows added, removed, and changed packages between the two versions.
4. ### Attach a benchmark verdict
```bash
oci-bai --repo rl-gym-env attach-bench cartpole
```
5. ### Compare at bench depth
```bash
oci-bai --repo rl-gym-env diff cuda-base cartpole --depth bench
```
Output shows verdict deltas between the baseline and candidate.
6. ### Check impact
```bash
oci-bai --repo rl-gym-env whodepends cuda-base
```
Shows every version that descends from the baseline — useful before promoting a base update.
## Key commands at a glance
| Step | Command |
|------|---------|
| Push | `docker push localhost:8080/:` |
| History | `oci-bai --repo log ` |
| Provenance | `oci-bai --repo provenance ` |
| Compare packages | `oci-bai --repo diff --depth package` |
| Compare verdicts | `oci-bai --repo diff --depth bench` |
| Find dependents | `oci-bai --repo whodepends ` |
| Search | `oci-bai search "format==safetensors cuda>=12.4"` |
========================================================================
# Integrations
========================================================================
## MCP Server
Source: https://briefcaseai.io/integrations/mcp/
> Expose Briefcase's safe, read-only SDK operations to MCP-capable tools.
Run a Model Context Protocol (MCP) server that gives MCP-capable tools — Claude
Code, Cursor, Codex — direct access to Briefcase operations: sanitize PII,
estimate model cost, analyze output drift, and read the usage guide. The tools
are read-only and wrap `briefcase.sanitize`, `briefcase.cost`, and
`briefcase.drift`.
{' '}
> When you'd reach for this
You're building a support-triage agent in an MCP-capable tool and want it to
redact PII before text leaves your environment, sanity-check the cost of a model
call, or compare outputs for drift — without wiring up the Python SDK directly.
The MCP server lets the runtime call these Briefcase capabilities as plain tools.
## Install
```bash
pip install briefcase-ai[mcp]
```
> Requires the mcp extra
The `[mcp]` extra installs `mcp>=1.2`. Import from `briefcase.mcp`.
## Run It
Start the server over stdio with the console script:
```bash
briefcase-mcp
```
Or run the module directly:
```bash
python -m briefcase.mcp
```
## Build the Server in Python
`build_server()` returns a configured `FastMCP` instance, and `main()` runs it
over stdio.
```python
from briefcase.mcp import build_server
server = build_server()
print(server.name) # -> "briefcase"
```
## Register in an MCP Client
Point an MCP-capable client at the `briefcase-mcp` command. The exact config file
differs per tool, but the shape is the same:
```json
{
"mcpServers": {
"briefcase": {
"command": "briefcase-mcp"
}
}
}
```
## Tools
The server exposes four tools:
| Tool | What it does | Act |
| --- | --- | --- |
| `sanitize_text` | Redact PII from text before it leaves your environment | Capture |
| `estimate_cost` | Estimate the cost of a model call | Operate |
| `analyze_drift` | Check a set of outputs for consistency and drift | Replay & Verify |
| `how_to` | Retrieve Briefcase usage guidance | — |
### sanitize_text
Redact PII (emails, phones, SSNs, cards, API keys, IPs) from text. Wraps
`briefcase.sanitize.Sanitizer`.
| Input | Type |
|-------|------|
| `text` | `str` |
Returns `{ "sanitized": str, "redactions": list[str] }` — the redacted text and
the PII types found.
```text
sanitize_text("Contact me at jordan@example.com or 555-123-4567")
-> {"sanitized": "Contact me at [REDACTED_EMAIL] or [REDACTED_PHONE]",
"redactions": ["email", "phone"]}
```
### estimate_cost
Estimate the USD cost of an LLM call. Wraps `briefcase.cost.CostCalculator`.
| Input | Type | |
|-------|------|--|
| `model` | `str` | |
| `input_tokens` | `int` | |
| `output_tokens` | `int` | |
| `rate_card` | `str` (optional) | `platform × tier` pricing, e.g. `"bedrock:batch"` |
Returns `{ "model": str, "rate_card": str, "input_cost": float, "output_cost": float, "cache_cost": float, "total_cost": float }`.
```text
estimate_cost("claude-haiku-4-5", 1000, 500)
-> {"model": "claude-haiku-4-5", "rate_card": "standard", "input_cost": 0.001,
"output_cost": 0.0025, "cache_cost": 0.0, "total_cost": 0.0035}
```
> New in 3.2.1
`estimate_cost` accepts an optional `rate_card` and returns `cache_cost`. See
[Cost Tracking](/features/cost-tracking/) for rate cards and prompt-cache billing.
### analyze_drift
Analyze a list of model outputs for consistency and drift. Wraps
`briefcase.drift.DriftCalculator`.
| Input | Type |
|-------|------|
| `outputs` | `list[str]` |
Returns `{ "consistency_score": float, "agreement_rate": float, "consensus_output": str, "status": str }`.
```text
analyze_drift(["billing", "billing", "shipping"])
-> {"consistency_score": 0.67, "agreement_rate": 0.67,
"consensus_output": "billing", "status": "drifting"}
```
### how_to
Return Briefcase usage guidance.
| Input | Type |
|-------|------|
| `topic` | `str` (optional) |
Pass a topic keyword (e.g. `"export"`, `"sanitize"`, `"cost"`, `"logging"`) to get
matching sections, or leave it empty for the full guide.
## Resource
The server also exposes a resource, `briefcase://llms-full.txt`, which serves the
bundled Briefcase usage guide for clients that read MCP resources.
> Docs as machine-readable text
This documentation site also publishes [`/llms.txt`](https://briefcaseai.io/llms.txt)
(a curated index) and [`/llms-full.txt`](https://briefcaseai.io/llms-full.txt) (the
core docs as plain text), so an assistant can ingest Briefcase even without the MCP
server. See [AI-Assisted Setup](/getting-started/ai-assisted-setup/).
## Where this fits
These tools surface Briefcase capabilities to any MCP client. To go deeper on what
each one does in the full SDK:
- PII Sanitization — How sanitize_text redacts sensitive data before it (/features/pii-sanitization/)
- Drift Detection — How analyze_drift compares outputs over time. (/features/drift-detection/)
## Next steps
- Exporters — Send decisions from your own code to the same backends these tools read. (/features/exporters/)
- Cost Tracking — The library behind the estimate_cost tool, with budget checks. (/features/cost-tracking/)
## lakeFS
Source: https://briefcaseai.io/integrations/lakefs/
> Capture lakeFS commit SHAs for the versioned data your agents read.
Track exactly which version of a policy document, taxonomy, or reference file an
agent read, by capturing the lakeFS commit SHA on every object access.
lakeFS is one bundled versioned-data source — if your data lives elsewhere,
implement the same capture against any version-controlled store through the
generic `vcs` protocol (`pip install briefcase-ai[vcs]`).
{' '}
> When you'd reach for this
Your support-triage agent classifies tickets using a knowledge base that lives in
a data lake, and that lake is updated daily. When you replay a decision weeks
later, you need the exact data the agent read — not today's version. Capturing the
lakeFS commit SHA at decision time lets a replay see the same data and reach the
same conclusion.
## Install
```bash
pip install briefcase-ai[lakefs]
```
The `[lakefs]` extra installs the `lakefs` package. Import from
`briefcase.integrations.lakefs`.
> Mock mode
When the `lakefs` package or a live endpoint is unavailable, the client runs in
mock mode so examples stay runnable.
## Track Reads with a Context Manager
Open a `versioned_context` and every read inside it is tagged with the resolved
commit SHA.
```python
from briefcase.integrations.lakefs import versioned_context
from unittest.mock import Mock
class MockBriefcaseClient:
def __init__(self):
self.config = {
"lakefs_endpoint": "https://example.lakefscloud.io/api/v1",
"lakefs_access_key": "your_access_key",
"lakefs_secret_key": "your_secret_key",
}
client = MockBriefcaseClient()
with versioned_context(client, "knowledge-base", "main") as lakefs:
refund_policy = lakefs.read_object("docs/refund_policy.pdf")
taxonomy = lakefs.read_object("config/category_taxonomy.json")
print(f"Read refund policy: {len(refund_policy)} bytes")
print(f"Read taxonomy: {len(taxonomy)} bytes")
print(f"Commit SHA: {lakefs.get_commit()}")
```
## Track Reads with a Decorator
`@versioned` injects a `VersionedClient` as the `versioned_client` keyword
argument. Pass your Briefcase client as `briefcase_client` when you call the
function.
```python
from briefcase.integrations.lakefs import versioned
from unittest.mock import Mock
class MockBriefcaseClient:
def __init__(self):
self.config = {
"lakefs_endpoint": "https://example.lakefscloud.io/api/v1",
"lakefs_access_key": "your_access_key",
"lakefs_secret_key": "your_secret_key",
}
client = MockBriefcaseClient()
@versioned(repository="knowledge-base", branch="main")
def classify_ticket(ticket: dict, versioned_client=None) -> dict:
policy = versioned_client.read_object("docs/refund_policy.pdf")
taxonomy = versioned_client.read_object("config/category_taxonomy.json")
return {
"category": "billing",
"commit_sha": versioned_client.get_commit(),
"bytes_read": len(policy) + len(taxonomy),
}
ticket = {"id": "TKT-4471", "subject": "Refund request"}
result = classify_ticket(ticket, briefcase_client=client)
print(f"Category: {result['category']}")
print(f"Commit SHA: {result['commit_sha']}")
```
## Use the Client Directly
Construct a `VersionedClient` when you need explicit control over reads,
existence checks, and listings.
```python
from briefcase.integrations.lakefs import VersionedClient
from unittest.mock import Mock
class MockBriefcaseClient:
def __init__(self):
self.config = {
"lakefs_endpoint": "https://example.lakefscloud.io/api/v1",
"lakefs_access_key": "your_access_key",
"lakefs_secret_key": "your_secret_key",
}
client = MockBriefcaseClient()
versioned_client = VersionedClient(
repository="knowledge-base",
branch="main",
briefcase_client=client,
)
for path in ["docs/refund_policy.pdf", "docs/shipping_policy.pdf"]:
if versioned_client.object_exists(path):
content = versioned_client.read_object(path)
print(f"Read {path}: {len(content)} bytes")
objects = versioned_client.list_objects(prefix="docs/")
print(f"Found {len(objects)} objects in docs/")
print(f"Commit SHA: {versioned_client.get_commit()}")
```
## VersionedClient Methods
| Method | Returns |
|--------|---------|
| `read_object(path, return_metadata=False)` | Object bytes, optionally with metadata |
| `upload_object(path, data, content_type=...)` | Writes bytes to the branch |
| `list_objects(prefix="")` | Objects under a prefix |
| `object_exists(path)` | `True` if the object is present |
| `get_commit()` | The resolved commit SHA for this client |
`VersionedClient(repository, branch, commit="latest", briefcase_client=None, ...)`
resolves `commit="latest"` against the branch head; pin a SHA to read a fixed
version.
## Where this fits
Capturing a lakeFS commit SHA is part of the **Store & Query** act: pin exactly
what your agents read so replays are reproducible.
- External Data — Capture references to any external source — including data-lake commits — behind a decision. (/advanced/external-data/)
- Reproducible RAG — Pin retrieval sources so a replayed RAG decision reads the same context. (/guides/reproducible-rag/)
## Next steps
- Storage Adapters — Persist the decisions that captured these commit SHAs. (/features/storage-adapters/)
- RAG Versioning — Pin and track the document versions feeding your retrieval pipeline. (/advanced/rag-versioning/)
========================================================================
# Reference
========================================================================
## oci-bai Install & Compatibility
Source: https://briefcaseai.io/reference/install-compat/
> Install the oci-bai CLI, bring up the local stack, and check the compatibility matrix.
> Private beta
oci-bai is in private beta. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) to request access.
## Install the CLI
Download the pre-built binary for your platform from the oci-bai releases page, or build from source:
```bash
# From source (requires Rust toolchain)
cargo install oci-bai
```
Verify:
```bash
oci-bai --version
```
## Bring up the local stack
```bash
make up # gateway on :8080, graph server on :50051, Postgres+AGE, MinIO, registry
make seed # create demo refs: rl-gym-env family with cuda-base and cartpole
```
To tear down:
```bash
make down
```
## Shell defaults
```bash
export OCI_BAI_SERVER=http://localhost:50051
export OCI_BAI_REPO=my-repo
```
## Compatibility matrix
| Component | Version |
|-----------|---------|
| oci-bai CLI | 0.1.0 |
| oci-jj-server API (min) | v1 |
| oci-jj image tag | 0.1.0 |
| verdictml | v0.1.0 |
## Web dashboard
The hosted dashboard is in early preview. Contact [support@briefcaseai.org](mailto:support@briefcaseai.org) for access.
## Python SDK
Source: https://briefcaseai.io/sdk/python/
> Complete guide to the Briefcase AI Python package.
{' '}
> This is the narrative guide
This page walks through the SDK by example. For exhaustive class, method, and parameter signatures, see the [Python API reference](/api/python/).
## Install
```bash
pip install briefcase-ai
```
## Import Paths
```python
from briefcase import (
capture,
observe,
setup,
init,
init_with_config,
is_initialized,
enable_logging,
set_log_level,
disable_logging,
get_logger,
BriefcaseConfig,
DecisionSnapshot,
Snapshot,
SnapshotQuery,
Input,
Output,
ModelParameters,
ExecutionContext,
HardwareMetadata,
)
from briefcase.cost import CostCalculator, CostEstimate, BudgetStatus
from briefcase.drift import DriftCalculator, DriftMetrics
from briefcase.sanitize import Sanitizer
from briefcase.storage import SqliteBackend, BufferedBackend
from briefcase.replay import ReplayEngine, ReplayPolicy, ReplayResult, ReplayStats
from briefcase.validation import PromptValidationEngine, ValidationReport
from briefcase.external import ExternalDataTracker, SnapshotPolicy, SnapshotFrequency
from briefcase.routing import AgentRouter, PolicyRegistry, PolicyVersion, PolicyRule
from briefcase.events import BriefcaseEvent, emit
from briefcase.bitemporal import BitemporalRecord, InMemoryBitemporalStore, AsOfView
from briefcase.compliance import ExaminerBundle
from briefcase.exporters import BaseExporter
```
## The @capture Decorator
```python
from briefcase import capture
@capture(decision_type="ticket-classification")
def classify_ticket(text: str) -> str:
return "billing"
```
`@capture` records a lightweight dict for every call — inputs, outputs, and timing — and hands it to an exporter. It does not persist a `DecisionSnapshot`; to store and replay structured decisions, build a `DecisionSnapshot` and use `SqliteBackend` (see [Core Concepts](/getting-started/core-concepts/)).
## Configuration
`setup()` wires up exporters, routing, events, storage, and other components and returns a `BriefcaseConfig`. There is no `configure()` function.
```python
from briefcase import setup
config = setup(
exporter=None,
router=None,
webhook_url=None,
storage=None,
)
```
Start the native runtime once with `init()`, or use `init_with_config()` instead to set worker threads. `BriefcaseConfig.get()` returns the active configuration.
```python
from briefcase import init, is_initialized, BriefcaseConfig
init() # start the runtime (use init_with_config(worker_threads=4) instead to size the pool)
print(is_initialized())
config = BriefcaseConfig.get()
```
## Logging
> Silent by default
The top-level `briefcase` logger has only a `NullHandler` attached, so importing
the package emits nothing. Opt in when you want to see what the SDK is doing.
These functions are in the base package — no extra required.
```python
import briefcase
# Opt in to briefcase logs on stderr (default level "INFO").
briefcase.enable_logging("DEBUG")
# Change the level later without re-adding a handler.
briefcase.set_log_level("WARNING")
# Use the same logger tree in your own modules.
log = briefcase.get_logger(__name__)
log.warning("classification fell back to default category")
# Turn briefcase logging back off and restore silence.
briefcase.disable_logging()
```
`enable_logging(level="INFO", *, stream=None, fmt=None, datefmt=None)` returns
the `briefcase` logger and adds a single `StreamHandler` (idempotent). Pass
`stream=`, `fmt=`, or `datefmt=` to control where and how records are formatted.
`set_log_level(level)` adjusts the level in place. `disable_logging()` removes the
handler. `get_logger(__name__)` returns a child of the `briefcase` logger so your
own modules inherit the same configuration.
Set `BRIEFCASE_LOG_LEVEL` to enable logging automatically at import — useful for
turning on diagnostics without touching code:
```bash
BRIEFCASE_LOG_LEVEL=DEBUG python app.py
```
## Extras
Install only what you need. See [Installation](/getting-started/installation/) for the full extras table.
## Lazy Imports
Optional submodules import only when their backing code is available. Pure-Python extras (`replay`, `validate`, `correlation`, `external`) report a missing extra:
```
ImportError: briefcase.replay requires the 'replay' extra.
Install it with: pip install briefcase-ai[replay]
```
Native-backed modules (`cost`, `drift`, `sanitize`, `storage`) instead ask you to reinstall or rebuild the native extension:
```
ImportError: briefcase.storage could not load the native extension. Reinstall the package (pip install --force-reinstall briefcase-ai) or rebuild from source with 'maturin develop'.
```
## Next steps
- Python API Reference — Full signatures for every public symbol, grouped by module. (/api/python/)
- Exporters — Wire captured decisions to console, file, or a custom backend. (/features/exporters/)
## Python API
Source: https://briefcaseai.io/api/python/
> Full API reference for the Briefcase AI Python SDK, grouped by module.
Reference for the public symbols of `briefcase-ai` (v3.3.0). Signatures match the
shipped SDK. Each section lists an install command, the import path, and one
small runnable usage.
> Looking for the narrative?
This page is the exhaustive signature reference. For a worked walkthrough of the SDK, start with the [Python SDK guide](/sdk/python/).
Install the base package:
```bash
pip install briefcase-ai
```
Optional feature extras install only what they need.
> Most extras add no dependencies
Most extras (`replay`, `drift`, `sanitize`, `storage`, `validate`, `guardrails`,
`rag`, `correlation`, `external`, `events`, `routing`, `vcs`, `bitemporal`,
`compliance`) pull in no third-party dependencies and exist to document intent.
Only `otel`, `lakefs`, `bitemporal-iceberg`, and `mcp` install external packages.
## briefcase
```bash
pip install briefcase-ai
```
Top-level exports.
### `capture()`
```python
from briefcase import capture
@capture(decision_type="classification")
def classify_ticket(text: str) -> str:
return "account_access"
classify_ticket("reset my password")
```
```python
capture(
fn=None,
*,
decision_type=None,
context_version=None,
max_input_chars=1000,
max_output_chars=1000,
exporter=None,
async_capture=True,
)
```
The `@capture` decorator records a lightweight dict for each call and forwards it
to an `exporter`. It does not itself persist a native `DecisionSnapshot`; for
storage and replay use the native runtime objects below.
### `setup()`
```python
from briefcase import setup
config = setup(
exporter=None,
storage=None,
guardrail_packs=None,
)
```
```python
setup(
exporter=None,
router=None,
webhook_url=None,
webhook_secret=None,
events=None,
event_bus=None,
storage=None,
guardrail_packs=None,
) -> BriefcaseConfig
```
### `init()`, `init_with_config()`, `is_initialized()`
```python
import briefcase
briefcase.init() # start the native runtime
print(briefcase.is_initialized())
```
`init()` must be called once before using the native storage and replay layer.
Use `init_with_config(worker_threads=2)` instead of `init()` to size the worker
pool. The runtime can only be initialized once per process.
### `observe()`
```python
import briefcase
mem = briefcase.observe("memory")
@briefcase.capture(async_capture=False)
def classify_ticket(text: str) -> str:
return "account_access"
classify_ticket("reset my password")
print(mem.records[0]["function_name"]) # "classify_ticket"
```
```python
observe(exporter="console", *, level=None) -> BaseExporter
```
Wires up decision export in one call. Without it, `@capture` records decisions
but has nowhere to send them. `exporter` accepts a `BaseExporter` instance or a
shorthand string: `"console"` (default, `ConsoleExporter`), `"memory"`
(`MemoryExporter`), or a path ending in `.jsonl` (`JSONLFileExporter`). Returns
the configured exporter, so a `MemoryExporter` can be inspected via `.records`.
Pass `level=` to also enable logging at that level. `@capture` exports in a
background thread by default, so use `@capture(async_capture=False)` when you
want a record to appear synchronously (for example to read
`MemoryExporter.records` right after the call).
### `enable_logging()`, `set_log_level()`, `disable_logging()`, `get_logger()`
```python
import briefcase
logger = briefcase.enable_logging("DEBUG") # opt-in; silent by default
briefcase.set_log_level("INFO")
module_logger = briefcase.get_logger("briefcase.app")
briefcase.disable_logging()
```
```python
enable_logging(level="INFO", *, stream=None, fmt=None, datefmt=None) -> logging.Logger
set_log_level(level) -> None
disable_logging() -> None
get_logger(name) -> logging.Logger
```
The library attaches only a `NullHandler` and emits nothing until you opt in.
`enable_logging` idempotently adds a single `StreamHandler` (default
`sys.stderr`) and returns the `briefcase` logger. Setting the environment
variable `BRIEFCASE_LOG_LEVEL=DEBUG` enables logging automatically at import.
### `BriefcaseConfig`
```python
from briefcase import BriefcaseConfig
config = BriefcaseConfig.get()
registry = config.guardrail_registry
config.reset()
```
### `DecisionSnapshot`
```python
from briefcase import DecisionSnapshot, Input, Output, ModelParameters
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "reset my password", "string"))
output = Output("category", "account_access", "string")
output.with_confidence(0.92)
decision.add_output(output)
decision.with_execution_time(12.0)
decision.with_module("triage_service")
decision.add_tag("environment", "production")
print(decision.function_name, decision.fingerprint()[:12])
```
```python
DecisionSnapshot(function_name)
.add_input(input)
.add_output(output)
.add_tag(key, value)
.with_model_parameters(params)
.with_execution_time(ms)
.with_module(module)
.with_agent(agent)
.with_hardware(hardware)
.with_error(error, error_type)
.with_scorecard(scorecard)
.fingerprint()
# attributes: function_name, module_name, inputs, outputs, tags, execution_time_ms
```
### `Snapshot`
```python
from briefcase import Snapshot
session = Snapshot("session")
session.add_decision(decision)
print(len(session.decisions))
```
### `SnapshotQuery`
```python
from briefcase import SnapshotQuery
query = SnapshotQuery()
query.with_function_name("classify_ticket")
query.with_tag("environment", "production")
query.with_limit(50)
query.with_offset(0)
```
### `Input`, `Output`
```python
from briefcase import Input, Output
text_input = Input("text", "reset my password", "string")
print(text_input.name, text_input.value, text_input.data_type)
result = Output("category", "account_access", "string")
result.with_confidence(0.92)
print(result.confidence)
```
### `ModelParameters`
```python
from briefcase import ModelParameters
params = ModelParameters("claude-3-haiku")
params.with_provider("anthropic")
params.with_parameter("temperature", 0.0)
params.with_parameter("max_tokens", 256)
```
### `ExecutionContext`
```python
from briefcase import ExecutionContext
context = ExecutionContext()
context.with_runtime_version("3.11")
context.with_dependency("transformers", "4.40.0")
context.with_env_var("REGION", "us-east-1")
context.with_random_seed(42)
```
### `HardwareMetadata`
```python
from briefcase import HardwareMetadata
hardware = HardwareMetadata("gpu", "A10G")
hardware.with_provider("aws")
hardware.with_vram(24.0)
```
## briefcase.storage
```bash
pip install briefcase-ai[storage]
```
Two backends ship in the open-source package: `SqliteBackend` and
`BufferedBackend`. The native runtime must be initialized first.
### `SqliteBackend`
```python
import briefcase
from briefcase import DecisionSnapshot, Input, Output, Snapshot, SnapshotQuery
from briefcase.storage import SqliteBackend
briefcase.init()
backend = SqliteBackend.in_memory() # or SqliteBackend("decisions.db")
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "reset my password", "string"))
decision.add_output(Output("category", "account_access", "string"))
decision_id = backend.save_decision(decision)
loaded = backend.load_decision(decision_id)
session = Snapshot("session")
session.add_decision(decision)
snapshot_id = backend.save(session)
backend.load(snapshot_id)
backend.query(SnapshotQuery().with_function_name("classify_ticket"))
backend.health_check()
```
```python
SqliteBackend(path)
SqliteBackend.in_memory()
.save(snapshot) -> snapshot_id
.load(snapshot_id)
.save_decision(decision) -> decision_id
.load_decision(decision_id)
.query(query)
.delete(id)
.health_check()
```
### `BufferedBackend`
```python
from briefcase.storage import BufferedBackend
buffered = BufferedBackend(backend, buffer_size=100)
buffered.save_decision(decision)
```
## briefcase.replay
```bash
pip install briefcase-ai[replay]
```
Re-executes stored decisions against a backend. Valid modes are `"strict"` and
`"tolerant"` (the default).
### `ReplayEngine`
```python
import briefcase
from briefcase import DecisionSnapshot, Input, Output
from briefcase.storage import SqliteBackend
from briefcase.replay import ReplayEngine
briefcase.init()
backend = SqliteBackend.in_memory()
decision = DecisionSnapshot("classify_ticket")
decision.add_input(Input("text", "reset my password", "string"))
decision.add_output(Output("category", "account_access", "string"))
decision_id = backend.save_decision(decision)
engine = ReplayEngine(backend)
result = engine.replay(decision_id, "strict")
print(result.status, result.outputs_match, result.execution_time_ms)
stats = engine.get_replay_stats([decision_id])
print(stats.total_replays, stats.success_rate)
```
```python
ReplayEngine(storage)
.replay(snapshot_id, mode)
.replay_batch(snapshot_ids, mode, max_concurrent)
.replay_with_policy(snapshot_id, policy, mode)
.validate(snapshot_id, policy)
.get_replay_stats(snapshot_ids)
.default_mode
```
### `ReplayPolicy`
```python
from briefcase.replay import ReplayPolicy
policy = ReplayPolicy("output_match")
policy.with_exact_match("category")
policy.with_similarity_threshold("summary", 0.9)
result = engine.replay_with_policy(decision_id, policy, "strict")
print(result.status, result.policy_violations)
```
### `ReplayResult`
Returned by `replay` / `replay_with_policy`. Attributes: `status`,
`outputs_match`, `replay_output`, `original_snapshot`, `execution_time_ms`,
`policy_violations`, plus `to_dict()`.
### `ReplayStats`
Returned by `get_replay_stats`. Attributes: `total_replays`,
`successful_replays`, `failed_replays`, `exact_matches`, `mismatches`,
`success_rate`, `average_execution_time_ms`, `total_execution_time_ms`, plus
`to_dict()`.
## briefcase.drift
```bash
pip install briefcase-ai[drift]
```
### `DriftCalculator`
```python
from briefcase.drift import DriftCalculator
calculator = DriftCalculator()
calculator.with_similarity_threshold(0.9)
metrics = calculator.calculate_drift(["billing", "billing", "account", "billing"])
print(metrics.consistency_score, metrics.agreement_rate, metrics.drift_score)
print(metrics.consensus_output, metrics.outliers)
print(metrics.get_status(calculator))
```
```python
DriftCalculator()
.calculate_drift(outputs) -> DriftMetrics
.with_similarity_threshold(threshold)
.similarity_threshold
```
### `DriftMetrics`
Returned by `calculate_drift`. Attributes: `consistency_score`,
`agreement_rate`, `drift_score`, `consensus_output`, `consensus_confidence`,
`outliers`, `total_samples`, plus `get_status(calculator)` and `to_dict()`.
## briefcase.cost
```bash
pip install briefcase-ai
```
Cost types ship in the base package under `briefcase.cost` — there is no `cost`
extra.
### `CostCalculator`
```python
from briefcase.cost import CostCalculator
calculator = CostCalculator()
estimate = calculator.estimate_cost("claude-haiku-4-5", 1000, 500)
print(estimate.total_cost, estimate.input_cost, estimate.output_cost)
# rate_card (platform × tier) and cache tokens are keyword-only (3.2.1)
batch = calculator.estimate_cost("claude-opus-4-8", 500_000, 50_000, rate_card="bedrock:batch")
cached = calculator.estimate_cost("claude-opus-4-8", 0, 1000, cache_read_tokens=100_000)
print(batch.total_cost, cached.cache_cost)
print(calculator.get_available_rate_cards())
budget = calculator.check_budget(85.0, 100.0)
print(budget.status, budget.percent_used, budget.remaining_budget, budget.alert_message)
print(calculator.compare_models("claude-haiku-4-5", "gpt-5.4-mini", 1000, 500))
print(calculator.project_monthly_cost("claude-haiku-4-5", 5000, 2000, 30))
```
```python
CostCalculator()
.estimate_cost(model_name, input_tokens, output_tokens, *,
rate_card=None, cache_read_tokens=None,
cache_write_5m_tokens=None, cache_write_1h_tokens=None) -> CostEstimate
.estimate_cost_from_text(model_name, input_text, estimated_output_tokens, *, rate_card=None)
.estimate_tokens(text)
.check_budget(current_spend, budget_limit) -> BudgetStatus
.compare_models(model_a, model_b, input_tokens, output_tokens, *, rate_card=None)
.project_monthly_cost(model_name, daily_input_tokens, daily_output_tokens, days_per_month, *, rate_card=None)
.get_available_rate_cards() -> list[str]
.get_available_models()
.get_cheapest_model(min_context_window)
.get_models_by_provider(provider)
.get_models_under_cost(max_cost_per_1k)
```
A `rate_card` is a forgiving `platform × tier × modifiers` string (platforms
`first_party` / `bedrock` / `vertex` / `azure`; tiers `standard` / `batch` /
`cached` / `priority` / `flex`). Omit it for first-party standard pricing.
### `CostEstimate`
Attributes: `model_name`, `input_tokens`, `output_tokens`, `input_cost`,
`output_cost`, `cache_cost`, `total_cost`, `cost_per_token`, `currency`, plus
`to_dict()`.
### `BudgetStatus`
Attributes: `status`, `percent_used`, `remaining_budget`, `current_spend`,
`budget_limit`, `alert_message`, plus `to_dict()`.
## briefcase.sanitize
```bash
pip install briefcase-ai[sanitize]
```
### `Sanitizer`
```python
from briefcase.sanitize import Sanitizer
sanitizer = Sanitizer()
result = sanitizer.sanitize("Contact support@example.com or call 555-123-4567")
print(result.sanitized, result.redaction_count)
for redaction in result.redactions:
print(redaction.pii_type, redaction.start_position, redaction.end_position)
print(sanitizer.contains_pii("support@example.com"))
print(sanitizer.analyze_pii("support@example.com"))
json_result = sanitizer.sanitize_json({"contact": "support@example.com"})
print(json_result.redaction_count)
sanitizer.add_pattern("ticket_id", r"\bTCK-\d{6}\b")
```
```python
Sanitizer()
.sanitize(text) -> SanitizationResult
.sanitize_json(data) -> SanitizationJsonResult
.contains_pii(text)
.analyze_pii(text)
.add_pattern(name, pattern)
.remove_pattern(pattern_name)
.set_enabled(enabled)
```
### `SanitizationResult`
Attributes: `sanitized`, `redactions`, `redaction_count`, `has_redactions`, plus
`to_dict()`.
### `Redaction`
Attributes: `pii_type`, `start_position`, `end_position`, `original_length`, plus
`to_dict()`.
## briefcase.validation
```bash
pip install briefcase-ai[validate]
```
The validation engine is pluggable: supply an extractor (finds references in a
prompt), a resolver (checks each reference), and a versioned client (records the
commit the validation ran against).
### `PromptValidationEngine`
```python
import re
from briefcase.validation import PromptValidationEngine
from briefcase.validation.errors import ValidationError, ValidationErrorCode
class RegexExtractor:
_REF = re.compile(r"[\w/]+\.md")
def extract(self, prompt: str) -> list:
return self._REF.findall(prompt)
class AllowlistResolver:
def __init__(self, known: set):
self._known = known
def resolve_all(self, references: list) -> list:
errors = []
for ref in references:
if ref not in self._known:
errors.append(
ValidationError(
code=ValidationErrorCode.REFERENCE_NOT_FOUND,
message=f"Reference not found: {ref}",
reference=ref,
severity="error",
layer="resolution",
remediation="Add the document to the knowledge base.",
)
)
return errors
class DemoLakeFS:
def get_commit(self, repository: str, branch: str) -> str:
return "demo0000000000000000000000000000000000000"
engine = PromptValidationEngine(
extractor=RegexExtractor(),
resolver=AllowlistResolver({"kb/faq.md"}),
lakefs_client=DemoLakeFS(),
repository="knowledge-base",
branch="main",
mode="strict",
)
report = engine.validate("See kb/faq.md and kb/missing.md")
print(report.status, report.references_checked, report.has_errors)
```
```python
PromptValidationEngine(
extractor,
resolver,
lakefs_client,
repository,
branch="main",
mode="strict",
semantic_validator=None,
)
.validate(prompt) -> ValidationReport
```
### `ValidationReport`
Attributes: `status`, `errors`, `warnings`, `references_checked`,
`validation_time_ms`, `lakefs_commit`, `has_errors`, `has_warnings`, plus
`to_dict()`.
### `ValidationError`
```python
ValidationError(
code, # ValidationErrorCode
message,
reference,
severity,
layer,
remediation=None,
metadata=None,
)
```
### `ValidationErrorCode`
Enum: `INVALID_SYNTAX`, `REFERENCE_AMBIGUOUS`, `REFERENCE_NOT_FOUND`,
`REFERENCE_GONE`, `VERSION_MISMATCH`, `SCHEMA_INVALID`, `LAKEFS_UNAVAILABLE`.
### Pluggable protocols
`Extractor.extract(prompt) -> list`, `Resolver.resolve_all(references) -> list`,
and `SemanticValidatorProtocol.validate_semantic(prompt, references) -> list`.
## briefcase.guardrails
```bash
pip install briefcase-ai[guardrails]
```
`GuardrailEnv` is a protocol. Subclass `BaseGuardrailEnv` and implement
`evaluate`.
### `BaseGuardrailEnv`, `EvalRequest`, `EvalResult`, `Effect`
```python
from briefcase.guardrails import BaseGuardrailEnv, EvalRequest, EvalResult, Effect
class QueueGuardrail(BaseGuardrailEnv):
@property
def name(self) -> str:
return "queue_access"
@property
def request_space(self):
return {}
def evaluate(self, request: EvalRequest) -> EvalResult:
effect = Effect.ALLOW if request.context.get("priority") == "high" else Effect.DENY
return EvalResult(effect=effect, guardrail_name=self.name, reason="priority check")
guardrail = QueueGuardrail()
request = EvalRequest(
agent="triage-bot",
action="route",
resource="queue:billing",
context={"priority": "high"},
)
result = guardrail.evaluate(request)
print(result.effect, result.is_allowed)
```
```python
EvalRequest(agent, action, resource, context={}, request_id=None)
EvalResult(effect, guardrail_name, reason=None, policy_id=None,
lakefs_sha=None, eval_time_ms=0.0, metadata={})
.is_allowed
Effect.ALLOW / Effect.DENY
```
### `make()`
```python
from briefcase.guardrails import make
# env = make("registered-guardrail-id", **kwargs)
```
### `GuardrailPipeline`
```python
from briefcase.guardrails import GuardrailPipeline
pipeline = GuardrailPipeline(stages=[guardrail])
pipeline_result = pipeline.evaluate(request)
print(pipeline.name, pipeline.check_compatibility())
```
```python
GuardrailPipeline(stages, mode=PipelineMode.FIRST_DENY, name="pipeline")
.evaluate(request) -> PipelineResult
.check_compatibility()
.stages
```
## briefcase.rag
```bash
pip install briefcase-ai[rag]
```
Versions an embedding index so it can be invalidated and rebuilt when documents
or the embedding model change.
### `VersionedEmbeddingPipeline`, `Document`
```python
from briefcase.rag import VersionedEmbeddingPipeline, Document
class EmbeddingModel:
def embed(self, texts):
return [[0.1, 0.2, 0.3] for _ in texts]
pipeline = VersionedEmbeddingPipeline(embedding_model=EmbeddingModel())
documents = [
Document(id="doc-1", content="Reset your password from settings.", metadata={"topic": "account"}),
]
print(documents[0].content_hash[:10])
batch = pipeline.create_embedding_batch(documents)
manifest = pipeline.create_manifest("faq-index", [batch])
report = pipeline.check_invalidation("faq-index", documents)
print(manifest.index_name, report.is_valid)
```
```python
VersionedEmbeddingPipeline(embedding_model=None, lakefs_client=None,
repository=None, branch="main")
.create_embedding_batch(documents, batch_id=None, source_commit=None)
.create_manifest(index_name, batches, metadata=None)
.check_invalidation(index_name, current_documents, ...)
.rebuild_index(index_name, documents, source_commit=None, batch_id=None)
.get_latest_manifest(index_name)
.get_manifests(index_name, limit=None)
Document(id, content, metadata={}, path="")
.content_hash
```
## briefcase.correlation
```bash
pip install briefcase-ai[correlation]
```
Correlates multiple agents executed within one workflow context.
### `briefcase_workflow`, `get_current_workflow`
```python
from unittest.mock import Mock
from briefcase.correlation import briefcase_workflow, get_current_workflow
client = Mock()
with briefcase_workflow("ticket-triage", client) as workflow:
print(workflow.workflow_id)
workflow.register_agent("agent-1", "classifier")
workflow.register_agent("agent-2", "responder")
print(get_current_workflow() is workflow)
```
```python
briefcase_workflow(workflow_name, briefcase_client, workflow_id=None)
# yields BriefcaseWorkflowContext
# .workflow_id
# .register_agent(agent_id, agent_type)
get_current_workflow() -> Optional[BriefcaseWorkflowContext]
```
### Trace propagation
```python
from briefcase.correlation import (
TraceContextCarrier,
inject_trace_context,
extract_trace_context,
)
headers = inject_trace_context({})
extract_trace_context(headers)
```
## briefcase.events
```bash
pip install briefcase-ai[events]
```
Emit functions are coroutines; `await` them inside an async context.
### `BriefcaseEvent`, `emit()`
```python
import asyncio
from briefcase.events import (
BriefcaseEvent,
emit,
emit_low_confidence,
emit_drift_detected,
)
async def main():
event = BriefcaseEvent(
event_type="low_confidence",
decision_id="dec-1",
payload={"confidence": 0.4},
)
await emit(event)
await emit_low_confidence({"id": "dec-1"}, 0.4, 0.7)
await emit_drift_detected({"id": "dec-1"}, {"drift_score": 0.3})
asyncio.run(main())
```
```python
BriefcaseEvent(event_type, decision_id, timestamp=..., payload={}, idempotency_key=...)
async emit(event)
async emit_low_confidence(decision, confidence, threshold)
async emit_drift_detected(decision, details=None)
```
## briefcase.external
```bash
pip install briefcase-ai[external]
```
Snapshots external data sources (API responses, database query results, file
fetches) and detects drift between them.
### `ExternalDataTracker`, `SnapshotPolicy`
```python
from briefcase.external import (
ExternalDataTracker,
SnapshotPolicy,
SnapshotFrequency,
)
tracker = ExternalDataTracker(
default_policy=SnapshotPolicy(
frequency=SnapshotFrequency.ON_CHANGE,
retention_days=30,
),
)
result = tracker.track_api_call(
api_name="product-catalog",
endpoint="/products",
method="GET",
response_data={"items": [1, 2, 3]},
record_count=3,
)
print(result["snapshot_id"], result["drift_detected"])
snapshot = tracker.get_latest_snapshot("product-catalog")
print(snapshot.source_name)
```
```python
ExternalDataTracker(lakefs_client=None, repository=None, branch="main",
default_policy=None, sanitizer=None)
.track_api_call(api_name, endpoint, method, response_data, ...)
.track_db_query(db_system, db_name, query, result_data=None, ...)
.track_file_fetch(source_name, file_data, file_path=None, ...)
.detect_drift(source_name, current_data=None, ...)
.compare_snapshots(snapshot_a_id, snapshot_b_id)
.correct_snapshot(parent_snapshot_id, corrected_data, *, source=None, ...)
SnapshotPolicy(frequency=SnapshotFrequency.ON_CHANGE, retention_days=90,
change_threshold=0.0, max_snapshots=0, compress=False)
SnapshotFrequency.EVERY_CALL / ON_CHANGE / HOURLY / DAILY / WEEKLY
```
## briefcase.routing
```bash
pip install briefcase-ai[routing]
```
The legacy `BaseRouter` interface and a newer policy-versioned routing layer.
### Legacy `BaseRouter`
```python
from briefcase.routing import BaseRouter, RoutingDecision
class StaticRouter(BaseRouter):
def route(self, decision_context) -> RoutingDecision:
return RoutingDecision(
action="senior-agent",
source="static",
eval_time_ms=0.1,
reason="default route",
)
router = StaticRouter()
decision = router.route({"priority": "high"})
print(decision.action, decision.source)
```
### Policy layer: `PolicyRegistry`, `PolicyVersion`, `PolicyRule`, `AgentRouter`
```python
from datetime import datetime, timezone
from briefcase.routing import (
PolicyRegistry,
PolicyVersion,
PolicyRule,
AgentRouter,
)
registry = PolicyRegistry()
policy = PolicyVersion(
policy_id="ticket-routing",
version="1",
rules=[
PolicyRule(
rule_id="high-priority",
condition={"priority": "high"},
choice="senior-agent",
rationale="High priority tickets go to senior agents.",
),
],
default_choice="general-agent",
)
registry.publish(policy, valid_from=datetime.now(timezone.utc))
router = AgentRouter(registry, use_case="ticket-routing", policy_id="ticket-routing")
decision = router.route({"priority": "high"})
print(decision.selected, decision.matched_rule_id, decision.policy_version)
```
```python
PolicyRegistry(store=None)
.publish(policy, *, valid_from, transaction_time=None, source="policy_registry")
.get(policy_id, *, as_of_transaction_time=None, as_of_valid_time=None)
.history(policy_id)
PolicyVersion(policy_id, version, rules, default_choice=None, description=None)
.select(context) -> PolicyEvaluationResult
PolicyRule(rule_id, condition, choice, rationale=None)
.matches(context) -> bool
AgentRouter(registry, *, use_case, policy_id, candidates_provider=None)
.route(context, *, evidence_refs=None, as_of_transaction_time=None) -> AgentRoutingDecision
```
`AgentRoutingDecision` attributes: `decision_id`, `use_case`, `context`,
`candidates`, `selected`, `policy_id`, `policy_version`, `matched_rule_id`,
`evidence_refs`, `rationale`, `decided_at`, plus `to_dict()`.
## briefcase.bitemporal
```bash
pip install briefcase-ai[bitemporal]
```
Append-only store that tracks both valid time (when a fact is true) and
transaction time (when it was recorded), so any past state can be reconstructed.
An Iceberg-backed store is available via `pip install briefcase-ai[bitemporal-iceberg]`.
### `BitemporalRecord`, `InMemoryBitemporalStore`
```python
from datetime import datetime, timezone
from briefcase.bitemporal import (
BitemporalRecord,
InMemoryBitemporalStore,
AsOfView,
append_correction,
)
store = InMemoryBitemporalStore()
now = datetime.now(timezone.utc)
record = BitemporalRecord.new(
key="config:max_retries",
valid_time=now,
value=3,
source="config-service",
)
store.append(record)
print(store.latest("config:max_retries").value, record.content_hash()[:12])
# Append-only correction (the original stays in history).
append_correction(store, record, 5, source="ops")
print(store.latest("config:max_retries").value)
print(len(store.history("config:max_retries")))
# Reconstruct the store as of a transaction time.
view = AsOfView(store, transaction_time=datetime.now(timezone.utc))
print(view.as_of("config:max_retries").value)
```
```python
BitemporalRecord.new(key, valid_time, value, source, *, transaction_time=None,
decision=None, source_trust_level=None,
parent_record_id=None, metadata=None, record_id=None)
.content_hash() -> str
.record_id
InMemoryBitemporalStore()
.append(record)
.append_many(records)
.latest(key)
.history(key)
.as_of(key, *, transaction_time=None, valid_time=None)
.keys()
AsOfView(store, *, transaction_time=None, valid_time=None)
append_correction(store, original, corrected_value, *, source=None, ...)
batch_append(store, records, *, transaction_time=None)
stream_append(store, record)
```
## briefcase.compliance
```bash
pip install briefcase-ai[compliance]
```
Builds a tamper-evident bundle that reproduces a routing decision together with
the policy version and evidence records in effect at the decision's transaction
time. Integrity is protected by a SHA-256 content hash; `verify()` raises if the
bundle was altered.
### `ExaminerBundle`
```python
from datetime import datetime, timezone
from briefcase.bitemporal import BitemporalRecord, InMemoryBitemporalStore
from briefcase.routing import PolicyRegistry, PolicyVersion, PolicyRule, AgentRouter
from briefcase.compliance import ExaminerBundle, BundleIntegrityError
store = InMemoryBitemporalStore()
now = datetime.now(timezone.utc)
evidence = BitemporalRecord.new(
key="config:max_retries",
valid_time=now,
value=3,
source="config-service",
)
store.append(evidence)
registry = PolicyRegistry()
policy = PolicyVersion(
policy_id="ticket-routing",
version="1",
rules=[PolicyRule(rule_id="gold-tier", condition={"tier": "gold"}, choice="priority-queue")],
default_choice="standard-queue",
)
registry.publish(policy, valid_from=now)
router = AgentRouter(registry, use_case="ticket-routing", policy_id="ticket-routing")
decision = router.route({"tier": "gold"}, evidence_refs=[evidence.record_id])
bundle = ExaminerBundle.build(decision, store, registry)
print(bundle.content_hash) # "sha256:..."
bundle.verify() # raises BundleIntegrityError if tampered
restored = ExaminerBundle.from_json(bundle.to_json(indent=2))
restored.verify()
```
```python
ExaminerBundle.build(decision, evidence_store, policy_registry, *,
as_of_transaction_time=None, metadata=None) -> ExaminerBundle
.verify() # raises BundleIntegrityError
.to_json(*, indent=None)
.from_json(s)
.to_dict() / .from_dict(d)
.content_hash # SHA-256
```
`evidence_refs` must contain the `record_id` of each evidence record in the
store.
## briefcase.otel
```bash
pip install briefcase-ai[otel]
```
### `get_tracer()`
```python
from briefcase.otel import get_tracer
tracer = get_tracer("briefcase")
```
```python
get_tracer(name="briefcase")
```
## briefcase.exporters
```bash
pip install briefcase-ai
```
Stock exporters ship in the base package. The fastest way to wire one up is
`briefcase.observe(...)`; construct them directly when you need full control.
### `ConsoleExporter`, `JSONLFileExporter`, `MemoryExporter`
```python
import briefcase
from briefcase.exporters import ConsoleExporter, JSONLFileExporter, MemoryExporter
console = ConsoleExporter() # JSON lines to stderr (default)
jsonl = JSONLFileExporter("runs.jsonl") # append-only, thread-safe
memory = MemoryExporter() # collect records in .records
briefcase.setup(exporter=memory) # or briefcase.observe(memory)
@briefcase.capture(async_capture=False)
def classify_ticket(text: str) -> str:
return "account_access"
classify_ticket("reset my password")
print(memory.records[0]["function_name"]) # "classify_ticket"
memory.clear()
```
```python
ConsoleExporter(stream=None, *, pretty=False) # default stream: sys.stderr
JSONLFileExporter(path)
MemoryExporter()
.records # list of captured decision records
.clear()
```
### `BaseExporter`
```python
from briefcase.exporters import BaseExporter
class LoggingExporter(BaseExporter):
async def export(self, decision) -> bool:
print(decision)
return True
async def flush(self) -> None:
pass
async def close(self) -> None:
pass
```
```python
BaseExporter()
async export(decision) -> bool
async flush()
async close()
```
## briefcase.mcp
```bash
pip install briefcase-ai[mcp]
```
Exposes safe SDK operations to MCP-capable clients (Cursor, Claude Code, Codex,
Replit). Run with the `briefcase-mcp` console script or `python -m briefcase.mcp`.
The `mcp` extra installs `mcp>=1.2`.
### `build_server()`, `main()`
```python
from briefcase.mcp import build_server, main
server = build_server() # returns a FastMCP server
# main() is the entry point used by the briefcase-mcp console script
```
```python
build_server() -> FastMCP
main() -> None
```
Tools exposed to MCP clients:
```python
sanitize_text(text) -> {"sanitized", "redactions"} # wraps briefcase.sanitize
estimate_cost(model, input_tokens, output_tokens) # wraps briefcase.cost
-> {"model", "input_cost", "output_cost", "total_cost"}
analyze_drift(outputs: list[str]) # wraps briefcase.drift
-> {"consistency_score", "agreement_rate", "consensus_output", "status"}
how_to(topic="") -> str # usage guidance
```
The server also exposes a `briefcase://llms-full.txt` resource with the full
usage guide.
## briefcase.integrations.lakefs
```bash
pip install briefcase-ai[lakefs]
```
Wraps a lakeFS repository so file reads are captured with the commit SHA they
were read at. Without the `lakefs` package installed, the client runs in mock
mode.
### `VersionedClient`
```python
from unittest.mock import Mock
from briefcase.integrations.lakefs import VersionedClient
client = Mock()
versioned_client = VersionedClient(
repository="knowledge-base",
branch="main",
briefcase_client=client,
)
if versioned_client.object_exists("config/defaults.json"):
data = versioned_client.read_object("config/defaults.json")
versioned_client.list_objects(prefix="config/")
print(versioned_client.get_commit())
```
```python
VersionedClient(repository, branch, commit="latest", briefcase_client=None, ...)
.read_object(path, return_metadata=False)
.upload_object(path, data, content_type="application/octet-stream")
.list_objects(prefix="")
.object_exists(path)
.get_commit()
```
### `versioned_context`, `versioned`
```python
from briefcase.integrations.lakefs import versioned_context, versioned
class BriefcaseClient:
"""Stand-in for a configured Briefcase client (mock mode without lakeFS)."""
config = {
"lakefs_endpoint": "https://lakefs.example.com/api/v1",
"lakefs_access_key": "access-key",
"lakefs_secret_key": "secret-key",
}
client = BriefcaseClient()
# Context manager
with versioned_context(client, "knowledge-base", "main") as lakefs:
config = lakefs.read_object("config/defaults.json")
commit = lakefs.get_commit()
# Decorator: injects the client as `versioned_client`
@versioned(repository="knowledge-base", branch="main")
def load_config(versioned_client=None) -> dict:
raw = versioned_client.read_object("config/defaults.json")
return {"commit": versioned_client.get_commit()}
load_config(briefcase_client=client)
```
```python
versioned_context(briefcase_client, repository, branch="main", commit="latest", **kwargs)
versioned(repository, branch="main", commit="latest", client_param="versioned_client")
```
## Next steps
- Python SDK — Install, import paths, logging, and lazy-import behavior in prose. (/sdk/python/)
- Exporters — Where captured decisions go and how to write your own backend. (/features/exporters/)
## Glossary
Source: https://briefcaseai.io/reference/glossary/
> Definitions for the core vocabulary used across Briefcase — decisions, evidence, policies, bitemporal time, and verifiable bundles.
The terms below appear throughout the docs. They are grouped by the lifecycle act they belong to.
## Capture
**Decision** — A single choice an AI system makes that you want to govern: a classification, a route, an approval. In Briefcase it is recorded as a [`DecisionSnapshot`](/getting-started/core-concepts/).
**`DecisionSnapshot`** — The structured, persistent record of one decision: its inputs, outputs, model parameters, execution context, and timing. You build it, store it, reload it, and replay it.
**`@capture`** — A decorator that records a lightweight dict for every call to a function and hands it to an exporter. The quick path for live observability; it does not persist a `DecisionSnapshot` on its own.
**Input / Output** — Typed wrappers (`Input(name, value, data_type)`, `Output(...)`) around a single named value. An `Output` can carry a confidence score.
**`ModelParameters`** — The model configuration captured at call time: model name, provider, and per-parameter settings. Captured so you can tell when a model change caused output drift.
**`ExecutionContext`** — The runtime environment a decision ran in: runtime version, resolved dependencies, random seed, and environment variables. Captured so a [replay](/features/replay/) can run in a comparable environment.
**Exporter** — A sink that receives captured decision records — console, JSONL file, in-memory, or a custom backend. Wired up with [`observe()`](/features/exporters/).
**Fingerprint** — A stable hash of a decision's content (`DecisionSnapshot.fingerprint()`), used to compare and group decisions.
## Control
**Guardrail** — A control that decides whether an agent may perform an action on a resource, returning an `EvalResult`. Deny-by-default and side-effect-free. See [Guardrails](/advanced/guardrails/).
**`EvalRequest` / `EvalResult` / `Effect`** — A guardrail evaluates an `EvalRequest` (agent, action, resource, context) and returns an `EvalResult` whose `Effect` is `ALLOW` or `DENY`.
**Fail closed** — The principle that a control which errors must deny, never allow. An error should never grant access.
**Routing decision** — The choice of where a decision goes (e.g. which queue or handler), produced by a router. See [Routing](/advanced/routing/).
**Policy / `PolicyVersion` / `PolicyRule`** — A policy is a named set of rules. A `PolicyVersion` is an immutable, published snapshot of those rules; a `PolicyRule` maps a condition to a choice with a rationale. See [Versioned Routing Policy](/advanced/versioned-routing-policy/).
## Store & Query
**Storage backend** — A durable store for decisions and snapshots (e.g. `SqliteBackend`). Queryable via `SnapshotQuery`. See [Storage Adapters](/features/storage-adapters/).
**Valid time** — When a fact was true in the real world.
**Transaction time** — When the system learned about that fact and recorded it.
**Bitemporal record** — An append-only record carrying both valid time and transaction time, so corrections are new records rather than edits and any past state can be reconstructed. See [Bitemporal Storage](/advanced/bitemporal-storage/).
**As-of view** — A reconstruction of a store as it stood at a chosen transaction (or valid) time — what was known then, without look-ahead.
**Snapshot policy** — Controls how often an [external source](/advanced/external-data/) is snapshotted (`EVERY_CALL`, `ON_CHANGE`, `HOURLY`, `DAILY`, `WEEKLY`).
**Manifest** — In [RAG versioning](/advanced/rag-versioning/), the atomic record of which documents and embedding model produced an index, used to detect staleness.
## Replay & Verify
**Replay** — Re-executing a stored decision and comparing the result to the original. Modes are `strict` (exact) and `tolerant` (allows minor differences). See [Deterministic Replay](/features/replay/).
**Drift** — How much a model's outputs vary across repeated runs of the same decision. Measured by a consistency score and related metrics. See [Drift Detection](/features/drift-detection/).
**Audit bundle / `ExaminerBundle`** — A self-contained artifact that joins a decision, its bitemporal evidence, and the policy version in effect, sealed with a SHA-256 content hash. See [Audit Bundles](/advanced/compliance-bundles/).
**Content hash** — The SHA-256 hash that seals a bundle. `verify()` recomputes it and raises if a single byte changed.
> Still missing a term?
If a term here is unfamiliar, start with [Core Concepts](/getting-started/core-concepts/) for the object model, or [Why Briefcase](/getting-started/why-briefcase/) for how the acts fit together.
========================================================================
# Resources
========================================================================
## Architecture
Source: https://briefcaseai.io/resources/architecture/
> System architecture of Briefcase AI.
## Overview
Briefcase AI is a layered system. The Python package you install (`briefcase-ai`) is the SDK surface — the `@capture` decorator, configuration, and pure-Python feature modules. Underneath, a PyO3 binding crate exposes a native extension module (`briefcase._native`) backed by a fast Rust core (~11K lines, the `briefcase-core` crate) that provides high-performance decision tracking, replay, drift, cost, sanitization, and SQLite storage.
```mermaid
graph TD
A[Python package: briefcase-ai] --> B[PyO3 bindings: briefcase._native]
B --> C[Rust core: briefcase-core]
C --> D[SQLite storage backend]
```
> Diagram description
A top-down stack of four layers. The Python package `briefcase-ai` calls the
PyO3 bindings `briefcase._native`, which call the Rust core `briefcase-core`,
which in turn reads and writes the SQLite storage backend.
## Rust Core (`briefcase-core`)
The core crate lives in `crates/briefcase-core` and exposes feature-gated modules:
- `models` - `DecisionSnapshot`, `Input`, `Output`, `ModelParameters`, and related types
- `storage` - the SQLite storage backend (`SqliteBackend`)
- `replay` - the deterministic replay engine
- `drift` - drift and consistency calculation
- `cost` - token-cost estimation and budget checks
- `sanitization` - PII detection and redaction
Feature flags (`recording`, `async`, `storage`, `replay`, `drift`, `sanitize`, `otel`, `tokens`, and others) control which modules compile. The Python extension is built with the full feature set.
## PyO3 Bindings (`briefcase._native`)
The `bindings/python` crate (`briefcase-python`, library name `briefcase_native`) uses [PyO3](https://pyo3.rs/) and [maturin](https://github.com/PyO3/maturin) to compile the Rust core into the `briefcase._native` extension module. The native-backed Python modules (`briefcase.cost`, `briefcase.drift`, `briefcase.sanitize`, `briefcase.storage`, `briefcase.replay`) import their classes from this extension.
## Python Package (`briefcase-ai`)
The Python layer adds:
- the `@capture` decorator, which records a lightweight decision dict and ships it through an exporter
- configuration via `setup()`, `init()`, and `BriefcaseConfig`
- pure-Python feature modules: validation, guardrails, RAG versioning, correlation, events, external-data tracking, routing, bitemporal primitives, and audit bundles
- an optional lakeFS integration (`briefcase.integrations.lakefs`) — one bundled versioned-data source; others plug in through the generic VCS protocol
## Capture and Replay Flow
The `@capture` decorator and the native runtime layer are separate paths. `@capture` records a dict and hands it to an exporter; persistence and replay use the native `DecisionSnapshot` objects directly.
```mermaid
sequenceDiagram
participant App
participant Capture as capture decorator
participant Exporter
App->>Capture: call function
Capture->>App: function result
Capture->>Exporter: export decision dict
```
> Diagram description
A sequence across App, the capture decorator, and the Exporter. The App calls
the wrapped function through the capture decorator, which returns the result to
the App and then exports the recorded decision dict to the Exporter.
```mermaid
sequenceDiagram
participant App
participant Storage as SqliteBackend
participant Replay as ReplayEngine
App->>Storage: save_decision(snapshot)
Storage-->>App: decision_id
App->>Replay: replay(decision_id, mode)
Replay->>Storage: load snapshot
Replay-->>App: ReplayResult
```
> Diagram description
A sequence across App, the SqliteBackend, and the ReplayEngine. The App saves a
snapshot to storage and receives a decision_id, then asks the ReplayEngine to
replay that id; the engine loads the snapshot from storage and returns a
ReplayResult to the App.
## Next steps
- Core Concepts — How snapshots, capture, and replay fit together in practice. (/getting-started/core-concepts/)
- Decision Recording — Record structured decisions that flow through these layers. (/features/decision-recording/)
## Examples
Source: https://briefcaseai.io/resources/examples/
> Example code and use cases for Briefcase AI.
## Capture a Decision
The `@capture` decorator records a lightweight decision dict and hands it to an exporter. It does not persist a native `DecisionSnapshot` itself.
```python
from briefcase import capture
@capture(decision_type="summarize")
def summarize(text: str) -> str:
# Replace with your real model call, e.g. client.responses.create(...).
return text[:280]
result = summarize("Long document text...")
```
## Configure an Exporter
`briefcase.observe()` wires up an exporter in one line and returns it, so
`@capture` decisions are actually emitted. Pass `"console"`, `"memory"`, a
`*.jsonl` path, or a `BaseExporter` instance.
```python
import briefcase
mem = briefcase.observe("memory") # or "console", or "decisions.jsonl"
@briefcase.capture(decision_type="summarize", async_capture=False)
def summarize(text: str) -> str:
return text[:280]
summarize("Long document text...")
print(mem.records[0])
```
The stock exporters live in `briefcase.exporters`: `ConsoleExporter` (JSON lines
to stderr), `JSONLFileExporter` (append to a file), and `MemoryExporter`
(collect in `.records`). For full control, subclass `BaseExporter` and register
it with `setup()` or pass it to `observe()`.
```python
from briefcase import setup, capture
from briefcase.exporters import BaseExporter
class PrintExporter(BaseExporter):
async def export(self, decision) -> bool:
print(decision)
return True
async def flush(self) -> None:
...
async def close(self) -> None:
...
setup(exporter=PrintExporter())
@capture()
def classify(text: str) -> str:
return "billing"
```
See [Exporters](/features/exporters/) for the full reference.
## Persist and Replay a Snapshot
The native runtime layer is separate from `@capture`. Call `init()` to start the native runtime, build a `DecisionSnapshot`, save it to a `SqliteBackend`, then replay it with the `ReplayEngine`.
```python
from briefcase import DecisionSnapshot, Input, Output, init
from briefcase.storage import SqliteBackend
from briefcase.replay import ReplayEngine
init() # start the native runtime before persisting
# Record a classify_ticket decision from the support-triage agent.
decision = (
DecisionSnapshot("classify_ticket")
.with_module("support_service")
)
decision.add_input(Input("ticket_text", "My invoice is wrong", "string"))
decision.add_output(Output("category", "billing", "string").with_confidence(0.93))
# Persist it. SqliteBackend.in_memory() is handy for examples and tests.
storage = SqliteBackend.in_memory()
decision_id = storage.save_decision(decision)
# Replay the recorded decision against the stored snapshot.
# Modes: "strict" (exact match) or "tolerant" (the default).
engine = ReplayEngine(storage)
result = engine.replay(decision_id, "strict")
print(result.status, result.outputs_match)
```
## Measure Drift Across Outputs
`DriftCalculator.calculate_drift()` scores the consistency of a set of outputs and reports the consensus value and any outliers.
```python
from briefcase.drift import DriftCalculator
calculator = DriftCalculator()
outputs = ["billing", "billing", "account", "billing", "billing"]
metrics = calculator.calculate_drift(outputs)
print(f"Consistency: {metrics.consistency_score:.3f}")
print(f"Agreement: {metrics.agreement_rate:.3f}")
print(f"Consensus: {metrics.consensus_output}")
print(f"Status: {metrics.get_status(calculator)}")
```
## Estimate Cost and Check a Budget
```python
from briefcase.cost import CostCalculator
calculator = CostCalculator()
estimate = calculator.estimate_cost("gpt-4", 1000, 500)
print(f"Total: ${estimate.total_cost:.4f}")
status = calculator.check_budget(85.0, 100.0)
print(f"{status.percent_used:.1f}% used - {status.status}")
```
## Redact PII
```python
from briefcase.sanitize import Sanitizer
sanitizer = Sanitizer()
result = sanitizer.sanitize("Contact support at support@example.com")
print(result.sanitized)
print(f"{result.redaction_count} redaction(s)")
for redaction in result.redactions:
print(redaction.pii_type, redaction.start_position, redaction.end_position)
```
## Correlate a Multi-Agent Workflow
`briefcase_workflow` is a context manager that links every agent that runs inside it under one workflow ID.
```python
from unittest.mock import Mock
from briefcase.correlation import briefcase_workflow
client = Mock() # replace with a real Briefcase client
with briefcase_workflow("content_pipeline", client) as workflow:
print(f"Workflow: {workflow.workflow_id}")
workflow.register_agent("retriever", "retrieval")
workflow.register_agent("summarizer", "generation")
workflow.register_agent("reviewer", "moderation")
```
## Track and Compare Model Versions with oci-bai
Push images through the oci-bai gateway and use the CLI to inspect commits, compare versions,
and search the catalog:
```bash
docker tag my-base:latest localhost:8080/rl-gym-env:cuda-base
docker push localhost:8080/rl-gym-env:cuda-base
docker tag my-candidate:latest localhost:8080/rl-gym-env:cartpole
docker push localhost:8080/rl-gym-env:cartpole
# Inspect and compare
oci-bai --repo rl-gym-env log cartpole
oci-bai --repo rl-gym-env diff cuda-base cartpole --depth package
oci-bai --repo rl-gym-env diff cuda-base cartpole --depth bench
# Search the catalog
oci-bai search "format==safetensors cuda>=12.4"
```
See the [Quick Start](/evaluate/quickstart/) for the end-to-end walkthrough and
[CLI Reference](/evaluate/cli/) for every command and flag. oci-bai is in private beta —
contact [support@briefcaseai.org](mailto:support@briefcaseai.org) for access.
## More Examples
See the [examples directory](https://github.com/briefcasebrain/briefcase-ai-sdk/tree/main/examples) for complete, runnable scripts covering basic usage, prompt validation, lakeFS versioning, and multi-agent correlation.
## Next steps
- Quickstart — Install Briefcase and run your first capture in a few minutes. (/getting-started/quickstart/)
- Exporters — Choose where the decisions in these examples are sent. (/features/exporters/)
## Changelog
Source: https://briefcaseai.io/resources/changelog/
> Release history for Briefcase AI.
The format is based on [Keep a Changelog](https://keepachangelog.com/), and this project follows [Semantic Versioning](https://semver.org/).
## [3.2.1] - 2026-05-30
### Added
- **Cost rate cards** (`briefcase.cost.CostCalculator.estimate_cost`): an optional keyword-only `rate_card` selects a `platform × tier × modifier` pricing scheme — platforms `first_party` / `bedrock` / `vertex` / `azure`, tiers `standard` / `batch` / `cached` / `priority` / `flex`, and modifiers for long-context tiered pricing, data residency (`us`, +10%), and fast-mode. Cards are forgiving strings such as `"batch"`, `"bedrock:batch"`, or `"first_party:fast"`; batch/flex are 0.5×, cache reads are 0.1× of input, and regional/residency add 10%. New keyword-only `cache_read_tokens` / `cache_write_5m_tokens` / `cache_write_1h_tokens` arguments bill prompt-cache usage, a `cache_cost` field is exposed on `CostEstimate`, and `get_available_rate_cards()` lists representative cards. Omitting `rate_card` (or passing `"standard"`) preserves the previous first-party standard pricing.
- **Latest model pricing**: added Anthropic Claude 4.x (`claude-opus-4-8` / `4-7` / `4-6` / `4-5` / `4-1`, `claude-sonnet-4-6` / `4-5`, `claude-haiku-4-5` / `3-5`), OpenAI GPT-5.x (`gpt-5.5`, `gpt-5.5-pro`, `gpt-5.4`, `gpt-5.4-mini`, `gpt-5.4-nano`, `gpt-5.4-pro`), and Google Gemini (`gemini-3.5-flash`, `gemini-3.1-pro`, `gemini-3.1-flash-lite`, `gemini-3-flash`, `gemini-2.5-pro` / `flash` / `flash-lite`) to the default pricing table. All previously available models are retained.
### Changed
- `CostCalculator.estimate_cost`, `estimate_cost_from_text`, and `project_monthly_cost` gained keyword-only `rate_card` (and, for `estimate_cost`, cache-token) parameters. The existing positional arguments and their `input_tokens` / `output_tokens` keyword names are unchanged, so existing calls behave identically.
- The MCP `estimate_cost` tool accepts an optional `rate_card` and returns a `cache_cost` field.
### Fixed
- A single stable-ABI wheel per platform now installs on Python 3.9–3.13 (previously the prebuilt wheel was effectively 3.11-only).
- The source distribution now bundles `LICENSE` and `NOTICE`.
## [3.2.0] - 2026-05-30
### Added
- Stock exporters in the base package (`briefcase.exporters`): `ConsoleExporter` (JSON lines to stderr), `JSONLFileExporter` (append-only, thread-safe), and `MemoryExporter` (collects records in `.records`).
- One-line observability setup: `briefcase.observe(exporter="console", *, level=None)` wires the global exporter so `@capture` records are emitted, and returns the configured exporter for inspection.
- Centralized logging in the base package: top-level `enable_logging`, `set_log_level`, `disable_logging`, and `get_logger`. The library is silent by default (`NullHandler`); set `BRIEFCASE_LOG_LEVEL=DEBUG` to enable logging automatically at import.
- MCP server (`pip install briefcase-ai[mcp]`): the `briefcase-mcp` console script (or `python -m briefcase.mcp`) exposes `sanitize_text`, `estimate_cost`, `analyze_drift`, and `how_to` tools plus a `briefcase://llms-full.txt` resource to MCP-capable clients. `briefcase.mcp` exports `build_server()` and `main()`.
- LLM-friendly `llms.txt`, `llms-full.txt`, and `AGENTS.md` so coding assistants can discover the API surface.
- Bitemporal evidence primitives (`briefcase.bitemporal`): `BitemporalRecord`, the `BitemporalStore` protocol with in-memory, SQLite, and Iceberg backends, `AsOfView`, append-only corrections, and batch/stream ingest.
- Versioned routing policy (`briefcase.routing`): `PolicyRegistry`, `PolicyVersion`, `PolicyRule`, `AgentRouter`, and `AgentRoutingDecision`.
- Audit bundles (`briefcase.compliance`): `ExaminerBundle` with SHA-256 content-hash integrity and tamper detection.
- Top-level `briefcase.capture`, `briefcase.setup`, and `briefcase.BriefcaseConfig` re-exports for discoverability.
- `ExternalDataTracker(sanitizer=...)` to redact PII from external-data snapshots before they are persisted to durable storage.
- `scripts/check_imports.py` import-smoke test for the built wheel.
### Fixed
- `briefcase.cost`, `briefcase.drift`, and `briefcase.sanitize` now import from a clean source build. The native bindings were missing `add_class` registrations for `CostEstimate`, `BudgetStatus`, `DriftMetrics`, `Redaction`, `SanitizationResult`, and `SanitizationJsonResult`; `briefcase.cost` also imported a non-existent `BudgetAlert` type.
- `briefcase.rag` no longer fails to import on a spurious `pyarrow` requirement.
- Misleading `ImportError` messages on native-backed modules now point to reinstall/rebuild rather than no-op pip extras.
- `scripts/version_sync.py` missing `Iterable` import; the manifest now also tracks `bindings/python/Cargo.toml`.
- The flagship `examples/python-basic` and validation examples now run end-to-end.
### Security
- External-data snapshots can be redacted before persistence; redaction fails closed if it errors.
- Expanded PII detection: corrected the email regex and added GitHub, GitLab, Stripe, and Hugging Face API-key prefixes.
- Robust telemetry opt-out: `BRIEFCASE_TELEMETRY` now accepts `0`, `false`, `no`, and `off`.
- `source_name` is sanitized before use in storage object keys (path-traversal hardening).
### Changed
- Deduplicated the optional OpenTelemetry import into `briefcase._otel`.
- Extracted guardrail core data types into `briefcase.guardrails._types`.
- CI builds and tests across Python 3.9-3.13, runs the native binding tests, and import-smoke-tests the built wheel before publish.
## [3.0.0] - 2026-03-22
### Added
- Initial open-source release: decision tracking, deterministic replay, drift and cost calculation, PII sanitization, and SQLite storage, backed by a Rust core.
## FAQ
Source: https://briefcaseai.io/resources/faq/
> Frequently asked questions about Briefcase AI.
## What is Briefcase AI?
Briefcase AI is an open-source Python SDK for recording, replaying, and auditing AI decisions. It captures every input, output, and parameter of AI calls as immutable snapshots.
## What languages are supported?
You install and use Briefcase in Python — `pip install briefcase-ai`, no Rust toolchain required. The performance-critical core is written in Rust and ships precompiled inside the wheel. See [Build the Rust Core](/sdk/rust/) only if you want to contribute to that core.
## Which AI frameworks does it support?
The open-source SDK provides extensible framework protocols (guardrails, exporters, routers, event emitters). Pre-built integrations for LangChain, CrewAI, LlamaIndex, AutoGen, AG2, and OpenAI Agents are available in [Briefcase AI Enterprise](https://github.com/briefcasebrain/briefcase-ai-sdk-enterprise).
## Where are persistent decisions stored?
A `DecisionSnapshot` you persist goes to a **storage backend**: by default an in-memory SQLite database (non-persistent), or a SQLite file / custom backend you configure for production. See [Storage Adapters](/features/storage-adapters/).
## How does `@capture` emit decisions?
`@capture` is a separate, lighter path — it sends to an **exporter**, not a storage backend. It records a lightweight dict per call and emits it once you call `briefcase.observe(...)`. Pass `"console"` to write JSON lines to stderr, a path ending in `.jsonl` to append to a file, or `"memory"` to collect records in `MemoryExporter.records`. `@capture` exports in a background thread by default; use `@capture(async_capture=False)` to make a record available synchronously.
## Is there an MCP server?
Yes. Install the `mcp` extra (`pip install briefcase-ai[mcp]`) and run `briefcase-mcp` (or `python -m briefcase.mcp`). It exposes `sanitize_text`, `estimate_cost`, `analyze_drift`, and `how_to` tools to MCP-capable clients. See the [Python API Reference](/api/python/#briefcasemcp).
## Does it add latency?
The Rust core adds microseconds of overhead per decision. Storage write latency depends on your backend choice.
## Is it production-ready?
Briefcase AI is under active development. Check the [Changelog](/resources/changelog/) for the latest release status.
## How do I contribute?
See [Development](/contributing/development/) and [Code Standards](/contributing/code-standards/).
========================================================================
# Contributing
========================================================================
## Development
Source: https://briefcaseai.io/contributing/development/
> Set up a development environment for Briefcase AI.
## Prerequisites
- Rust 1.70+ (a recent stable toolchain; CI builds on `stable`)
- Python 3.9+
- [maturin](https://github.com/PyO3/maturin)
## Clone the Repository
```bash
git clone https://github.com/briefcasebrain/briefcase-ai-sdk.git
cd briefcase-ai-sdk
```
The repository is a Cargo workspace: the Rust core (`crates/briefcase-core`), the PyO3 bindings (`bindings/python`), and the Python package (`briefcase/`).
## Install Development Dependencies
```bash
pip install briefcase-ai[dev]
pip install maturin
```
## Build the Native Extension
`maturin develop` compiles the Rust core through the bindings and installs `briefcase._native` into the active environment:
```bash
maturin develop
```
## Build the Rust Core Directly
```bash
cargo build -p briefcase-core --locked
```
## Run the Test Suites
```bash
cargo test -p briefcase-core --locked # Rust core
pytest tests/ # Python facade (mocks the extension)
pytest bindings/python/tests/ # native binding tests (real extension)
```
See [Testing](/contributing/testing/) for details on the facade vs. native split.
## Development Workflow
1. Create a branch from `main`.
2. Make changes and add tests.
3. Run the formatters and linters (see [Code Standards](/contributing/code-standards/)).
4. Run the test suites.
5. Submit a pull request.
## Build the Rust Core
Source: https://briefcaseai.io/sdk/rust/
> For contributors — build the Briefcase AI Rust core and Python bindings from source.
Most users never need this page — `pip install briefcase-ai` ships precompiled
wheels with no Rust toolchain required. This guide is for **contributors** who
want to build the Rust core and Python bindings from source.
## Prerequisites
- A recent stable Rust toolchain (Rust 1.70+; CI builds on `stable`)
- Python 3.9+
- [maturin](https://github.com/PyO3/maturin)
The crates use Rust edition 2021. The `Cargo.toml` does not pin a minimum Rust version, so any current stable toolchain works.
## Clone
```bash
git clone https://github.com/briefcasebrain/briefcase-ai-sdk.git
cd briefcase-ai-sdk
```
The repository is a Cargo workspace with the core library (`crates/briefcase-core`) and the PyO3 bindings (`bindings/python`).
## Build the Rust Core
```bash
cargo build -p briefcase-core --locked
```
## Run Tests
```bash
cargo test -p briefcase-core --locked
```
## Build the Python Bindings
```bash
pip install maturin
maturin develop
```
This compiles the Rust core through the bindings and installs the `briefcase._native` extension into the active environment. Add `--release` for an optimized build:
```bash
maturin develop --release
```
See [Development](/contributing/development/) for the full contributor workflow.
## Next steps
- Development — The full contributor workflow: tooling, tests, and conventions. (/contributing/development/)
## Testing
Source: https://briefcaseai.io/contributing/testing/
> Run and write tests for Briefcase AI.
## Install Test Dependencies
```bash
pip install briefcase-ai[dev]
```
The `dev` extra installs `pytest`, `pytest-mock`, `pytest-asyncio`, `pytest-subtests`, `black`, `flake8`, and `mypy`.
## Build the Native Extension First
The Python facade tests mock `briefcase._native`, while the native binding tests run against the real extension. Build it before running either suite:
```bash
maturin develop
```
## Run the Test Suites
```bash
pytest tests/ # Python facade (mocks briefcase._native)
pytest bindings/python/tests/ # native binding tests (real extension)
python scripts/check_imports.py # smoke-test that every submodule imports
```
The facade suite and the native binding suite mock the extension differently, so run them in separate processes (do not collect both in one `pytest` invocation).
## Run Rust Tests
```bash
cargo test -p briefcase-core --locked
```
## Run Specific Tests
```bash
cargo test -p briefcase-core test_snapshot_creation
pytest tests/ -k "capture"
```
## Continuous Integration
CI builds the native extension with `maturin` and runs the Python suites across Python 3.9, 3.10, 3.11, 3.12, and 3.13. A separate job builds, tests, and clippy-lints the Rust core.
## Writing Tests
### Rust
```rust
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_snapshot_creation() {
let snapshot = DecisionSnapshot::new("ai_function")
.add_input(Input::new("query", json!("hello"), "string"))
.add_output(Output::new("response", json!("hi"), "string"));
assert_eq!(snapshot.function_name, "ai_function");
}
}
```
### Python
The facade test suite mocks `briefcase._native`, so import the public API and assert on behavior:
```python
from briefcase import capture
def test_capture_returns_result():
@capture()
def my_fn(x):
return x * 2
assert my_fn(5) == 10
```
## Code Standards
Source: https://briefcaseai.io/contributing/code-standards/
> Code style and conventions for Briefcase AI.
## Rust
- Format with `rustfmt` (configured in `.rustfmt.toml`, edition 2021)
- Run `cargo clippy` with warnings denied before committing
- All public APIs must have doc comments
```bash
cargo fmt --all -- --check
cargo clippy -p briefcase-core --locked -- -D warnings
```
## Python
- Follow PEP 8
- Use type annotations
- Format with `black`, lint with `flake8`, and type-check with `mypy`
Install the tooling with the `dev` extra:
```bash
pip install briefcase-ai[dev]
```
```bash
black briefcase/ tests/
flake8 briefcase/ tests/
mypy briefcase/
```
## Commit Messages
Use conventional commits:
```
feat: add drift detection threshold config
fix: handle empty snapshots in replay
docs: update storage adapter examples
```
## Pull Request Process
1. Branch from `main`
2. Write tests for new functionality
3. Ensure all tests pass
4. Request review from a maintainer
## Governance
Source: https://briefcaseai.io/contributing/governance/
> Project governance and decision-making.
## License
Briefcase AI is licensed under Apache-2.0. See [LICENSE](https://github.com/briefcasebrain/briefcase-ai-sdk/blob/main/LICENSE).
## Contributions
All contributions are welcome. Please read [Development](/contributing/development/) and [Code Standards](/contributing/code-standards/) before submitting a pull request.
## Maintainers
The project is maintained by the Briefcase Brain team. Maintainer decisions are made by consensus.
## Reporting Issues
File issues on [GitHub](https://github.com/briefcasebrain/briefcase-ai-sdk/issues).
]