Tenuo A2A Integration

Status: Implemented (MVP)

Overview

Tenuo A2A adds warrant-based authorization to agent-to-agent communication. When Agent A delegates a task to Agent B, the warrant specifies exactly what Agent B is allowed to do.

┌─────────────┐                     ┌─────────────┐
│   Agent A   │   Task + Warrant    │   Agent B   │
│ (Orchestrator)│──────────────────▶│  (Worker)   │
│             │                     │             │
│             │◀──────────────────  │             │
│             │      Result         │             │
└─────────────┘                     └─────────────┘

Warrant says: "Agent B can only search arxiv.org for this task"

Use cases:

  • Multi-agent systems where agents delegate tasks
  • Orchestrators that dispatch work to specialized workers
  • Agent networks with least-privilege access control

Not for: Single-agent tool enforcement (use tenuo.openai or tenuo.langchain instead)


Installation

uv pip install "tenuo[a2a]"

Quick Start (Minimal Example)

Server (Worker Agent):

from tenuo.a2a import A2AServerBuilder

# Build server with fluent API
server = (A2AServerBuilder()
    .name("Worker")
    .url("https://worker.example.com")
    .key(my_signing_key)                    # Your identity
    .accept_warrants_from(orchestrator_key) # Who can give you tasks
    .build())

@server.skill("echo")
async def echo(msg: str) -> str:
    return f"Echo: {msg}"

# uvicorn server:server.app --port 8000

Or use the direct constructor:

from tenuo.a2a import A2AServer

server = A2AServer(
    name="Worker",
    url="https://worker.example.com",
    public_key=my_public_key,
    trusted_issuers=[orchestrator_public_key],
)

Client (Orchestrator):

from tenuo.a2a import A2AClientBuilder
from tenuo import Warrant

# Create warrant for this task
task_warrant = (Warrant.mint_builder()
    .capability("echo")
    .holder(worker_public_key)
    .ttl(300)
    .mint(orchestrator_key))

# Build client with default warrant
client = (A2AClientBuilder()
    .url("http://localhost:8000")
    .warrant(task_warrant, orchestrator_key)  # Pre-configure for repeated use
    .build())

# Send task (warrant already configured)
result = await client.send_task(
    skill="echo",
    arguments={"msg": "hello"},
)
print(result.output)  # "Echo: hello"

Or use the direct constructor:

from tenuo.a2a import A2AClient

client = A2AClient("http://localhost:8000")
result = await client.send_task(
    skill="echo",
    arguments={"msg": "hello"},
    warrant=task_warrant,
    signing_key=orchestrator_key,
)

That’s it. The warrant proves the orchestrator authorized this specific task.


Full Example (With Constraints)

Server (Worker)

from tenuo.a2a import A2AServerBuilder
from tenuo.constraints import Subpath, UrlSafe

server = (A2AServerBuilder()
    .name("Research Agent")
    .url("https://research-agent.example.com")
    .key(my_signing_key)
    .accept_warrants_from(orchestrator_public_key)
    .build())

# Register skills with constraint bindings
@server.skill("search_papers", constraints={"sources": UrlSafe})
async def search_papers(query: str, sources: list[str]) -> list[dict]:
    return await do_search(query, sources)

@server.skill("read_file", constraints={"path": Subpath})
async def read_file(path: str) -> str:
    with open(path) as f:
        return f.read()

# uvicorn server:server.app --host 0.0.0.0 --port 8000

Client (Orchestrator)

from tenuo.a2a import A2AClient
from tenuo.constraints import UrlSafe

# Discover agent capabilities
client = A2AClient("https://research-agent.example.com")
card = await client.discover()

# Attenuate warrant for this delegation
task_warrant = (my_warrant
    .grant_builder()
    .capability("search_papers", sources=UrlSafe(allow_domains=["arxiv.org"]))
    .audience(card.public_key)
    .ttl(300)
    .build(my_signing_key))

# Send task with warrant
result = await client.send_task(
    message="Find papers on capability-based security",
    warrant=task_warrant,
    skill="search_papers",
    arguments={"query": "capability-based security", "sources": ["https://arxiv.org"]},
)

Streaming Tasks

For long-running tasks, use streaming to receive incremental updates:

# Stream results as they arrive
async for update in client.send_task_streaming(
    message="Analyze these papers",
    warrant=task_warrant,
    skill="analyze_papers",
    arguments={"paper_ids": ["arxiv:2401.12345"]},
):
    if update.type.value == "status":
        print(f"Status: {update.status}")
    elif update.type.value == "message":
        print(f"Chunk: {update.content}")
    elif update.type.value == "complete":
        print(f"Done: {update.output}")

