Layer 5: Agent Harness Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Build the daemon + Claude Code skill that turns Claude Code into an IRC-native AI agent with supervisor oversight and webhook alerting.

Architecture: Each agent is an independent daemon process (Python asyncio) that maintains an IRC connection, manages a Claude Code subprocess, and runs a Sonnet 4.6 supervisor via Agent SDK. The Claude Code process gets IRC tools via a skill installed at ~/.claude/skills/irc/, communicating with the daemon over a Unix socket using JSON Lines.

Tech Stack: Python 3.12+ asyncio, Claude Agent SDK (anthropic), PyYAML, existing protocol/ and server/ layers.

Spec: docs/superpowers/specs/2026-03-21-layer5-agent-harness-design.md


File Structure

clients/
└── claude/
    ├── __init__.py              # Package marker
    ├── __main__.py              # CLI entry point (culture start/stop)
    ├── config.py                # YAML config loading: DaemonConfig, AgentConfig, etc.
    ├── ipc.py                   # IPC message types, JSON Lines encode/decode
    ├── irc_transport.py         # Async IRC client: connect, register, join, buffer
    ├── message_buffer.py        # Per-channel ring buffer with read-cursor tracking
    ├── socket_server.py         # Unix socket server for skill IPC
    ├── webhook.py               # HTTP POST + IRC #alerts dual delivery
    ├── agent_runner.py          # Claude Agent SDK session lifecycle (query, resume, prompt queue)
    ├── supervisor.py            # Sonnet 4.6 supervisor via Agent SDK
    ├── daemon.py                # Main orchestrator tying all components together
    └── skill/                   # Claude Code skill (installed to ~/.claude/skills/irc/)
        ├── SKILL.md             # Skill definition: tool descriptions, usage
        └── irc_client.py        # Standalone CLI: connects to daemon socket, runs tools

tests/
├── test_daemon_config.py        # Config loading tests
├── test_ipc.py                  # IPC protocol encode/decode tests
├── test_irc_transport.py        # IRC transport against real server
├── test_message_buffer.py       # Ring buffer + cursor tests
├── test_socket_server.py        # Unix socket server/client tests
├── test_webhook.py              # Webhook delivery tests
├── test_agent_runner.py         # Claude Code process management tests
├── test_supervisor.py           # Supervisor evaluation logic tests
├── test_daemon.py               # Daemon orchestrator integration tests
└── test_skill_client.py         # Skill IPC client tests

docs/
└── clients/
    └── claude/
        ├── overview.md
        ├── irc-tools.md
        ├── supervisor.md
        ├── context-management.md
        ├── webhooks.md
        └── configuration.md

Task 1: Dependencies and Config

Files:

  • Modify: pyproject.toml
  • Create: clients/__init__.py
  • Create: clients/claude/__init__.py
  • Create: clients/claude/config.py
  • Create: tests/test_daemon_config.py

This task adds required dependencies and implements YAML config loading. The config is the foundation — every other component reads from it.

  • Step 1: Write failing tests for config loading
# tests/test_daemon_config.py
import pytest
import tempfile
import os
from pathlib import Path


def test_load_config_from_yaml():
    """Load a complete agents.yaml and verify all fields parse."""
    from culture.clients.claude.config import load_config

    yaml_content = """\
server:
  host: 127.0.0.1
  port: 6667

supervisor:
  model: claude-sonnet-4-6
  thinking: medium
  window_size: 20
  eval_interval: 5
  escalation_threshold: 3

webhooks:
  url: "https://example.com/webhook"
  irc_channel: "#alerts"
  events:
    - agent_spiraling
    - agent_error

buffer_size: 300

agents:
  - nick: spark-culture
    directory: /tmp/test
    channels:
      - "#general"
      - "#dev"
    model: claude-opus-4-6
    thinking: medium
"""
    with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
        f.write(yaml_content)
        f.flush()
        try:
            config = load_config(f.name)
            assert config.server.host == "127.0.0.1"
            assert config.server.port == 6667
            assert config.supervisor.model == "claude-sonnet-4-6"
            assert config.supervisor.window_size == 20
            assert config.supervisor.eval_interval == 5
            assert config.supervisor.escalation_threshold == 3
            assert config.webhooks.url == "https://example.com/webhook"
            assert config.webhooks.irc_channel == "#alerts"
            assert len(config.webhooks.events) == 2
            assert config.buffer_size == 300
            assert len(config.agents) == 1
            agent = config.agents[0]
            assert agent.nick == "spark-culture"
            assert agent.directory == "/tmp/test"
            assert agent.channels == ["#general", "#dev"]
            assert agent.model == "claude-opus-4-6"
            assert agent.thinking == "medium"
        finally:
            os.unlink(f.name)


def test_load_config_defaults():
    """Missing optional fields get defaults."""
    from culture.clients.claude.config import load_config

    yaml_content = """\
agents:
  - nick: spark-culture
    directory: /tmp
    channels:
      - "#general"
"""
    with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
        f.write(yaml_content)
        f.flush()
        try:
            config = load_config(f.name)
            assert config.server.host == "0.0.0.0"
            assert config.server.port == 6667
            assert config.supervisor.model == "claude-sonnet-4-6"
            assert config.supervisor.thinking == "medium"
            assert config.supervisor.window_size == 20
            assert config.supervisor.eval_interval == 5
            assert config.supervisor.escalation_threshold == 3
            assert config.webhooks.url is None
            assert config.webhooks.irc_channel == "#alerts"
            assert config.buffer_size == 500
            agent = config.agents[0]
            assert agent.model == "claude-opus-4-6"
            assert agent.thinking == "medium"
        finally:
            os.unlink(f.name)


def test_get_agent_by_nick():
    """Look up an agent config by nick."""
    from culture.clients.claude.config import load_config

    yaml_content = """\
agents:
  - nick: spark-culture
    directory: /tmp/a
    channels: ["#general"]
  - nick: spark-assimilai
    directory: /tmp/b
    channels: ["#dev"]
"""
    with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
        f.write(yaml_content)
        f.flush()
        try:
            config = load_config(f.name)
            agent = config.get_agent("spark-assimilai")
            assert agent is not None
            assert agent.directory == "/tmp/b"
            assert config.get_agent("nonexistent") is None
        finally:
            os.unlink(f.name)
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_daemon_config.py -v Expected: FAIL with ModuleNotFoundError: No module named 'clients'

  • Step 3: Add dependencies to pyproject.toml
# Add to [project] section:
dependencies = [
    "pyyaml>=6.0",
    "anthropic>=1.0",
]

# Update [tool.hatch.build.targets.wheel]:
packages = ["protocol", "server", "clients"]
  • Step 4: Create package init files
# clients/__init__.py
# (empty)

# clients/claude/__init__.py
# (empty)
  • Step 5: Implement config.py
# clients/claude/config.py
from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path

import yaml


@dataclass
class ServerConnConfig:
    """IRC server connection settings."""
    host: str = "0.0.0.0"
    port: int = 6667


@dataclass
class SupervisorConfig:
    """Supervisor sub-agent settings."""
    model: str = "claude-sonnet-4-6"
    thinking: str = "medium"
    window_size: int = 20
    eval_interval: int = 5
    escalation_threshold: int = 3


@dataclass
class WebhookConfig:
    """Webhook alerting settings."""
    url: str | None = None
    irc_channel: str = "#alerts"
    events: list[str] = field(default_factory=lambda: [
        "agent_spiraling", "agent_error", "agent_question",
        "agent_timeout", "agent_complete",
    ])


@dataclass
class AgentConfig:
    """Per-agent settings."""
    nick: str = ""
    directory: str = "."
    channels: list[str] = field(default_factory=lambda: ["#general"])
    model: str = "claude-opus-4-6"
    thinking: str = "medium"


@dataclass
class DaemonConfig:
    """Top-level daemon configuration."""
    server: ServerConnConfig = field(default_factory=ServerConnConfig)
    supervisor: SupervisorConfig = field(default_factory=SupervisorConfig)
    webhooks: WebhookConfig = field(default_factory=WebhookConfig)
    buffer_size: int = 500
    agents: list[AgentConfig] = field(default_factory=list)

    def get_agent(self, nick: str) -> AgentConfig | None:
        for agent in self.agents:
            if agent.nick == nick:
                return agent
        return None


def load_config(path: str | Path) -> DaemonConfig:
    """Load daemon config from a YAML file."""
    with open(path) as f:
        raw = yaml.safe_load(f) or {}

    server = ServerConnConfig(**raw.get("server", {}))
    supervisor = SupervisorConfig(**raw.get("supervisor", {}))

    webhooks_raw = raw.get("webhooks", {})
    webhooks = WebhookConfig(**webhooks_raw) if webhooks_raw else WebhookConfig()

    agents = []
    for agent_raw in raw.get("agents", []):
        agents.append(AgentConfig(**agent_raw))

    return DaemonConfig(
        server=server,
        supervisor=supervisor,
        webhooks=webhooks,
        buffer_size=raw.get("buffer_size", 500),
        agents=agents,
    )
  • Step 6: Run tests to verify they pass

Run: uv run pytest tests/test_daemon_config.py -v Expected: All 3 tests PASS

  • Step 7: Commit
git add clients/ tests/test_daemon_config.py pyproject.toml
git commit -m "feat(layer5): add daemon config loading from YAML"

Task 2: IPC Protocol

Files:

  • Create: clients/claude/ipc.py
  • Create: tests/test_ipc.py

JSON Lines encode/decode for daemon ↔ skill communication. Foundation for socket server and skill client.

  • Step 1: Write failing tests for IPC encoding/decoding
# tests/test_ipc.py
import json
import uuid

from culture.clients.claude.ipc import (
    encode_message,
    decode_message,
    make_request,
    make_response,
    make_whisper,
    MSG_TYPE_RESPONSE,
    MSG_TYPE_WHISPER,
)


def test_encode_decode_roundtrip():
    """A message survives encode → decode."""
    msg = {"type": "irc_send", "id": "abc", "channel": "#general", "message": "hello"}
    line = encode_message(msg)
    assert line.endswith(b"\n")
    decoded = decode_message(line)
    assert decoded == msg


def test_make_request_has_uuid():
    """make_request generates a unique ID."""
    req = make_request("irc_send", channel="#general", message="hi")
    assert req["type"] == "irc_send"
    assert "id" in req
    # Verify it's a valid UUID
    uuid.UUID(req["id"])
    assert req["channel"] == "#general"
    assert req["message"] == "hi"


def test_make_response():
    """make_response creates a response tied to a request ID."""
    resp = make_response("abc123", ok=True, data={"messages": []})
    assert resp["type"] == MSG_TYPE_RESPONSE
    assert resp["id"] == "abc123"
    assert resp["ok"] is True
    assert resp["data"] == {"messages": []}


