Tenuo Temporal Integration
Status: Implemented (Activity authorization + PoP + Delegation)
Overview
Tenuo integrates with Temporal to bring warrant-based authorization to durable workflows. Activities are transparently authorized against the workflow’s warrant without code changes to your activity definitions.
Key Features:
- Activity-level authorization: Each activity execution is authorized against warrant constraints
- Proof-of-Possession (PoP): Ed25519 signature verification when
trusted_rootsis configured - Warrant propagation: Warrants flow through workflow headers automatically
- Child workflow delegation: Attenuate warrants when spawning child workflows via
tenuo_execute_child_workflow() - Delegation chain verification: Full chain-of-trust validation back to trusted roots
- Signal & update guards: Restrict which signals and updates a workflow accepts
- Nexus header propagation: Warrant context flows through Nexus operations
- PoP replay protection: In-memory dedup cache prevents signature replay attacks
- Continue-as-new support: Warrant headers survive workflow continuation
- Fail-closed: Missing or invalid warrants block execution by default
- Enterprise key management: VaultKeyResolver, KMSKeyResolver, CompositeKeyResolver
Installation
uv pip install "tenuo[temporal]"
Requires Temporal server running locally or in production.
Quick Start
Basic Workflow Protection
from datetime import timedelta
from pathlib import Path
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.common import RetryPolicy
from temporalio.worker import Worker
from tenuo import SigningKey, Warrant
from tenuo_core import Subpath
from tenuo.temporal import (
AuthorizedWorkflow,
TenuoInterceptor,
TenuoInterceptorConfig,
TenuoClientInterceptor,
EnvKeyResolver,
tenuo_headers,
tenuo_execute_activity,
)
# Define protected activities (no Tenuo-specific code needed)
@activity.defn
async def read_file(path: str) -> str:
return Path(path).read_text()
@activity.defn
async def write_file(path: str, content: str) -> str:
Path(path).write_text(content)
return f"Wrote {len(content)} bytes"
# Define workflow — use AuthorizedWorkflow for automatic PoP calculation
@workflow.defn
class DataProcessingWorkflow(AuthorizedWorkflow):
@workflow.run
async def run(self, input_path: str, output_path: str) -> str:
# Automatic PoP signature generation via self.execute_authorized_activity
data = await self.execute_authorized_activity(
read_file,
args=[input_path],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=1),
)
processed = data.upper()
await self.execute_authorized_activity(
write_file,
args=[output_path, processed],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=1),
)
return f"Processed {len(data)} bytes"
# Setup
async def main():
# Client interceptor injects warrant headers into workflow start
client_interceptor = TenuoClientInterceptor()
client = await Client.connect("localhost:7233",
interceptors=[client_interceptor])
# Generate keys
control_key = SigningKey.generate()
agent_key = SigningKey.generate()
# Issue warrant using the builder API
warrant = (
Warrant.mint_builder()
.holder(agent_key.public_key)
.capability("read_file", path=Subpath("/data/input"))
.capability("write_file", path=Subpath("/data/output"))
.ttl(3600)
.mint(control_key)
)
# Configure worker interceptor with full PoP verification
interceptor = TenuoInterceptor(
TenuoInterceptorConfig(
key_resolver=EnvKeyResolver(),
on_denial="raise",
trusted_roots=[control_key.public_key], # enables Authorizer + PoP
)
)
# Start worker with interceptor and sandbox passthrough
from temporalio.worker.workflow_sandbox import (
SandboxedWorkflowRunner, SandboxRestrictions,
)
async with Worker(
client,
task_queue="data-processing",
workflows=[DataProcessingWorkflow],
activities=[read_file, write_file],
interceptors=[interceptor],
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules(
"tenuo", "tenuo_core", # Required for PoP signing
)
),
):
# Set warrant headers, then execute workflow
client_interceptor.set_headers(
tenuo_headers(warrant, "agent-key-1", agent_key)
)
result = await client.execute_workflow(
DataProcessingWorkflow.run,
args=["/data/input/report.txt", "/data/output/report.txt"],
id="process-001",
task_queue="data-processing",
)
Important:
tenuoandtenuo_coremust be configured as passthrough modules in Temporal’s workflow sandbox. Without this, PoP verification will fail withImportError: PyO3 modules compiled for CPython 3.8 or older may only be initialized once per interpreter process.
What happens:
TenuoClientInterceptorinjects warrant + signing key into workflow headers- Workflow inbound interceptor extracts Tenuo headers and propagates them to activities via Temporal’s header mechanism
- Each
self.execute_authorized_activity()call computes a PoP signature viawarrant.sign() - Activity inbound interceptor reads the warrant, PoP, and signing key from activity headers
Authorizer.authorize()verifies chain, expiry, capabilities, constraints, and PoP- Activity executes only if all checks pass
This works in both single-process demos and distributed deployments where client and worker run in separate processes.
Configuration
Interceptor Config
from tenuo.temporal import TenuoInterceptorConfig
config = TenuoInterceptorConfig(
key_resolver=EnvKeyResolver(), # Required: key resolution strategy
on_denial="raise", # "raise" | "log" | "skip"
trusted_roots=[control_key.public_key], # Enables Authorizer + PoP verification
require_warrant=True, # Fail-closed: deny if no warrant
block_local_activities=True, # Prevent local activity bypass
redact_args_in_logs=True, # Prevent secret leaks in logs
max_chain_depth=10, # Max delegation depth
audit_callback=on_audit, # Optional audit event handler
metrics=TenuoMetrics(), # Optional Prometheus metrics
)
Key Resolvers
Development: EnvKeyResolver
from tenuo.temporal import EnvKeyResolver
import os
import base64
# Set environment variable
os.environ["TENUO_KEY_agent1"] = base64.b64encode(
agent_key.secret_key_bytes()
).decode()
resolver = EnvKeyResolver() # Reads from TENUO_KEY_{key_id}
Production: VaultKeyResolver
from tenuo.temporal import VaultKeyResolver
resolver = VaultKeyResolver(
url="https://vault.company.com:8200",
mount="secret",
path_template="tenuo/keys/{key_id}",
cache_ttl=300, # 5 minute cache
)
Fallback Chain: CompositeKeyResolver
from tenuo.temporal import CompositeKeyResolver
resolver = CompositeKeyResolver([
VaultKeyResolver(url="https://vault.prod"), # Try Vault first
EnvKeyResolver(), # Fallback to env vars
])
Proof-of-Possession
When trusted_roots is configured, Tenuo enforces PoP verification for all activity executions. The challenge is a CBOR-serialized tuple of (warrant_id, tool, sorted_args, window_ts) signed with the holder’s Ed25519 key.
Two patterns for PoP
AuthorizedWorkflow (recommended) validates headers at workflow start and provides self.execute_authorized_activity():
@workflow.defn
class MyWorkflow(AuthorizedWorkflow):
@workflow.run
async def run(self, path: str) -> str:
return await self.execute_authorized_activity(
read_file,
args=[path],
start_to_close_timeout=timedelta(seconds=30),
)
tenuo_execute_activity() is a free function for advanced use cases (multi-warrant workflows, per-stage delegation) where you need explicit control:
from tenuo.temporal import tenuo_execute_activity
@workflow.defn
class PipelineWorkflow:
@workflow.run
async def run(self, path: str) -> str:
return await tenuo_execute_activity(
read_file,
args=[path],
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3),
)
Both automatically sign PoP challenges — you never need to call warrant.sign() directly in Temporal workflows.
PoP Challenge Format
The PoP signature is computed deterministically by the Rust core:
domain_context = b"tenuo-pop-v1"
window_ts = (unix_now // 30) * 30 # 30-second bucket
challenge_data = CBOR( (warrant_id, tool, sorted_args, window_ts) )
preimage = domain_context || challenge_data
signature = Ed25519.sign(signing_key, preimage) # 64 bytes
In Python, this is a single call:
pop_signature = warrant.sign(signing_key, "read_file", {"path": "/data/file.txt"})
# Returns 64 raw bytes; valid for 4 windows (2 minutes)
tenuo_execute_activity() handles this automatically. You only need warrant.sign() directly if building custom tooling outside of Temporal.
Child Workflow Delegation
Attenuate warrants when spawning child workflows with tenuo_execute_child_workflow():
from tenuo.temporal import tenuo_execute_child_workflow
@workflow.defn
class ParentWorkflow:
@workflow.run
async def run(self) -> str:
# Parent has: read_file + write_file
# Child gets only read_file with reduced TTL
result = await tenuo_execute_child_workflow(
ChildWorkflow.run,
tools=["read_file"], # Subset of parent tools
ttl_seconds=60, # Shorter than parent
args=["/data/input"],
id=f"child-{workflow.info().workflow_id}",
task_queue=workflow.info().task_queue,
)
return result
The wrapper calls attenuated_headers() internally and injects the attenuated warrant via the outbound workflow interceptor — Temporal’s execute_child_workflow() does not accept a headers kwarg directly.
Delegation Chain Verification
When warrants are attenuated, the full delegation chain is propagated via the x-tenuo-warrant-chain header. The activity interceptor calls Authorizer.check_chain() to verify every link in the chain back to a trusted root, ensuring no intermediate warrant was forged or widened.
Signal & Update Authorization
Control which signals and workflow updates are allowed:
config = TenuoInterceptorConfig(
key_resolver=EnvKeyResolver(),
on_denial="raise",
trusted_roots=[control_key.public_key],
authorized_signals=["approve", "reject"], # Only these signals allowed
authorized_updates=["update_config"], # Only these updates allowed
)
Unrecognized signals raise ConstraintViolation. Unrecognized updates are rejected at the validator stage before the handler runs. When set to None (default), all signals and updates pass through for backward compatibility.
Nexus Operation Headers
When starting Nexus operations from a Tenuo-protected workflow, the outbound interceptor automatically propagates warrant headers to the Nexus service. Headers are base64-encoded into Nexus’s string-based header format.
PoP Replay Protection
The activity interceptor maintains an in-memory dedup cache to detect replayed PoP signatures within the same time window. Each unique (warrant, tool, args, workflow_id, activity_id) combination is tracked. Retries (attempt > 1) bypass dedup since Temporal legitimately re-delivers the same activity. The cache is periodically evicted (every 60 seconds) to prevent unbounded memory growth.
Decorators
@tool() - Activity-to-Tool Mapping
Map activity names to different tool names in warrants:
from tenuo.temporal import tool
@activity.defn
@tool("read_file")
async def fetch_document(doc_id: str) -> str:
"""Activity name is 'fetch_document', warrant checks 'read_file'."""
return await storage.get(doc_id)
@unprotected - Local Activities
Mark activities as safe for local execution (bypass interceptor):
from tenuo.temporal import unprotected
@activity.defn
@unprotected
async def get_config_value(key: str) -> str:
"""Internal config lookup - no warrant needed."""
return config[key]
# Can be used as local activity
await workflow.execute_local_activity(
get_config_value,
args=["database_url"],
)
Audit Events
Subscribe to authorization decisions:
from tenuo.temporal import TemporalAuditEvent
def on_audit(event: TemporalAuditEvent):
if event.decision == "ALLOW":
logger.info(
f"Allowed: {event.tool} in {event.workflow_type} "
f"(warrant: {event.warrant_id})"
)
else:
logger.warning(
f"Denied: {event.tool} in {event.workflow_type} - "
f"{event.denial_reason}"
)
config = TenuoInterceptorConfig(
key_resolver=resolver,
audit_callback=on_audit,
audit_allow=True, # Log allowed actions
audit_deny=True, # Log denied actions
)
Observability
Prometheus Metrics
from tenuo.temporal import TenuoMetrics
metrics = TenuoMetrics(prefix="tenuo_temporal")
config = TenuoInterceptorConfig(
key_resolver=resolver,
metrics=metrics,
)
# Exposes metrics at /metrics:
# - tenuo_temporal_activities_authorized_total{tool, workflow_type}
# - tenuo_temporal_activities_denied_total{tool, reason, workflow_type}
# - tenuo_temporal_authorization_latency_seconds_bucket{tool}
Security Model
Fail-Closed by Default
All security checks default to deny:
| Check | Missing/Invalid | Behavior |
|---|---|---|
| Warrant header | Missing | Denied (require_warrant=True) |
| Warrant expired | Expired | Raises WarrantExpired |
| Tool not in warrant | Not allowed | Raises ConstraintViolation |
| Constraint violated | Args don’t match | Raises ConstraintViolation |
| PoP signature | Missing/invalid | Raises PopVerificationError |
| Local activity | Protected function | Raises LocalActivityError |
Replay Safety
PoP challenges use 30-second time-window bucketing (floor(unix_now / 30) * 30) for replay tolerance. Signatures remain valid for 4 windows (2 minutes). The tenuo_execute_activity() helper handles PoP signing inside the workflow sandbox, and the workflow outbound interceptor injects signed headers into Temporal’s native activity header propagation. This means authorization works correctly even in distributed deployments where the client and worker run in separate processes.
Exceptions
All exceptions include error_code for wire format compatibility:
from tenuo.temporal import (
ConstraintViolation, # error_code: "CONSTRAINT_VIOLATED"
WarrantExpired, # error_code: "WARRANT_EXPIRED"
ChainValidationError, # error_code: "CHAIN_INVALID"
PopVerificationError, # error_code: "POP_VERIFICATION_FAILED"
LocalActivityError, # error_code: "LOCAL_ACTIVITY_BLOCKED"
KeyResolutionError, # error_code: "KEY_NOT_FOUND"
)
Best Practices
- Use AuthorizedWorkflow as your base class for fail-fast validation and automatic PoP
- Use tenuo_execute_activity() for advanced multi-warrant or delegation patterns
- Always configure passthrough modules (
tenuo,tenuo_core) in the workflow sandbox - Set up VaultKeyResolver for production key management
- Enable audit logging to track authorization decisions
- Use @unprotected sparingly - only for truly internal operations
- Attenuate warrants for child workflows to enforce least privilege
- Keep TTLs short for sensitive operations (minutes, not hours)
Examples
| Example | Description |
|---|---|
authorized_workflow_demo.py |
Recommended starting point. AuthorizedWorkflow base class with parallel reads and fail-fast validation |
demo.py |
Lower-level tenuo_execute_activity() API with sequential + parallel reads |
multi_warrant.py |
Multi-tenant isolation: separate warrants per workflow |
delegation.py |
Per-stage pipeline authorization with least-privilege warrants |
Per-Stage Pipeline (from delegation.py)
Each pipeline stage gets its own tightly-scoped warrant:
# Ingest warrant: read-only
ingest_warrant = (
Warrant.mint_builder()
.holder(ingest_key.public_key)
.capability("read_file", path=Subpath("/data/source"))
.capability("list_directory", path=Subpath("/data/source"))
.ttl(600)
.mint(control_key)
)
# Transform warrant: write-only
transform_warrant = (
Warrant.mint_builder()
.holder(transform_key.public_key)
.capability("write_file", path=Subpath("/data/output"), content=Pattern("*"))
.ttl(600)
.mint(control_key)
)
# Switch warrant between stages
client_interceptor.set_headers(tenuo_headers(ingest_warrant, "ingest", ingest_key))
data = await client.execute_workflow(IngestWorkflow.run, ...)
client_interceptor.set_headers(tenuo_headers(transform_warrant, "transform", transform_key))
await client.execute_workflow(TransformWorkflow.run, ...)
Comparison with Other Integrations
| Integration | Use Case | Durable | PoP | Delegation |
|---|---|---|---|---|
| OpenAI | Streaming agents | No | Optional | No |
| LangChain | Tool chains | No | Optional | Via context |
| CrewAI | Multi-agent crews | No | Yes (Tier 2) | Yes |
| Temporal | Long-running workflows | Yes | Mandatory | Yes |
Temporal integration is designed for workflows that may run for hours or days, with full replay support and durable state.