The server emits SSE events for status updates, intermediate messages, and final completion.

Stream timeout (DoS protection):

# Default timeout is 300 seconds (5 minutes)
async for update in client.send_task_streaming(
    ...,
    stream_timeout=600.0,  # 10 minute timeout
):
    ...

If the stream exceeds stream_timeout, a TimeoutError is raised. This prevents slow-drip DoS attacks where a malicious server holds connections indefinitely.


Proof-of-Possession (PoP)

Proof-of-Possession adds an additional security layer by requiring the client to prove they control the private key associated with the warrant’s holder.

When to Use PoP

Require PoP when:

  • ✅ Agents communicate over untrusted networks (Internet, shared infrastructure)
  • ✅ Compliance requires cryptographic proof of authorization
  • ✅ Protection against warrant theft is critical
  • ✅ Multi-hop delegation across organizational boundaries

PoP is optional when:

  • ⚠️ All agents run on trusted infrastructure (same data center, VPC)
  • ⚠️ Network isolation provides security (private network, mTLS)
  • ⚠️ Performance is critical and risk is low (microsecond latency matters)

Never skip PoP when:

  • ❌ Agents are on the public Internet
  • ❌ Warrants have long TTLs (hours/days)
  • ❌ Untrusted intermediaries exist in the call chain

How PoP Works

PoP signatures prove that the caller possesses the private key corresponding to the warrant’s sub (holder) field:

┌──────────────────────────────────────────────────────┐
│ Warrant (JWT):                                       │
│   sub: "z6Mk..."  ← Orchestrator's public key       │
│   grants: ["search"]                                 │
│   exp: 1234567890                                    │
│   Signature: <signed by control plane>              │
└──────────────────────────────────────────────────────┘
                       +
┌──────────────────────────────────────────────────────┐
│ PoP Signature:                                       │
│   sign(orchestrator_private_key, "search", args, ts) │
│   → Proves orchestrator controls the private key     │
└──────────────────────────────────────────────────────┘
                       =
               Authorization Proof

What PoP Prevents:

  • Warrant Theft: If an attacker intercepts a warrant, they can’t use it without the private key
  • Replay Attacks: Each PoP signature includes a timestamp and is checked once
  • Man-in-the-Middle: Modified arguments invalidate the PoP signature

Client Usage

Enable PoP by passing signing_key to send_task():

from tenuo.a2a import A2AClient

client = A2AClient("https://worker.example.com")

# Without PoP (only warrant validation)
result = await client.send_task(
    warrant=my_warrant,
    skill="search",
    arguments={"query": "papers"},
)

# With PoP (warrant + signature proof)
result = await client.send_task(
    warrant=my_warrant,
    skill="search",
    arguments={"query": "papers"},
    signing_key=orchestrator_key,  # ← Proves possession
)

Or configure PoP by default using the builder:

from tenuo.a2a import A2AClientBuilder

client = (A2AClientBuilder()
    .url("https://worker.example.com")
    .warrant(my_warrant, orchestrator_key)  # ← Pre-configure PoP
    .build())

# All requests automatically include PoP
result = await client.send_task(
    skill="search",
    arguments={"query": "papers"},
)

Server Configuration

Control PoP requirements on the server:

server = A2AServer(
    name="Worker",
    url="https://worker.example.com",
    public_key=worker_public_key,
    trusted_issuers=[control_plane_key],

    # PoP configuration
    require_pop=True,          # Reject requests without PoP (default: True)
    trusted_roots=[...],       # Required for PoP verification
)

Security Defaults:

  • require_pop=True by default (fail-safe)
  • Can be disabled via TENUO_A2A_REQUIRE_POP=false environment variable
  • If require_pop=True but client doesn’t provide PoP → PopRequiredError

Performance Impact

PoP adds cryptographic overhead:

Without PoP:
  - Warrant verification: ~0.5ms (Ed25519 signature check)

With PoP:
  - Warrant verification: ~0.5ms
  - PoP signature generation (client): ~0.3ms
  - PoP signature verification (server): ~0.3ms
  - Total overhead: ~1.1ms per request

Recommendation: Always use PoP in production unless you have network-level security (mTLS + VPC).

Error Handling

from tenuo.a2a import PopRequiredError, PopVerificationError

try:
    result = await client.send_task(warrant=warrant, skill="search", arguments={})
except PopRequiredError:
    print("Server requires PoP signature - add signing_key parameter")
except PopVerificationError as e:
    print(f"PoP signature invalid: {e}")
    # Possible causes:
    #   - Wrong signing key (not matching warrant.sub)
    #   - Arguments modified after signing
    #   - Clock skew between client/server

Debugging PoP Issues

Issue: PopVerificationError: Signature verification failed

