Python SDK
The official Python SDK for CntrlNode. Requires Python 3.9+ and supports both sync and async usage.
Install
pip install cntrlnode-sdk
# or
uv add cntrlnode-sdk
# or
poetry add cntrlnode-sdk
Connect
from cntrlnode import CntrlNodeClient
client = CntrlNodeClient(
base_url="http://localhost:7474",
api_key="your-api-key", # optional — omit if auth is disabled
)
All methods are async. Use asyncio.run() or your framework's event loop.
State Store
import asyncio
from cntrlnode import CntrlNodeClient
client = CntrlNodeClient(base_url="http://localhost:7474")
async def main():
# Set a value (ttl=0 means no expiry)
await client.state.set("workflow-1", "results", {"summary": "Q4 up 12%"}, ttl=3600)
# Get a value
entry = await client.state.get("workflow-1", "results")
print(entry["value"]) # {"summary": "Q4 up 12%"}
print(entry["version"]) # 1
# Delete
await client.state.delete("workflow-1", "results")
# List all keys
result = await client.state.list("workflow-1")
print(result["keys"])
# Optimistic write
entry = await client.state.get("workflow-1", "counter")
await client.state.set_with_version(
"workflow-1", "counter",
entry["value"] + 1,
entry["version"]
)
asyncio.run(main())
Task Bus
async def tasks_example():
# Submit a task
task = await client.tasks.submit(
workflow_id="workflow-1",
agent_id="researcher-1",
payload={"query": "Summarise Q4 report"},
)
print(task["id"], task["status"]) # "2Kx..." "SUBMITTED"
# Submit a child task
child = await client.tasks.submit(
workflow_id="workflow-1",
agent_id="writer-1",
parent_id=task["id"],
payload={"draft": True},
)
# Idempotent submission
task = await client.tasks.submit(
workflow_id="workflow-1",
agent_id="researcher-1",
idempotency_key="research-q4-2024",
payload={},
)
# Get status
t = await client.tasks.get(task["id"])
print(t["status"])
# List workflow tasks
result = await client.tasks.list("workflow-1")
for t in result["tasks"]:
print(t["id"], t["status"])
# Cancel (recursively)
await client.tasks.cancel(task["id"])
# Lifecycle (called by the agent)
await client.tasks.accept(task["id"])
await client.tasks.complete(task["id"], result={"answer": "done"})
await client.tasks.fail(task["id"], "Connection timed out")
Agent Registry
async def registry_example():
# Register
await client.registry.register(
agent_id="researcher-1",
tags=["research", "web-search", "pdf"],
model="claude-sonnet-4-5",
endpoint="http://my-agent:8080",
max_concurrency=3,
)
# Heartbeat
await client.registry.heartbeat("researcher-1")
# Discover by tags
result = await client.registry.discover(tags=["research", "pdf"])
for agent in result["agents"]:
print(agent["id"], agent["healthy"])
# Semantic discovery (requires Ollama)
result = await client.registry.discover(
query="agent that reads PDFs and extracts tables",
top_k=3,
)
# List all
result = await client.registry.list()
# Get one
agent = await client.registry.get("researcher-1")
# Deregister
await client.registry.deregister("researcher-1")
Complete agent example
import asyncio
from cntrlnode import CntrlNodeClient
client = CntrlNodeClient(base_url="http://localhost:7474")
async def run_agent():
await client.registry.register(
agent_id="my-agent",
tags=["summarize"],
)
while True:
result = await client.tasks.list("workflow-1")
mine = [
t for t in result["tasks"]
if t["agent_id"] == "my-agent" and t["status"] == "SUBMITTED"
]
for task in mine:
await client.tasks.accept(task["id"])
try:
result = await do_work(task["payload"])
await client.tasks.complete(task["id"], result)
except Exception as e:
await client.tasks.fail(task["id"], str(e))
await asyncio.sleep(2)
asyncio.run(run_agent())
Using with LangChain / LangGraph
from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph
from cntrlnode import CntrlNodeClient
cn = CntrlNodeClient(base_url="http://localhost:7474")
llm = ChatAnthropic(model="claude-sonnet-4-5")
async def researcher_node(state):
# Share findings with other agents via CntrlNode state
result = await llm.ainvoke(state["messages"])
await cn.state.set(state["workflow_id"], "research-output", result.content)
return {"messages": [result]}
Using with CrewAI
from crewai import Agent, Task, Crew
from cntrlnode import CntrlNodeClient
cn = CntrlNodeClient(base_url="http://localhost:7474")
researcher = Agent(
role="Researcher",
goal="Research the topic",
backstory="You are a research expert",
llm="anthropic/claude-sonnet-4-5",
)
async def after_task(task_output):
await cn.state.set("crew-wf-1", "research-done", task_output.raw)