def test_make_response_error():
    """make_response with error."""
    resp = make_response("abc123", ok=False, error="channel not found")
    assert resp["ok"] is False
    assert resp["error"] == "channel not found"


def test_make_whisper():
    """make_whisper creates a supervisor whisper."""
    w = make_whisper("You're spiraling", "CORRECTION")
    assert w["type"] == MSG_TYPE_WHISPER
    assert w["message"] == "You're spiraling"
    assert w["whisper_type"] == "CORRECTION"


def test_decode_ignores_blank_lines():
    """Blank or whitespace-only lines return None."""
    assert decode_message(b"\n") is None
    assert decode_message(b"  \n") is None
    assert decode_message(b"") is None
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_ipc.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Implement ipc.py
# clients/claude/ipc.py
from __future__ import annotations

import json
import uuid
from typing import Any

MSG_TYPE_RESPONSE = "response"
MSG_TYPE_WHISPER = "whisper"


def encode_message(msg: dict[str, Any]) -> bytes:
    """Encode a message as a JSON line (newline-terminated bytes)."""
    return json.dumps(msg, separators=(",", ":")).encode() + b"\n"


def decode_message(line: bytes) -> dict[str, Any] | None:
    """Decode a JSON line into a message dict. Returns None for blank lines."""
    stripped = line.strip()
    if not stripped:
        return None
    return json.loads(stripped)


def make_request(msg_type: str, **kwargs: Any) -> dict[str, Any]:
    """Create a request message with a unique ID."""
    return {"type": msg_type, "id": str(uuid.uuid4()), **kwargs}


def make_response(
    request_id: str,
    ok: bool = True,
    data: Any = None,
    error: str | None = None,
) -> dict[str, Any]:
    """Create a response message tied to a request ID."""
    msg: dict[str, Any] = {"type": MSG_TYPE_RESPONSE, "id": request_id, "ok": ok}
    if data is not None:
        msg["data"] = data
    if error is not None:
        msg["error"] = error
    return msg


def make_whisper(message: str, whisper_type: str) -> dict[str, Any]:
    """Create a supervisor whisper message."""
    return {"type": MSG_TYPE_WHISPER, "message": message, "whisper_type": whisper_type}
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_ipc.py -v Expected: All 6 tests PASS

  • Step 5: Commit
git add clients/claude/ipc.py tests/test_ipc.py
git commit -m "feat(layer5): add IPC protocol for daemon-skill communication"

Task 3: Message Buffer

Files:

  • Create: clients/claude/message_buffer.py
  • Create: tests/test_message_buffer.py

Per-channel ring buffer with read-cursor tracking. Separated from IRC transport for testability.

  • Step 1: Write failing tests
# tests/test_message_buffer.py
import time
from culture.clients.claude.message_buffer import MessageBuffer, BufferedMessage


def test_add_and_read():
    """Add messages, read them back."""
    buf = MessageBuffer(max_per_channel=100)
    buf.add("#general", "spark-ori", "hello")
    buf.add("#general", "spark-culture", "hi there")

    msgs = buf.read("#general", limit=50)
    assert len(msgs) == 2
    assert msgs[0].nick == "spark-ori"
    assert msgs[0].text == "hello"
    assert msgs[1].nick == "spark-culture"


def test_read_returns_since_last_read():
    """Second read only returns new messages."""
    buf = MessageBuffer(max_per_channel=100)
    buf.add("#general", "a", "msg1")
    buf.add("#general", "b", "msg2")

    msgs1 = buf.read("#general", limit=50)
    assert len(msgs1) == 2

    buf.add("#general", "c", "msg3")
    msgs2 = buf.read("#general", limit=50)
    assert len(msgs2) == 1
    assert msgs2[0].text == "msg3"


def test_read_empty_channel():
    """Reading a channel with no messages returns empty list."""
    buf = MessageBuffer(max_per_channel=100)
    assert buf.read("#empty", limit=50) == []


def test_ring_buffer_eviction():
    """Oldest messages are evicted when buffer is full."""
    buf = MessageBuffer(max_per_channel=5)
    for i in range(10):
        buf.add("#general", "bot", f"msg{i}")

    # Buffer holds last 5
    msgs = buf.read("#general", limit=100)
    assert len(msgs) == 5
    assert msgs[0].text == "msg5"
    assert msgs[-1].text == "msg9"


def test_limit_caps_results():
    """Limit parameter caps the number of returned messages."""
    buf = MessageBuffer(max_per_channel=100)
    for i in range(20):
        buf.add("#general", "bot", f"msg{i}")

    msgs = buf.read("#general", limit=5)
    # Returns the 5 most recent
    assert len(msgs) == 5
    assert msgs[0].text == "msg15"


def test_multiple_channels_independent():
    """Channels have independent buffers and cursors."""
    buf = MessageBuffer(max_per_channel=100)
    buf.add("#general", "a", "gen1")
    buf.add("#dev", "b", "dev1")

    gen_msgs = buf.read("#general", limit=50)
    assert len(gen_msgs) == 1
    assert gen_msgs[0].text == "gen1"

    dev_msgs = buf.read("#dev", limit=50)
    assert len(dev_msgs) == 1
    assert dev_msgs[0].text == "dev1"


def test_messages_have_timestamps():
    """Each buffered message has a timestamp."""
    buf = MessageBuffer(max_per_channel=100)
    before = time.time()
    buf.add("#general", "ori", "test")
    after = time.time()

    msgs = buf.read("#general", limit=1)
    assert before <= msgs[0].timestamp <= after
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_message_buffer.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Implement message_buffer.py
# clients/claude/message_buffer.py
from __future__ import annotations

import time
from collections import deque
from dataclasses import dataclass, field


@dataclass
class BufferedMessage:
    nick: str
    text: str
    timestamp: float


class MessageBuffer:
    """Per-channel ring buffer with read-cursor tracking."""

    def __init__(self, max_per_channel: int = 500):
        self.max_per_channel = max_per_channel
        self._buffers: dict[str, deque[BufferedMessage]] = {}
        self._cursors: dict[str, int] = {}  # channel -> index of next unread
        self._totals: dict[str, int] = {}   # channel -> total messages ever added

    def add(self, channel: str, nick: str, text: str) -> None:
        """Add a message to a channel's buffer."""
        if channel not in self._buffers:
            self._buffers[channel] = deque(maxlen=self.max_per_channel)
            self._totals[channel] = 0
            self._cursors[channel] = 0

        self._buffers[channel].append(
            BufferedMessage(nick=nick, text=text, timestamp=time.time())
        )
        self._totals[channel] += 1

    def read(self, channel: str, limit: int = 50) -> list[BufferedMessage]:
        """Read messages since last read, up to limit (most recent)."""
        buf = self._buffers.get(channel)
        if not buf:
            return []

        total = self._totals[channel]
        cursor = self._cursors.get(channel, 0)

        # How many new messages since last read
        new_count = total - cursor
        if new_count <= 0:
            return []

        # The buffer is a deque — new messages are at the end
        # We want the last new_count entries, capped by limit
        available = list(buf)
        new_messages = available[-new_count:] if new_count <= len(available) else available

        # Apply limit (return most recent)
        if len(new_messages) > limit:
            new_messages = new_messages[-limit:]

        # Advance cursor
        self._cursors[channel] = total

        return new_messages
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_message_buffer.py -v Expected: All 7 tests PASS

  • Step 5: Commit
git add clients/claude/message_buffer.py tests/test_message_buffer.py
git commit -m "feat(layer5): add per-channel message buffer with read cursors"

Task 4: IRC Transport

Files:

  • Create: clients/claude/irc_transport.py
  • Create: tests/test_irc_transport.py

Async IRC client that connects to the server, registers a nick, joins channels, and buffers incoming messages. Uses the existing protocol/message.py for parsing. Tests against real server instances using the existing conftest.py fixtures.

  • Step 1: Write failing tests
# tests/test_irc_transport.py
import asyncio
import pytest
import pytest_asyncio
from culture.clients.claude.irc_transport import IRCTransport
from culture.clients.claude.message_buffer import MessageBuffer


@pytest.mark.asyncio
async def test_connect_and_register(server):
    """Transport connects and registers nick."""
    buf = MessageBuffer()
    transport = IRCTransport(
        host="127.0.0.1",
        port=server.config.port,
        nick="testserv-bot",
        user="bot",
        channels=["#general"],
        buffer=buf,
    )
    await transport.connect()
    try:
        # Wait for registration to complete
        await asyncio.sleep(0.3)
        assert transport.connected
        assert "testserv-bot" in server.clients
    finally:
        await transport.disconnect()


@pytest.mark.asyncio
async def test_joins_channels(server):
    """Transport auto-joins configured channels after registration."""
    buf = MessageBuffer()
    transport = IRCTransport(
        host="127.0.0.1",
        port=server.config.port,
        nick="testserv-bot",
        user="bot",
        channels=["#general", "#dev"],
        buffer=buf,
    )
    await transport.connect()
    try:
        await asyncio.sleep(0.3)
        assert "#general" in server.channels
        assert "#dev" in server.channels
    finally:
        await transport.disconnect()


@pytest.mark.asyncio
async def test_buffers_incoming_messages(server, make_client):
    """Messages from other clients are buffered."""
    buf = MessageBuffer()
    transport = IRCTransport(
        host="127.0.0.1",
        port=server.config.port,
        nick="testserv-bot",
        user="bot",
        channels=["#general"],
        buffer=buf,
    )
    await transport.connect()
    await asyncio.sleep(0.3)

    # Another client joins and sends a message
    human = await make_client(nick="testserv-ori", user="ori")
    await human.send("JOIN #general")
    await human.recv_all(timeout=0.3)
    await human.send("PRIVMSG #general :hello bot")
    await asyncio.sleep(0.3)

    msgs = buf.read("#general", limit=50)
    assert any(m.text == "hello bot" and m.nick == "testserv-ori" for m in msgs)

    await transport.disconnect()


@pytest.mark.asyncio
async def test_send_privmsg(server, make_client):
    """Transport can send PRIVMSG to a channel."""
    buf = MessageBuffer()
    transport = IRCTransport(
        host="127.0.0.1",
        port=server.config.port,
        nick="testserv-bot",
        user="bot",
        channels=["#general"],
        buffer=buf,
    )
    await transport.connect()
    await asyncio.sleep(0.3)

    human = await make_client(nick="testserv-ori", user="ori")
    await human.send("JOIN #general")
    await human.recv_all(timeout=0.3)

    await transport.send_privmsg("#general", "hello human")
    response = await human.recv(timeout=2.0)
    assert "hello human" in response

    await transport.disconnect()