Causes:

  1. Wrong signing key: Key doesn’t match warrant’s sub field
  2. Modified arguments: Arguments changed after PoP computation
  3. Clock skew: Client/server clocks differ significantly

Debug:

# Verify signing key matches warrant holder
assert warrant.sub == signing_key.public_key.to_did()

# Log PoP computation
import logging
logging.getLogger("tenuo.a2a.client").setLevel(logging.DEBUG)
# Shows: "Generated PoP signature for skill 'search'"

Server Configuration

server = A2AServer(
    # Required
    name="Agent Name",                    # Display name
    url="https://agent.example.com",      # Public URL (for audience validation)
    public_key=my_public_key,             # This agent's public key
    trusted_issuers=[...],                # List of trusted issuer public keys
    
    # Optional (shown with defaults)
    trust_delegated=True,                 # Accept warrants delegated from trusted issuers
    require_warrant=True,                 # Reject tasks without warrants
    require_audience=True,                # Require warrant audience matches our URL
    check_replay=True,                    # Enforce jti uniqueness
    replay_window=3600,                   # Seconds to remember jti values
    max_chain_depth=10,                   # Maximum delegation chain length
    
    # Audit
    audit_log=sys.stderr,                 # Destination (file, callable, or stderr)
    audit_format="json",                  # "json" or "text"
)

Trust Model

The server trusts warrants based on trusted_issuers:

  1. Direct Trust: Warrant signed by a trusted issuer → accepted
  2. Delegated Trust (if trust_delegated=True): Warrant with valid chain back to trusted issuer → accepted
┌─────────────────────┐
│    Trusted Root     │  ← In trusted_issuers
│   (Control Plane)   │
└──────────┬──────────┘
           │ delegates
           ▼
┌─────────────────────┐
│   Orchestrator A    │  ← Warrant signed by root
└──────────┬──────────┘
           │ delegates
           ▼
┌─────────────────────┐
│     Worker B        │  ← Warrant with chain [root → A → B]
└─────────────────────┘

Skill Constraints

Constraints bind warrant parameters to skill parameters:

@server.skill("read_file", constraints={"path": Subpath})
async def read_file(path: str) -> str:
    # "path" constraint checked against warrant's path constraint
    # Blocked if: warrant allows Subpath("/data") but arg is "/etc/passwd"
    ...

Constraint binding validation happens at startup:

# This raises ConstraintBindingError at startup:
@server.skill("read_file", constraints={"file_path": Subpath})  # ❌ "file_path" not a param
async def read_file(path: str) -> str:  # param is "path"
    ...

Client Configuration

client = A2AClient(
    url="https://agent.example.com",
    
    # Optional
    pin_key="z6Mk...",    # Expected public key (raises KeyMismatchError if different)
    timeout=30.0,          # Request timeout in seconds
)

Key Pinning

Pin the expected public key to prevent TOFU (Trust On First Use) attacks:

# If agent returns different key, raises KeyMismatchError
client = A2AClient(
    "https://research-agent.example.com",
    pin_key="z6MkResearchAgentKey123"  # From your config/secrets
)

card = await client.discover()  # Fails if key doesn't match

Key Format Compatibility

A2A accepts public keys in multiple formats:

# All of these work:
server = A2AServerBuilder()
    .key(signing_key)  # PublicKey object
    .accept_warrants_from("a1b2c3...")  # Hex (64 chars)
    .accept_warrants_from("z6MkpT...")  # Multibase (base58btc)
    .accept_warrants_from("did:key:z6MkpT...")  # W3C DID
    .build()

All formats are automatically normalized for comparison. Multibase and DID support requires uv pip install base58.


Agent Card (Discovery)

Agents expose their capabilities via /.well-known/agent.json:

{
  "name": "Research Agent",
  "url": "https://research-agent.example.com",
  "skills": [
    {
      "id": "search_papers",
      "name": "Search Papers",
      "x-tenuo-constraints": {
        "sources": {"type": "UrlSafe", "required": true}
      }
    }
  ],
  "x-tenuo": {
    "version": "0.1.0",
    "required": true,
    "public_key": "z6Mk..."
  }
}

Delegation Chains

When delegating through multiple agents, include the warrant chain:

# Orchestrator delegates to Worker A, who delegates to Worker B
# Worker B receives:
#   X-Tenuo-Warrant: <worker_b_warrant>
#   X-Tenuo-Warrant-Chain: <root>;...;<worker_a_warrant>

The server validates:

  1. Root warrant is from a trusted issuer
  2. Each link: child issuer = parent holder
  3. Skills narrow monotonically (no privilege escalation)
  4. Chain depth ≤ max_chain_depth

Error Handling

