cubepi.tracing
tracing_context
function
tracing_context(*, tags: list[str] | tuple[str, ...] | None = None, metadata: dict[str, Any] | None = None) -> Iterator[None]
Scope tags / metadata onto agent runs started inside this block.
The recorder reads these contextvars on AgentStartEvent and
stamps them on the invoke_agent span as:
cubepi.tags— tuple of strings (OTel attribute type)- one attribute per metadata key, namespaced under
cubepi.metadata.*(e.g.metadata=\{"user_id": "u-42"\}→cubepi.metadata.user_id = "u-42"). The dedicated sub-namespace keeps recorder-owned schema keys (cubepi.run_id,cubepi.turn.index, …) safe from caller-supplied collisions.
The contextvar nature means this works for concurrent agents: each asyncio task tree gets its own value. Nested blocks merge additively (tags concatenate; metadata is union with inner keys winning).
Args
tags— Tags to apply to runs started in this scope. Stored as a tuple on the span so it round-trips through OTel's attribute serializer.metadata— Per-run key/value pairs. Values must be types that OTel attributes accept (str, bool, int, float, or a tuple/list of those); other shapes will be silently dropped by the recorder.
JsonlSpanExporter
class
JsonlSpanExporter(self, directory: str | os.PathLike[str] = './cubepi-traces')
Write each ReadableSpan as one JSON line.
Files: <directory>/<YYYY-MM-DD>/<run_id>.jsonl. The date used for
the subdirectory is the span's start time (UTC). The run_id comes
from the cubepi.run_id attribute set by the cubepi Recorder;
spans without that attribute fall back to "unknown-run".
Permissions: files are created mode 0o600 (user-only).
Methods
export
export(spans: Sequence[ReadableSpan]) -> SpanExportResult
shutdown
shutdown() -> None
force_flush
force_flush(timeout_millis: int = 30000) -> bool
Meter
class
Meter(self, *, resource: Resource | None = None, exporters: list[MetricExporter] | None = None, export_interval_millis: int = 60000)
Emit OTel GenAI histograms alongside the cubepi Tracer.
Construct with a list of MetricExporter instances (e.g. the
OTLP metric exporter); call attach to start emitting.
Example:
from cubepi.tracing import Tracer, Meter
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
tracer = Tracer(service_name="my-bot", exporters=[...])
meter = Meter(
resource=tracer.resource,
exporters=[OTLPMetricExporter(endpoint="http://collector:4318/v1/metrics")],
)
tracer.attach(agent)
meter.attach(agent)
Attributes
otel_meter:Any— The underlyingopentelemetry.metrics.Meter— exposed so callers can register their own instruments.
Methods
attach
attach(agent: 'Agent') -> Callable[[], None]
Subscribe to agent and start emitting metrics.
Each attach() call gets its own _MeterState so
multiple concurrent agents attached to the same Meter don't
clobber each other's open-ns timestamps and attribute dicts
(codex overall-review MAJOR — previously all state lived on
the Meter instance, so a second agent's provider_request
overwrote the first agent's _chat_open_ns / _chat_attrs
before its response landed).
force_flush
force_flush(timeout_seconds: float = 30.0) -> bool
shutdown
shutdown(timeout_seconds: float = 30.0) -> None
attached
attached(agent: 'Agent') -> AsyncIterator['Meter']
RAII wrapper around attach.
async with enters by attaching the per-attach state, exits
by calling the detach callable returned from attach.
Mirrors Tracer.attached — use both for the cleanest
end-to-end shutdown:
async with (
Tracer(...) as tracer,
Meter(resource=tracer.resource, ...) as meter,
tracer.attached(agent),
meter.attached(agent),
):
await agent.prompt("...")
# auto: detach both + shutdown both
SCHEMA_URL
attribute
SCHEMA_URL = 'https://opentelemetry.io/schemas/1.41.0'
Tracer
class
Tracer(self, *, service_name: str | None = None, service_version: str | None = None, service_namespace: str | None = None, service_instance_id: str | None = None, deployment_environment: str | None = None, agent_name: str | None = None, agent_id: str | None = None, agent_description: str | None = None, agent_version: str | None = None, exporters: list[SpanExporter] | None = None, record_content: bool = False, redact: 'Callable[[str, Any], Any] | None' = None, resource: Resource | None = None, atexit_flush: bool = True, atexit_flush_timeout_seconds: float = 5.0)
Attaches OTel-compatible tracing to a cubepi Agent.
Construct once per process (or per service). Each attach(agent)
call wires the cubepi recorder to the agent's event stream and
provider listener registry; the returned callable detaches.
Example
from cubepi.tracing import Tracer
from cubepi.tracing.exporters import JsonlSpanExporter
tracer = Tracer(
service_name="my-bot",
agent_name="coding-agent",
exporters=[JsonlSpanExporter(directory="./cubepi-traces")],
)
detach = tracer.attach(agent)
try:
...
finally:
detach()
await tracer.shutdown()
Attributes
resource:Resourceotel_tracer:Any— The underlyingopentelemetry.trace.Tracerinstance.redact:'Callable[[str, Any], Any] | None'— Optional(key, value) -> valuefilter applied at everyset_attributesite for content attributes.
Methods
attach
attach(agent: 'Agent') -> Callable[[], Any]
Wire the cubepi recorder to agent.
Returns a detach() callable. When invoked:
- Synchronously: unsubscribes every hook, closes any spans still open from a cancelled run, sweeps MCP tool-span registrations — observable on the next line.
- Schedules a flush on the running event loop and returns the
resulting
asyncio.Taskso callers canawait detach()to block until buffered spans have been exported. Outside an async context returnsNone— callawait tracer.shutdown()separately to flush.
Either await detach() or await tracer.shutdown() (or
both) must be used in the caller's finally block; the
synchronous detach() alone does not guarantee that ended
spans have left BatchSpanProcessor (codex overall-review
MAJOR).
Also registers this Tracer's private TracerProvider with
cubepi.mcp._tracing so MCP CLIENT spans flow through
the same exporters — without this step, the MCP module falls
back to the OTel global default (a no-op provider when the
caller didn't separately call trace.set_tracer_provider).
The detach callable unregisters.
force_flush
force_flush(timeout_seconds: float = 30.0) -> bool
Block until all currently buffered spans are exported.
Returns False on timeout.
shutdown
shutdown(timeout_seconds: float = 30.0) -> None
Flush and close all exporters. Tracer is unusable after this.
attached
attached(agent: 'Agent') -> AsyncIterator['Tracer']
RAII wrapper around attach.
async with enters by attaching the recorder, exits by
running detach and (in an async context) awaiting its returned
flush Task. Equivalent to:
detach = tracer.attach(agent)
try:
...
finally:
result = detach()
if result is not None: # async context: it's a Task
await result
Use this instead of the bare attach / finally: detach()
pattern when you want the cleanup tied to a single async with block. Combines with Tracer's own context
manager:
async with Tracer(...) as tracer, tracer.attached(agent):
await agent.prompt("...")
# auto: detach (closes cancelled spans) + shutdown (flush + close)
trace
function
trace(tracer: 'Tracer | None', agent: 'Agent') -> AsyncIterator[None]
Best-effort tracing scope for one agent run.
Attaches tracer to agent on enter and detaches + flushes its
buffered spans on exit. Every tracing fault — a failed attach, detach, or
flush — is logged and swallowed, so tracing can never break or fail the
work inside the async with block. Passing tracer=None makes the
block a no-op, which lets callers gate tracing on config without branching
at the call site.
This does not shut the tracer down: the tracer is reusable across runs,
so build it once (e.g. per process) and call await tracer.shutdown()
when the owning process stops.
Unlike Tracer.attached, which surfaces flush failures to the caller,
this helper swallows them — use it when tracing is auxiliary to the work and
must never affect its outcome.
Example
async with trace(tracer, agent):
await agent.prompt("...")