@pytest.mark.asyncio
async def test_send_join_part(server):
    """Transport can join and part channels dynamically."""
    buf = MessageBuffer()
    transport = IRCTransport(
        host="127.0.0.1",
        port=server.config.port,
        nick="testserv-bot",
        user="bot",
        channels=["#general"],
        buffer=buf,
    )
    await transport.connect()
    await asyncio.sleep(0.3)

    await transport.join_channel("#new")
    await asyncio.sleep(0.2)
    assert "#new" in server.channels

    await transport.part_channel("#new")
    await asyncio.sleep(0.2)
    # Channel removed when last member leaves
    assert "#new" not in server.channels

    await transport.disconnect()
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_irc_transport.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Implement irc_transport.py
# clients/claude/irc_transport.py
from __future__ import annotations

import asyncio
import logging
from typing import Callable

from culture.protocol.message import Message
from culture.clients.claude.message_buffer import MessageBuffer

logger = logging.getLogger(__name__)


class IRCTransport:
    """Async IRC client for the daemon. Connects, registers, joins, buffers."""

    def __init__(
        self,
        host: str,
        port: int,
        nick: str,
        user: str,
        channels: list[str],
        buffer: MessageBuffer,
        on_mention: Callable[[str, str, str], None] | None = None,
    ):
        self.host = host
        self.port = port
        self.nick = nick
        self.user = user
        self.channels = list(channels)
        self.buffer = buffer
        self.on_mention = on_mention
        self.connected = False
        self._reader: asyncio.StreamReader | None = None
        self._writer: asyncio.StreamWriter | None = None
        self._read_task: asyncio.Task | None = None
        self._reconnecting = False
        self._should_run = False

    async def connect(self) -> None:
        """Connect to IRC server, register, and join channels."""
        self._should_run = True
        await self._do_connect()

    async def _do_connect(self) -> None:
        """Internal connect with actual TCP setup."""
        self._reader, self._writer = await asyncio.open_connection(
            self.host, self.port
        )
        await self._send_raw(f"NICK {self.nick}")
        await self._send_raw(f"USER {self.user} 0 * :{self.user}")
        self._read_task = asyncio.create_task(self._read_loop())

    async def disconnect(self) -> None:
        """Disconnect from IRC server."""
        self._should_run = False
        if self._read_task:
            self._read_task.cancel()
            try:
                await self._read_task
            except asyncio.CancelledError:
                pass
        if self._writer:
            try:
                await self._send_raw("QUIT :daemon shutdown")
            except (ConnectionError, OSError):
                pass
            self._writer.close()
            try:
                await self._writer.wait_closed()
            except (ConnectionError, BrokenPipeError):
                pass
        self.connected = False

    async def send_privmsg(self, target: str, text: str) -> None:
        """Send a PRIVMSG to a channel or nick."""
        await self._send_raw(f"PRIVMSG {target} :{text}")

    async def join_channel(self, channel: str) -> None:
        """Join a channel."""
        await self._send_raw(f"JOIN {channel}")
        if channel not in self.channels:
            self.channels.append(channel)

    async def part_channel(self, channel: str) -> None:
        """Leave a channel."""
        await self._send_raw(f"PART {channel}")
        if channel in self.channels:
            self.channels.remove(channel)

    async def send_who(self, target: str) -> list[str]:
        """Send WHO and collect responses. Returns raw response lines."""
        # WHO responses handled in _read_loop, collected via future
        # For simplicity, this is fire-and-forget; the caller reads from buffer
        await self._send_raw(f"WHO {target}")

    async def _send_raw(self, line: str) -> None:
        """Send a raw IRC line."""
        if self._writer:
            self._writer.write(f"{line}\r\n".encode())
            await self._writer.drain()

    async def _read_loop(self) -> None:
        """Read IRC messages and dispatch."""
        buf = ""
        try:
            while True:
                data = await self._reader.read(4096)
                if not data:
                    break
                buf += data.decode("utf-8", errors="replace")
                buf = buf.replace("\r\n", "\n").replace("\r", "\n")
                while "\n" in buf:
                    line, buf = buf.split("\n", 1)
                    if line.strip():
                        msg = Message.parse(line)
                        await self._handle(msg)
        except asyncio.CancelledError:
            return
        except (ConnectionError, OSError):
            logger.warning("IRC connection lost")
        finally:
            self.connected = False
            if self._should_run and not self._reconnecting:
                asyncio.create_task(self._reconnect())

    async def _reconnect(self) -> None:
        """Reconnect with exponential backoff (1s, 2s, 4s, ..., max 60s)."""
        self._reconnecting = True
        delay = 1
        while self._should_run:
            logger.info("Reconnecting to IRC in %ds...", delay)
            await asyncio.sleep(delay)
            try:
                await self._do_connect()
                logger.info("Reconnected to IRC")
                self._reconnecting = False
                return
            except (ConnectionError, OSError):
                delay = min(delay * 2, 60)

    async def _handle(self, msg: Message) -> None:
        """Handle an incoming IRC message."""
        if msg.command == "PING":
            token = msg.params[0] if msg.params else ""
            await self._send_raw(f"PONG :{token}")

        elif msg.command == "001":
            # Welcome — registration complete, join channels
            self.connected = True
            for channel in self.channels:
                await self._send_raw(f"JOIN {channel}")

        elif msg.command == "PRIVMSG" and len(msg.params) >= 2:
            target = msg.params[0]
            text = msg.params[1]
            sender = msg.prefix.split("!")[0] if msg.prefix else "unknown"

            if sender == self.nick:
                return  # Ignore own messages

            if target.startswith("#"):
                # Channel message — buffer it
                self.buffer.add(target, sender, text)
            else:
                # DM — buffer under sender's nick as channel key
                self.buffer.add(f"DM:{sender}", sender, text)

            # Check for @mention
            if self.on_mention and f"@{self.nick}" in text:
                self.on_mention(target, sender, text)

        elif msg.command == "NOTICE" and len(msg.params) >= 2:
            # Buffer notices too (mention notifications, etc.)
            target = msg.params[0]
            text = msg.params[1]
            sender = msg.prefix.split("!")[0] if msg.prefix else "server"
            if target.startswith("#"):
                self.buffer.add(target, sender, text)
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_irc_transport.py -v Expected: All 5 tests PASS

  • Step 5: Commit
git add clients/claude/irc_transport.py tests/test_irc_transport.py
git commit -m "feat(layer5): add async IRC transport with message buffering"

Task 5: Socket Server

Files:

  • Create: clients/claude/socket_server.py
  • Create: tests/test_socket_server.py

Unix socket server that handles JSON Lines IPC between the daemon and the Claude Code skill. Routes incoming requests, delivers responses and whispers.

  • Step 1: Write failing tests
# tests/test_socket_server.py
import asyncio
import json
import os
import tempfile
import pytest

from culture.clients.claude.socket_server import SocketServer
from culture.clients.claude.ipc import encode_message, decode_message, make_request


@pytest.mark.asyncio
async def test_socket_server_accepts_connection():
    """Server starts and accepts a client connection."""
    sock_path = os.path.join(tempfile.mkdtemp(), "test.sock")
    handler_called = asyncio.Event()
    received_msgs = []

    async def handler(msg):
        received_msgs.append(msg)
        handler_called.set()
        return {"type": "response", "id": msg["id"], "ok": True}

    srv = SocketServer(sock_path, handler)
    await srv.start()
    try:
        reader, writer = await asyncio.open_unix_connection(sock_path)
        req = make_request("irc_channels")
        writer.write(encode_message(req))
        await writer.drain()

        # Read response
        data = await asyncio.wait_for(reader.readline(), timeout=2.0)
        resp = decode_message(data)
        assert resp["ok"] is True
        assert resp["id"] == req["id"]

        writer.close()
        await writer.wait_closed()
    finally:
        await srv.stop()
        os.unlink(sock_path)


@pytest.mark.asyncio
async def test_socket_server_sends_whisper():
    """Server can push an unsolicited whisper to connected clients."""
    sock_path = os.path.join(tempfile.mkdtemp(), "test.sock")

    async def handler(msg):
        return {"type": "response", "id": msg["id"], "ok": True}

    srv = SocketServer(sock_path, handler)
    await srv.start()
    try:
        reader, writer = await asyncio.open_unix_connection(sock_path)

        # Push a whisper
        await srv.send_whisper("You're spiraling", "CORRECTION")

        data = await asyncio.wait_for(reader.readline(), timeout=2.0)
        whisper = decode_message(data)
        assert whisper["type"] == "whisper"
        assert whisper["whisper_type"] == "CORRECTION"
        assert "spiraling" in whisper["message"]

        writer.close()
        await writer.wait_closed()
    finally:
        await srv.stop()
        os.unlink(sock_path)


@pytest.mark.asyncio
async def test_socket_permissions():
    """Socket file is created with 0600 permissions."""
    sock_path = os.path.join(tempfile.mkdtemp(), "test.sock")

    async def handler(msg):
        return {"type": "response", "id": msg["id"], "ok": True}

    srv = SocketServer(sock_path, handler)
    await srv.start()
    try:
        mode = os.stat(sock_path).st_mode & 0o777
        assert mode == 0o600
    finally:
        await srv.stop()
        os.unlink(sock_path)
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_socket_server.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Implement socket_server.py
# clients/claude/socket_server.py
from __future__ import annotations

import asyncio
import logging
import os
from typing import Any, Callable, Awaitable

from culture.clients.claude.ipc import encode_message, decode_message, make_whisper

logger = logging.getLogger(__name__)

RequestHandler = Callable[[dict[str, Any]], Awaitable[dict[str, Any]]]


class SocketServer:
    """Unix socket server for daemon ↔ skill IPC."""

    def __init__(self, path: str, handler: RequestHandler):
        self.path = path
        self.handler = handler
        self._server: asyncio.Server | None = None
        self._clients: list[asyncio.StreamWriter] = []

    async def start(self) -> None:
        """Start listening on the Unix socket."""
        # Remove stale socket
        if os.path.exists(self.path):
            os.unlink(self.path)

        self._server = await asyncio.start_unix_server(
            self._handle_client, path=self.path
        )
        # Set restrictive permissions
        os.chmod(self.path, 0o600)

    async def stop(self) -> None:
        """Stop the socket server and close all client connections."""
        for writer in self._clients:
            try:
                writer.close()
                await writer.wait_closed()
            except (ConnectionError, BrokenPipeError, OSError):
                pass
        self._clients.clear()
        if self._server:
            self._server.close()
            await self._server.wait_closed()

    async def send_whisper(self, message: str, whisper_type: str) -> None:
        """Push a whisper to all connected clients."""
        whisper = make_whisper(message, whisper_type)
        data = encode_message(whisper)
        for writer in list(self._clients):
            try:
                writer.write(data)
                await writer.drain()
            except (ConnectionError, BrokenPipeError, OSError):
                self._clients.remove(writer)

    async def _handle_client(
        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
    ) -> None:
        """Handle a connected skill client."""
        self._clients.append(writer)
        try:
            while True:
                line = await reader.readline()
                if not line:
                    break
                msg = decode_message(line)
                if msg is None:
                    continue
                try:
                    response = await self.handler(msg)
                    writer.write(encode_message(response))
                    await writer.drain()
                except Exception:
                    logger.exception("Handler error for message: %s", msg)
        except (ConnectionError, asyncio.IncompleteReadError):
            pass
        finally:
            if writer in self._clients:
                self._clients.remove(writer)
            writer.close()
            try:
                await writer.wait_closed()
            except (ConnectionError, BrokenPipeError, OSError):
                pass
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_socket_server.py -v Expected: All 3 tests PASS

  • Step 5: Commit
