-
Notifications
You must be signed in to change notification settings - Fork 12
Heartbeat: Record outbound reply proofs #201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -47,6 +47,7 @@ for (const warning of gatewayAliasWarnings) { | |||||
| const SOCKET_DIR = path.join(homedir(), ".pi", "session-control"); | ||||||
| const AGENT_TIMEOUT_MS = 120_000; | ||||||
| const API_PORT = parseInt(process.env.BRIDGE_API_PORT || "7890", 10); | ||||||
| const REPLY_LEDGER_PATH = path.join(homedir(), ".pi", "agent", "slack-reply-log.jsonl"); | ||||||
|
|
||||||
| // Validate required env vars | ||||||
| for (const key of ["SLACK_BOT_TOKEN", "SLACK_APP_TOKEN"]) { | ||||||
|
|
@@ -116,6 +117,31 @@ function resolveAckReaction(channel, threadTs) { | |||||
| }); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Append a durable outbound-reply proof entry used by heartbeat reply detection. | ||||||
| * This tracks replies from both /send (with thread_ts) and /reply endpoints. | ||||||
| */ | ||||||
| function appendReplyLedgerEntry({ channel, threadTs, route }) { | ||||||
| if (!threadTs) return; | ||||||
|
|
||||||
| const entry = { | ||||||
| channel, | ||||||
| thread_ts: threadTs, | ||||||
| route, | ||||||
| replied_at: new Date().toISOString(), | ||||||
| }; | ||||||
|
|
||||||
| try { | ||||||
| fs.mkdirSync(path.dirname(REPLY_LEDGER_PATH), { recursive: true }); | ||||||
| fs.appendFileSync(REPLY_LEDGER_PATH, `${JSON.stringify(entry)}\n`, { | ||||||
| encoding: "utf-8", | ||||||
| mode: 0o600, | ||||||
| }); | ||||||
| } catch (err) { | ||||||
| console.warn(`⚠️ failed to append reply ledger entry: ${err.message}`); | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Inconsistent error serialisation vs
Suggested change
Prompt To Fix With AIThis is a comment left during a code review.
Path: gateway-bridge/bridge.mjs
Line: 141
Comment:
**Inconsistent error serialisation vs `broker-bridge.mjs`**
`broker-bridge.mjs` uses `err instanceof Error ? err.message : String(err)` to safely handle non-`Error` thrown values, but `bridge.mjs` accesses `err.message` directly. If a non-Error is ever thrown (e.g. a plain string), `err.message` will be `undefined` and the warning will be useless.
```suggestion
console.warn(`⚠️ failed to append reply ledger entry: ${err instanceof Error ? err.message : String(err)}`);
```
How can I resolve this? If you propose a fix, please make it concise. |
||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Evict the oldest entries when the registry exceeds MAX_THREADS. | ||||||
| * Maps iterate in insertion order, so the first entries are the oldest. | ||||||
|
|
@@ -490,9 +516,11 @@ function startApiServer() { | |||||
|
|
||||||
| console.log(`📤 Sent to ${channel}: ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`); | ||||||
|
|
||||||
| // If this is a threaded reply, check for a pending ✅ ack reaction. | ||||||
| // If this is a threaded reply, check for a pending ✅ ack reaction | ||||||
| // and append a durable reply ledger entry for heartbeat detection. | ||||||
| if (thread_ts) { | ||||||
| resolveAckReaction(channel, thread_ts); | ||||||
| appendReplyLedgerEntry({ channel, threadTs: thread_ts, route: "/send" }); | ||||||
| } | ||||||
|
|
||||||
| res.writeHead(200, { "Content-Type": "application/json" }); | ||||||
|
|
@@ -534,8 +562,14 @@ function startApiServer() { | |||||
|
|
||||||
| console.log(`📤 Reply to ${thread_id} (${thread.channel}): ${text.slice(0, 80)}${text.length > 80 ? "..." : ""}`); | ||||||
|
|
||||||
| // Check for a pending ✅ ack reaction on the /reply path too. | ||||||
| // Check for a pending ✅ ack reaction on the /reply path too, | ||||||
| // and append a durable reply ledger entry for heartbeat detection. | ||||||
| resolveAckReaction(thread.channel, thread.thread_ts); | ||||||
| appendReplyLedgerEntry({ | ||||||
| channel: thread.channel, | ||||||
| threadTs: thread.thread_ts, | ||||||
| route: "/reply", | ||||||
| }); | ||||||
|
|
||||||
| res.writeHead(200, { "Content-Type": "application/json" }); | ||||||
| res.end(JSON.stringify({ ok: true, ts: result.ts, channel: result.channel })); | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -415,9 +415,10 @@ function slackTsToMs(ts: string): number | null { | |
| function hasRepliedToThread(threadTs: string): boolean { | ||
| // Check multiple sources for evidence of a reply to this thread_ts. | ||
|
|
||
| // 1. Check the reply tracking log (most reliable — written by the agent). | ||
| // 1. Check the reply tracking log (most reliable — written by the bridge | ||
| // for both /send and /reply outbound paths). | ||
| // File: ~/.pi/agent/slack-reply-log.jsonl | ||
| // Each line: {"thread_ts":"...","replied_at":"..."} | ||
| // Each line: {"thread_ts":"...","replied_at":"...", ...} | ||
| const replyLogPath = join(homedir(), ".pi", "agent", "slack-reply-log.jsonl"); | ||
| if (existsSync(replyLogPath)) { | ||
| try { | ||
|
|
@@ -440,57 +441,69 @@ function hasRepliedToThread(threadTs: string): boolean { | |
| } | ||
| } | ||
|
|
||
| // 2. Check recent control-agent session logs for explicit outbound /send calls. | ||
| // Session files are in ~/.pi/agent/sessions/--home-baudbot_agent--/ | ||
| // and named <timestamp>_<uuid>.jsonl. | ||
| const controlAgentSessionDir = join(SESSION_DIR, "--home-baudbot_agent--"); | ||
| if (existsSync(controlAgentSessionDir)) { | ||
| // 2. Fallback for older runs: scan recent assistant bash tool calls for | ||
| // explicit outbound /send calls carrying this exact thread_ts. | ||
| // | ||
| // We scan multiple session directories (not just control-agent) because | ||
| // replies may come from delegated sessions depending on runtime wiring. | ||
| if (existsSync(SESSION_DIR)) { | ||
| const escapeRegExp = (value: string): string => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); | ||
| const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapeRegExp(threadTs)}["']`); | ||
|
|
||
| try { | ||
| const sessionFiles = readdirSync(controlAgentSessionDir) | ||
| .filter((f) => f.endsWith(".jsonl")) | ||
| .sort() | ||
| .reverse() | ||
| .slice(0, 3); // Check last 3 sessions | ||
| const sessionDirs = readdirSync(SESSION_DIR, { withFileTypes: true }) | ||
| .filter((entry) => entry.isDirectory()) | ||
| .map((entry) => join(SESSION_DIR, entry.name)); | ||
|
|
||
| for (const file of sessionFiles) { | ||
| for (const sessionDir of sessionDirs) { | ||
| let sessionFiles: string[] = []; | ||
| try { | ||
| const content = readFileSync(join(controlAgentSessionDir, file), "utf-8"); | ||
| const lines = content.split("\n"); | ||
|
|
||
| for (const line of lines) { | ||
| const trimmed = line.trim(); | ||
| if (!trimmed) continue; | ||
|
|
||
| let parsed: any; | ||
| try { | ||
| parsed = JSON.parse(trimmed); | ||
| } catch { | ||
| continue; | ||
| } | ||
| sessionFiles = readdirSync(sessionDir) | ||
| .filter((f) => f.endsWith(".jsonl")) | ||
| .sort() | ||
| .reverse() | ||
| .slice(0, 10); | ||
| } catch { | ||
| continue; | ||
| } | ||
|
|
||
| for (const file of sessionFiles) { | ||
| try { | ||
| const content = readFileSync(join(sessionDir, file), "utf-8"); | ||
| const lines = content.split("\n"); | ||
|
|
||
| if (parsed?.type !== "message") continue; | ||
| if (parsed?.message?.role !== "assistant") continue; | ||
| for (const line of lines) { | ||
| const trimmed = line.trim(); | ||
| if (!trimmed) continue; | ||
|
|
||
| const items = parsed?.message?.content; | ||
| if (!Array.isArray(items)) continue; | ||
| let parsed: any; | ||
| try { | ||
| parsed = JSON.parse(trimmed); | ||
| } catch { | ||
| continue; | ||
| } | ||
|
|
||
| for (const item of items) { | ||
| if (item?.type !== "toolCall") continue; | ||
| if (item?.name !== "bash") continue; | ||
| if (parsed?.type !== "message") continue; | ||
| if (parsed?.message?.role !== "assistant") continue; | ||
|
|
||
| const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; | ||
| if (!command.includes("curl")) continue; | ||
| if (!command.includes("/send")) continue; | ||
| if (!threadTsPattern.test(command)) continue; | ||
| const items = parsed?.message?.content; | ||
| if (!Array.isArray(items)) continue; | ||
|
|
||
| return true; | ||
| for (const item of items) { | ||
| if (item?.type !== "toolCall") continue; | ||
| if (item?.name !== "bash") continue; | ||
|
|
||
| const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; | ||
| if (!command.includes("curl")) continue; | ||
| if (!command.includes("/send")) continue; | ||
| if (!threadTsPattern.test(command)) continue; | ||
|
|
||
| return true; | ||
| } | ||
| } | ||
| } catch { | ||
| // File read error - skip | ||
| } | ||
| } catch { | ||
| // File read error - skip | ||
| } | ||
| } | ||
|
Comment on lines
+458
to
508
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fallback session scan now covers all directories but still only matches The PR expands the fallback (Path 2) from scanning a single Since the comment explicitly calls this a "fallback for older runs," this is a minor gap, but it's worth noting so future contributors understand the limitation: Path 2 cannot retroactively detect pre-ledger The expanded scan also has a performance implication on the hot path: if Path 1 misses (e.g. ledger doesn't exist or was rotated), Path 2 now reads up to Prompt To Fix With AIThis is a comment left during a code review.
Path: pi/extensions/heartbeat.ts
Line: 458-508
Comment:
**Fallback session scan now covers all directories but still only matches `/send`**
The PR expands the fallback (Path 2) from scanning a single `--home-baudbot_agent--` directory to scanning **every** subdirectory of `SESSION_DIR`, and increases the per-directory file limit from 3 to 10. However, the pattern match still only looks for curl commands that include `"/send"` (line 498). Replies issued via `/reply` in older sessions that predate the ledger will remain undetected through this path.
Since the comment explicitly calls this a "fallback for older runs," this is a minor gap, but it's worth noting so future contributors understand the limitation: Path 2 cannot retroactively detect pre-ledger `/reply` calls.
The expanded scan also has a performance implication on the hot path: if Path 1 misses (e.g. ledger doesn't exist or was rotated), Path 2 now reads up to `N_dirs × 10` JSONL session files in full on every heartbeat invocation.
How can I resolve this? If you propose a fix, please make it concise. |
||
| } catch { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unbounded ledger file growth — no pruning or TTL
slack-reply-log.jsonlis append-only. Every outbound Slack reply adds a new line and nothing ever removes old entries. The heartbeat check inhasRepliedToThreadreads the entire file on every invocation, so on an active bot over weeks or months this will cause steadily increasing I/O on each heartbeat cycle.Consider either:
This same concern applies to
broker-bridge.mjs(sameREPLY_LEDGER_PATH, same append-only strategy).Prompt To Fix With AI