Tenuo A2A Integration
Status: Implemented (MVP)
Overview
Tenuo A2A adds warrant-based authorization to agent-to-agent communication. When Agent A delegates a task to Agent B, the warrant specifies exactly what Agent B is allowed to do.
┌─────────────┐ ┌─────────────┐
│ Agent A │ Task + Warrant │ Agent B │
│ (Orchestrator)│──────────────────▶│ (Worker) │
│ │ │ │
│ │◀────────────────── │ │
│ │ Result │ │
└─────────────┘ └─────────────┘
Warrant says: "Agent B can only search arxiv.org for this task"
Use cases:
- Multi-agent systems where agents delegate tasks
- Orchestrators that dispatch work to specialized workers
- Agent networks with least-privilege access control
Not for: Single-agent tool enforcement (use tenuo.openai or tenuo.langchain instead)
Installation
uv pip install "tenuo[a2a]"
Quick Start (Minimal Example)
Server (Worker Agent):
from tenuo.a2a import A2AServerBuilder
# Build server with fluent API
server = (A2AServerBuilder()
.name("Worker")
.url("https://worker.example.com")
.key(my_signing_key) # Your identity
.accept_warrants_from(orchestrator_key) # Who can give you tasks
.build())
@server.skill("echo")
async def echo(msg: str) -> str:
return f"Echo: {msg}"
# uvicorn server:server.app --port 8000
Or use the direct constructor:
from tenuo.a2a import A2AServer
server = A2AServer(
name="Worker",
url="https://worker.example.com",
public_key=my_public_key,
trusted_issuers=[orchestrator_public_key],
)
Client (Orchestrator):
from tenuo.a2a import A2AClientBuilder
from tenuo import Warrant
# Create warrant for this task
task_warrant = (Warrant.mint_builder()
.capability("echo")
.holder(worker_public_key)
.ttl(300)
.mint(orchestrator_key))
# Build client with default warrant
client = (A2AClientBuilder()
.url("http://localhost:8000")
.warrant(task_warrant, orchestrator_key) # Pre-configure for repeated use
.build())
# Send task (warrant already configured)
result = await client.send_task(
skill="echo",
arguments={"msg": "hello"},
)
print(result.output) # "Echo: hello"
Or use the direct constructor:
from tenuo.a2a import A2AClient
client = A2AClient("http://localhost:8000")
result = await client.send_task(
skill="echo",
arguments={"msg": "hello"},
warrant=task_warrant,
signing_key=orchestrator_key,
)
That’s it. The warrant proves the orchestrator authorized this specific task.
Full Example (With Constraints)
Server (Worker)
from tenuo.a2a import A2AServerBuilder
from tenuo.constraints import Subpath, UrlSafe
server = (A2AServerBuilder()
.name("Research Agent")
.url("https://research-agent.example.com")
.key(my_signing_key)
.accept_warrants_from(orchestrator_public_key)
.build())
# Register skills with constraint bindings
@server.skill("search_papers", constraints={"sources": UrlSafe})
async def search_papers(query: str, sources: list[str]) -> list[dict]:
return await do_search(query, sources)
@server.skill("read_file", constraints={"path": Subpath})
async def read_file(path: str) -> str:
with open(path) as f:
return f.read()
# uvicorn server:server.app --host 0.0.0.0 --port 8000
Client (Orchestrator)
from tenuo.a2a import A2AClient
from tenuo.constraints import UrlSafe
# Discover agent capabilities
client = A2AClient("https://research-agent.example.com")
card = await client.discover()
# Attenuate warrant for this delegation
task_warrant = (my_warrant
.grant_builder()
.capability("search_papers", sources=UrlSafe(allow_domains=["arxiv.org"]))
.audience(card.public_key)
.ttl(300)
.build(my_signing_key))
# Send task with warrant
result = await client.send_task(
message="Find papers on capability-based security",
warrant=task_warrant,
skill="search_papers",
arguments={"query": "capability-based security", "sources": ["https://arxiv.org"]},
)
Streaming Tasks
For long-running tasks, use streaming to receive incremental updates:
# Stream results as they arrive
async for update in client.send_task_streaming(
message="Analyze these papers",
warrant=task_warrant,
skill="analyze_papers",
arguments={"paper_ids": ["arxiv:2401.12345"]},
):
if update.type.value == "status":
print(f"Status: {update.status}")
elif update.type.value == "message":
print(f"Chunk: {update.content}")
elif update.type.value == "complete":
print(f"Done: {update.output}")
The server emits SSE events for status updates, intermediate messages, and final completion.
Stream timeout (DoS protection):
# Default timeout is 300 seconds (5 minutes)
async for update in client.send_task_streaming(
...,
stream_timeout=600.0, # 10 minute timeout
):
...
If the stream exceeds stream_timeout, a TimeoutError is raised. This prevents slow-drip DoS attacks where a malicious server holds connections indefinitely.
Proof-of-Possession (PoP)
Proof-of-Possession adds an additional security layer by requiring the client to prove they control the private key associated with the warrant’s holder.
When to Use PoP
Require PoP when:
- ✅ Agents communicate over untrusted networks (Internet, shared infrastructure)
- ✅ Compliance requires cryptographic proof of authorization
- ✅ Protection against warrant theft is critical
- ✅ Multi-hop delegation across organizational boundaries
PoP is optional when:
- ⚠️ All agents run on trusted infrastructure (same data center, VPC)
- ⚠️ Network isolation provides security (private network, mTLS)
- ⚠️ Performance is critical and risk is low (microsecond latency matters)
Never skip PoP when:
- ❌ Agents are on the public Internet
- ❌ Warrants have long TTLs (hours/days)
- ❌ Untrusted intermediaries exist in the call chain
How PoP Works
PoP signatures prove that the caller possesses the private key corresponding to the warrant’s sub (holder) field:
┌──────────────────────────────────────────────────────┐
│ Warrant (JWT): │
│ sub: "z6Mk..." ← Orchestrator's public key │
│ grants: ["search"] │
│ exp: 1234567890 │
│ Signature: <signed by control plane> │
└──────────────────────────────────────────────────────┘
+
┌──────────────────────────────────────────────────────┐
│ PoP Signature: │
│ sign(orchestrator_private_key, "search", args, ts) │
│ → Proves orchestrator controls the private key │
└──────────────────────────────────────────────────────┘
=
Authorization Proof
What PoP Prevents:
- Warrant Theft: If an attacker intercepts a warrant, they can’t use it without the private key
- Replay Attacks: Each PoP signature includes a timestamp and is checked once
- Man-in-the-Middle: Modified arguments invalidate the PoP signature
Client Usage
Enable PoP by passing signing_key to send_task():
from tenuo.a2a import A2AClient
client = A2AClient("https://worker.example.com")
# Without PoP (only warrant validation)
result = await client.send_task(
warrant=my_warrant,
skill="search",
arguments={"query": "papers"},
)
# With PoP (warrant + signature proof)
result = await client.send_task(
warrant=my_warrant,
skill="search",
arguments={"query": "papers"},
signing_key=orchestrator_key, # ← Proves possession
)
Or configure PoP by default using the builder:
from tenuo.a2a import A2AClientBuilder
client = (A2AClientBuilder()
.url("https://worker.example.com")
.warrant(my_warrant, orchestrator_key) # ← Pre-configure PoP
.build())
# All requests automatically include PoP
result = await client.send_task(
skill="search",
arguments={"query": "papers"},
)
Server Configuration
Control PoP requirements on the server:
server = A2AServer(
name="Worker",
url="https://worker.example.com",
public_key=worker_public_key,
trusted_issuers=[control_plane_key],
# PoP configuration
require_pop=True, # Reject requests without PoP (default: True)
trusted_roots=[...], # Required for PoP verification
)
Security Defaults:
require_pop=Trueby default (fail-safe)- Can be disabled via
TENUO_A2A_REQUIRE_POP=falseenvironment variable - If
require_pop=Truebut client doesn’t provide PoP →PopRequiredError
Performance Impact
PoP adds cryptographic overhead:
Without PoP:
- Warrant verification: ~0.5ms (Ed25519 signature check)
With PoP:
- Warrant verification: ~0.5ms
- PoP signature generation (client): ~0.3ms
- PoP signature verification (server): ~0.3ms
- Total overhead: ~1.1ms per request
Recommendation: Always use PoP in production unless you have network-level security (mTLS + VPC).
Error Handling
from tenuo.a2a import PopRequiredError, PopVerificationError
try:
result = await client.send_task(warrant=warrant, skill="search", arguments={})
except PopRequiredError:
print("Server requires PoP signature - add signing_key parameter")
except PopVerificationError as e:
print(f"PoP signature invalid: {e}")
# Possible causes:
# - Wrong signing key (not matching warrant.sub)
# - Arguments modified after signing
# - Clock skew between client/server
Debugging PoP Issues
Issue: PopVerificationError: Signature verification failed
Causes:
- Wrong signing key: Key doesn’t match warrant’s
subfield - Modified arguments: Arguments changed after PoP computation
- Clock skew: Client/server clocks differ significantly
Debug:
# Verify signing key matches warrant holder
assert warrant.sub == signing_key.public_key.to_did()
# Log PoP computation
import logging
logging.getLogger("tenuo.a2a.client").setLevel(logging.DEBUG)
# Shows: "Generated PoP signature for skill 'search'"
Server Configuration
server = A2AServer(
# Required
name="Agent Name", # Display name
url="https://agent.example.com", # Public URL (for audience validation)
public_key=my_public_key, # This agent's public key
trusted_issuers=[...], # List of trusted issuer public keys
# Optional (shown with defaults)
trust_delegated=True, # Accept warrants delegated from trusted issuers
require_warrant=True, # Reject tasks without warrants
require_audience=True, # Require warrant audience matches our URL
check_replay=True, # Enforce jti uniqueness
replay_window=3600, # Seconds to remember jti values
max_chain_depth=10, # Maximum delegation chain length
# Audit
audit_log=sys.stderr, # Destination (file, callable, or stderr)
audit_format="json", # "json" or "text"
)
Trust Model
The server trusts warrants based on trusted_issuers:
- Direct Trust: Warrant signed by a trusted issuer → accepted
- Delegated Trust (if
trust_delegated=True): Warrant with valid chain back to trusted issuer → accepted
┌─────────────────────┐
│ Trusted Root │ ← In trusted_issuers
│ (Control Plane) │
└──────────┬──────────┘
│ delegates
▼
┌─────────────────────┐
│ Orchestrator A │ ← Warrant signed by root
└──────────┬──────────┘
│ delegates
▼
┌─────────────────────┐
│ Worker B │ ← Warrant with chain [root → A → B]
└─────────────────────┘
Skill Constraints
Constraints bind warrant parameters to skill parameters:
@server.skill("read_file", constraints={"path": Subpath})
async def read_file(path: str) -> str:
# "path" constraint checked against warrant's path constraint
# Blocked if: warrant allows Subpath("/data") but arg is "/etc/passwd"
...
Constraint binding validation happens at startup:
# This raises ConstraintBindingError at startup:
@server.skill("read_file", constraints={"file_path": Subpath}) # ❌ "file_path" not a param
async def read_file(path: str) -> str: # param is "path"
...
Client Configuration
client = A2AClient(
url="https://agent.example.com",
# Optional
pin_key="z6Mk...", # Expected public key (raises KeyMismatchError if different)
timeout=30.0, # Request timeout in seconds
)
Key Pinning
Pin the expected public key to prevent TOFU (Trust On First Use) attacks:
# If agent returns different key, raises KeyMismatchError
client = A2AClient(
"https://research-agent.example.com",
pin_key="z6MkResearchAgentKey123" # From your config/secrets
)
card = await client.discover() # Fails if key doesn't match
Key Format Compatibility
A2A accepts public keys in multiple formats:
# All of these work:
server = A2AServerBuilder()
.key(signing_key) # PublicKey object
.accept_warrants_from("a1b2c3...") # Hex (64 chars)
.accept_warrants_from("z6MkpT...") # Multibase (base58btc)
.accept_warrants_from("did:key:z6MkpT...") # W3C DID
.build()
All formats are automatically normalized for comparison. Multibase and DID support requires uv pip install base58.
Agent Card (Discovery)
Agents expose their capabilities via /.well-known/agent.json:
{
"name": "Research Agent",
"url": "https://research-agent.example.com",
"skills": [
{
"id": "search_papers",
"name": "Search Papers",
"x-tenuo-constraints": {
"sources": {"type": "UrlSafe", "required": true}
}
}
],
"x-tenuo": {
"version": "0.1.0",
"required": true,
"public_key": "z6Mk..."
}
}
Delegation Chains
When delegating through multiple agents, include the warrant chain:
# Orchestrator delegates to Worker A, who delegates to Worker B
# Worker B receives:
# X-Tenuo-Warrant: <worker_b_warrant>
# X-Tenuo-Warrant-Chain: <root>;...;<worker_a_warrant>
The server validates:
- Root warrant is from a trusted issuer
- Each link: child issuer = parent holder
- Skills narrow monotonically (no privilege escalation)
- Chain depth ≤
max_chain_depth
Error Handling
All A2A errors inherit from A2AError and map to JSON-RPC error codes with canonical wire codes:
from tenuo.a2a import (
A2AError,
MissingWarrantError, # -32001: Warrant required but not provided
InvalidSignatureError, # -32002: Signature verification failed
UntrustedIssuerError, # -32003: Issuer not in trusted_issuers
WarrantExpiredError, # -32004: Warrant has expired
AudienceMismatchError, # -32005: Audience doesn't match server URL
ReplayDetectedError, # -32006: jti already used
SkillNotGrantedError, # -32007: Skill not in warrant grants
ConstraintViolationError, # -32008: Argument violates constraint
ChainInvalidError, # -32010: Delegation chain validation failed
KeyMismatchError, # -32012: Public key doesn't match pinned key
)
try:
result = await client.send_task(...)
except SkillNotGrantedError as e:
print(f"Skill {e.data['skill']} not in granted: {e.data['granted_skills']}")
except A2AError as e:
print(f"A2A error {e.code}: {e.message}")
Wire Code Support
A2A error responses now include canonical Tenuo wire codes (1000-2199) for cross-protocol compatibility:
{
"jsonrpc": "2.0",
"error": {
"code": -32008,
"message": "Constraint violation",
"data": {
"tenuo_code": 1501,
"field": "amount",
"reason": "Value exceeds maximum"
}
},
"id": "task_123"
}
This enables:
- Cross-protocol debugging: Same wire codes used in HTTP, gRPC, and JSON-RPC
- Precise error mapping: JSON-RPC code -32008 maps to canonical code 1501
- Machine-readable errors: Clients can programmatically handle specific error types
| A2A JSON-RPC Code | Canonical Wire Code | Name |
|---|---|---|
| -32001 | 1202 | Missing warrant |
| -32002 | 1100 | Invalid signature |
| -32003 | 1406 | Untrusted issuer |
| -32004 | 1300 | Warrant expired |
| -32007 | 1500 | Tool not authorized |
| -32008 | 1501 | Constraint violation |
| -32010 | 1405 | Chain invalid |
See wire format specification for the complete list.
Accessing the Warrant
Inside a skill, access the current warrant via context:
from tenuo.a2a import current_task_warrant
@server.skill("my_skill")
async def my_skill(query: str) -> str:
warrant = current_task_warrant.get()
if warrant:
print(f"Warrant issuer: {warrant.iss}")
print(f"Warrant subject: {warrant.sub}")
return "done"
Audit Logging
Server emits structured audit events:
# JSON format (default)
{"timestamp": "...", "event": "warrant_validated", "skill": "search", "outcome": "allowed", ...}
# Text format
[WARRANT_VALIDATED] search: allowed
Custom audit handler:
async def my_audit_handler(event: AuditEvent):
await send_to_siem(event.to_dict())
server = A2AServer(..., audit_log=my_audit_handler)
Example: Full Multi-Agent System
# control_plane.py
from tenuo import SigningKey, Warrant
control_key = SigningKey.from_env("CONTROL_PLANE_KEY")
def issue_orchestrator_warrant(orchestrator_pubkey):
return (Warrant.mint_builder()
.capability("search_papers", {})
.capability("read_file", {"path": Subpath("/data")})
.holder(orchestrator_pubkey)
.ttl(86400) # 24 hours
.mint(control_key))
# orchestrator.py
from tenuo.a2a import A2AClient
async def delegate_research(topic: str, my_warrant, my_key, target_pubkey):
client = A2AClient("https://research-agent.example.com")
# Attenuate warrant for this specific task
task_warrant = (my_warrant
.grant_builder()
.capability("search_papers", sources=UrlSafe(allow_domains=["arxiv.org"]))
.audience(target_pubkey)
.ttl(300)
.build(my_key))
return await client.send_task(
message=f"Research: {topic}",
warrant=task_warrant,
skill="search_papers",
arguments={"query": topic, "sources": ["https://arxiv.org"]},
)
# research_agent.py
from tenuo.a2a import A2AServer
from tenuo.constraints import UrlSafe
server = A2AServer(
name="Research Agent",
url="https://research-agent.example.com",
public_key=my_public_key,
trusted_issuers=[control_plane_public_key],
)
@server.skill("search_papers", constraints={"sources": UrlSafe})
async def search_papers(query: str, sources: list[str]) -> list[dict]:
# Only allowed URLs pass through
return await search_arxiv(query, sources)
if __name__ == "__main__":
import uvicorn
uvicorn.run(server.app, host="0.0.0.0", port=8000)
API Reference
See API Reference for complete type signatures.
Protocol Specification
For the wire format and protocol details, see the Protocol Spec and Wire Format.