git add clients/claude/socket_server.py tests/test_socket_server.py
git commit -m "feat(layer5): add Unix socket server for skill IPC"

Task 6: Webhook Client

Files:

  • Create: clients/claude/webhook.py
  • Create: tests/test_webhook.py

Dual-delivery alerting: HTTP POST to configured URL + IRC PRIVMSG to #alerts.

  • Step 1: Write failing tests
# tests/test_webhook.py
import asyncio
import json
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import pytest

from culture.clients.claude.webhook import WebhookClient, AlertEvent
from culture.clients.claude.config import WebhookConfig


class WebhookCapture(BaseHTTPRequestHandler):
    """Captures POST requests for testing."""
    received = []

    def do_POST(self):
        length = int(self.headers["Content-Length"])
        body = json.loads(self.rfile.read(length))
        WebhookCapture.received.append(body)
        self.send_response(200)
        self.end_headers()

    def log_message(self, *args):
        pass  # Suppress output


@pytest.mark.asyncio
async def test_webhook_http_post():
    """Fires HTTP POST with correct payload."""
    WebhookCapture.received.clear()
    http = HTTPServer(("127.0.0.1", 0), WebhookCapture)
    port = http.server_address[1]
    thread = threading.Thread(target=http.handle_request, daemon=True)
    thread.start()

    config = WebhookConfig(
        url=f"http://127.0.0.1:{port}/webhook",
        irc_channel="#alerts",
        events=["agent_error"],
    )
    client = WebhookClient(config, irc_send=None)
    event = AlertEvent(
        event_type="agent_error",
        nick="spark-culture",
        message='[ERROR] spark-culture crashed: exit code 1',
    )
    await client.fire(event)
    thread.join(timeout=2.0)

    assert len(WebhookCapture.received) == 1
    assert "spark-culture" in WebhookCapture.received[0]["content"]
    http.server_close()


@pytest.mark.asyncio
async def test_webhook_irc_fallback():
    """Fires IRC PRIVMSG to #alerts channel."""
    sent_messages = []

    async def mock_irc_send(channel, text):
        sent_messages.append((channel, text))

    config = WebhookConfig(url=None, irc_channel="#alerts", events=["agent_error"])
    client = WebhookClient(config, irc_send=mock_irc_send)
    event = AlertEvent(
        event_type="agent_error",
        nick="spark-culture",
        message="[ERROR] spark-culture crashed",
    )
    await client.fire(event)

    assert len(sent_messages) == 1
    assert sent_messages[0][0] == "#alerts"
    assert "spark-culture" in sent_messages[0][1]


@pytest.mark.asyncio
async def test_webhook_skips_unconfigured_events():
    """Events not in the config's event list are silently skipped."""
    sent_messages = []

    async def mock_irc_send(channel, text):
        sent_messages.append((channel, text))

    config = WebhookConfig(url=None, irc_channel="#alerts", events=["agent_error"])
    client = WebhookClient(config, irc_send=mock_irc_send)
    event = AlertEvent(
        event_type="agent_complete",  # Not in events list
        nick="spark-culture",
        message="[COMPLETE] done",
    )
    await client.fire(event)

    assert len(sent_messages) == 0
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_webhook.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Implement webhook.py
# clients/claude/webhook.py
from __future__ import annotations

import asyncio
import json
import logging
import urllib.request
from dataclasses import dataclass
from typing import Any, Callable, Awaitable

from culture.clients.claude.config import WebhookConfig

logger = logging.getLogger(__name__)


@dataclass
class AlertEvent:
    event_type: str
    nick: str
    message: str


class WebhookClient:
    """Dual-delivery alerting: HTTP POST + IRC channel."""

    def __init__(
        self,
        config: WebhookConfig,
        irc_send: Callable[[str, str], Awaitable[None]] | None = None,
    ):
        self.config = config
        self.irc_send = irc_send

    async def fire(self, event: AlertEvent) -> None:
        """Fire an alert event to all configured channels."""
        if event.event_type not in self.config.events:
            return

        # IRC fallback (always, if irc_send is available)
        if self.irc_send:
            try:
                await self.irc_send(self.config.irc_channel, event.message)
            except Exception:
                logger.exception("Failed to send IRC alert")

        # HTTP webhook
        if self.config.url:
            try:
                await self._http_post(event)
            except Exception:
                logger.exception("Webhook POST failed to %s", self.config.url)

    async def _http_post(self, event: AlertEvent) -> None:
        """POST to webhook URL. Runs in thread to avoid blocking event loop."""
        payload = json.dumps({"content": event.message}).encode()

        def _post():
            req = urllib.request.Request(
                self.config.url,
                data=payload,
                headers={"Content-Type": "application/json"},
                method="POST",
            )
            urllib.request.urlopen(req, timeout=10)

        await asyncio.to_thread(_post)
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_webhook.py -v Expected: All 3 tests PASS

  • Step 5: Commit
git add clients/claude/webhook.py tests/test_webhook.py
git commit -m "feat(layer5): add webhook client with HTTP + IRC dual delivery"

Task 7: Agent Runner

Files:

  • Create: clients/claude/agent_runner.py
  • Create: tests/test_agent_runner.py

Manages the Claude Agent SDK session lifecycle. Uses query() with permission_mode="bypassPermissions", queues prompts for commands (compact/clear), handles crash recovery.

  • Step 1: Write failing tests
# tests/test_agent_runner.py
import asyncio
import pytest

from culture.clients.claude.agent_runner import AgentRunner


@pytest.mark.asyncio
async def test_spawn_process():
    """AgentRunner spawns a subprocess and reports it running."""
    # Use a simple long-running command instead of claude for testing
    runner = AgentRunner(
        command=["python3", "-u", "-c", "import time; time.sleep(60)"],
        directory="/tmp",
    )
    await runner.start()
    try:
        assert runner.is_running()
    finally:
        await runner.stop()
    assert not runner.is_running()


@pytest.mark.asyncio
async def test_stdin_pipe():
    """Can write to the subprocess stdin."""
    # Echo back what we send via stdin
    runner = AgentRunner(
        command=["python3", "-u", "-c",
                 "import sys\nfor line in sys.stdin:\n    print('GOT:' + line.strip(), flush=True)"],
        directory="/tmp",
    )
    await runner.start()
    try:
        await runner.write_stdin("hello\n")
        # Read from stdout
        line = await asyncio.wait_for(runner.read_stdout_line(), timeout=2.0)
        assert "GOT:hello" in line
    finally:
        await runner.stop()


@pytest.mark.asyncio
async def test_on_exit_callback():
    """on_exit callback fires when process exits."""
    exit_codes = []

    async def on_exit(code):
        exit_codes.append(code)

    runner = AgentRunner(
        command=["python3", "-c", "pass"],  # Exits immediately
        directory="/tmp",
        on_exit=on_exit,
    )
    await runner.start()
    # Wait for process to exit
    await asyncio.sleep(0.5)
    assert len(exit_codes) == 1
    assert exit_codes[0] == 0


@pytest.mark.asyncio
async def test_crash_detection():
    """Detects non-zero exit code."""
    exit_codes = []

    async def on_exit(code):
        exit_codes.append(code)

    runner = AgentRunner(
        command=["python3", "-c", "raise SystemExit(1)"],
        directory="/tmp",
        on_exit=on_exit,
    )
    await runner.start()
    await asyncio.sleep(0.5)
    assert len(exit_codes) == 1
    assert exit_codes[0] == 1
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_agent_runner.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Implement agent_runner.py
# clients/claude/agent_runner.py
from __future__ import annotations

import asyncio
import logging
from typing import Callable, Awaitable

logger = logging.getLogger(__name__)


