Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions gateway-bridge/bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ for (const warning of gatewayAliasWarnings) {
const SOCKET_DIR = path.join(homedir(), ".pi", "session-control");
const AGENT_TIMEOUT_MS = 120_000;
const API_PORT = parseInt(process.env.BRIDGE_API_PORT || "7890", 10);
const REPLY_LEDGER_PATH = path.join(homedir(), ".pi", "agent", "slack-reply-log.jsonl");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded ledger file growth — no pruning or TTL

slack-reply-log.jsonl is append-only. Every outbound Slack reply adds a new line and nothing ever removes old entries. The heartbeat check in hasRepliedToThread reads the entire file on every invocation, so on an active bot over weeks or months this will cause steadily increasing I/O on each heartbeat cycle.

Consider either:

  • Periodically rewriting the file keeping only entries from the last N days (e.g. 7 days), or
  • Reading the file in reverse / using a line limit so only recent entries are scanned.

This same concern applies to broker-bridge.mjs (same REPLY_LEDGER_PATH, same append-only strategy).

Prompt To Fix With AI
This is a comment left during a code review.
Path: gateway-bridge/bridge.mjs
Line: 50

Comment:
**Unbounded ledger file growth — no pruning or TTL**

`slack-reply-log.jsonl` is append-only. Every outbound Slack reply adds a new line and nothing ever removes old entries. The heartbeat check in `hasRepliedToThread` reads the **entire file** on every invocation, so on an active bot over weeks or months this will cause steadily increasing I/O on each heartbeat cycle.

Consider either:
- Periodically rewriting the file keeping only entries from the last N days (e.g. 7 days), or
- Reading the file in reverse / using a line limit so only recent entries are scanned.

This same concern applies to `broker-bridge.mjs` (same `REPLY_LEDGER_PATH`, same append-only strategy).

How can I resolve this? If you propose a fix, please make it concise.


// Validate required env vars
for (const key of ["SLACK_BOT_TOKEN", "SLACK_APP_TOKEN"]) {
Expand Down Expand Up @@ -116,6 +117,31 @@ function resolveAckReaction(channel, threadTs) {
});
}

/**
* Append a durable outbound-reply proof entry used by heartbeat reply detection.
* This tracks replies from both /send (with thread_ts) and /reply endpoints.
*/
function appendReplyLedgerEntry({ channel, threadTs, route }) {
if (!threadTs) return;

const entry = {
channel,
thread_ts: threadTs,
route,
replied_at: new Date().toISOString(),
};

try {
fs.mkdirSync(path.dirname(REPLY_LEDGER_PATH), { recursive: true });
fs.appendFileSync(REPLY_LEDGER_PATH, `${JSON.stringify(entry)}\n`, {
encoding: "utf-8",
mode: 0o600,
});
} catch (err) {
console.warn(`⚠️ failed to append reply ledger entry: ${err.message}`);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent error serialisation vs broker-bridge.mjs

broker-bridge.mjs uses err instanceof Error ? err.message : String(err) to safely handle non-Error thrown values, but bridge.mjs accesses err.message directly. If a non-Error is ever thrown (e.g. a plain string), err.message will be undefined and the warning will be useless.

Suggested change
console.warn(`⚠️ failed to append reply ledger entry: ${err.message}`);
console.warn(`⚠️ failed to append reply ledger entry: ${err instanceof Error ? err.message : String(err)}`);
Prompt To Fix With AI
This is a comment left during a code review.
Path: gateway-bridge/bridge.mjs
Line: 141

Comment:
**Inconsistent error serialisation vs `broker-bridge.mjs`**

`broker-bridge.mjs` uses `err instanceof Error ? err.message : String(err)` to safely handle non-`Error` thrown values, but `bridge.mjs` accesses `err.message` directly. If a non-Error is ever thrown (e.g. a plain string), `err.message` will be `undefined` and the warning will be useless.

```suggestion
    console.warn(`⚠️ failed to append reply ledger entry: ${err instanceof Error ? err.message : String(err)}`);
```

How can I resolve this? If you propose a fix, please make it concise.

}
}

