diff --git a/src/app/slack/webhook/route.ts b/src/app/slack/webhook/route.ts index 80550d5e7..a629f2e6b 100644 --- a/src/app/slack/webhook/route.ts +++ b/src/app/slack/webhook/route.ts @@ -23,7 +23,7 @@ import { getDevUserSuffix } from '@/lib/slack-bot/dev-user-info'; import { APP_URL } from '@/lib/constants'; import { verifySlackRequest } from '@/lib/slack/verify-request'; import { db } from '@/lib/drizzle'; -import { cli_sessions_v2 } from '@/db/schema'; +import { cliSessions } from '@/db/schema'; import { eq } from 'drizzle-orm'; import type { Owner } from '@/lib/integrations/core/types'; @@ -52,9 +52,9 @@ function buildSessionUrl(dbSessionId: string, owner: Owner): string { */ async function getDbSessionIdFromCloudAgentId(cloudAgentSessionId: string): Promise { const [session] = await db - .select({ session_id: cli_sessions_v2.session_id }) - .from(cli_sessions_v2) - .where(eq(cli_sessions_v2.cloud_agent_session_id, cloudAgentSessionId)) + .select({ session_id: cliSessions.session_id }) + .from(cliSessions) + .where(eq(cliSessions.cloud_agent_session_id, cloudAgentSessionId)) .limit(1); return session?.session_id ?? null; diff --git a/src/lib/cloud-agent-next/run-session.ts b/src/lib/cloud-agent-next/run-session.ts deleted file mode 100644 index 3fd979720..000000000 --- a/src/lib/cloud-agent-next/run-session.ts +++ /dev/null @@ -1,377 +0,0 @@ -import 'server-only'; -import { getEnvVariable } from '@/lib/dotenvx'; -import { signStreamTicket, type StreamTicketPayload } from '@/lib/cloud-agent/stream-ticket'; -import { createWebSocketManager } from './websocket-manager'; -import { createEventProcessor, type ProcessedMessage } from './processor'; -import type { CloudAgentEvent, StreamError } from './event-types'; -import type { CloudAgentNextClient } from './cloud-agent-client'; -import type { PrepareSessionInput, InitiateFromPreparedSessionInput } from './cloud-agent-client'; - -/** - * Server-side helper for running a cloud-agent-next session to completion. - * - * Encapsulates the full lifecycle: - * prepare → initiate → sign ticket → connect WebSocket → stream events → return result - * - * This is the server-side equivalent of the frontend useCloudAgentStream hook, - * designed for headless consumers like the Slack bot and security agent. - */ - -// --------------------------------------------------------------------------- -// Constants -// --------------------------------------------------------------------------- - -const CLOUD_AGENT_NEXT_WS_URL = getEnvVariable('NEXT_PUBLIC_CLOUD_AGENT_NEXT_WS_URL'); -const CLOUD_AGENT_NEXT_API_URL = getEnvVariable('CLOUD_AGENT_NEXT_API_URL'); - -const DEFAULT_STREAM_TIMEOUT_MS = 15 * 60 * 1000; // 15 minutes -const COMPLETE_GRACE_MS = 1000; // Wait 1s after 'complete' for final events - -// --------------------------------------------------------------------------- -// URL resolution -// --------------------------------------------------------------------------- - -/** - * Resolve a (possibly relative) stream URL returned by initiateFromPreparedSession - * into an absolute WebSocket URL. - * - * Resolution order for the base URL: - * 1. NEXT_PUBLIC_CLOUD_AGENT_NEXT_WS_URL (preferred, purpose-built for WS) - * 2. CLOUD_AGENT_NEXT_API_URL (fallback, the tRPC API base) - */ -export function resolveStreamUrl(streamUrl: string): string { - if (!streamUrl) { - throw new Error('Cloud Agent stream URL is missing'); - } - - let url: URL; - if (/^(wss?|https?):\/\//i.test(streamUrl)) { - url = new URL(streamUrl); - } else { - const baseUrl = CLOUD_AGENT_NEXT_WS_URL || CLOUD_AGENT_NEXT_API_URL; - if (!baseUrl) { - throw new Error( - 'Neither NEXT_PUBLIC_CLOUD_AGENT_NEXT_WS_URL nor CLOUD_AGENT_NEXT_API_URL is configured' - ); - } - url = new URL(streamUrl, baseUrl); - } - - // Upgrade HTTP(S) to WS(S) - if (url.protocol === 'http:') url.protocol = 'ws:'; - else if (url.protocol === 'https:') url.protocol = 'wss:'; - - return url.toString(); -} - -// --------------------------------------------------------------------------- -// Text extraction -// --------------------------------------------------------------------------- - -type MessagePart = ProcessedMessage['parts'][number]; -type TextMessagePart = Extract; - -function isTextPart(part: MessagePart): part is TextMessagePart { - return part.type === 'text'; -} - -/** - * Extract concatenated text content from a completed message's parts. - */ -export function extractTextFromMessage(message: ProcessedMessage): string { - return message.parts - .filter(isTextPart) - .map(part => part.text ?? '') - .join('') - .trim(); -} - -// --------------------------------------------------------------------------- -// Public types -// --------------------------------------------------------------------------- - -/** Input for runSessionToCompletion */ -export type RunSessionInput = { - /** An already-constructed CloudAgentNextClient (caller owns auth / balance-check config). */ - client: CloudAgentNextClient; - /** Fields forwarded to prepareSession. */ - prepareInput: PrepareSessionInput; - /** Partial override for initiateFromPreparedSession (e.g. githubToken). */ - initiateInput?: Omit; - /** Payload fields for signing the WebSocket stream ticket. */ - ticketPayload: Pick; - /** Stream timeout in ms (default: 15 minutes). */ - streamTimeoutMs?: number; - /** Optional log prefix for console messages (e.g. '[SlackBot]'). */ - logPrefix?: string; -}; - -/** Result from runSessionToCompletion */ -export type RunSessionResult = { - /** The final text response extracted from the assistant's completed message(s). */ - response: string; - /** The cloud-agent session ID (available even on failure). */ - sessionId?: string; - /** Whether the session encountered an error. */ - hasError: boolean; - /** Collected status/error messages for diagnostics. */ - statusMessages: string[]; -}; - -// --------------------------------------------------------------------------- -// Main entry point -// --------------------------------------------------------------------------- - -/** - * Run a cloud-agent-next session to completion, returning the final text result. - * - * Steps: - * 1. prepareSession → cloudAgentSessionId + kiloSessionId - * 2. initiateFromPrepared → streamUrl - * 3. resolveStreamUrl → absolute wss:// URL - * 4. signStreamTicket → short-lived JWT for WebSocket auth - * 5. EventProcessor + WebSocketManager → stream events until idle/complete/error - * 6. Return aggregated text result - */ -export async function runSessionToCompletion(input: RunSessionInput): Promise { - const { - client, - prepareInput, - initiateInput, - ticketPayload, - logPrefix = '[CloudAgentNext]', - } = input; - const streamTimeoutMs = input.streamTimeoutMs ?? DEFAULT_STREAM_TIMEOUT_MS; - - const statusMessages: string[] = []; - let completionResult: string | undefined; - let sessionId: string | undefined; - let kiloSessionId: string | undefined; - let hasError = false; - let errorMessage: string | undefined; - - // 1. Prepare - try { - const prepared = await client.prepareSession(prepareInput); - sessionId = prepared.cloudAgentSessionId; - kiloSessionId = prepared.kiloSessionId; - } catch (error) { - const msg = error instanceof Error ? error.message : String(error); - console.error(`${logPrefix} Error preparing session:`, msg, error); - return { - response: `Error preparing Cloud Agent: ${msg}`, - sessionId, - hasError: true, - statusMessages, - }; - } - - if (!sessionId || !kiloSessionId) { - const msg = 'Session preparation did not return session IDs.'; - console.error(`${logPrefix} ${msg}`); - return { response: msg, sessionId, hasError: true, statusMessages }; - } - - // 2. Initiate - let streamUrl: string; - try { - const initiated = await client.initiateFromPreparedSession({ - cloudAgentSessionId: sessionId, - ...initiateInput, - }); - streamUrl = initiated.streamUrl; - } catch (error) { - const msg = error instanceof Error ? error.message : String(error); - console.error(`${logPrefix} Error initiating session:`, msg, error); - return { - response: `Error initiating Cloud Agent: ${msg}`, - sessionId, - hasError: true, - statusMessages, - }; - } - - // 3. Resolve URL - let wsUrl: string; - try { - wsUrl = resolveStreamUrl(streamUrl); - } catch (error) { - const msg = error instanceof Error ? error.message : String(error); - console.error(`${logPrefix} Error resolving stream URL:`, msg, error); - return { - response: `Error resolving stream URL: ${msg}`, - sessionId, - hasError: true, - statusMessages, - }; - } - - // 4. Sign ticket - const ticketFields: StreamTicketPayload = { - userId: ticketPayload.userId, - kiloSessionId, - cloudAgentSessionId: sessionId, - organizationId: ticketPayload.organizationId, - }; - const { ticket } = signStreamTicket(ticketFields); - - // 5. Wire up EventProcessor + WebSocketManager - let resolveStream: (() => void) | undefined; - let completeGraceTimeoutId: ReturnType | undefined; - let streamCompleted = false; - const streamTimeoutRef: { id?: ReturnType } = {}; - - const resolveOnce = () => { - if (streamCompleted) return; - streamCompleted = true; - if (streamTimeoutRef.id) clearTimeout(streamTimeoutRef.id); - if (completeGraceTimeoutId) clearTimeout(completeGraceTimeoutId); - resolveStream?.(); - }; - - const processor = createEventProcessor({ - callbacks: { - onMessageCompleted: (_sid, _mid, message) => { - if (message.info.role !== 'assistant') return; - const text = extractTextFromMessage(message); - if (text) completionResult = text; - - if (message.info.error) { - const errData = message.info.error as { data?: { message?: string } }; - hasError = true; - errorMessage = errData?.data?.message ?? 'Assistant message failed.'; - } - }, - onSessionStatusChanged: status => { - if (status.type === 'idle') resolveOnce(); - }, - onError: error => { - hasError = true; - errorMessage = error; - resolveOnce(); - }, - }, - }); - - const streamDone = new Promise(resolve => { - resolveStream = resolve; - }); - - const scheduleCompleteGrace = () => { - if (completeGraceTimeoutId) return; - completeGraceTimeoutId = setTimeout(resolveOnce, COMPLETE_GRACE_MS); - }; - - const wsManager = createWebSocketManager({ - url: wsUrl, - ticket, - onEvent: (event: CloudAgentEvent) => { - processor.processEvent(event); - - switch (event.streamEventType) { - case 'complete': { - const data = event.data as { exitCode?: number; metadata?: { executionTimeMs?: number } }; - statusMessages.push( - `Session completed${data?.metadata?.executionTimeMs !== undefined ? ` in ${data.metadata.executionTimeMs}ms` : ''} with exit code ${data?.exitCode ?? 'unknown'}` - ); - scheduleCompleteGrace(); - break; - } - case 'error': { - const data = event.data as { error?: string }; - const text = data?.error ?? 'Cloud Agent error'; - statusMessages.push(`Error: ${text}`); - hasError = true; - errorMessage = text; - resolveOnce(); - break; - } - case 'interrupted': { - const data = event.data as { reason?: string }; - const reason = data?.reason ?? 'Session interrupted'; - statusMessages.push(`Session interrupted: ${reason}`); - hasError = true; - errorMessage = reason; - resolveOnce(); - break; - } - case 'output': { - const data = event.data as { source?: string; content?: string }; - if (data?.source === 'stderr') { - statusMessages.push(`[stderr] ${data.content ?? ''}`.trim()); - hasError = true; - } - break; - } - case 'status': { - const data = event.data as { message?: string }; - if (data?.message) statusMessages.push(data.message); - break; - } - } - }, - onStateChange: state => { - if (state.status === 'error') { - hasError = true; - errorMessage = state.error; - resolveOnce(); - } - if (state.status === 'disconnected') { - resolveOnce(); - } - }, - onError: (error: StreamError) => { - hasError = true; - errorMessage = `${error.code}: ${error.message}`; - resolveOnce(); - }, - onRefreshTicket: async () => { - const refreshed = signStreamTicket(ticketFields); - return refreshed.ticket; - }, - }); - - // 6. Stream - console.log(`${logPrefix} Connecting to stream for session ${sessionId}...`); - wsManager.connect(); - - streamTimeoutRef.id = setTimeout(() => { - hasError = true; - errorMessage = `Stream timed out after ${streamTimeoutMs}ms`; - resolveOnce(); - }, streamTimeoutMs); - - await streamDone; - wsManager.disconnect(); - - console.log( - `${logPrefix} Stream completed. statusMessages=${statusMessages.length}, hasResult=${!!completionResult}` - ); - - // 7. Build result - if (hasError) { - const details = [errorMessage, ...statusMessages].filter(Boolean).join('\n'); - return { - response: `Cloud Agent session ${sessionId} encountered errors:\n${details}`, - sessionId, - hasError: true, - statusMessages, - }; - } - - if (completionResult) { - return { - response: `Cloud Agent session ${sessionId} completed:\n\n${completionResult}`, - sessionId, - hasError: false, - statusMessages, - }; - } - - return { - response: `Cloud Agent session ${sessionId} completed successfully.\n\nStatus:\n${statusMessages.slice(-5).join('\n')}`, - sessionId, - hasError: false, - statusMessages, - }; -} diff --git a/src/lib/slack-bot.ts b/src/lib/slack-bot.ts index db9cf99ed..2074a5a87 100644 --- a/src/lib/slack-bot.ts +++ b/src/lib/slack-bot.ts @@ -1,8 +1,7 @@ import { - createCloudAgentNextClient, - type PrepareSessionInput, -} from '@/lib/cloud-agent-next/cloud-agent-client'; -import { runSessionToCompletion } from '@/lib/cloud-agent-next/run-session'; + createCloudAgentClient, + type InitiateSessionInput, +} from '@/lib/cloud-agent/cloud-agent-client'; import { getGitHubTokenForUser, getGitHubTokenForOrganization, @@ -67,7 +66,7 @@ Additional context may be appended to this prompt: - Slack conversation context (recent messages, thread context) - Available GitHub repositories for this Slack integration -Treat this context as authoritative. Prefer selecting a repo from the provided repository list. If the user requests work on a repo that isn't in the list, ask them to confirm the exact owner/repo and ensure it's accessible to the integration. Never invent repository names. +Treat this context as authoritative. Prefer selecting a repo from the provided repository list. If the user requests work on a repo that isn’t in the list, ask them to confirm the exact owner/repo and ensure it’s accessible to the integration. Never invent repository names. ## Tool: spawn_cloud_agent You can call the tool "spawn_cloud_agent" to run a Cloud Agent session for coding work on a GitHub repository. @@ -92,14 +91,14 @@ Provide: - prompt: a clear, specific task with constraints and success criteria Your prompt to the agent should usually include: -- the desired outcome (what "done" looks like) +- the desired outcome (what “done” looks like) - any constraints (keep changes minimal, follow existing patterns, etc.) - a request to open a PR and return the PR URL ## Accuracy & safety -- Don't claim you ran tools, changed code, or created a PR unless the tool results confirm it. -- Don't fabricate links (including PR URLs). -- If you can't proceed (missing repo, missing details, permissions), say what's missing and what you need next.`; +- Don’t claim you ran tools, changed code, or created a PR unless the tool results confirm it. +- Don’t fabricate links (including PR URLs). +- If you can’t proceed (missing repo, missing details, permissions), say what’s missing and what you need next.`; /** * Tool definition for spawning Cloud Agent sessions @@ -213,8 +212,7 @@ async function getSlackRequesterInfo( } /** - * Spawn a Cloud Agent session and collect the results. - * Delegates to the shared runSessionToCompletion helper. + * Spawn a Cloud Agent session and collect the results */ async function spawnCloudAgentSession( args: { @@ -225,7 +223,6 @@ async function spawnCloudAgentSession( owner: Owner, model: string, authToken: string, - ticketUserId: string, requesterInfo?: SlackRequesterInfo ): Promise { console.log('[SlackBot] spawnCloudAgentSession called with args:', JSON.stringify(args, null, 2)); @@ -236,40 +233,100 @@ async function spawnCloudAgentSession( // Handle organization-owned integrations if (owner.type === 'org') { + // Get GitHub token for the organization githubToken = await getGitHubTokenForOrganization(owner.id); + + // Set the organization ID for cloud agent usage attribution kilocodeOrganizationId = owner.id; } else { + // Get GitHub token for the user githubToken = await getGitHubTokenForUser(owner.id); } + // Skip balance check for Slackbot users - Slack integration has its own billing model + const cloudAgentClient = createCloudAgentClient(authToken, { skipBalanceCheck: true }); + // Append PR signature to the prompt if we have requester info const promptWithSignature = requesterInfo ? args.prompt + buildPrSignature(requesterInfo) : args.prompt; - const result = await runSessionToCompletion({ - client: createCloudAgentNextClient(authToken, { skipBalanceCheck: true }), - prepareInput: { - githubRepo: args.githubRepo, - prompt: promptWithSignature, - mode: (args.mode as PrepareSessionInput['mode']) || 'code', - model, - githubToken, - kilocodeOrganizationId, - createdOnPlatform: 'slack', - }, - initiateInput: { - githubToken, - kilocodeOrganizationId, - }, - ticketPayload: { - userId: ticketUserId, - organizationId: owner.type === 'org' ? owner.id : undefined, - }, - logPrefix: '[SlackBot]', - }); + const input: InitiateSessionInput = { + githubRepo: args.githubRepo, + prompt: promptWithSignature, + mode: (args.mode as InitiateSessionInput['mode']) || 'code', + model: model, + githubToken, + kilocodeOrganizationId, + createdOnPlatform: 'slack', + }; + + const statusMessages: string[] = []; + let completionResult: string | undefined; + let sessionId: string | undefined; + let hasError = false; + + try { + console.log('[SlackBot] Starting to stream events from Cloud Agent...'); + for await (const event of cloudAgentClient.initiateSessionStream(input)) { + if (event.sessionId) sessionId = event.sessionId; + + switch (event.streamEventType) { + case 'complete': + statusMessages.push( + `Session completed in ${event.metadata.executionTimeMs}ms with exit code ${event.exitCode}` + ); + break; + case 'error': + statusMessages.push(`Error: ${event.error}`); + hasError = true; + break; + case 'kilocode': { + const payload = event.payload; + if (payload.say === 'completion_result' && typeof payload.content === 'string') { + completionResult = payload.content; + } + break; + } + case 'output': + if (event.source === 'stderr') { + statusMessages.push(`[stderr] ${event.content}`); + hasError = true; + console.log('[SlackBot] Error flag set to true'); + } + break; + case 'interrupted': + statusMessages.push(`Session interrupted: ${event.reason}`); + hasError = true; + console.log('[SlackBot] Error flag set to true'); + break; + } + } + console.log( + `[SlackBot] Stream completed. Total status messages: ${statusMessages.length}, Has completion result: ${!!completionResult}` + ); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + console.error('[SlackBot] Error during stream:', errorMessage, error); + return { response: `Error spawning Cloud Agent: ${errorMessage}`, sessionId }; + } + + if (hasError) { + const errorResult = `Cloud Agent session ${sessionId || 'unknown'} encountered errors:\n${statusMessages.join('\n')}`; + console.log('[SlackBot] Returning error result:', errorResult); + return { response: errorResult, sessionId }; + } + + // Return the completion result if available, otherwise show status messages + if (completionResult) { + const successResult = `Cloud Agent session ${sessionId || 'unknown'} completed:\n\n${completionResult}`; + console.log('[SlackBot] Returning success result'); + return { response: successResult, sessionId }; + } - return { response: result.response, sessionId: result.sessionId }; + const fallbackResult = `Cloud Agent session ${sessionId || 'unknown'} completed successfully.\n\nStatus:\n${statusMessages.slice(-5).join('\n')}`; + console.log('[SlackBot] Returning fallback result:', fallbackResult); + return { response: fallbackResult, sessionId }; } /** @@ -344,7 +401,7 @@ export async function processKiloBotMessage( // For organization-owned integrations, use bot user for auth token // This ensures usage is tracked at the organization level, not individual users const authResult = await getSlackbotAuthTokenForOwner(owner, slackUserEmail); - if ('error' in authResult) { + if (!authResult.authToken) { return { response: `Error: ${authResult.error}`, modelUsed: '', @@ -354,7 +411,6 @@ export async function processKiloBotMessage( }; } const authToken = authResult.authToken; - const authUserId = authResult.userId; let slackContextForPrompt = ''; if (slackEventContext) { @@ -417,7 +473,6 @@ export async function processKiloBotMessage( owner, selectedModel, authToken, - authUserId, slackRequesterInfo ); console.log('[SlackBot] Tool result received, length:', toolResult.response.length); diff --git a/src/lib/slack/auth.ts b/src/lib/slack/auth.ts index 380aca5b7..9d6a27b91 100644 --- a/src/lib/slack/auth.ts +++ b/src/lib/slack/auth.ts @@ -10,9 +10,8 @@ import { getOrganizationMemberByEmail } from '@/lib/organizations/organizations' export async function getSlackbotAuthTokenForOwner( owner: Owner, slackUserEmail?: string -): Promise<{ authToken: string; userId: string } | { error: string }> { +): Promise<{ authToken: string; error?: undefined } | { authToken?: undefined; error: string }> { let authToken: string | undefined; - let userId: string | undefined; if (owner.type === 'org') { const memberInfo = slackUserEmail && (await getOrganizationMemberByEmail(owner.id, slackUserEmail)); @@ -21,24 +20,21 @@ export async function getSlackbotAuthTokenForOwner( if (memberInfo) { authToken = generateApiToken(memberInfo.kilocode_users, { internalApiUse: true }); - userId = memberInfo.kilocode_users.id; } else { const user = await ensureBotUserForOrg(owner.id, 'slack-bot'); authToken = generateApiToken(user, { botId: 'slack-bot', internalApiUse: true }); - userId = user.id; } } else { const user = await findUserById(owner.id); if (user) { authToken = generateApiToken(user, { internalApiUse: true }); - userId = user.id; } } - if (!authToken || !userId) { - return { error: `Slackbot User not found for ID: ${owner.id} and type: ${owner.type}` }; + if (!authToken) { + throw new Error(`Slackbot User not found for ID: ${owner.id} and type: ${owner.type}`); } - return { authToken, userId }; + return { authToken }; }