class AgentRunner:
    """Manages a Claude Code subprocess."""

    def __init__(
        self,
        command: list[str],
        directory: str,
        on_exit: Callable[[int], Awaitable[None]] | None = None,
        on_stdout: Callable[[str], Awaitable[None]] | None = None,
    ):
        self.command = command
        self.directory = directory
        self.on_exit = on_exit
        self.on_stdout = on_stdout
        self._process: asyncio.subprocess.Process | None = None
        self._monitor_task: asyncio.Task | None = None
        self._stdout_task: asyncio.Task | None = None

    async def start(self) -> None:
        """Spawn the subprocess."""
        self._process = await asyncio.create_subprocess_exec(
            *self.command,
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            cwd=self.directory,
        )
        self._monitor_task = asyncio.create_task(self._monitor())
        if self.on_stdout:
            self._stdout_task = asyncio.create_task(self._read_stdout())

    async def stop(self) -> None:
        """Terminate the subprocess."""
        if self._process and self._process.returncode is None:
            self._process.terminate()
            try:
                await asyncio.wait_for(self._process.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                self._process.kill()
                await self._process.wait()
        if self._monitor_task:
            self._monitor_task.cancel()
            try:
                await self._monitor_task
            except asyncio.CancelledError:
                pass
        if self._stdout_task:
            self._stdout_task.cancel()
            try:
                await self._stdout_task
            except asyncio.CancelledError:
                pass

    def is_running(self) -> bool:
        """Check if subprocess is alive."""
        return self._process is not None and self._process.returncode is None

    async def write_stdin(self, text: str) -> None:
        """Write text to the subprocess stdin."""
        if self._process and self._process.stdin:
            self._process.stdin.write(text.encode())
            await self._process.stdin.drain()

    async def read_stdout_line(self) -> str:
        """Read a single line from stdout."""
        if self._process and self._process.stdout:
            line = await self._process.stdout.readline()
            return line.decode().rstrip("\n")
        return ""

    async def _monitor(self) -> None:
        """Wait for process exit and fire callback."""
        if not self._process:
            return
        code = await self._process.wait()
        if self.on_exit:
            await self.on_exit(code)

    async def _read_stdout(self) -> None:
        """Continuously read stdout and fire callback."""
        try:
            while self._process and self._process.stdout:
                line = await self._process.stdout.readline()
                if not line:
                    break
                if self.on_stdout:
                    await self.on_stdout(line.decode().rstrip("\n"))
        except asyncio.CancelledError:
            return
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_agent_runner.py -v Expected: All 4 tests PASS

  • Step 5: Commit
git add clients/claude/agent_runner.py tests/test_agent_runner.py
git commit -m "feat(layer5): add agent runner for Claude Code subprocess management"

Task 8: Supervisor

Files:

  • Create: clients/claude/supervisor.py
  • Create: tests/test_supervisor.py

The supervisor evaluates agent activity and generates whispers. Tests use a mock evaluation function to avoid real API calls.

  • Step 1: Write failing tests
# tests/test_supervisor.py
import asyncio
import pytest

from culture.clients.claude.supervisor import Supervisor, SupervisorVerdict


def test_verdict_parsing():
    """Parse supervisor model output into structured verdict."""
    assert SupervisorVerdict.parse("OK") == SupervisorVerdict(action="OK", message="")
    assert SupervisorVerdict.parse("CORRECTION You're spiraling") == SupervisorVerdict(
        action="CORRECTION", message="You're spiraling"
    )
    assert SupervisorVerdict.parse("THINK_DEEPER This needs more thought") == SupervisorVerdict(
        action="THINK_DEEPER", message="This needs more thought"
    )
    assert SupervisorVerdict.parse("ESCALATION Still stuck") == SupervisorVerdict(
        action="ESCALATION", message="Still stuck"
    )


@pytest.mark.asyncio
async def test_rolling_window():
    """Supervisor maintains a rolling window of activity."""
    whispers = []

    async def on_whisper(msg, wtype):
        whispers.append((msg, wtype))

    # Mock evaluator that always returns OK
    async def mock_eval(window, task):
        return SupervisorVerdict(action="OK", message="")

    sup = Supervisor(
        window_size=5,
        eval_interval=3,
        escalation_threshold=3,
        evaluate_fn=mock_eval,
        on_whisper=on_whisper,
        on_escalation=None,
        task_description="test task",
    )

    # Feed 6 turns — triggers eval after every 3
    for i in range(6):
        await sup.observe({"turn": i, "type": "response", "content": f"turn {i}"})

    assert len(sup._window) == 5  # Window capped
    assert len(whispers) == 0  # All OK, no whispers


@pytest.mark.asyncio
async def test_whisper_on_correction():
    """Supervisor whispers when evaluator returns CORRECTION."""
    whispers = []

    async def on_whisper(msg, wtype):
        whispers.append((msg, wtype))

    async def mock_eval(window, task):
        return SupervisorVerdict(action="CORRECTION", message="Stop retrying")

    sup = Supervisor(
        window_size=20,
        eval_interval=2,
        escalation_threshold=3,
        evaluate_fn=mock_eval,
        on_whisper=on_whisper,
        on_escalation=None,
        task_description="test task",
    )

    for i in range(2):
        await sup.observe({"turn": i})

    assert len(whispers) == 1
    assert whispers[0] == ("Stop retrying", "CORRECTION")


@pytest.mark.asyncio
async def test_escalation_after_threshold():
    """Supervisor escalates after escalation_threshold consecutive non-OK verdicts."""
    whispers = []
    escalated = []

    async def on_whisper(msg, wtype):
        whispers.append((msg, wtype))

    async def on_escalation(msg):
        escalated.append(msg)

    call_count = 0

    async def mock_eval(window, task):
        nonlocal call_count
        call_count += 1
        return SupervisorVerdict(action="CORRECTION", message=f"Attempt {call_count}")

    sup = Supervisor(
        window_size=20,
        eval_interval=1,  # Eval every turn
        escalation_threshold=3,
        evaluate_fn=mock_eval,
        on_whisper=on_whisper,
        on_escalation=on_escalation,
        task_description="test task",
    )

    for i in range(3):
        await sup.observe({"turn": i})

    # First 2 are whispers, 3rd triggers escalation
    assert len(whispers) == 2
    assert len(escalated) == 1


@pytest.mark.asyncio
async def test_ok_resets_escalation_counter():
    """An OK verdict resets the consecutive failure counter."""
    whispers = []
    escalated = []

    async def on_whisper(msg, wtype):
        whispers.append((msg, wtype))

    async def on_escalation(msg):
        escalated.append(msg)

    verdicts = iter([
        SupervisorVerdict(action="CORRECTION", message="warn1"),
        SupervisorVerdict(action="CORRECTION", message="warn2"),
        SupervisorVerdict(action="OK", message=""),  # Resets counter
        SupervisorVerdict(action="CORRECTION", message="warn3"),
        SupervisorVerdict(action="CORRECTION", message="warn4"),
    ])

    async def mock_eval(window, task):
        return next(verdicts)

    sup = Supervisor(
        window_size=20,
        eval_interval=1,
        escalation_threshold=3,
        evaluate_fn=mock_eval,
        on_whisper=on_whisper,
        on_escalation=on_escalation,
        task_description="test task",
    )

    for i in range(5):
        await sup.observe({"turn": i})

    # No escalation — OK reset the counter before reaching threshold
    assert len(escalated) == 0
    assert len(whispers) == 4  # All 4 corrections delivered as whispers
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_supervisor.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Implement supervisor.py
# clients/claude/supervisor.py
from __future__ import annotations

import logging
from collections import deque
from dataclasses import dataclass
from typing import Any, Callable, Awaitable

logger = logging.getLogger(__name__)


@dataclass
class SupervisorVerdict:
    action: str  # OK, CORRECTION, THINK_DEEPER, ESCALATION
    message: str

    @classmethod
    def parse(cls, text: str) -> SupervisorVerdict:
        """Parse raw model output into a verdict."""
        text = text.strip()
        if text == "OK":
            return cls(action="OK", message="")
        parts = text.split(" ", 1)
        action = parts[0]
        message = parts[1] if len(parts) > 1 else ""
        return cls(action=action, message=message)


# Type alias for the evaluation function
EvaluateFn = Callable[[list[dict[str, Any]], str], Awaitable[SupervisorVerdict]]


class Supervisor:
    """Watches agent activity and intervenes when unproductive."""

    def __init__(
        self,
        window_size: int,
        eval_interval: int,
        escalation_threshold: int,
        evaluate_fn: EvaluateFn,
        on_whisper: Callable[[str, str], Awaitable[None]] | None,
        on_escalation: Callable[[str], Awaitable[None]] | None,
        task_description: str = "",
    ):
        self.window_size = window_size
        self.eval_interval = eval_interval
        self.escalation_threshold = escalation_threshold
        self.evaluate_fn = evaluate_fn
        self.on_whisper = on_whisper
        self.on_escalation = on_escalation
        self.task_description = task_description

        self._window: deque[dict[str, Any]] = deque(maxlen=window_size)
        self._turn_count: int = 0
        self._consecutive_failures: int = 0
        self.paused: bool = False

    async def observe(self, turn: dict[str, Any]) -> None:
        """Record an agent turn and evaluate if interval reached."""
        self._window.append(turn)
        self._turn_count += 1

        if self._turn_count % self.eval_interval == 0:
            await self._evaluate()

    async def _evaluate(self) -> None:
        """Run the evaluation function on the rolling window."""
        if self.paused:
            return

        try:
            verdict = await self.evaluate_fn(list(self._window), self.task_description)
        except Exception:
            logger.exception("Supervisor evaluation failed")
            return

        if verdict.action == "OK":
            self._consecutive_failures = 0
            return

        self._consecutive_failures += 1

        if self._consecutive_failures >= self.escalation_threshold:
            # Escalate
            self.paused = True
            if self.on_escalation:
                await self.on_escalation(verdict.message)
        else:
            # Whisper
            if self.on_whisper:
                await self.on_whisper(verdict.message, verdict.action)
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_supervisor.py -v Expected: All 5 tests PASS

  • Step 5: Commit
git add clients/claude/supervisor.py tests/test_supervisor.py
git commit -m "feat(layer5): add supervisor with rolling window and escalation ladder"

Task 9: Daemon Orchestrator

Files:

  • Create: clients/claude/daemon.py
  • Create: tests/test_daemon.py

Ties all components together. Starts IRC transport, socket server, agent runner, supervisor, and webhook client. Routes messages between them.

  • Step 1: Write failing tests
# tests/test_daemon.py
import asyncio
import os
import tempfile
import pytest

from culture.clients.claude.daemon import AgentDaemon
from culture.clients.claude.config import (
    DaemonConfig, ServerConnConfig, AgentConfig,
    SupervisorConfig, WebhookConfig,
)


@pytest.mark.asyncio
async def test_daemon_starts_and_connects(server):
    """Daemon starts, connects to IRC, and registers nick."""
    config = DaemonConfig(
        server=ServerConnConfig(host="127.0.0.1", port=server.config.port),
        supervisor=SupervisorConfig(),
        webhooks=WebhookConfig(url=None),
    )
    agent = AgentConfig(
        nick="testserv-bot",
        directory="/tmp",
        channels=["#general"],
    )

    sock_dir = tempfile.mkdtemp()
    daemon = AgentDaemon(config, agent, socket_dir=sock_dir, skip_claude=True)
    await daemon.start()
    try:
        await asyncio.sleep(0.5)
        assert "testserv-bot" in server.clients
        assert "#general" in server.channels
    finally:
        await daemon.stop()


@pytest.mark.asyncio
async def test_daemon_ipc_irc_send(server, make_client):
    """Skill client can send IRC messages through the daemon."""
    config = DaemonConfig(
        server=ServerConnConfig(host="127.0.0.1", port=server.config.port),
    )
    agent = AgentConfig(
        nick="testserv-bot",
        directory="/tmp",
        channels=["#general"],
    )

    sock_dir = tempfile.mkdtemp()
    daemon = AgentDaemon(config, agent, socket_dir=sock_dir, skip_claude=True)
    await daemon.start()
    await asyncio.sleep(0.5)

    # A human joins to receive messages
    human = await make_client(nick="testserv-ori", user="ori")
    await human.send("JOIN #general")
    await human.recv_all(timeout=0.3)

    # Connect to daemon socket and send irc_send request
    from culture.clients.claude.ipc import encode_message, decode_message, make_request
    sock_path = os.path.join(sock_dir, "testserv-bot.sock")
    reader, writer = await asyncio.open_unix_connection(sock_path)

    req = make_request("irc_send", channel="#general", message="hello from skill")
    writer.write(encode_message(req))
    await writer.drain()

    # Read response from socket
    data = await asyncio.wait_for(reader.readline(), timeout=2.0)
    resp = decode_message(data)
    assert resp["ok"] is True

    # Verify human received the message
    msg = await human.recv(timeout=2.0)
    assert "hello from skill" in msg

    writer.close()
    await writer.wait_closed()
    await daemon.stop()


@pytest.mark.asyncio
async def test_daemon_ipc_irc_read(server, make_client):
    """Skill client can read buffered messages through the daemon."""
    config = DaemonConfig(
        server=ServerConnConfig(host="127.0.0.1", port=server.config.port),
    )
    agent = AgentConfig(
        nick="testserv-bot",
        directory="/tmp",
        channels=["#general"],
    )

    sock_dir = tempfile.mkdtemp()
    daemon = AgentDaemon(config, agent, socket_dir=sock_dir, skip_claude=True)
    await daemon.start()
    await asyncio.sleep(0.5)

    # Human sends a message
    human = await make_client(nick="testserv-ori", user="ori")
    await human.send("JOIN #general")
    await human.recv_all(timeout=0.3)
    await human.send("PRIVMSG #general :test message")
    await asyncio.sleep(0.3)

    # Connect to daemon socket and send irc_read request
    from culture.clients.claude.ipc import encode_message, decode_message, make_request
    sock_path = os.path.join(sock_dir, "testserv-bot.sock")
    reader, writer = await asyncio.open_unix_connection(sock_path)

    req = make_request("irc_read", channel="#general", limit=50)
    writer.write(encode_message(req))
    await writer.drain()

    data = await asyncio.wait_for(reader.readline(), timeout=2.0)
    resp = decode_message(data)
    assert resp["ok"] is True
    assert len(resp["data"]["messages"]) >= 1
    assert any("test message" in m["text"] for m in resp["data"]["messages"])

    writer.close()
    await writer.wait_closed()
    await daemon.stop()
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_daemon.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Implement daemon.py
# clients/claude/daemon.py
from __future__ import annotations

import asyncio
import logging
import os
from typing import Any

import time

from culture.clients.claude.config import DaemonConfig, AgentConfig
from culture.clients.claude.ipc import make_response, MSG_TYPE_RESPONSE
from culture.clients.claude.irc_transport import IRCTransport
from culture.clients.claude.message_buffer import MessageBuffer
from culture.clients.claude.socket_server import SocketServer
from culture.clients.claude.webhook import WebhookClient, AlertEvent
from culture.clients.claude.agent_runner import AgentRunner
from culture.clients.claude.supervisor import Supervisor, SupervisorVerdict

logger = logging.getLogger(__name__)

MAX_CRASH_COUNT = 3
CRASH_WINDOW_SECONDS = 300  # 5 minutes
CRASH_RESTART_DELAY = 5  # seconds


class AgentDaemon:
    """Main orchestrator for a single agent."""

    def __init__(
        self,
        config: DaemonConfig,
        agent: AgentConfig,
        socket_dir: str | None = None,
        skip_claude: bool = False,
    ):
        self.config = config
        self.agent = agent
        self.skip_claude = skip_claude

        self.buffer = MessageBuffer(max_per_channel=config.buffer_size)

        # Resolve socket path
        runtime_dir = socket_dir or os.environ.get(
            "XDG_RUNTIME_DIR", "/tmp"
        )
        self.socket_path = os.path.join(runtime_dir, f"{agent.nick}.sock")

        # Components (initialized in start())
        self.transport: IRCTransport | None = None
        self.socket_server: SocketServer | None = None
        self.webhook: WebhookClient | None = None
        self.supervisor: Supervisor | None = None
        self.agent_runner: AgentRunner | None = None

        # Crash recovery state
        self._crash_times: list[float] = []

    async def start(self) -> None:
        """Start all daemon components."""
        # IRC Transport
        self.transport = IRCTransport(
            host=self.config.server.host,
            port=self.config.server.port,
            nick=self.agent.nick,
            user=self.agent.nick.split("-", 1)[-1] if "-" in self.agent.nick else self.agent.nick,
            channels=list(self.agent.channels),
            buffer=self.buffer,
        )
        await self.transport.connect()

        # Webhook client
        self.webhook = WebhookClient(
            config=self.config.webhooks,
            irc_send=self.transport.send_privmsg,
        )

        # Socket server
        self.socket_server = SocketServer(self.socket_path, self._handle_ipc)
        await self.socket_server.start()

        # Agent runner (Claude Agent SDK session)
        if not self.skip_claude:
            await self._start_agent()

    async def _start_agent(self) -> None:
        """Start the Claude Agent SDK session."""
        self.agent_runner = AgentRunner(
            # Uses Claude Agent SDK query() with permission_mode="bypassPermissions"
            directory=self.agent.directory,
            on_exit=self._on_agent_exit,
        )
        await self.agent_runner.start()

    async def _on_agent_exit(self, code: int) -> None:
        """Handle Claude Code process exit with crash recovery."""
        if code == 0:
            # Clean exit
            if self.webhook:
                await self.webhook.fire(AlertEvent(
                    event_type="agent_complete",
                    nick=self.agent.nick,
                    message=f"[COMPLETE] {self.agent.nick} session ended cleanly.",
                ))
            return

        # Crash — record and check circuit breaker
        now = time.time()
        self._crash_times.append(now)
        # Keep only crashes within the window
        self._crash_times = [t for t in self._crash_times if now - t < CRASH_WINDOW_SECONDS]

        if self.webhook:
            await self.webhook.fire(AlertEvent(
                event_type="agent_error",
                nick=self.agent.nick,
                message=f"[ERROR] {self.agent.nick} crashed with exit code {code}.",
            ))

        if len(self._crash_times) >= MAX_CRASH_COUNT:
            # Circuit breaker tripped
            logger.error("%s crashed %d times in %ds — stopping restarts",
                         self.agent.nick, MAX_CRASH_COUNT, CRASH_WINDOW_SECONDS)
            if self.webhook:
                await self.webhook.fire(AlertEvent(
                    event_type="agent_spiraling",
                    nick=self.agent.nick,
                    message=f"[ESCALATION] {self.agent.nick} crashed {MAX_CRASH_COUNT} times "
                            f"in {CRASH_WINDOW_SECONDS}s. Manual intervention required.",
                ))
            return

        # Restart after delay
        logger.info("Restarting %s in %ds...", self.agent.nick, CRASH_RESTART_DELAY)
        await asyncio.sleep(CRASH_RESTART_DELAY)
        await self._start_agent()

    async def stop(self) -> None:
        """Stop all daemon components."""
        if self.agent_runner:
            await self.agent_runner.stop()
        if self.socket_server:
            await self.socket_server.stop()
        if self.transport:
            await self.transport.disconnect()
        # Clean up socket file
        if os.path.exists(self.socket_path):
            os.unlink(self.socket_path)

    async def _handle_ipc(self, msg: dict[str, Any]) -> dict[str, Any]:
        """Route IPC requests from the skill client."""
        msg_type = msg.get("type", "")
        msg_id = msg.get("id", "")

        try:
            if msg_type == "irc_send":
                await self.transport.send_privmsg(msg["channel"], msg["message"])
                return make_response(msg_id, ok=True)

            elif msg_type == "irc_read":
                messages = self.buffer.read(
                    msg["channel"], limit=msg.get("limit", 50)
                )
                return make_response(msg_id, ok=True, data={
                    "messages": [
                        {"nick": m.nick, "text": m.text, "timestamp": m.timestamp}
                        for m in messages
                    ]
                })

            elif msg_type == "irc_join":
                await self.transport.join_channel(msg["channel"])
                return make_response(msg_id, ok=True)

            elif msg_type == "irc_part":
                await self.transport.part_channel(msg["channel"])
                return make_response(msg_id, ok=True)

            elif msg_type == "irc_channels":
                return make_response(msg_id, ok=True, data={
                    "channels": self.transport.channels,
                })

            elif msg_type == "irc_who":
                await self.transport.send_who(msg["channel"])
                return make_response(msg_id, ok=True)

            elif msg_type == "irc_ask":
                # Post question and wait for @mention response
                channel = msg["channel"]
                question = msg["question"]
                timeout = msg.get("timeout", 300)
                await self.transport.send_privmsg(channel, question)
                # Fire webhook for question event
                if self.webhook:
                    await self.webhook.fire(AlertEvent(
                        event_type="agent_question",
                        nick=self.agent.nick,
                        message=f"[QUESTION] {self.agent.nick} needs input: \"{question}\"",
                    ))
                # TODO: implement response waiting with mention matching
                return make_response(msg_id, ok=True, data={"response": None})

            elif msg_type == "compact":
                if self.agent_runner and self.agent_runner.is_running():
                    await self.agent_runner.write_stdin("/compact\n")
                return make_response(msg_id, ok=True)

            elif msg_type == "clear":
                if self.agent_runner and self.agent_runner.is_running():
                    await self.agent_runner.write_stdin("/clear\n")
                return make_response(msg_id, ok=True)

            else:
                return make_response(msg_id, ok=False, error=f"Unknown type: {msg_type}")

        except Exception as e:
            logger.exception("IPC handler error for %s", msg_type)
            return make_response(msg_id, ok=False, error=str(e))
  • Step 4: Run tests to verify they pass

Run: uv run pytest tests/test_daemon.py -v Expected: All 3 tests PASS

  • Step 5: Commit
git add clients/claude/daemon.py tests/test_daemon.py
git commit -m "feat(layer5): add daemon orchestrator with IPC routing"

Task 10: IRC Skill Client

Files:

  • Create: clients/claude/skill/SKILL.md
  • Create: clients/claude/skill/irc_client.py
  • Create: tests/test_skill_client.py

Standalone CLI tool that Claude Code invokes via Bash. Connects to the daemon’s Unix socket, sends requests, returns results.

  • Step 1: Write failing test
# tests/test_skill_client.py
import asyncio
import json
import os
import tempfile
import pytest

from culture.clients.claude.ipc import make_response, encode_message
from culture.clients.claude.skill.irc_client import SkillClient


@pytest.mark.asyncio
async def test_skill_client_send():
    """Skill client sends irc_send and gets response."""
    sock_dir = tempfile.mkdtemp()
    sock_path = os.path.join(sock_dir, "test-agent.sock")

    # Mock daemon socket server
    async def mock_handler(reader, writer):
        data = await reader.readline()
        msg = json.loads(data)
        resp = make_response(msg["id"], ok=True)
        writer.write(encode_message(resp))
        await writer.drain()
        writer.close()

    srv = await asyncio.start_unix_server(mock_handler, path=sock_path)
    try:
        client = SkillClient(sock_path)
        await client.connect()
        result = await client.irc_send("#general", "hello")
        assert result["ok"] is True
        await client.close()
    finally:
        srv.close()
        await srv.wait_closed()
        os.unlink(sock_path)


@pytest.mark.asyncio
async def test_skill_client_read():
    """Skill client sends irc_read and gets buffered messages."""
    sock_dir = tempfile.mkdtemp()
    sock_path = os.path.join(sock_dir, "test-agent.sock")

    async def mock_handler(reader, writer):
        data = await reader.readline()
        msg = json.loads(data)
        resp = make_response(msg["id"], ok=True, data={
            "messages": [{"nick": "ori", "text": "hello", "timestamp": 123.0}]
        })
        writer.write(encode_message(resp))
        await writer.drain()
        writer.close()

    srv = await asyncio.start_unix_server(mock_handler, path=sock_path)
    try:
        client = SkillClient(sock_path)
        await client.connect()
        result = await client.irc_read("#general", limit=50)
        assert result["ok"] is True
        assert len(result["data"]["messages"]) == 1
        await client.close()
    finally:
        srv.close()
        await srv.wait_closed()
        os.unlink(sock_path)


@pytest.mark.asyncio
async def test_skill_client_queues_whispers():
    """Whispers received between calls are queued."""
    sock_dir = tempfile.mkdtemp()
    sock_path = os.path.join(sock_dir, "test-agent.sock")

    from culture.clients.claude.ipc import make_whisper

    async def mock_handler(reader, writer):
        # Send a whisper first (unsolicited)
        whisper = make_whisper("Stop retrying", "CORRECTION")
        writer.write(encode_message(whisper))
        await writer.drain()
        # Then handle the request
        data = await reader.readline()
        msg = json.loads(data)
        resp = make_response(msg["id"], ok=True, data={"channels": ["#general"]})
        writer.write(encode_message(resp))
        await writer.drain()
        writer.close()

    srv = await asyncio.start_unix_server(mock_handler, path=sock_path)
    try:
        client = SkillClient(sock_path)
        await client.connect()
        # Give time for whisper to arrive
        await asyncio.sleep(0.1)
        assert len(client.pending_whispers) == 1
        assert client.pending_whispers[0]["whisper_type"] == "CORRECTION"
        await client.close()
    finally:
        srv.close()
        await srv.wait_closed()
        os.unlink(sock_path)
  • Step 2: Run tests to verify they fail

Run: uv run pytest tests/test_skill_client.py -v Expected: FAIL with ModuleNotFoundError

  • Step 3: Create skill directory and __init__.py
# clients/claude/skill/__init__.py
# (empty)
  • Step 4: Implement irc_client.py
# clients/claude/skill/irc_client.py
"""
Standalone IRC skill client for Claude Code.
Connects to the daemon's Unix socket and provides IRC tools.

Usage (from Claude Code Bash tool):
    python -m culture.clients.claude.skill.irc_client send "#general" "hello"
    python -m culture.clients.claude.skill.irc_client read "#general" --limit 50
    python -m culture.clients.claude.skill.irc_client ask "#general" "question?" --timeout 300
    python -m culture.clients.claude.skill.irc_client join "#channel"
    python -m culture.clients.claude.skill.irc_client part "#channel"
    python -m culture.clients.claude.skill.irc_client channels
    python -m culture.clients.claude.skill.irc_client who "#channel"
    python -m culture.clients.claude.skill.irc_client compact
    python -m culture.clients.claude.skill.irc_client clear
    python -m culture.clients.claude.skill.irc_client set-directory /path
"""
from __future__ import annotations

import asyncio
import json
import os
import sys
from typing import Any

from culture.clients.claude.ipc import encode_message, decode_message, make_request, MSG_TYPE_WHISPER


class SkillClient:
    """Async client that communicates with the daemon over Unix socket."""

    def __init__(self, socket_path: str):
        self.socket_path = socket_path
        self._reader: asyncio.StreamReader | None = None
        self._writer: asyncio.StreamWriter | None = None
        self._read_task: asyncio.Task | None = None
        self._pending: dict[str, asyncio.Future] = {}
        self.pending_whispers: list[dict[str, Any]] = []

    async def connect(self) -> None:
        self._reader, self._writer = await asyncio.open_unix_connection(
            self.socket_path
        )
        self._read_task = asyncio.create_task(self._read_loop())

    async def close(self) -> None:
        if self._read_task:
            self._read_task.cancel()
            try:
                await self._read_task
            except asyncio.CancelledError:
                pass
        if self._writer:
            self._writer.close()
            try:
                await self._writer.wait_closed()
            except (ConnectionError, BrokenPipeError, OSError):
                pass

    async def _read_loop(self) -> None:
        """Read responses and whispers from the daemon."""
        try:
            while self._reader:
                line = await self._reader.readline()
                if not line:
                    break
                msg = decode_message(line)
                if msg is None:
                    continue
                if msg.get("type") == MSG_TYPE_WHISPER:
                    self.pending_whispers.append(msg)
                elif "id" in msg and msg["id"] in self._pending:
                    self._pending[msg["id"]].set_result(msg)
        except asyncio.CancelledError:
            return
        except (ConnectionError, OSError):
            pass

    async def _request(self, msg_type: str, **kwargs: Any) -> dict[str, Any]:
        """Send a request and wait for response."""
        req = make_request(msg_type, **kwargs)
        future = asyncio.get_running_loop().create_future()
        self._pending[req["id"]] = future

        self._writer.write(encode_message(req))
        await self._writer.drain()

        try:
            return await asyncio.wait_for(future, timeout=600)
        finally:
            self._pending.pop(req["id"], None)

    async def irc_send(self, channel: str, message: str) -> dict[str, Any]:
        return await self._request("irc_send", channel=channel, message=message)

    async def irc_read(self, channel: str, limit: int = 50) -> dict[str, Any]:
        return await self._request("irc_read", channel=channel, limit=limit)

    async def irc_ask(self, channel: str, question: str, timeout: int = 300) -> dict[str, Any]:
        return await self._request("irc_ask", channel=channel, question=question, timeout=timeout)

    async def irc_join(self, channel: str) -> dict[str, Any]:
        return await self._request("irc_join", channel=channel)

    async def irc_part(self, channel: str) -> dict[str, Any]:
        return await self._request("irc_part", channel=channel)

    async def irc_channels(self) -> dict[str, Any]:
        return await self._request("irc_channels")

    async def irc_who(self, channel: str) -> dict[str, Any]:
        return await self._request("irc_who", channel=channel)

    async def compact(self) -> dict[str, Any]:
        return await self._request("compact")

    async def clear(self) -> dict[str, Any]:
        return await self._request("clear")

    async def set_directory(self, path: str) -> dict[str, Any]:
        return await self._request("set_directory", path=path)

    def drain_whispers(self) -> list[dict[str, Any]]:
        """Return and clear any pending whispers."""
        whispers = list(self.pending_whispers)
        self.pending_whispers.clear()
        return whispers


def _resolve_socket_path() -> str:
    """Find the daemon socket. Reads CULTURE_NICK env var."""
    nick = os.environ.get("CULTURE_NICK", "")
    if not nick:
        print("Error: CULTURE_NICK environment variable not set", file=sys.stderr)
        sys.exit(1)
    runtime_dir = os.environ.get("XDG_RUNTIME_DIR", "/tmp")
    return os.path.join(runtime_dir, f"{nick}.sock")


async def _main(args: list[str]) -> None:
    """CLI entry point."""
    if not args:
        print("Usage: irc_client.py <command> [args...]", file=sys.stderr)
        sys.exit(1)

    client = SkillClient(_resolve_socket_path())
    await client.connect()

    try:
        cmd = args[0]
        if cmd == "send" and len(args) >= 3:
            result = await client.irc_send(args[1], " ".join(args[2:]))
        elif cmd == "read" and len(args) >= 2:
            limit = 50
            if "--limit" in args:
                idx = args.index("--limit")
                limit = int(args[idx + 1])
            result = await client.irc_read(args[1], limit=limit)
        elif cmd == "ask" and len(args) >= 3:
            timeout = 300
            remaining = args[2:]
            if "--timeout" in remaining:
                idx = remaining.index("--timeout")
                timeout = int(remaining[idx + 1])
                remaining = remaining[:idx] + remaining[idx + 2:]
            question = " ".join(remaining)
            result = await client.irc_ask(args[1], question, timeout=timeout)
        elif cmd == "join" and len(args) >= 2:
            result = await client.irc_join(args[1])
        elif cmd == "part" and len(args) >= 2:
            result = await client.irc_part(args[1])
        elif cmd == "channels":
            result = await client.irc_channels()
        elif cmd == "who" and len(args) >= 2:
            result = await client.irc_who(args[1])
        elif cmd == "compact":
            result = await client.compact()
        elif cmd == "clear":
            result = await client.clear()
        elif cmd == "set-directory" and len(args) >= 2:
            result = await client.set_directory(args[1])
        else:
            print(f"Unknown command: {cmd}", file=sys.stderr)
            sys.exit(1)

        # Prepend any whispers
        whispers = client.drain_whispers()
        for w in whispers:
            print(f"[SUPERVISOR/{w['whisper_type']}] {w['message']}")

        # Print result
        if result.get("ok"):
            data = result.get("data")
            if data:
                print(json.dumps(data, indent=2))
            else:
                print("OK")
        else:
            print(f"Error: {result.get('error', 'unknown')}", file=sys.stderr)
            sys.exit(1)
    finally:
        await client.close()


if __name__ == "__main__":
    asyncio.run(_main(sys.argv[1:]))
  • Step 5: Write SKILL.md
# IRC Skill

Connect to IRC channels, communicate with other agents and humans,
and manage your working context through the culture daemon.

## Tools

### irc_send
Send a message to an IRC channel or user.
```bash
python -m culture.clients.claude.skill.irc_client send "<channel>" "<message>"
```

### irc_read

Read recent messages from a channel since your last read.

```bash
python -m culture.clients.claude.skill.irc_client read "<channel>" --limit 50
```

### irc_ask

Post a question and wait for a response directed at you.

```bash
python -m culture.clients.claude.skill.irc_client ask "<channel>" "<question>" --timeout 300
```

### irc_join / irc_part

Join or leave an IRC channel.

```bash
python -m culture.clients.claude.skill.irc_client join "<channel>"
python -m culture.clients.claude.skill.irc_client part "<channel>"
```

### channels / who

List your channels or members of a channel.

```bash
python -m culture.clients.claude.skill.irc_client channels
python -m culture.clients.claude.skill.irc_client who "<channel>"
```

### compact / clear

Manage your conversation context.

```bash
python -m culture.clients.claude.skill.irc_client compact
python -m culture.clients.claude.skill.irc_client clear
```

### set-directory

Change your working directory and load its CLAUDE.md.

```bash
python -m culture.clients.claude.skill.irc_client set-directory "/path/to/project"
```

## Guidelines

- Check IRC periodically between subtasks with `irc_read`
- Share progress via `irc_send` after completing significant work
- Use `irc_ask` when you need input from others
- Compact your context when transitioning between phases
- Clear your context when starting a completely new task

  • Step 6: Run tests to verify they pass

Run: uv run pytest tests/test_skill_client.py -v Expected: All 3 tests PASS

  • Step 7: Commit
git add clients/claude/skill/ tests/test_skill_client.py
git commit -m "feat(layer5): add IRC skill client for Claude Code"

Task 11: CLI Entry Point

Files:

  • Create: clients/claude/__main__.py
  • Modify: pyproject.toml (add script entry point)

The culture command that starts agents from config.

  • Step 1: Implement __main__.py
# clients/claude/__main__.py
"""CLI entry point for the culture daemon.

Usage:
    culture start <nick>       Start a single agent by nick
    culture start --all        Start all agents from config
"""
from __future__ import annotations

import argparse
import asyncio
import logging
import os
import signal
import sys
from pathlib import Path

from culture.clients.claude.config import load_config
from culture.clients.claude.daemon import AgentDaemon

logger = logging.getLogger("culture")

DEFAULT_CONFIG = os.path.expanduser("~/.culture/agents.yaml")


def main() -> None:
    parser = argparse.ArgumentParser(prog="culture", description="culture agent daemon")
    sub = parser.add_subparsers(dest="command")

    start_parser = sub.add_parser("start", help="Start agent daemon(s)")
    start_parser.add_argument("nick", nargs="?", help="Agent nick to start")
    start_parser.add_argument("--all", action="store_true", help="Start all agents")
    start_parser.add_argument("--config", default=DEFAULT_CONFIG, help="Config file path")

    args = parser.parse_args()

    if args.command != "start":
        parser.print_help()
        sys.exit(1)

    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s")

    config = load_config(args.config)

    if args.all:
        agents = config.agents
    elif args.nick:
        agent = config.get_agent(args.nick)
        if not agent:
            logger.error("Agent '%s' not found in config", args.nick)
            sys.exit(1)
        agents = [agent]
    else:
        start_parser.print_help()
        sys.exit(1)

    if not agents:
        logger.error("No agents configured")
        sys.exit(1)

    if len(agents) == 1:
        # Single agent — run in this process
        asyncio.run(_run_single(config, agents[0]))
    else:
        # Multiple agents — fork each as a separate process
        _run_multi(config, agents)


async def _run_single(config, agent) -> None:
    """Run a single agent daemon in the current process."""
    daemon = AgentDaemon(config, agent)
    await daemon.start()
    logger.info("Agent %s started", agent.nick)

    stop_event = asyncio.Event()
    loop = asyncio.get_event_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, stop_event.set)

    await stop_event.wait()
    logger.info("Shutting down %s", agent.nick)
    await daemon.stop()