/**
* Evict the oldest entries when the registry exceeds MAX_THREADS.
* Maps iterate in insertion order, so the first entries are the oldest.
Expand Down Expand Up @@ -490,9 +516,11 @@ function startApiServer() {

console.log(`📤 Sent to ${channel}: ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`);

// If this is a threaded reply, check for a pending ✅ ack reaction.
// If this is a threaded reply, check for a pending ✅ ack reaction
// and append a durable reply ledger entry for heartbeat detection.
if (thread_ts) {
resolveAckReaction(channel, thread_ts);
appendReplyLedgerEntry({ channel, threadTs: thread_ts, route: "/send" });
}

res.writeHead(200, { "Content-Type": "application/json" });
Expand Down Expand Up @@ -534,8 +562,14 @@ function startApiServer() {

console.log(`📤 Reply to ${thread_id} (${thread.channel}): ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`);

// Check for a pending ✅ ack reaction on the /reply path too.
// Check for a pending ✅ ack reaction on the /reply path too,
// and append a durable reply ledger entry for heartbeat detection.
resolveAckReaction(thread.channel, thread.thread_ts);
appendReplyLedgerEntry({
channel: thread.channel,
threadTs: thread.thread_ts,
route: "/reply",
});

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: result.ts, channel: result.channel }));
Expand Down
38 changes: 36 additions & 2 deletions gateway-bridge/broker-bridge.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ function clampInt(value, min, max, fallback) {
}

const API_PORT = clampInt(process.env.BRIDGE_API_PORT || "7890", 0, 65535, 7890);
const REPLY_LEDGER_PATH = path.join(homedir(), ".pi", "agent", "slack-reply-log.jsonl");
const POLL_INTERVAL_MS = clampInt(process.env.SLACK_BROKER_POLL_INTERVAL_MS || "3000", 0, 60_000, 3000);
const MAX_MESSAGES = clampInt(process.env.SLACK_BROKER_MAX_MESSAGES || "10", 1, 100, 10);
const MAX_WAIT_SECONDS = 25;
Expand Down Expand Up @@ -196,6 +197,31 @@ function resolveAckReaction(channel, threadTs) {
});
}

/**
* Append a durable outbound-reply proof entry used by heartbeat reply detection.
* This tracks replies from both /send (with thread_ts) and /reply endpoints.
*/
function appendReplyLedgerEntry({ channel, threadTs, route }) {
if (!threadTs) return;

const entry = {
channel,
thread_ts: threadTs,
route,
replied_at: new Date().toISOString(),
};

try {
fs.mkdirSync(path.dirname(REPLY_LEDGER_PATH), { recursive: true });
fs.appendFileSync(REPLY_LEDGER_PATH, `${JSON.stringify(entry)}\n`, {
encoding: "utf-8",
mode: 0o600,
});
} catch (err) {
logWarn(`⚠️ failed to append reply ledger entry: ${err instanceof Error ? err.message : String(err)}`);
}
}

let socketPath = null;

let cryptoState = null;
Expand Down Expand Up @@ -1115,9 +1141,11 @@ function startApiServer() {
actionRequestBody: { text: safeText },
});

// If this is a threaded reply, check for a pending ✅ ack reaction.
// If this is a threaded reply, check for a pending ✅ ack reaction
// and append a durable reply ledger entry for heartbeat detection.
if (thread_ts) {
resolveAckReaction(channel, thread_ts);
appendReplyLedgerEntry({ channel, threadTs: thread_ts, route: "/send" });
}

res.writeHead(200, { "Content-Type": "application/json" });
Expand Down Expand Up @@ -1152,8 +1180,14 @@ function startApiServer() {
actionRequestBody: { text: safeText },
});

// Check for a pending ✅ ack reaction on the /reply path too.
// Check for a pending ✅ ack reaction on the /reply path too,
// and append a durable reply ledger entry for heartbeat detection.
resolveAckReaction(thread.channel, thread.thread_ts);
appendReplyLedgerEntry({
channel: thread.channel,
threadTs: thread.thread_ts,
route: "/reply",
});

res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ ok: true, ts: result.ts }));
Expand Down
10 changes: 10 additions & 0 deletions pi/extensions/heartbeat.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,16 @@ describe("heartbeat v2: unanswered mention reply detection", () => {
assert.equal(hasReplyLogEntry(log, "9999.0000"), false);
});

it("matches reply-log entries emitted from both /send and /reply routes", () => {
const log = [
'{"channel":"C111","thread_ts":"3456.7890","route":"/send","replied_at":"2026-02-27T00:10:00Z"}',
'{"channel":"C111","thread_ts":"4567.8901","route":"/reply","replied_at":"2026-02-27T00:11:00Z"}',
].join("\n");

assert.equal(hasReplyLogEntry(log, "3456.7890"), true);
assert.equal(hasReplyLogEntry(log, "4567.8901"), true);
});

