Tenuo A2A Integration
Status: Production Ready
Overview
Tenuo A2A adds warrant-based authorization to agent-to-agent communication. When Agent A delegates a task to Agent B, the warrant specifies exactly what Agent B is allowed to do.
┌─────────────┐ ┌─────────────┐
│ Agent A │ Task + Warrant │ Agent B │
│ (Orchestrator)│──────────────────▶│ (Worker) │
│ │ │ │
│ │◀────────────────── │ │
│ │ Result │ │
└─────────────┘ └─────────────┘
Warrant says: "Agent B can only search arxiv.org for this task"
Use cases:
- Multi-agent systems where agents delegate tasks
- Orchestrators that dispatch work to specialized workers
- Agent networks with least-privilege access control
Not for: Single-agent tool enforcement (use tenuo.openai or tenuo.langchain instead)
Installation
uv pip install "tenuo[a2a]"
Quick Start (Minimal Example)
Server (Worker Agent):
from tenuo.a2a import A2AServerBuilder
# Build server with fluent API
server = (A2AServerBuilder()
.name("Worker")
.url("https://worker.example.com")
.key(my_signing_key) # Your identity
.accept_warrants_from(orchestrator_key) # Who can give you tasks
.build())
@server.skill("echo")
async def echo(msg: str) -> str:
return f"Echo: {msg}"
# uvicorn server:server.app --port 8000
Or use the direct constructor:
from tenuo.a2a import A2AServer
server = A2AServer(
name="Worker",
url="https://worker.example.com",
public_key=my_public_key,
trusted_issuers=[orchestrator_public_key],
)
Client (Orchestrator):
from tenuo.a2a import A2AClientBuilder
from tenuo import Warrant
# Create warrant for this task
task_warrant = (Warrant.mint_builder()
.capability("echo")
.holder(worker_public_key)
.ttl(300)
.mint(orchestrator_key))
# Build client with default warrant
client = (A2AClientBuilder()
.url("http://localhost:8000")
.warrant(task_warrant, orchestrator_key) # Pre-configure for repeated use
.build())
# Send task (warrant already configured)
result = await client.send_task(
"hello",
skill="echo",
arguments={"msg": "hello"},
)
print(result.output) # "Echo: hello"
Or use the direct constructor:
from tenuo.a2a import A2AClient
client = A2AClient("http://localhost:8000")
result = await client.send_task(
"hello",
skill="echo",
arguments={"msg": "hello"},
warrant=task_warrant,
signing_key=orchestrator_key,
)
That’s it. The warrant proves the orchestrator authorized this specific task.
Full Example (With Constraints)
Server (Worker)
from tenuo.a2a import A2AServerBuilder
from tenuo.constraints import Subpath, UrlSafe
server = (A2AServerBuilder()
.name("Research Agent")
.url("https://research-agent.example.com")
.key(my_signing_key)
.accept_warrants_from(orchestrator_public_key)
.build())
# Register skills with constraint bindings
@server.skill("search_papers", constraints={"sources": UrlSafe()})
async def search_papers(query: str, sources: list[str]) -> list[dict]:
return await do_search(query, sources)
@server.skill("read_file", constraints={"path": Subpath("/data")})
async def read_file(path: str) -> str:
with open(path) as f:
return f.read()
# uvicorn server:server.app --host 0.0.0.0 --port 8000
Automated Registration (CSR Handshake)
A2A supports an automated handshake for agent registration, eliminating the need for out-of-band key sharing. This follows the Certificate Signing Request (CSR) pattern.
The connecting agent dynamically generates a self-signed challenge token to cryptographically prove key ownership. The server verifies this signature and uses a registered handler to decide what capabilities to grant, minting a fresh delegation warrant on the fly.
Server (Control Plane / Parent Agent):
from tenuo.a2a.types import VerifiedWarrantRequest
# The handler decides whether to grant the requested capabilities
async def registration_handler(req: VerifiedWarrantRequest, issue):
if req.verified_key_hex not in ALLOWLIST:
raise RegistrationDeniedError("Agent not approved")
# Issue a new warrant bound to the requested capabilities
await issue(capabilities=req.capabilities, ttl=86400) # 24 hrs
server = (A2AServerBuilder()
.name("Control Plane")
.url("https://control.example.com")
.key(server_signing_key) # MUST be a SigningKey to issue warrants
.trust(server_signing_key.public_key)
.registration_handler(registration_handler) # Enable handshake
.build())
Client (Child Agent):
from tenuo.a2a import A2AClient
from tenuo import SigningKey
client = A2AClient("https://control.example.com")
worker_key = SigningKey.generate()
# Request a warrant with specific capabilities
# The client automatically generates the self-signed challenge token
warrant = await client.request_warrant(
signing_key=worker_key,
capabilities={"search_papers": {}}
)
# You can now immediately use this warrant (and key) for tasks
result = await client.send_task(
"Search for AI Agents papers",
skill="search_papers",
arguments={"query": "AI Agents"},
warrant=warrant,
signing_key=worker_key,
)
Note: Extension data (like AWS Nitro Enclaves or SGX TEE quotes) can be attached to the request via the extensions parameter in request_warrant() and inspected in the server handler via req.extensions.
Client (Orchestrator)
from tenuo.a2a import A2AClient
from tenuo.constraints import UrlSafe
# Discover agent capabilities
client = A2AClient("https://research-agent.example.com")
card = await client.discover()
# Attenuate warrant for this delegation
task_warrant = (my_warrant
.grant_builder()
.capability("search_papers", sources=UrlSafe(allow_domains=["arxiv.org"]))
.holder(card.public_key)
.ttl(300)
.build(my_signing_key))
# Send task with warrant
result = await client.send_task(
message="Find papers on capability-based security",
warrant=task_warrant,
skill="search_papers",
arguments={"query": "capability-based security", "sources": ["https://arxiv.org"]},
)
Streaming Tasks
For long-running tasks, use streaming to receive incremental updates:
# Stream results as they arrive
async for update in client.send_task_streaming(
message="Analyze these papers",
warrant=task_warrant,
skill="analyze_papers",
arguments={"paper_ids": ["arxiv:2401.12345"]},
):
if update.type.value == "status":
print(f"Status: {update.data.get('status')}")
elif update.type.value == "message":
print(f"Chunk: {update.data.get('content')}")
elif update.type.value == "complete":
print(f"Done: {update.data.get('output')}")
The server emits SSE events for status updates, intermediate messages, and final completion.
Stream timeout (DoS protection):
# Default timeout is 300 seconds (5 minutes)
async for update in client.send_task_streaming(
...,
stream_timeout=600.0, # 10 minute timeout
):
...
If the stream exceeds stream_timeout, a TimeoutError is raised. This prevents slow-drip DoS attacks where a malicious server holds connections indefinitely.
Proof-of-Possession (PoP)
Proof-of-Possession adds an additional security layer by requiring the client to prove they control the private key associated with the warrant’s holder.
When to Use PoP
Require PoP when:
- Agents communicate over untrusted networks (Internet, shared infrastructure)
- Compliance requires cryptographic proof of authorization
- Protection against warrant theft is critical
- Multi-hop delegation across organizational boundaries
PoP is optional when:
- All agents run on trusted infrastructure (same data center, VPC)
- Network isolation provides security (private network, mTLS)
- Performance is critical and risk is low (microsecond latency matters)
Never skip PoP when:
- Agents are on the public Internet
- Warrants have long TTLs (hours/days)
- Untrusted intermediaries exist in the call chain
How PoP Works
PoP signatures prove that the caller possesses the private key corresponding to the warrant’s sub (holder) field:
┌──────────────────────────────────────────────────────┐
│ Warrant (JWT): │
│ sub: "z6Mk..." ← Orchestrator's public key │
│ grants: ["search"] │
│ exp: 1234567890 │
│ Signature: <signed by control plane> │
└──────────────────────────────────────────────────────┘
+
┌──────────────────────────────────────────────────────┐
│ PoP Signature: │
│ sign(orchestrator_private_key, "search", args, ts) │
│ → Proves orchestrator controls the private key │
└──────────────────────────────────────────────────────┘
=
Authorization Proof
What PoP Prevents:
- Warrant Theft: If an attacker intercepts a warrant, they can’t use it without the private key
- Replay Attacks: Each PoP signature includes a timestamp and is checked once
- Man-in-the-Middle: Modified arguments invalidate the PoP signature
Client Usage
Enable PoP by passing signing_key to send_task():
from tenuo.a2a import A2AClient
client = A2AClient("https://worker.example.com")
# Without PoP (only warrant validation)
result = await client.send_task(
"search for papers",
warrant=my_warrant,
skill="search",
arguments={"query": "papers"},
)
# With PoP (warrant + signature proof)
result = await client.send_task(
"search for papers",
warrant=my_warrant,
skill="search",
arguments={"query": "papers"},
signing_key=orchestrator_key, # ← Proves possession
)
Or configure PoP by default using the builder:
from tenuo.a2a import A2AClientBuilder
client = (A2AClientBuilder()
.url("https://worker.example.com")
.warrant(my_warrant, orchestrator_key) # ← Pre-configure PoP
.build())
# All requests automatically include PoP
result = await client.send_task(
"search for papers",
skill="search",
arguments={"query": "papers"},
)
Server Configuration
Control PoP requirements on the server:
server = A2AServer(
name="Worker",
url="https://worker.example.com",
public_key=worker_public_key,
trusted_issuers=[control_plane_key], # Required — warrants must be signed by these keys
# PoP configuration
require_pop=True, # Reject requests without PoP (default: True)
)
Important: Always configure
trusted_issuers. Without it, the builder raisesValueError. This ensures only warrants signed by your control plane are accepted — self-signed warrants from attackers are rejected.
Security Defaults:
trusted_issuersis required (fail-closed)require_pop=Trueby default (fail-safe)- Can be disabled via
TENUO_A2A_REQUIRE_POP=falseenvironment variable - If
require_pop=Truebut client doesn’t provide PoP →PopRequiredError
Performance Impact
PoP adds cryptographic overhead:
Without PoP:
- Warrant verification: ~0.5ms (Ed25519 signature check)
With PoP:
- Warrant verification: ~0.5ms
- PoP signature generation (client): ~0.3ms
- PoP signature verification (server): ~0.3ms
- Total overhead: ~1.1ms per request
Recommendation: Always use PoP in production unless you have network-level security (mTLS + VPC).
Error Handling
from tenuo.a2a import PopRequiredError, PopVerificationError
try:
result = await client.send_task("search", warrant=warrant, skill="search", arguments={})
except PopRequiredError:
print("Server requires PoP signature - add signing_key parameter")
except PopVerificationError as e:
print(f"PoP signature invalid: {e}")
# Possible causes:
# - Wrong signing key (not matching warrant.sub)
# - Arguments modified after signing
# - Clock skew between client/server
Debugging PoP Issues
Issue: PopVerificationError: Signature verification failed
Causes:
- Wrong signing key: Key doesn’t match warrant’s
subfield - Modified arguments: Arguments changed after PoP computation
- Clock skew: Client/server clocks differ significantly
Debug:
# Verify signing key matches warrant holder
assert warrant.sub == str(signing_key.public_key)
# Log PoP computation
import logging
logging.getLogger("tenuo.a2a.client").setLevel(logging.DEBUG)
# Shows: "Generated PoP signature for skill 'search'"
Server Configuration
server = A2AServer(
# Required
name="Agent Name", # Display name
url="https://agent.example.com", # Public URL (for audience validation)
public_key=my_public_key, # This agent's public key
trusted_issuers=[...], # List of trusted issuer public keys
# Optional (shown with defaults)
trust_delegated=True, # Accept warrants delegated from trusted issuers
require_warrant=True, # Reject tasks without warrants
require_audience=True, # Require warrant audience matches our URL
check_replay=True, # Enforce jti uniqueness
replay_window=3600, # Seconds to remember jti values
max_chain_depth=10, # Maximum delegation chain length
# Audit
audit_log=sys.stderr, # Destination (file, callable, or stderr)
audit_format="json", # "json" or "text"
)
Trust Model
The server trusts warrants based on trusted_issuers:
- Direct Trust: Warrant signed by a trusted issuer → accepted
- Delegated Trust (if
trust_delegated=True): Warrant with valid chain back to trusted issuer → accepted
┌─────────────────────┐
│ Trusted Root │ ← In trusted_issuers
│ (Control Plane) │
└──────────┬──────────┘
│ delegates
▼
┌─────────────────────┐
│ Orchestrator A │ ← Warrant signed by root
└──────────┬──────────┘
│ delegates
▼
┌─────────────────────┐
│ Worker B │ ← Warrant with chain [root → A → B]
└─────────────────────┘
Skill Constraints
Constraints bind warrant parameters to skill parameters:
@server.skill("read_file", constraints={"path": Subpath("/data")})
async def read_file(path: str) -> str:
# "path" constraint checked against warrant's path constraint
# Blocked if: warrant allows Subpath("/data") but arg is "/etc/passwd"
...
Constraint binding validation happens at startup:
# This raises ConstraintBindingError at startup:
@server.skill("read_file", constraints={"file_path": Subpath("/data")}) # "file_path" not a param
async def read_file(path: str) -> str: # param is "path"
...
Client Configuration
client = A2AClient(
url="https://agent.example.com",
# Optional
pin_key="z6Mk...", # Expected public key (raises KeyMismatchError if different)
timeout=30.0, # Request timeout in seconds
)
Key Pinning
Pin the expected public key to prevent TOFU (Trust On First Use) attacks:
# If agent returns different key, raises KeyMismatchError
client = A2AClient(
"https://research-agent.example.com",
pin_key="z6MkResearchAgentKey123" # From your config/secrets
)
card = await client.discover() # Fails if key doesn't match
Key Format Compatibility
A2A accepts public keys in multiple formats:
# All of these work:
server = (A2AServerBuilder()
.key(signing_key) # PublicKey object
.accept_warrants_from("a1b2c3...") # Hex (64 chars)
.accept_warrants_from("z6MkpT...") # Multibase (base58btc)
.accept_warrants_from("did:key:z6MkpT...") # W3C DID
.build())
All formats are automatically normalized for comparison. Multibase and DID support requires uv pip install base58.
Agent Card (Discovery)
Agents expose their capabilities via /.well-known/agent.json:
{
"name": "Research Agent",
"url": "https://research-agent.example.com",
"skills": [
{
"id": "search_papers",
"name": "Search Papers",
"x-tenuo-constraints": {
"sources": {"type": "UrlSafe", "required": true}
}
}
],
"x-tenuo": {
"version": "0.1.0",
"required": true,
"public_key": "z6Mk..."
}
}
Delegation Chains
When delegating through multiple agents, the full chain is transmitted as a single header.
The server validates:
- Root warrant is from a trusted issuer
- Each link: child issuer = parent holder
- Skills narrow monotonically (no privilege escalation)
- Chain depth ≤
max_chain_depth
WarrantStack Transport
The current implementation packs the entire delegation chain into a single X-Tenuo-Warrant header using WarrantStack encoding, rather than the legacy two-header approach (X-Tenuo-Warrant + X-Tenuo-Warrant-Chain).
The client encodes the chain with encode_warrant_stack and the server decodes it with decode_warrant_stack_base64:
from tenuo import encode_warrant_stack
# Client sends delegation chain as a single header
chain = [root_warrant, child_warrant]
stack_b64 = encode_warrant_stack(chain)
# stack_b64 goes in X-Tenuo-Warrant header
# Server automatically detects and unpacks WarrantStack
This simplifies proxy and load-balancer configurations (one header to forward instead of two) and avoids ordering ambiguities in multi-hop chains.
Error Handling
All A2A errors inherit from A2AError and map to JSON-RPC error codes with canonical wire codes:
from tenuo.a2a import (
A2AError,
MissingWarrantError, # -32001: Warrant required but not provided
InvalidSignatureError, # -32002: Signature verification failed
UntrustedIssuerError, # -32003: Issuer not in trusted_issuers
WarrantExpiredError, # -32004: Warrant has expired
AudienceMismatchError, # -32005: Audience doesn't match server URL
ReplayDetectedError, # -32006: jti already used
SkillNotGrantedError, # -32007: Skill not in warrant grants
ConstraintViolationError, # -32008: Argument violates constraint
ChainInvalidError, # -32010: Delegation chain validation failed
KeyMismatchError, # -32012: Public key doesn't match pinned key
)
try:
result = await client.send_task(...)
except SkillNotGrantedError as e:
print(f"Skill {e.data['skill']} not in granted: {e.data['granted_skills']}")
except A2AError as e:
print(f"A2A error {e.code}: {e.message}")
Wire Code Support
A2A error responses now include canonical Tenuo wire codes (1000-2199) for cross-protocol compatibility:
{
"jsonrpc": "2.0",
"error": {
"code": -32008,
"message": "Constraint violation",
"data": {
"tenuo_code": 1501,
"field": "amount",
"reason": "Value exceeds maximum"
}
},
"id": "task_123"
}
This enables:
- Cross-protocol debugging: Same wire codes used in HTTP, gRPC, and JSON-RPC
- Precise error mapping: JSON-RPC code -32008 maps to canonical code 1501
- Machine-readable errors: Clients can programmatically handle specific error types
| A2A JSON-RPC Code | Canonical Wire Code | Name |
|---|---|---|
| -32001 | 1202 | Missing warrant |
| -32002 | 1100 | Invalid signature |
| -32003 | 1406 | Untrusted issuer |
| -32004 | 1300 | Warrant expired |
| -32007 | 1500 | Tool not authorized |
| -32008 | 1501 | Constraint violation |
| -32010 | 1405 | Chain invalid |
See wire format specification for the complete list.
Accessing the Warrant
Inside a skill, access the current warrant via context:
from tenuo.a2a import current_task_warrant
@server.skill("my_skill")
async def my_skill(query: str) -> str:
warrant = current_task_warrant.get()
if warrant:
print(f"Warrant issuer: {warrant.iss}")
print(f"Warrant subject: {warrant.sub}")
return "done"
Audit Logging
Server emits structured audit events:
# JSON format (default)
{"timestamp": "...", "event": "warrant_validated", "skill": "search", "outcome": "allowed", ...}
# Text format
[WARRANT_VALIDATED] search: allowed
Custom audit handler:
async def my_audit_handler(event: AuditEvent):
await send_to_siem(event.to_dict())
server = A2AServer(..., audit_log=my_audit_handler)
Example: Full Multi-Agent System
# control_plane.py
from tenuo import SigningKey, Warrant
control_key = SigningKey.from_env("CONTROL_PLANE_KEY")
def issue_orchestrator_warrant(orchestrator_pubkey):
return (Warrant.mint_builder()
.capability("search_papers", {})
.capability("read_file", {"path": Subpath("/data")})
.holder(orchestrator_pubkey)
.ttl(86400) # 24 hours
.mint(control_key))
# orchestrator.py
from tenuo.a2a import A2AClient
async def delegate_research(topic: str, my_warrant, my_key, target_pubkey):
client = A2AClient("https://research-agent.example.com")
# Attenuate warrant for this specific task
task_warrant = (my_warrant
.grant_builder()
.capability("search_papers", sources=UrlSafe(allow_domains=["arxiv.org"]))
.holder(target_pubkey)
.ttl(300)
.build(my_key))
return await client.send_task(
message=f"Research: {topic}",
warrant=task_warrant,
skill="search_papers",
arguments={"query": topic, "sources": ["https://arxiv.org"]},
)
# research_agent.py
from tenuo.a2a import A2AServer
from tenuo.constraints import UrlSafe
server = A2AServer(
name="Research Agent",
url="https://research-agent.example.com",
public_key=my_public_key,
trusted_issuers=[control_plane_public_key],
)
@server.skill("search_papers", constraints={"sources": UrlSafe()})
async def search_papers(query: str, sources: list[str]) -> list[dict]:
# Only allowed URLs pass through
return await search_arxiv(query, sources)
if __name__ == "__main__":
import uvicorn
uvicorn.run(server.app, host="0.0.0.0", port=8000)
API Reference
See API Reference for complete type signatures.
Protocol Specification
For the wire format and protocol details, see the Protocol Spec and Wire Format.