def _run_multi(config, agents) -> None:
    """Fork a separate process per agent."""
    pids = []
    for agent in agents:
        pid = os.fork()
        if pid == 0:
            # Child process
            asyncio.run(_run_single(config, agent))
            sys.exit(0)
        else:
            pids.append((pid, agent.nick))
            logger.info("Started %s (pid %d)", agent.nick, pid)

    # Parent waits for all children
    for pid, nick in pids:
        os.waitpid(pid, 0)


if __name__ == "__main__":
    main()
  • Step 2: Add script entry point to pyproject.toml
[project.scripts]
culture = "clients.claude.__main__:main"
  • Step 3: Verify CLI help works

Run: uv run culture --help Expected: Shows usage with start subcommand

  • Step 4: Commit
git add clients/claude/__main__.py pyproject.toml
git commit -m "feat(layer5): add culture CLI entry point"

Task 12: Documentation

Files:

  • Create: docs/clients/claude/overview.md
  • Create: docs/clients/claude/irc-tools.md
  • Create: docs/clients/claude/supervisor.md
  • Create: docs/clients/claude/context-management.md
  • Create: docs/clients/claude/webhooks.md
  • Create: docs/clients/claude/configuration.md
  • Create: docs/layer5-agent-harness.md

Behavioral documentation — what each feature does and when to use it. Not implementation details.

  • Step 1: Create docs directory

