Python SDK

The official Python SDK for CntrlNode. Requires Python 3.9+ and supports both sync and async usage.

Install

pip install cntrlnode-sdk
# or
uv add cntrlnode-sdk
# or
poetry add cntrlnode-sdk

Connect

from cntrlnode import CntrlNodeClient

client = CntrlNodeClient(
    base_url="http://localhost:7474",
    api_key="your-api-key",  # optional — omit if auth is disabled
)

All methods are async. Use asyncio.run() or your framework's event loop.


State Store

import asyncio
from cntrlnode import CntrlNodeClient

client = CntrlNodeClient(base_url="http://localhost:7474")

async def main():
    # Set a value (ttl=0 means no expiry)
    await client.state.set("workflow-1", "results", {"summary": "Q4 up 12%"}, ttl=3600)

    # Get a value
    entry = await client.state.get("workflow-1", "results")
    print(entry["value"])   # {"summary": "Q4 up 12%"}
    print(entry["version"]) # 1

    # Delete
    await client.state.delete("workflow-1", "results")

    # List all keys
    result = await client.state.list("workflow-1")
    print(result["keys"])

    # Optimistic write
    entry = await client.state.get("workflow-1", "counter")
    await client.state.set_with_version(
        "workflow-1", "counter",
        entry["value"] + 1,
        entry["version"]
    )

asyncio.run(main())

Task Bus

async def tasks_example():
    # Submit a task
    task = await client.tasks.submit(
        workflow_id="workflow-1",
        agent_id="researcher-1",
        payload={"query": "Summarise Q4 report"},
    )
    print(task["id"], task["status"])  # "2Kx..." "SUBMITTED"

    # Submit a child task
    child = await client.tasks.submit(
        workflow_id="workflow-1",
        agent_id="writer-1",
        parent_id=task["id"],
        payload={"draft": True},
    )

    # Idempotent submission
    task = await client.tasks.submit(
        workflow_id="workflow-1",
        agent_id="researcher-1",
        idempotency_key="research-q4-2024",
        payload={},
    )

    # Get status
    t = await client.tasks.get(task["id"])
    print(t["status"])

    # List workflow tasks
    result = await client.tasks.list("workflow-1")
    for t in result["tasks"]:
        print(t["id"], t["status"])

    # Cancel (recursively)
    await client.tasks.cancel(task["id"])

    # Lifecycle (called by the agent)
    await client.tasks.accept(task["id"])
    await client.tasks.complete(task["id"], result={"answer": "done"})
    await client.tasks.fail(task["id"], "Connection timed out")

Agent Registry

async def registry_example():
    # Register
    await client.registry.register(
        agent_id="researcher-1",
        tags=["research", "web-search", "pdf"],
        model="claude-sonnet-4-5",
        endpoint="http://my-agent:8080",
        max_concurrency=3,
    )

    # Heartbeat
    await client.registry.heartbeat("researcher-1")

    # Discover by tags
    result = await client.registry.discover(tags=["research", "pdf"])
    for agent in result["agents"]:
        print(agent["id"], agent["healthy"])

    # Semantic discovery (requires Ollama)
    result = await client.registry.discover(
        query="agent that reads PDFs and extracts tables",
        top_k=3,
    )

    # List all
    result = await client.registry.list()

    # Get one
    agent = await client.registry.get("researcher-1")

    # Deregister
    await client.registry.deregister("researcher-1")

Complete agent example

import asyncio
from cntrlnode import CntrlNodeClient

client = CntrlNodeClient(base_url="http://localhost:7474")

async def run_agent():
    await client.registry.register(
        agent_id="my-agent",
        tags=["summarize"],
    )

    while True:
        result = await client.tasks.list("workflow-1")
        mine = [
            t for t in result["tasks"]
            if t["agent_id"] == "my-agent" and t["status"] == "SUBMITTED"
        ]

        for task in mine:
            await client.tasks.accept(task["id"])
            try:
                result = await do_work(task["payload"])
                await client.tasks.complete(task["id"], result)
            except Exception as e:
                await client.tasks.fail(task["id"], str(e))

        await asyncio.sleep(2)

asyncio.run(run_agent())

Using with LangChain / LangGraph

from langchain_anthropic import ChatAnthropic
from langgraph.graph import StateGraph
from cntrlnode import CntrlNodeClient

cn = CntrlNodeClient(base_url="http://localhost:7474")
llm = ChatAnthropic(model="claude-sonnet-4-5")

async def researcher_node(state):
    # Share findings with other agents via CntrlNode state
    result = await llm.ainvoke(state["messages"])
    await cn.state.set(state["workflow_id"], "research-output", result.content)
    return {"messages": [result]}

Using with CrewAI

from crewai import Agent, Task, Crew
from cntrlnode import CntrlNodeClient

cn = CntrlNodeClient(base_url="http://localhost:7474")

researcher = Agent(
    role="Researcher",
    goal="Research the topic",
    backstory="You are a research expert",
    llm="anthropic/claude-sonnet-4-5",
)

async def after_task(task_output):
    await cn.state.set("crew-wf-1", "research-done", task_output.raw)