it("ignores malformed reply-log lines", () => {
const log = ['{"thread_ts":"1234.5678"}', 'not-json', '{"thread_ts":"2345.6789"}'].join("\n");
assert.equal(hasReplyLogEntry(log, "2345.6789"), true);
Expand Down
93 changes: 53 additions & 40 deletions pi/extensions/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,10 @@ function slackTsToMs(ts: string): number | null {
function hasRepliedToThread(threadTs: string): boolean {
// Check multiple sources for evidence of a reply to this thread_ts.

// 1. Check the reply tracking log (most reliable — written by the agent).
// 1. Check the reply tracking log (most reliable — written by the bridge
// for both /send and /reply outbound paths).
// File: ~/.pi/agent/slack-reply-log.jsonl
// Each line: {"thread_ts":"...","replied_at":"..."}
// Each line: {"thread_ts":"...","replied_at":"...", ...}
const replyLogPath = join(homedir(), ".pi", "agent", "slack-reply-log.jsonl");
if (existsSync(replyLogPath)) {
try {
Expand All @@ -440,57 +441,69 @@ function hasRepliedToThread(threadTs: string): boolean {
}
}

// 2. Check recent control-agent session logs for explicit outbound /send calls.
// Session files are in ~/.pi/agent/sessions/--home-baudbot_agent--/
// and named <timestamp>_<uuid>.jsonl.
const controlAgentSessionDir = join(SESSION_DIR, "--home-baudbot_agent--");
if (existsSync(controlAgentSessionDir)) {
// 2. Fallback for older runs: scan recent assistant bash tool calls for
// explicit outbound /send calls carrying this exact thread_ts.
//
// We scan multiple session directories (not just control-agent) because
// replies may come from delegated sessions depending on runtime wiring.
if (existsSync(SESSION_DIR)) {
const escapeRegExp = (value: string): string => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapeRegExp(threadTs)}["']`);

try {
const sessionFiles = readdirSync(controlAgentSessionDir)
.filter((f) => f.endsWith(".jsonl"))
.sort()
.reverse()
.slice(0, 3); // Check last 3 sessions
const sessionDirs = readdirSync(SESSION_DIR, { withFileTypes: true })
.filter((entry) => entry.isDirectory())
.map((entry) => join(SESSION_DIR, entry.name));

for (const file of sessionFiles) {
for (const sessionDir of sessionDirs) {
let sessionFiles: string[] = [];
try {
const content = readFileSync(join(controlAgentSessionDir, file), "utf-8");
const lines = content.split("\n");

for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;

let parsed: any;
try {
parsed = JSON.parse(trimmed);
} catch {
continue;
}
sessionFiles = readdirSync(sessionDir)
.filter((f) => f.endsWith(".jsonl"))
.sort()
.reverse()
.slice(0, 10);
} catch {
continue;
}

for (const file of sessionFiles) {
try {
const content = readFileSync(join(sessionDir, file), "utf-8");
const lines = content.split("\n");

if (parsed?.type !== "message") continue;
if (parsed?.message?.role !== "assistant") continue;
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;

const items = parsed?.message?.content;
if (!Array.isArray(items)) continue;
let parsed: any;
try {
parsed = JSON.parse(trimmed);
} catch {
continue;
}

for (const item of items) {
if (item?.type !== "toolCall") continue;
if (item?.name !== "bash") continue;
if (parsed?.type !== "message") continue;
if (parsed?.message?.role !== "assistant") continue;

const command = typeof item?.arguments?.command === "string" ? item.arguments.command : "";
if (!command.includes("curl")) continue;
if (!command.includes("/send")) continue;
if (!threadTsPattern.test(command)) continue;
const items = parsed?.message?.content;
if (!Array.isArray(items)) continue;

return true;
for (const item of items) {
if (item?.type !== "toolCall") continue;
if (item?.name !== "bash") continue;

const command = typeof item?.arguments?.command === "string" ? item.arguments.command : "";
if (!command.includes("curl")) continue;
if (!command.includes("/send")) continue;
if (!threadTsPattern.test(command)) continue;

return true;
}
}
} catch {
// File read error - skip
}
} catch {
// File read error - skip
}
}
Comment on lines +458 to 508
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fallback session scan now covers all directories but still only matches /send

The PR expands the fallback (Path 2) from scanning a single --home-baudbot_agent-- directory to scanning every subdirectory of SESSION_DIR, and increases the per-directory file limit from 3 to 10. However, the pattern match still only looks for curl commands that include "/send" (line 498). Replies issued via /reply in older sessions that predate the ledger will remain undetected through this path.

Since the comment explicitly calls this a "fallback for older runs," this is a minor gap, but it's worth noting so future contributors understand the limitation: Path 2 cannot retroactively detect pre-ledger /reply calls.

The expanded scan also has a performance implication on the hot path: if Path 1 misses (e.g. ledger doesn't exist or was rotated), Path 2 now reads up to N_dirs × 10 JSONL session files in full on every heartbeat invocation.

Prompt To Fix With AI
This is a comment left during a code review.
Path: pi/extensions/heartbeat.ts
Line: 458-508

Comment:
**Fallback session scan now covers all directories but still only matches `/send`**

The PR expands the fallback (Path 2) from scanning a single `--home-baudbot_agent--` directory to scanning **every** subdirectory of `SESSION_DIR`, and increases the per-directory file limit from 3 to 10. However, the pattern match still only looks for curl commands that include `"/send"` (line 498). Replies issued via `/reply` in older sessions that predate the ledger will remain undetected through this path.

Since the comment explicitly calls this a "fallback for older runs," this is a minor gap, but it's worth noting so future contributors understand the limitation: Path 2 cannot retroactively detect pre-ledger `/reply` calls.

The expanded scan also has a performance implication on the hot path: if Path 1 misses (e.g. ledger doesn't exist or was rotated), Path 2 now reads up to `N_dirs × 10` JSONL session files in full on every heartbeat invocation.

How can I resolve this? If you propose a fix, please make it concise.

} catch {
Expand Down