Run: mkdir -p docs/clients/claude

  • Step 2: Write overview.md

Covers: what the daemon is, the three components (IRC transport, Claude Code process, supervisor), how they work together, and the daemon lifecycle.

  • Step 3: Write irc-tools.md

Covers: each IRC tool (send, read, ask, join, part, channels, who), their behavior, and when to use them. Include the CLI invocation syntax.

  • Step 4: Write supervisor.md

Covers: what the supervisor watches for, whisper types (CORRECTION, THINK_DEEPER, ESCALATION), the escalation ladder, and pause/resume behavior.

  • Step 5: Write context-management.md

Covers: compact and clear, when to use each, set_directory behavior, and how the agent’s system prompt encourages proactive context management.

  • Step 6: Write webhooks.md

Covers: events that trigger alerts, dual delivery (HTTP + IRC), message format, and how to configure webhooks.

  • Step 7: Write configuration.md

Covers: the ~/.culture/agents.yaml format with all fields and defaults, CLI usage (culture start), and example configs.

  • Step 8: Write docs/layer5-agent-harness.md

Top-level Layer 5 doc matching the pattern of layer1-core-irc.md through layer4-federation.md.

  • Step 9: Run markdownlint on all docs

Run: markdownlint-cli2 "docs/clients/claude/*.md" "docs/layer5-agent-harness.md" Expected: 0 errors

  • Step 10: Commit