All A2A errors inherit from A2AError and map to JSON-RPC error codes with canonical wire codes:

from tenuo.a2a import (
    A2AError,
    MissingWarrantError,      # -32001: Warrant required but not provided
    InvalidSignatureError,     # -32002: Signature verification failed
    UntrustedIssuerError,      # -32003: Issuer not in trusted_issuers
    WarrantExpiredError,       # -32004: Warrant has expired
    AudienceMismatchError,     # -32005: Audience doesn't match server URL
    ReplayDetectedError,       # -32006: jti already used
    SkillNotGrantedError,      # -32007: Skill not in warrant grants
    ConstraintViolationError,  # -32008: Argument violates constraint
    ChainInvalidError,         # -32010: Delegation chain validation failed
    KeyMismatchError,          # -32012: Public key doesn't match pinned key
)

try:
    result = await client.send_task(...)
except SkillNotGrantedError as e:
    print(f"Skill {e.data['skill']} not in granted: {e.data['granted_skills']}")
except A2AError as e:
    print(f"A2A error {e.code}: {e.message}")

Wire Code Support

A2A error responses now include canonical Tenuo wire codes (1000-2199) for cross-protocol compatibility:

{
  "jsonrpc": "2.0",
  "error": {
    "code": -32008,
    "message": "Constraint violation",
    "data": {
      "tenuo_code": 1501,
      "field": "amount",
      "reason": "Value exceeds maximum"
    }
  },
  "id": "task_123"
}

This enables:

  • Cross-protocol debugging: Same wire codes used in HTTP, gRPC, and JSON-RPC
  • Precise error mapping: JSON-RPC code -32008 maps to canonical code 1501
  • Machine-readable errors: Clients can programmatically handle specific error types
A2A JSON-RPC Code Canonical Wire Code Name
-32001 1202 Missing warrant
-32002 1100 Invalid signature
-32003 1406 Untrusted issuer
-32004 1300 Warrant expired
-32007 1500 Tool not authorized
-32008 1501 Constraint violation
-32010 1405 Chain invalid

See wire format specification for the complete list.


Accessing the Warrant

Inside a skill, access the current warrant via context:

from tenuo.a2a import current_task_warrant

@server.skill("my_skill")
async def my_skill(query: str) -> str:
    warrant = current_task_warrant.get()
    if warrant:
        print(f"Warrant issuer: {warrant.iss}")
        print(f"Warrant subject: {warrant.sub}")
    return "done"

Audit Logging

Server emits structured audit events:

# JSON format (default)
{"timestamp": "...", "event": "warrant_validated", "skill": "search", "outcome": "allowed", ...}

# Text format
[WARRANT_VALIDATED] search: allowed

Custom audit handler:

async def my_audit_handler(event: AuditEvent):
    await send_to_siem(event.to_dict())

server = A2AServer(..., audit_log=my_audit_handler)

Example: Full Multi-Agent System

# control_plane.py
from tenuo import SigningKey, Warrant

control_key = SigningKey.from_env("CONTROL_PLANE_KEY")

def issue_orchestrator_warrant(orchestrator_pubkey):
    return (Warrant.mint_builder()
        .capability("search_papers", {})
        .capability("read_file", {"path": Subpath("/data")})
        .holder(orchestrator_pubkey)
        .ttl(86400)  # 24 hours
        .mint(control_key))
# orchestrator.py
from tenuo.a2a import A2AClient

async def delegate_research(topic: str, my_warrant, my_key, target_pubkey):
    client = A2AClient("https://research-agent.example.com")
    
    # Attenuate warrant for this specific task
    task_warrant = (my_warrant
        .grant_builder()
        .capability("search_papers", sources=UrlSafe(allow_domains=["arxiv.org"]))
        .audience(target_pubkey)
        .ttl(300)
        .build(my_key))
    
    return await client.send_task(
        message=f"Research: {topic}",
        warrant=task_warrant,
        skill="search_papers",
        arguments={"query": topic, "sources": ["https://arxiv.org"]},
    )
# research_agent.py
from tenuo.a2a import A2AServer
from tenuo.constraints import UrlSafe

server = A2AServer(
    name="Research Agent",
    url="https://research-agent.example.com",
    public_key=my_public_key,
    trusted_issuers=[control_plane_public_key],
)

@server.skill("search_papers", constraints={"sources": UrlSafe})
async def search_papers(query: str, sources: list[str]) -> list[dict]:
    # Only allowed URLs pass through
    return await search_arxiv(query, sources)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(server.app, host="0.0.0.0", port=8000)

API Reference

See API Reference for complete type signatures.

Protocol Specification

For the wire format and protocol details, see the Protocol Spec and Wire Format.