Policy Engine
Composable policies evaluated at dispatch time. The PolicyEnforcer sits between the scheduler's grant decision and task execution, checking each policy in order and short-circuiting on the first rejection.
Quick Setup
from loco import (
AsyncLOCOScheduler, Agent, SharedResource,
PolicyEnforcer, BudgetPolicy, AccessPolicy, RatePolicy,
)
enforcer = PolicyEnforcer([
BudgetPolicy(default_limit=50.0),
AccessPolicy(rules={"analyst": {"labels": ["public", "internal"]}}),
RatePolicy(limits={"batch": 10.0}, period=60.0),
])
scheduler = AsyncLOCOScheduler(
agents=[Agent(agent_id="analyst"), Agent(agent_id="batch")],
resource=SharedResource("llm_api", capacity=3),
enforcer=enforcer,
)
How It Works
Policies are checked after the resource is acquired but before work executes -- the same point where budget checks ran in v0.2. The enforcer evaluates policies in order and short-circuits on the first rejection:
- Agent wins a resource slot via L(i) scoring
- PolicyEnforcer runs each policy's
check()in order - First rejection: resource is released, next waiter gets the slot,
PolicyViolationErrorraised - All pass: work proceeds,
record()called on each policy after release
Built-in Policies
BudgetPolicy
Per-agent spend limits. Same behavior as v0.2's BudgetManager (which is now an alias).
from loco import BudgetPolicy
budget = BudgetPolicy(default_limit=100.0, on_exceeded="reject")
budget.set_limit("expensive-agent", max_cost=50.0)
Three enforcement modes: "reject" (raise), "alert" (allow + log), "downgrade" (allow + flag).
See Budget Management for full details.
AccessPolicy
Controls which agents can process tasks with specific security labels. Open by default -- agents not listed in rules are allowed.
from loco import AccessPolicy, SecurityLabel, Task
policy = AccessPolicy(rules={
"analyst": {"labels": ["public", "internal"]},
"auditor": {"labels": ["public", "internal", "confidential"]},
})
# Analyst can process public/internal tasks but not confidential
# Auditor can process anything
# Agents not in rules are allowed (open by default)
Requires tasks to have labels set:
task = Task(
weight=2.0,
labels={"input": SecurityLabel.CONFIDENTIAL, "output": SecurityLabel.INTERNAL},
)
RatePolicy
Per-agent request rate limits using a token bucket algorithm.
from loco import RatePolicy
policy = RatePolicy(
limits={"batch": 10.0, "realtime": 100.0},
period=60.0, # 10 requests per minute for batch, 100 for realtime
)
# Check remaining tokens
policy.remaining("batch") # 10.0 initially, decreases with each check
The bucket refills automatically over time. Burst capacity equals the limit.
Composing Policies
The PolicyEnforcer evaluates policies in order. Put fast/cheap checks first:
enforcer = PolicyEnforcer([
RatePolicy(limits={"batch": 10.0}), # fast: token bucket check
AccessPolicy(rules={...}), # fast: label lookup
BudgetPolicy(default_limit=100.0), # fast: spend comparison
])
Adding and Removing at Runtime
enforcer.add_policy(RatePolicy(limits={"new-agent": 5.0}))
enforcer.remove_policy("rate") # remove by policy name
enforcer.get_policy("budget") # look up by name
Summary
enforcer.summary()
# {"budget": {"analyst": {"limit": 50.0, "spent": 12.0, "remaining": 38.0}},
# "access": {"type": "AccessPolicy"},
# "rate": {"type": "RatePolicy"}}
Writing Custom Policies
Extend the Policy base class:
from loco.policy import Policy, PolicyViolationError
from loco.task import Task
class RegionPolicy(Policy):
"""Only allow agents in specific regions."""
name = "region"
def __init__(self, allowed_regions: dict[str, list[str]]):
self._regions = allowed_regions
def check(self, agent_id: str, task: Task) -> bool:
if agent_id in self._regions:
if task.task_type not in self._regions[agent_id]:
raise PolicyViolationError(
self.name, agent_id,
f"Task type {task.task_type!r} not allowed in region"
)
return True
def record(self, agent_id: str, task: Task) -> None:
# Optional: track per-region usage
pass
Backward Compatibility
The budget= parameter still works unchanged:
# v0.2 style -- still works
scheduler = AsyncLOCOScheduler(agents, resource, budget=budget)
# v0.3 style -- policy composition
scheduler = AsyncLOCOScheduler(agents, resource, enforcer=enforcer)
# Both together -- budget is added to enforcer automatically
scheduler = AsyncLOCOScheduler(agents, resource, budget=budget, enforcer=enforcer)
BudgetManager is an alias for BudgetPolicy. Existing code works without changes.
Exceptions
| Exception | When |
|---|---|
PolicyViolationError |
Any policy rejects a task |
BudgetExceededError |
Budget policy rejects (subclass of PolicyViolationError) |