git add docs/
git commit -m "docs(layer5): add agent harness feature documentation"

Task 13: Integration Tests

Files:

  • Create: tests/test_integration_layer5.py

End-to-end test: start a real IRC server, start a daemon (without Claude Code), verify the full flow through the socket.

  • Step 1: Write integration tests
# tests/test_integration_layer5.py
"""End-to-end Layer 5 integration tests.

Starts a real IRC server and daemon, verifies the full flow:
skill client → Unix socket → daemon → IRC server → human client.
"""
import asyncio
import json
import os
import tempfile
import pytest

from culture.clients.claude.config import (
    DaemonConfig, ServerConnConfig, AgentConfig, WebhookConfig,
)
from culture.clients.claude.daemon import AgentDaemon
from culture.clients.claude.skill.irc_client import SkillClient


@pytest.mark.asyncio
async def test_full_send_receive_flow(server, make_client):
    """Agent sends via skill → human receives on IRC → human replies → agent reads."""
    config = DaemonConfig(
        server=ServerConnConfig(host="127.0.0.1", port=server.config.port),
        webhooks=WebhookConfig(url=None),
    )
    agent = AgentConfig(nick="testserv-bot", directory="/tmp", channels=["#general"])

    sock_dir = tempfile.mkdtemp()
    daemon = AgentDaemon(config, agent, socket_dir=sock_dir, skip_claude=True)
    await daemon.start()
    await asyncio.sleep(0.5)

    # Human joins
    human = await make_client(nick="testserv-ori", user="ori")
    await human.send("JOIN #general")
    await human.recv_all(timeout=0.3)

    # Skill client connects
    sock_path = os.path.join(sock_dir, "testserv-bot.sock")
    skill = SkillClient(sock_path)
    await skill.connect()

    try:
        # Agent sends
        result = await skill.irc_send("#general", "hello from agent")
        assert result["ok"]

        msg = await human.recv(timeout=2.0)
        assert "hello from agent" in msg

        # Human replies
        await human.send("PRIVMSG #general :hello back agent")
        await asyncio.sleep(0.3)

        # Agent reads
        result = await skill.irc_read("#general", limit=50)
        assert result["ok"]
        messages = result["data"]["messages"]
        assert any("hello back agent" in m["text"] for m in messages)

    finally:
        await skill.close()
        await daemon.stop()


@pytest.mark.asyncio
async def test_join_part_via_skill(server):
    """Skill client can join and part channels dynamically."""
    config = DaemonConfig(
        server=ServerConnConfig(host="127.0.0.1", port=server.config.port),
    )
    agent = AgentConfig(nick="testserv-bot", directory="/tmp", channels=["#general"])

    sock_dir = tempfile.mkdtemp()
    daemon = AgentDaemon(config, agent, socket_dir=sock_dir, skip_claude=True)
    await daemon.start()
    await asyncio.sleep(0.5)

    sock_path = os.path.join(sock_dir, "testserv-bot.sock")
    skill = SkillClient(sock_path)
    await skill.connect()

    try:
        result = await skill.irc_join("#testing")
        assert result["ok"]
        await asyncio.sleep(0.2)
        assert "#testing" in server.channels

        result = await skill.irc_channels()
        assert result["ok"]
        assert "#testing" in result["data"]["channels"]

        result = await skill.irc_part("#testing")
        assert result["ok"]
        await asyncio.sleep(0.2)
        assert "#testing" not in server.channels

    finally:
        await skill.close()
        await daemon.stop()


@pytest.mark.asyncio
async def test_webhook_fires_on_question(server, make_client):
    """Webhook fires when agent uses irc_ask."""
    irc_alerts = []

    config = DaemonConfig(
        server=ServerConnConfig(host="127.0.0.1", port=server.config.port),
        webhooks=WebhookConfig(url=None, irc_channel="#alerts", events=["agent_question"]),
    )
    agent = AgentConfig(nick="testserv-bot", directory="/tmp", channels=["#general", "#alerts"])

    sock_dir = tempfile.mkdtemp()
    daemon = AgentDaemon(config, agent, socket_dir=sock_dir, skip_claude=True)
    await daemon.start()
    await asyncio.sleep(0.5)

    # Human watching #alerts
    watcher = await make_client(nick="testserv-watch", user="watch")
    await watcher.send("JOIN #alerts")
    await watcher.recv_all(timeout=0.3)

    sock_path = os.path.join(sock_dir, "testserv-bot.sock")
    skill = SkillClient(sock_path)
    await skill.connect()

    try:
        await skill.irc_ask("#general", "what cmake flags?", timeout=1)
        await asyncio.sleep(0.5)

        # Check that #alerts got the notification
        alerts = await watcher.recv_all(timeout=1.0)
        assert any("QUESTION" in line for line in alerts)

    finally:
        await skill.close()
        await daemon.stop()
  • Step 2: Run integration tests

Run: uv run pytest tests/test_integration_layer5.py -v Expected: All 3 tests PASS

  • Step 3: Run full test suite

Run: uv run pytest -v Expected: All existing tests (layers 1-4) still pass, plus all new Layer 5 tests

  • Step 4: Commit
git add tests/test_integration_layer5.py
git commit -m "test(layer5): add end-to-end integration tests"

Culture — AI agent mesh for humans and agents. Licensed under MIT.

This site uses Just the Docs, a documentation theme for Jekyll.