diff --git a/gateway-bridge/bridge.mjs b/gateway-bridge/bridge.mjs index 9f119fd..8b5ad6a 100644 --- a/gateway-bridge/bridge.mjs +++ b/gateway-bridge/bridge.mjs @@ -47,6 +47,9 @@ for (const warning of gatewayAliasWarnings) { const SOCKET_DIR = path.join(homedir(), ".pi", "session-control"); const AGENT_TIMEOUT_MS = 120_000; const API_PORT = parseInt(process.env.BRIDGE_API_PORT || "7890", 10); +const REPLY_LEDGER_PATH = path.join(homedir(), ".pi", "agent", "slack-reply-log.jsonl"); +const REPLY_LEDGER_ROTATED_PATH = `${REPLY_LEDGER_PATH}.1`; +const REPLY_LEDGER_MAX_BYTES = 10 * 1024 * 1024; // Validate required env vars for (const key of ["SLACK_BOT_TOKEN", "SLACK_APP_TOKEN"]) { @@ -116,6 +119,43 @@ function resolveAckReaction(channel, threadTs) { }); } +/** + * Append a durable outbound-reply proof entry used by heartbeat reply detection. + * This tracks replies from both /send (with thread_ts) and /reply endpoints. + */ +function rotateReplyLedgerIfNeeded() { + try { + const stats = fs.statSync(REPLY_LEDGER_PATH); + if (stats.size < REPLY_LEDGER_MAX_BYTES) return; + fs.renameSync(REPLY_LEDGER_PATH, REPLY_LEDGER_ROTATED_PATH); + } catch (err) { + if (err && typeof err === "object" && "code" in err && err.code === "ENOENT") return; + console.warn(`⚠️ failed to rotate reply ledger: ${err instanceof Error ? err.message : String(err)}`); + } +} + +function appendReplyLedgerEntry({ channel, threadTs, route }) { + if (!threadTs) return; + + const entry = { + channel, + thread_ts: threadTs, + route, + replied_at: new Date().toISOString(), + }; + + try { + fs.mkdirSync(path.dirname(REPLY_LEDGER_PATH), { recursive: true }); + rotateReplyLedgerIfNeeded(); + fs.appendFileSync(REPLY_LEDGER_PATH, `${JSON.stringify(entry)}\n`, { + encoding: "utf-8", + mode: 0o600, + }); + } catch (err) { + console.warn(`⚠️ failed to append reply ledger entry: ${err instanceof Error ? err.message : String(err)}`); + } +} + /** * Evict the oldest entries when the registry exceeds MAX_THREADS. * Maps iterate in insertion order, so the first entries are the oldest. @@ -490,9 +530,11 @@ function startApiServer() { console.log(`📤 Sent to ${channel}: ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`); - // If this is a threaded reply, check for a pending ✅ ack reaction. + // If this is a threaded reply, check for a pending ✅ ack reaction + // and append a durable reply ledger entry for heartbeat detection. if (thread_ts) { resolveAckReaction(channel, thread_ts); + appendReplyLedgerEntry({ channel, threadTs: thread_ts, route: "/send" }); } res.writeHead(200, { "Content-Type": "application/json" }); @@ -534,8 +576,14 @@ function startApiServer() { console.log(`📤 Reply to ${thread_id} (${thread.channel}): ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`); - // Check for a pending ✅ ack reaction on the /reply path too. + // Check for a pending ✅ ack reaction on the /reply path too, + // and append a durable reply ledger entry for heartbeat detection. resolveAckReaction(thread.channel, thread.thread_ts); + appendReplyLedgerEntry({ + channel: thread.channel, + threadTs: thread.thread_ts, + route: "/reply", + }); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, ts: result.ts, channel: result.channel })); diff --git a/gateway-bridge/broker-bridge.mjs b/gateway-bridge/broker-bridge.mjs index ab3c6c5..fdb8675 100755 --- a/gateway-bridge/broker-bridge.mjs +++ b/gateway-bridge/broker-bridge.mjs @@ -48,6 +48,9 @@ function clampInt(value, min, max, fallback) { } const API_PORT = clampInt(process.env.BRIDGE_API_PORT || "7890", 0, 65535, 7890); +const REPLY_LEDGER_PATH = path.join(homedir(), ".pi", "agent", "slack-reply-log.jsonl"); +const REPLY_LEDGER_ROTATED_PATH = `${REPLY_LEDGER_PATH}.1`; +const REPLY_LEDGER_MAX_BYTES = 10 * 1024 * 1024; const POLL_INTERVAL_MS = clampInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 0, 60_000, 3000); const MAX_MESSAGES = clampInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 1, 100, 10); const MAX_WAIT_SECONDS = 25; @@ -196,6 +199,43 @@ function resolveAckReaction(channel, threadTs) { }); } +/** + * Append a durable outbound-reply proof entry used by heartbeat reply detection. + * This tracks replies from both /send (with thread_ts) and /reply endpoints. + */ +function rotateReplyLedgerIfNeeded() { + try { + const stats = fs.statSync(REPLY_LEDGER_PATH); + if (stats.size < REPLY_LEDGER_MAX_BYTES) return; + fs.renameSync(REPLY_LEDGER_PATH, REPLY_LEDGER_ROTATED_PATH); + } catch (err) { + if (err && typeof err === "object" && "code" in err && err.code === "ENOENT") return; + logWarn(`⚠️ failed to rotate reply ledger: ${err instanceof Error ? err.message : String(err)}`); + } +} + +function appendReplyLedgerEntry({ channel, threadTs, route }) { + if (!threadTs) return; + + const entry = { + channel, + thread_ts: threadTs, + route, + replied_at: new Date().toISOString(), + }; + + try { + fs.mkdirSync(path.dirname(REPLY_LEDGER_PATH), { recursive: true }); + rotateReplyLedgerIfNeeded(); + fs.appendFileSync(REPLY_LEDGER_PATH, `${JSON.stringify(entry)}\n`, { + encoding: "utf-8", + mode: 0o600, + }); + } catch (err) { + logWarn(`⚠️ failed to append reply ledger entry: ${err instanceof Error ? err.message : String(err)}`); + } +} + let socketPath = null; let cryptoState = null; @@ -1115,9 +1155,11 @@ function startApiServer() { actionRequestBody: { text: safeText }, }); - // If this is a threaded reply, check for a pending ✅ ack reaction. + // If this is a threaded reply, check for a pending ✅ ack reaction + // and append a durable reply ledger entry for heartbeat detection. if (thread_ts) { resolveAckReaction(channel, thread_ts); + appendReplyLedgerEntry({ channel, threadTs: thread_ts, route: "/send" }); } res.writeHead(200, { "Content-Type": "application/json" }); @@ -1152,8 +1194,14 @@ function startApiServer() { actionRequestBody: { text: safeText }, }); - // Check for a pending ✅ ack reaction on the /reply path too. + // Check for a pending ✅ ack reaction on the /reply path too, + // and append a durable reply ledger entry for heartbeat detection. resolveAckReaction(thread.channel, thread.thread_ts); + appendReplyLedgerEntry({ + channel: thread.channel, + threadTs: thread.thread_ts, + route: "/reply", + }); res.writeHead(200, { "Content-Type": "application/json" }); res.end(JSON.stringify({ ok: true, ts: result.ts })); diff --git a/pi/extensions/heartbeat.test.mjs b/pi/extensions/heartbeat.test.mjs index 91bab1e..42f4855 100644 --- a/pi/extensions/heartbeat.test.mjs +++ b/pi/extensions/heartbeat.test.mjs @@ -83,7 +83,7 @@ function hasReplyLogEntry(replyLogContent, threadTs) { return false; } -function hasOutboundSendCommand(sessionJsonlContent, threadTs) { +function hasOutboundBridgeReplyCommand(sessionJsonlContent, threadTs) { const escapedThreadTs = threadTs.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapedThreadTs}["']`); @@ -108,7 +108,7 @@ function hasOutboundSendCommand(sessionJsonlContent, threadTs) { if (item?.name !== "bash") continue; const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; if (!command.includes("curl")) continue; - if (!command.includes("/send")) continue; + if (!command.includes("/send") && !command.includes("/reply")) continue; if (!threadTsPattern.test(command)) continue; return true; } @@ -470,6 +470,16 @@ describe("heartbeat v2: unanswered mention reply detection", () => { assert.equal(hasReplyLogEntry(log, "9999.0000"), false); }); + it("matches reply-log entries emitted from both /send and /reply routes", () => { + const log = [ + '{"channel":"C111","thread_ts":"3456.7890","route":"/send","replied_at":"2026-02-27T00:10:00Z"}', + '{"channel":"C111","thread_ts":"4567.8901","route":"/reply","replied_at":"2026-02-27T00:11:00Z"}', + ].join("\n"); + + assert.equal(hasReplyLogEntry(log, "3456.7890"), true); + assert.equal(hasReplyLogEntry(log, "4567.8901"), true); + }); + it("ignores malformed reply-log lines", () => { const log = ['{"thread_ts":"1234.5678"}', 'not-json', '{"thread_ts":"2345.6789"}'].join("\n"); assert.equal(hasReplyLogEntry(log, "2345.6789"), true); @@ -493,7 +503,28 @@ describe("heartbeat v2: unanswered mention reply detection", () => { }, }); - assert.equal(hasOutboundSendCommand(session, "1234.5678"), true); + assert.equal(hasOutboundBridgeReplyCommand(session, "1234.5678"), true); + }); + + it("detects outbound curl /reply with matching thread_ts", () => { + const session = JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: [ + { + type: "toolCall", + name: "bash", + arguments: { + command: + "curl -s -X POST http://127.0.0.1:7890/reply -H 'Content-Type: application/json' -d '{\"thread_id\":\"thread-1\",\"text\":\"hi\",\"thread_ts\":\"4567.8901\"}'", + }, + }, + ], + }, + }); + + assert.equal(hasOutboundBridgeReplyCommand(session, "4567.8901"), true); }); it("does not treat inbound text containing thread_ts as a reply", () => { @@ -505,7 +536,7 @@ describe("heartbeat v2: unanswered mention reply detection", () => { }, }); - assert.equal(hasOutboundSendCommand(inboundOnly, "1234.5678"), false); + assert.equal(hasOutboundBridgeReplyCommand(inboundOnly, "1234.5678"), false); }); }); diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index 92ab9c9..ab0fb9e 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -25,7 +25,7 @@ import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "@sinclair/typebox"; import { StringEnum } from "@mariozechner/pi-ai"; -import { existsSync, readdirSync, readFileSync, statSync } from "node:fs"; +import { closeSync, existsSync, openSync, readdirSync, readFileSync, readSync, statSync } from "node:fs"; import { homedir } from "node:os"; import { join } from "node:path"; import { discoverSubagentPackages, readSubagentState, resolveEffectiveState } from "./subagent-registry.ts"; @@ -42,7 +42,12 @@ const TODOS_DIR = join(homedir(), ".pi", "todos"); const BRIDGE_URL = "http://127.0.0.1:7890/send"; const BRIDGE_LOG_PRIMARY = join(homedir(), ".pi", "agent", "logs", "gateway-bridge.log"); const BRIDGE_LOG_LEGACY = join(homedir(), ".pi", "agent", "logs", "slack-bridge.log"); +const REPLY_LOG_PRIMARY = join(homedir(), ".pi", "agent", "slack-reply-log.jsonl"); +const REPLY_LOG_ROTATED = `${REPLY_LOG_PRIMARY}.1`; +const REPLY_LOG_TAIL_MAX_BYTES = 8 * 1024 * 1024; // Scan last 8 MiB to bound heartbeat I/O. const SESSION_DIR = join(homedir(), ".pi", "agent", "sessions"); +const SESSION_SCAN_MAX_FILES_PER_DIR = 10; +const SESSION_SCAN_MAX_FILES_TOTAL = 30; const UNANSWERED_MENTION_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes type HeartbeatState = { @@ -412,18 +417,43 @@ function slackTsToMs(ts: string): number | null { return Math.floor(parsed * 1000); } +function readRecentFileTail(filePath: string, maxBytes: number): string { + const stats = statSync(filePath); + if (stats.size <= 0) return ""; + if (stats.size <= maxBytes) { + return readFileSync(filePath, "utf-8"); + } + + const start = Math.max(0, stats.size - maxBytes); + const bytesToRead = stats.size - start; + const fd = openSync(filePath, "r"); + try { + const chunk = Buffer.alloc(bytesToRead); + readSync(fd, chunk, 0, bytesToRead, start); + const decoded = chunk.toString("utf-8"); + + // We started mid-file; drop the first partial line before parsing JSONL. + const firstNewline = decoded.indexOf("\n"); + return firstNewline === -1 ? "" : decoded.slice(firstNewline + 1); + } finally { + closeSync(fd); + } +} + function hasRepliedToThread(threadTs: string): boolean { // Check multiple sources for evidence of a reply to this thread_ts. - // 1. Check the reply tracking log (most reliable — written by the agent). + // 1. Check the reply tracking logs (most reliable — written by the bridge + // for both /send and /reply outbound paths). // File: ~/.pi/agent/slack-reply-log.jsonl - // Each line: {"thread_ts":"...","replied_at":"..."} - const replyLogPath = join(homedir(), ".pi", "agent", "slack-reply-log.jsonl"); - if (existsSync(replyLogPath)) { + // Rotated fallback: ~/.pi/agent/slack-reply-log.jsonl.1 + // Each line: {"thread_ts":"...","replied_at":"...", ...} + for (const replyLogPath of [REPLY_LOG_PRIMARY, REPLY_LOG_ROTATED]) { + if (!existsSync(replyLogPath)) continue; + try { - const content = readFileSync(replyLogPath, "utf-8"); - const lines = content.split("\n"); - for (const line of lines) { + const content = readRecentFileTail(replyLogPath, REPLY_LOG_TAIL_MAX_BYTES); + for (const line of content.split("\n")) { const trimmed = line.trim(); if (!trimmed) continue; try { @@ -436,61 +466,77 @@ function hasRepliedToThread(threadTs: string): boolean { } } } catch { - // File read error — fall through to other checks + // File read error — skip this path and fall through to other checks } } - // 2. Check recent control-agent session logs for explicit outbound /send calls. - // Session files are in ~/.pi/agent/sessions/--home-baudbot_agent--/ - // and named _.jsonl. - const controlAgentSessionDir = join(SESSION_DIR, "--home-baudbot_agent--"); - if (existsSync(controlAgentSessionDir)) { + // 2. Fallback for older runs: scan recent assistant bash tool calls for + // explicit outbound /send or /reply calls carrying this exact thread_ts. + // + // We scan multiple session directories (not just control-agent) because + // replies may come from delegated sessions depending on runtime wiring. + if (existsSync(SESSION_DIR)) { const escapeRegExp = (value: string): string => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapeRegExp(threadTs)}["']`); + let scannedFiles = 0; try { - const sessionFiles = readdirSync(controlAgentSessionDir) - .filter((f) => f.endsWith(".jsonl")) - .sort() - .reverse() - .slice(0, 3); // Check last 3 sessions + const sessionDirs = readdirSync(SESSION_DIR, { withFileTypes: true }) + .filter((entry) => entry.isDirectory()) + .map((entry) => join(SESSION_DIR, entry.name)); - for (const file of sessionFiles) { + for (const sessionDir of sessionDirs) { + if (scannedFiles >= SESSION_SCAN_MAX_FILES_TOTAL) break; + let sessionFiles: string[] = []; try { - const content = readFileSync(join(controlAgentSessionDir, file), "utf-8"); - const lines = content.split("\n"); - - for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed) continue; - - let parsed: any; - try { - parsed = JSON.parse(trimmed); - } catch { - continue; - } - - if (parsed?.type !== "message") continue; - if (parsed?.message?.role !== "assistant") continue; - - const items = parsed?.message?.content; - if (!Array.isArray(items)) continue; - - for (const item of items) { - if (item?.type !== "toolCall") continue; - if (item?.name !== "bash") continue; - - const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; - if (!command.includes("curl")) continue; - if (!command.includes("/send")) continue; - if (!threadTsPattern.test(command)) continue; + sessionFiles = readdirSync(sessionDir) + .filter((f) => f.endsWith(".jsonl")) + .sort() + .reverse() + .slice(0, SESSION_SCAN_MAX_FILES_PER_DIR); + } catch { + continue; + } - return true; + for (const file of sessionFiles) { + if (scannedFiles >= SESSION_SCAN_MAX_FILES_TOTAL) break; + scannedFiles += 1; + try { + const content = readFileSync(join(sessionDir, file), "utf-8"); + const lines = content.split("\n"); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + let parsed: any; + try { + parsed = JSON.parse(trimmed); + } catch { + continue; + } + + if (parsed?.type !== "message") continue; + if (parsed?.message?.role !== "assistant") continue; + + const items = parsed?.message?.content; + if (!Array.isArray(items)) continue; + + for (const item of items) { + if (item?.type !== "toolCall") continue; + if (item?.name !== "bash") continue; + + const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; + if (!command.includes("curl")) continue; + if (!command.includes("/send") && !command.includes("/reply")) continue; + if (!threadTsPattern.test(command)) continue; + + return true; + } } + } catch { + // File read error - skip } - } catch { - // File read error - skip } } } catch {