diff --git a/apps/sim/app/api/function/execute/route.ts b/apps/sim/app/api/function/execute/route.ts index 4ccbd8d7c0..441bf788d9 100644 --- a/apps/sim/app/api/function/execute/route.ts +++ b/apps/sim/app/api/function/execute/route.ts @@ -845,6 +845,8 @@ export async function POST(req: NextRequest) { contextVariables, timeoutMs: timeout, requestId, + ownerKey: `user:${auth.userId}`, + ownerWeight: 1, }) const executionTime = Date.now() - startTime diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 7c4cdc9dbc..06984a3e22 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -325,6 +325,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: requestId ) + // Client-side sessions and personal API keys bill/permission-check the + // authenticated user, not the workspace billed account. + const useAuthenticatedUserAsActor = + isClientSession || (auth.authType === 'api_key' && auth.apiKeyType === 'personal') + const preprocessResult = await preprocessExecution({ workflowId, userId, @@ -334,6 +339,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: checkDeployment: !shouldUseDraftState, loggingSession, useDraftState: shouldUseDraftState, + useAuthenticatedUserAsActor, }) if (!preprocessResult.success) { diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index 40c7b9ba8b..1a883d0af4 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -307,6 +307,7 @@ export class AgentBlockHandler implements BlockHandler { _context: { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/api/api-handler.ts b/apps/sim/executor/handlers/api/api-handler.ts index 562067cdfe..83c710bef0 100644 --- a/apps/sim/executor/handlers/api/api-handler.ts +++ b/apps/sim/executor/handlers/api/api-handler.ts @@ -72,6 +72,7 @@ export class ApiBlockHandler implements BlockHandler { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, executionId: ctx.executionId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/condition/condition-handler.ts b/apps/sim/executor/handlers/condition/condition-handler.ts index 96fe0db4b4..0c88e0e784 100644 --- a/apps/sim/executor/handlers/condition/condition-handler.ts +++ b/apps/sim/executor/handlers/condition/condition-handler.ts @@ -48,6 +48,7 @@ export async function evaluateConditionExpression( _context: { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/function/function-handler.ts b/apps/sim/executor/handlers/function/function-handler.ts index 624a262d3a..d8e1209e54 100644 --- a/apps/sim/executor/handlers/function/function-handler.ts +++ b/apps/sim/executor/handlers/function/function-handler.ts @@ -39,6 +39,7 @@ export class FunctionBlockHandler implements BlockHandler { _context: { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/generic/generic-handler.ts b/apps/sim/executor/handlers/generic/generic-handler.ts index 558a37dee5..c6a6b7e9f3 100644 --- a/apps/sim/executor/handlers/generic/generic-handler.ts +++ b/apps/sim/executor/handlers/generic/generic-handler.ts @@ -66,6 +66,7 @@ export class GenericBlockHandler implements BlockHandler { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, executionId: ctx.executionId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, }, diff --git a/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts b/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts index dd53a0a0e1..2a23c622c2 100644 --- a/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts +++ b/apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts @@ -605,6 +605,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler { _context: { workflowId: ctx.workflowId, workspaceId: ctx.workspaceId, + userId: ctx.userId, isDeployedContext: ctx.isDeployedContext, }, blockData: blockDataWithPause, diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index 8bdf8edd2c..456838d1ee 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -511,6 +511,8 @@ export class LoopOrchestrator { contextVariables: {}, timeoutMs: LOOP_CONDITION_TIMEOUT_MS, requestId, + ownerKey: `user:${ctx.userId}`, + ownerWeight: 1, }) if (vmResult.error) { diff --git a/apps/sim/lib/auth/hybrid.ts b/apps/sim/lib/auth/hybrid.ts index 2b49d7158a..1c34286f62 100644 --- a/apps/sim/lib/auth/hybrid.ts +++ b/apps/sim/lib/auth/hybrid.ts @@ -1,7 +1,4 @@ -import { db } from '@sim/db' -import { workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { eq } from 'drizzle-orm' import type { NextRequest } from 'next/server' import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service' import { getSession } from '@/lib/auth' @@ -13,35 +10,33 @@ export interface AuthResult { success: boolean userId?: string authType?: 'session' | 'api_key' | 'internal_jwt' + apiKeyType?: 'personal' | 'workspace' error?: string } /** * Resolves userId from a verified internal JWT token. - * Extracts workflowId/userId from URL params or POST body, then looks up userId if needed. + * Extracts userId from the JWT payload, URL search params, or POST body. */ async function resolveUserFromJwt( request: NextRequest, verificationUserId: string | null, options: { requireWorkflowId?: boolean } ): Promise { - let workflowId: string | null = null let userId: string | null = verificationUserId - const { searchParams } = new URL(request.url) - workflowId = searchParams.get('workflowId') if (!userId) { + const { searchParams } = new URL(request.url) userId = searchParams.get('userId') } - if (!workflowId && !userId && request.method === 'POST') { + if (!userId && request.method === 'POST') { try { const clonedRequest = request.clone() const bodyText = await clonedRequest.text() if (bodyText) { const body = JSON.parse(bodyText) - workflowId = body.workflowId || body._context?.workflowId - userId = userId || body.userId || body._context?.userId + userId = body.userId || body._context?.userId || null } } catch { // Ignore JSON parse errors @@ -52,22 +47,8 @@ async function resolveUserFromJwt( return { success: true, userId, authType: 'internal_jwt' } } - if (workflowId) { - const [workflowData] = await db - .select({ userId: workflow.userId }) - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) - - if (!workflowData) { - return { success: false, error: 'Workflow not found' } - } - - return { success: true, userId: workflowData.userId, authType: 'internal_jwt' } - } - if (options.requireWorkflowId !== false) { - return { success: false, error: 'workflowId or userId required for internal JWT calls' } + return { success: false, error: 'userId required for internal JWT calls' } } return { success: true, authType: 'internal_jwt' } @@ -222,6 +203,7 @@ export async function checkHybridAuth( success: true, userId: result.userId!, authType: 'api_key', + apiKeyType: result.keyType, } } diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 8440de3bc4..1921738ede 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -180,6 +180,24 @@ export const env = createEnv({ EXECUTION_TIMEOUT_ASYNC_TEAM: z.string().optional().default('5400'), // 90 minutes EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: z.string().optional().default('5400'), // 90 minutes + // Isolated-VM Worker Pool Configuration + IVM_POOL_SIZE: z.string().optional().default('4'), // Max worker processes in pool + IVM_MAX_CONCURRENT: z.string().optional().default('10000'), // Max concurrent executions globally + IVM_MAX_PER_WORKER: z.string().optional().default('2500'), // Max concurrent executions per worker + IVM_WORKER_IDLE_TIMEOUT_MS: z.string().optional().default('60000'), // Worker idle cleanup timeout (ms) + IVM_MAX_QUEUE_SIZE: z.string().optional().default('10000'), // Max pending queued executions in memory + IVM_MAX_FETCH_RESPONSE_BYTES: z.string().optional().default('8388608'),// Max bytes read from sandbox fetch responses + IVM_MAX_FETCH_RESPONSE_CHARS: z.string().optional().default('4000000'),// Max chars returned to sandbox from fetch body + IVM_MAX_FETCH_OPTIONS_JSON_CHARS: z.string().optional().default('262144'), // Max JSON payload size for sandbox fetch options + IVM_MAX_FETCH_URL_LENGTH: z.string().optional().default('8192'), // Max URL length accepted by sandbox fetch + IVM_MAX_STDOUT_CHARS: z.string().optional().default('200000'), // Max captured stdout characters per execution + IVM_MAX_ACTIVE_PER_OWNER: z.string().optional().default('200'), // Max active executions per owner (per process) + IVM_MAX_QUEUED_PER_OWNER: z.string().optional().default('2000'), // Max queued executions per owner (per process) + IVM_MAX_OWNER_WEIGHT: z.string().optional().default('5'), // Max accepted weight for weighted owner scheduling + IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER:z.string().optional().default('2200'), // Max owner in-flight leases across replicas + IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: z.string().optional().default('120000'), // Min TTL for distributed in-flight leases (ms) + IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms) + // Knowledge Base Processing Configuration - Shared across all processing methods KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes) KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts diff --git a/apps/sim/lib/core/security/input-validation.server.ts b/apps/sim/lib/core/security/input-validation.server.ts index e8c0ec8614..2a912240cb 100644 --- a/apps/sim/lib/core/security/input-validation.server.ts +++ b/apps/sim/lib/core/security/input-validation.server.ts @@ -103,6 +103,7 @@ export interface SecureFetchOptions { body?: string | Buffer | Uint8Array timeout?: number maxRedirects?: number + maxResponseBytes?: number } export class SecureFetchHeaders { @@ -165,6 +166,7 @@ export async function secureFetchWithPinnedIP( redirectCount = 0 ): Promise { const maxRedirects = options.maxRedirects ?? DEFAULT_MAX_REDIRECTS + const maxResponseBytes = options.maxResponseBytes return new Promise((resolve, reject) => { const parsed = new URL(url) @@ -237,14 +239,32 @@ export async function secureFetchWithPinnedIP( } const chunks: Buffer[] = [] + let totalBytes = 0 + let responseTerminated = false + + res.on('data', (chunk: Buffer) => { + if (responseTerminated) return + + totalBytes += chunk.length + if ( + typeof maxResponseBytes === 'number' && + maxResponseBytes > 0 && + totalBytes > maxResponseBytes + ) { + responseTerminated = true + res.destroy(new Error(`Response exceeded maximum size of ${maxResponseBytes} bytes`)) + return + } - res.on('data', (chunk: Buffer) => chunks.push(chunk)) + chunks.push(chunk) + }) res.on('error', (error) => { reject(error) }) res.on('end', () => { + if (responseTerminated) return const bodyBuffer = Buffer.concat(chunks) const body = bodyBuffer.toString('utf-8') const headersRecord: Record = {} diff --git a/apps/sim/lib/execution/isolated-vm-worker.cjs b/apps/sim/lib/execution/isolated-vm-worker.cjs index 3deb761663..2641b80e11 100644 --- a/apps/sim/lib/execution/isolated-vm-worker.cjs +++ b/apps/sim/lib/execution/isolated-vm-worker.cjs @@ -9,6 +9,21 @@ const USER_CODE_START_LINE = 4 const pendingFetches = new Map() let fetchIdCounter = 0 const FETCH_TIMEOUT_MS = 300000 // 5 minutes +const MAX_STDOUT_CHARS = Number.parseInt(process.env.IVM_MAX_STDOUT_CHARS || '', 10) || 200000 +const MAX_FETCH_OPTIONS_JSON_CHARS = + Number.parseInt(process.env.IVM_MAX_FETCH_OPTIONS_JSON_CHARS || '', 10) || 256 * 1024 + +function stringifyLogValue(value) { + if (typeof value !== 'object' || value === null) { + return String(value) + } + + try { + return JSON.stringify(value) + } catch { + return '[unserializable]' + } +} /** * Extract line and column from error stack or message @@ -101,8 +116,32 @@ function convertToCompatibleError(errorInfo, userCode) { async function executeCode(request) { const { code, params, envVars, contextVariables, timeoutMs, requestId } = request const stdoutChunks = [] + let stdoutLength = 0 + let stdoutTruncated = false let isolate = null + const appendStdout = (line) => { + if (stdoutTruncated || !line) return + + const remaining = MAX_STDOUT_CHARS - stdoutLength + if (remaining <= 0) { + stdoutTruncated = true + stdoutChunks.push('[stdout truncated]\n') + return + } + + if (line.length <= remaining) { + stdoutChunks.push(line) + stdoutLength += line.length + return + } + + stdoutChunks.push(line.slice(0, remaining)) + stdoutChunks.push('\n[stdout truncated]\n') + stdoutLength = MAX_STDOUT_CHARS + stdoutTruncated = true + } + try { isolate = new ivm.Isolate({ memoryLimit: 128 }) const context = await isolate.createContext() @@ -111,18 +150,14 @@ async function executeCode(request) { await jail.set('global', jail.derefInto()) const logCallback = new ivm.Callback((...args) => { - const message = args - .map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg))) - .join(' ') - stdoutChunks.push(`${message}\n`) + const message = args.map((arg) => stringifyLogValue(arg)).join(' ') + appendStdout(`${message}\n`) }) await jail.set('__log', logCallback) const errorCallback = new ivm.Callback((...args) => { - const message = args - .map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg))) - .join(' ') - stdoutChunks.push(`ERROR: ${message}\n`) + const message = args.map((arg) => stringifyLogValue(arg)).join(' ') + appendStdout(`ERROR: ${message}\n`) }) await jail.set('__error', errorCallback) @@ -178,6 +213,9 @@ async function executeCode(request) { } catch { throw new Error('fetch options must be JSON-serializable'); } + if (optionsJson.length > ${MAX_FETCH_OPTIONS_JSON_CHARS}) { + throw new Error('fetch options exceed maximum payload size'); + } } const resultJson = await __fetchRef.apply(undefined, [url, optionsJson], { result: { promise: true } }); let result; diff --git a/apps/sim/lib/execution/isolated-vm.test.ts b/apps/sim/lib/execution/isolated-vm.test.ts new file mode 100644 index 0000000000..17fb20c0d7 --- /dev/null +++ b/apps/sim/lib/execution/isolated-vm.test.ts @@ -0,0 +1,500 @@ +import { EventEmitter } from 'node:events' +import { afterEach, describe, expect, it, vi } from 'vitest' + +type MockProc = EventEmitter & { + connected: boolean + stderr: EventEmitter + send: (message: unknown) => boolean + kill: () => boolean +} + +type SpawnFactory = () => MockProc +type RedisEval = (...args: any[]) => unknown | Promise +type SecureFetchImpl = (...args: any[]) => unknown | Promise + +function createBaseProc(): MockProc { + const proc = new EventEmitter() as MockProc + proc.connected = true + proc.stderr = new EventEmitter() + proc.send = () => true + proc.kill = () => { + if (!proc.connected) return true + proc.connected = false + setImmediate(() => proc.emit('exit', 0)) + return true + } + return proc +} + +function createStartupFailureProc(): MockProc { + const proc = createBaseProc() + setImmediate(() => { + proc.connected = false + proc.emit('exit', 1) + }) + return proc +} + +function createReadyProc(result: unknown): MockProc { + const proc = createBaseProc() + proc.send = (message: unknown) => { + const msg = message as { type?: string; executionId?: number } + if (msg.type === 'execute') { + setImmediate(() => { + proc.emit('message', { + type: 'result', + executionId: msg.executionId, + result: { result, stdout: '' }, + }) + }) + } + return true + } + setImmediate(() => proc.emit('message', { type: 'ready' })) + return proc +} + +function createReadyProcWithDelay(delayMs: number): MockProc { + const proc = createBaseProc() + proc.send = (message: unknown) => { + const msg = message as { type?: string; executionId?: number; request?: { requestId?: string } } + if (msg.type === 'execute') { + setTimeout(() => { + proc.emit('message', { + type: 'result', + executionId: msg.executionId, + result: { result: msg.request?.requestId ?? 'unknown', stdout: '' }, + }) + }, delayMs) + } + return true + } + setImmediate(() => proc.emit('message', { type: 'ready' })) + return proc +} + +function createReadyFetchProxyProc(fetchMessage: { url: string; optionsJson?: string }): MockProc { + const proc = createBaseProc() + let currentExecutionId = 0 + + proc.send = (message: unknown) => { + const msg = message as { type?: string; executionId?: number; request?: { requestId?: string } } + + if (msg.type === 'execute') { + currentExecutionId = msg.executionId ?? 0 + setImmediate(() => { + proc.emit('message', { + type: 'fetch', + fetchId: 1, + requestId: msg.request?.requestId ?? 'fetch-test', + url: fetchMessage.url, + optionsJson: fetchMessage.optionsJson, + }) + }) + return true + } + + if (msg.type === 'fetchResponse') { + const fetchResponse = message as { response?: string } + setImmediate(() => { + proc.emit('message', { + type: 'result', + executionId: currentExecutionId, + result: { result: fetchResponse.response ?? '', stdout: '' }, + }) + }) + return true + } + + return true + } + + setImmediate(() => proc.emit('message', { type: 'ready' })) + return proc +} + +async function loadExecutionModule(options: { + envOverrides?: Record + spawns: SpawnFactory[] + redisEvalImpl?: RedisEval + secureFetchImpl?: SecureFetchImpl +}) { + vi.resetModules() + + const spawnQueue = [...options.spawns] + const spawnMock = vi.fn(() => { + const next = spawnQueue.shift() + if (!next) { + throw new Error('No mock spawn factory configured') + } + return next() as any + }) + + vi.doMock('@sim/logger', () => ({ + createLogger: () => ({ + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }), + })) + + const secureFetchMock = vi.fn( + options.secureFetchImpl ?? + (async () => ({ + ok: true, + status: 200, + statusText: 'OK', + headers: new Map(), + text: async () => '', + json: async () => ({}), + arrayBuffer: async () => new ArrayBuffer(0), + })) + ) + vi.doMock('@/lib/core/security/input-validation.server', () => ({ + secureFetchWithValidation: secureFetchMock, + })) + + vi.doMock('@/lib/core/config/env', () => ({ + env: { + IVM_POOL_SIZE: '1', + IVM_MAX_CONCURRENT: '100', + IVM_MAX_PER_WORKER: '100', + IVM_WORKER_IDLE_TIMEOUT_MS: '60000', + IVM_MAX_QUEUE_SIZE: '10', + IVM_MAX_ACTIVE_PER_OWNER: '100', + IVM_MAX_QUEUED_PER_OWNER: '10', + IVM_MAX_OWNER_WEIGHT: '5', + IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: '100', + IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: '1000', + IVM_QUEUE_TIMEOUT_MS: '1000', + ...(options.envOverrides ?? {}), + }, + })) + + const redisEval = options.redisEvalImpl ? vi.fn(options.redisEvalImpl) : undefined + vi.doMock('@/lib/core/config/redis', () => ({ + getRedisClient: vi.fn(() => + redisEval + ? ({ + eval: redisEval, + } as any) + : null + ), + })) + + vi.doMock('node:child_process', () => ({ + execSync: vi.fn(() => Buffer.from('v23.11.0')), + spawn: spawnMock, + })) + + const mod = await import('./isolated-vm') + return { ...mod, spawnMock, secureFetchMock } +} + +describe('isolated-vm scheduler', () => { + afterEach(() => { + vi.restoreAllMocks() + vi.resetModules() + }) + + it('recovers from an initial spawn failure and drains queued work', async () => { + const { executeInIsolatedVM, spawnMock } = await loadExecutionModule({ + spawns: [createStartupFailureProc, () => createReadyProc('ok')], + }) + + const result = await executeInIsolatedVM({ + code: 'return "ok"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-1', + }) + + expect(result.error).toBeUndefined() + expect(result.result).toBe('ok') + expect(spawnMock).toHaveBeenCalledTimes(2) + }) + + it('rejects new requests when the queue is full', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_QUEUE_SIZE: '1', + IVM_QUEUE_TIMEOUT_MS: '200', + }, + spawns: [createStartupFailureProc, createStartupFailureProc, createStartupFailureProc], + }) + + const firstPromise = executeInIsolatedVM({ + code: 'return 1', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-2', + ownerKey: 'user:a', + }) + + await new Promise((resolve) => setTimeout(resolve, 25)) + + const second = await executeInIsolatedVM({ + code: 'return 2', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-3', + ownerKey: 'user:b', + }) + + expect(second.error?.message).toContain('at capacity') + + const first = await firstPromise + expect(first.error?.message).toContain('timed out waiting') + }) + + it('enforces per-owner queued limit', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_QUEUED_PER_OWNER: '1', + IVM_QUEUE_TIMEOUT_MS: '200', + }, + spawns: [createStartupFailureProc, createStartupFailureProc, createStartupFailureProc], + }) + + const firstPromise = executeInIsolatedVM({ + code: 'return 1', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-4', + ownerKey: 'user:hog', + }) + + await new Promise((resolve) => setTimeout(resolve, 25)) + + const second = await executeInIsolatedVM({ + code: 'return 2', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-5', + ownerKey: 'user:hog', + }) + + expect(second.error?.message).toContain('Too many concurrent') + + const first = await firstPromise + expect(first.error?.message).toContain('timed out waiting') + }) + + it('enforces distributed owner in-flight lease limit when Redis is configured', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: '1', + REDIS_URL: 'redis://localhost:6379', + }, + spawns: [() => createReadyProc('ok')], + redisEvalImpl: (...args: any[]) => { + const script = String(args[0] ?? '') + if (script.includes('ZREMRANGEBYSCORE')) { + return 0 + } + return 1 + }, + }) + + const result = await executeInIsolatedVM({ + code: 'return "blocked"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-6', + ownerKey: 'user:distributed', + }) + + expect(result.error?.message).toContain('Too many concurrent') + }) + + it('fails closed when Redis is configured but unavailable', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + REDIS_URL: 'redis://localhost:6379', + }, + spawns: [() => createReadyProc('ok')], + }) + + const result = await executeInIsolatedVM({ + code: 'return "blocked"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-7', + ownerKey: 'user:redis-down', + }) + + expect(result.error?.message).toContain('temporarily unavailable') + }) + + it('fails closed when Redis lease evaluation errors', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + REDIS_URL: 'redis://localhost:6379', + }, + spawns: [() => createReadyProc('ok')], + redisEvalImpl: (...args: any[]) => { + const script = String(args[0] ?? '') + if (script.includes('ZREMRANGEBYSCORE')) { + throw new Error('redis timeout') + } + return 1 + }, + }) + + const result = await executeInIsolatedVM({ + code: 'return "blocked"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-8', + ownerKey: 'user:redis-error', + }) + + expect(result.error?.message).toContain('temporarily unavailable') + }) + + it('applies weighted owner scheduling when draining queued executions', async () => { + const { executeInIsolatedVM } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_PER_WORKER: '1', + }, + spawns: [() => createReadyProcWithDelay(10)], + }) + + const completionOrder: string[] = [] + const pushCompletion = (label: string) => (res: { result: unknown }) => { + completionOrder.push(String(res.result ?? label)) + return res + } + + const p1 = executeInIsolatedVM({ + code: 'return 1', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'a-1', + ownerKey: 'user:a', + ownerWeight: 2, + }).then(pushCompletion('a-1')) + + const p2 = executeInIsolatedVM({ + code: 'return 2', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'a-2', + ownerKey: 'user:a', + ownerWeight: 2, + }).then(pushCompletion('a-2')) + + const p3 = executeInIsolatedVM({ + code: 'return 3', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'b-1', + ownerKey: 'user:b', + ownerWeight: 1, + }).then(pushCompletion('b-1')) + + const p4 = executeInIsolatedVM({ + code: 'return 4', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'b-2', + ownerKey: 'user:b', + ownerWeight: 1, + }).then(pushCompletion('b-2')) + + const p5 = executeInIsolatedVM({ + code: 'return 5', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 500, + requestId: 'a-3', + ownerKey: 'user:a', + ownerWeight: 2, + }).then(pushCompletion('a-3')) + + await Promise.all([p1, p2, p3, p4, p5]) + + expect(completionOrder.slice(0, 3)).toEqual(['a-1', 'a-2', 'a-3']) + expect(completionOrder).toEqual(['a-1', 'a-2', 'a-3', 'b-1', 'b-2']) + }) + + it('rejects oversized fetch options payloads before outbound call', async () => { + const { executeInIsolatedVM, secureFetchMock } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_FETCH_OPTIONS_JSON_CHARS: '50', + }, + spawns: [ + () => + createReadyFetchProxyProc({ + url: 'https://example.com', + optionsJson: 'x'.repeat(100), + }), + ], + }) + + const result = await executeInIsolatedVM({ + code: 'return "fetch-options"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-fetch-options', + }) + + const payload = JSON.parse(String(result.result)) + expect(payload.error).toContain('Fetch options exceed maximum payload size') + expect(secureFetchMock).not.toHaveBeenCalled() + }) + + it('rejects overly long fetch URLs before outbound call', async () => { + const { executeInIsolatedVM, secureFetchMock } = await loadExecutionModule({ + envOverrides: { + IVM_MAX_FETCH_URL_LENGTH: '30', + }, + spawns: [ + () => + createReadyFetchProxyProc({ + url: 'https://example.com/path/to/a/very/long/resource', + }), + ], + }) + + const result = await executeInIsolatedVM({ + code: 'return "fetch-url"', + params: {}, + envVars: {}, + contextVariables: {}, + timeoutMs: 100, + requestId: 'req-fetch-url', + }) + + const payload = JSON.parse(String(result.result)) + expect(payload.error).toContain('fetch URL exceeds maximum length') + expect(secureFetchMock).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/execution/isolated-vm.ts b/apps/sim/lib/execution/isolated-vm.ts index 472fc12b25..75567aed50 100644 --- a/apps/sim/lib/execution/isolated-vm.ts +++ b/apps/sim/lib/execution/isolated-vm.ts @@ -3,7 +3,13 @@ import fs from 'node:fs' import path from 'node:path' import { fileURLToPath } from 'node:url' import { createLogger } from '@sim/logger' -import { validateProxyUrl } from '@/lib/core/security/input-validation' +import { env } from '@/lib/core/config/env' +import { getRedisClient } from '@/lib/core/config/redis' +import { + type SecureFetchOptions, + secureFetchWithValidation, +} from '@/lib/core/security/input-validation.server' +import { sanitizeUrlForLog } from '@/lib/core/utils/logging' const logger = createLogger('IsolatedVMExecution') @@ -27,6 +33,8 @@ export interface IsolatedVMExecutionRequest { contextVariables: Record timeoutMs: number requestId: string + ownerKey?: string + ownerWeight?: number } export interface IsolatedVMExecutionResult { @@ -44,90 +52,478 @@ export interface IsolatedVMError { lineContent?: string } +const POOL_SIZE = Number.parseInt(env.IVM_POOL_SIZE) || 4 +const MAX_CONCURRENT = Number.parseInt(env.IVM_MAX_CONCURRENT) || 10000 +const MAX_PER_WORKER = Number.parseInt(env.IVM_MAX_PER_WORKER) || 2500 +const WORKER_IDLE_TIMEOUT_MS = Number.parseInt(env.IVM_WORKER_IDLE_TIMEOUT_MS) || 60000 +const QUEUE_TIMEOUT_MS = Number.parseInt(env.IVM_QUEUE_TIMEOUT_MS) || 300000 +const MAX_QUEUE_SIZE = Number.parseInt(env.IVM_MAX_QUEUE_SIZE) || 10000 +const MAX_FETCH_RESPONSE_BYTES = Number.parseInt(env.IVM_MAX_FETCH_RESPONSE_BYTES) || 8_388_608 +const MAX_FETCH_RESPONSE_CHARS = Number.parseInt(env.IVM_MAX_FETCH_RESPONSE_CHARS) || 4_000_000 +const MAX_FETCH_URL_LENGTH = Number.parseInt(env.IVM_MAX_FETCH_URL_LENGTH) || 8192 +const MAX_FETCH_OPTIONS_JSON_CHARS = + Number.parseInt(env.IVM_MAX_FETCH_OPTIONS_JSON_CHARS) || 262_144 +const MAX_ACTIVE_PER_OWNER = Number.parseInt(env.IVM_MAX_ACTIVE_PER_OWNER) || 200 +const MAX_QUEUED_PER_OWNER = Number.parseInt(env.IVM_MAX_QUEUED_PER_OWNER) || 2000 +const MAX_OWNER_WEIGHT = Number.parseInt(env.IVM_MAX_OWNER_WEIGHT) || 5 +const DISTRIBUTED_MAX_INFLIGHT_PER_OWNER = + Number.parseInt(env.IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER) || + MAX_ACTIVE_PER_OWNER + MAX_QUEUED_PER_OWNER +const DISTRIBUTED_LEASE_MIN_TTL_MS = Number.parseInt(env.IVM_DISTRIBUTED_LEASE_MIN_TTL_MS) || 120000 +const DISTRIBUTED_KEY_PREFIX = 'ivm:fair:v1:owner' +const QUEUE_RETRY_DELAY_MS = 1000 +const DISTRIBUTED_LEASE_GRACE_MS = 30000 + interface PendingExecution { resolve: (result: IsolatedVMExecutionResult) => void timeout: ReturnType + ownerKey: string +} + +interface WorkerInfo { + process: ChildProcess + ready: boolean + readyPromise: Promise | null + activeExecutions: number + pendingExecutions: Map + idleTimeout: ReturnType | null + id: number +} + +interface QueuedExecution { + id: number + ownerKey: string + req: IsolatedVMExecutionRequest + resolve: (result: IsolatedVMExecutionResult) => void + queueTimeout: ReturnType +} + +interface QueueNode { + ownerKey: string + value: QueuedExecution + prev: QueueNode | null + next: QueueNode | null +} + +interface OwnerState { + ownerKey: string + weight: number + activeExecutions: number + queueHead: QueueNode | null + queueTail: QueueNode | null + queueLength: number + burstRemaining: number } -let worker: ChildProcess | null = null -let workerReady = false -let workerReadyPromise: Promise | null = null -let workerIdleTimeout: ReturnType | null = null -const pendingExecutions = new Map() +const workers: Map = new Map() +const ownerStates: Map = new Map() +const queuedOwnerRing: string[] = [] +let queuedOwnerCursor = 0 +let queueSize = 0 +const queueNodes: Map = new Map() +let totalActiveExecutions = 0 let executionIdCounter = 0 +let queueIdCounter = 0 +let nextWorkerId = 0 +let spawnInProgress = 0 +let queueDrainRetryTimeout: ReturnType | null = null -const WORKER_IDLE_TIMEOUT_MS = 60000 +type IsolatedFetchOptions = RequestInit & { + timeout?: number + maxRedirects?: number +} -function cleanupWorker() { - if (workerIdleTimeout) { - clearTimeout(workerIdleTimeout) - workerIdleTimeout = null +function truncateString(value: string, maxChars: number): { value: string; truncated: boolean } { + if (value.length <= maxChars) { + return { value, truncated: false } } - if (worker) { - worker.kill() - worker = null + return { + value: `${value.slice(0, maxChars)}... [truncated ${value.length - maxChars} chars]`, + truncated: true, } - workerReady = false - workerReadyPromise = null } -function resetIdleTimeout() { - if (workerIdleTimeout) { - clearTimeout(workerIdleTimeout) +function normalizeFetchOptions(options?: IsolatedFetchOptions): SecureFetchOptions { + if (!options) return { maxResponseBytes: MAX_FETCH_RESPONSE_BYTES } + + const normalized: SecureFetchOptions = { + maxResponseBytes: MAX_FETCH_RESPONSE_BYTES, + } + + if (typeof options.method === 'string' && options.method.length > 0) { + normalized.method = options.method } - workerIdleTimeout = setTimeout(() => { - if (pendingExecutions.size === 0) { - logger.info('Cleaning up idle isolated-vm worker') - cleanupWorker() + + if ( + typeof options.timeout === 'number' && + Number.isFinite(options.timeout) && + options.timeout > 0 + ) { + normalized.timeout = Math.floor(options.timeout) + } + + if ( + typeof options.maxRedirects === 'number' && + Number.isFinite(options.maxRedirects) && + options.maxRedirects >= 0 + ) { + normalized.maxRedirects = Math.floor(options.maxRedirects) + } + + if (options.headers) { + const headers: Record = {} + if (options.headers instanceof Headers) { + options.headers.forEach((value, key) => { + headers[key] = value + }) + } else if (Array.isArray(options.headers)) { + for (const [key, value] of options.headers) { + headers[String(key)] = String(value) + } + } else { + for (const [key, value] of Object.entries(options.headers)) { + headers[key] = String(value) + } } - }, WORKER_IDLE_TIMEOUT_MS) + normalized.headers = headers + } + + if ( + typeof options.body === 'string' || + options.body instanceof Buffer || + options.body instanceof Uint8Array + ) { + normalized.body = options.body + } else if (options.body !== undefined && options.body !== null) { + normalized.body = String(options.body) + } + + return normalized } -/** - * Secure fetch wrapper that validates URLs to prevent SSRF attacks - */ -async function secureFetch(requestId: string, url: string, options?: RequestInit): Promise { - const validation = validateProxyUrl(url) - if (!validation.isValid) { - logger.warn(`[${requestId}] Blocked fetch request due to SSRF validation`, { - url: url.substring(0, 100), - error: validation.error, +async function secureFetch( + requestId: string, + url: string, + options?: IsolatedFetchOptions +): Promise { + if (url.length > MAX_FETCH_URL_LENGTH) { + return JSON.stringify({ + error: `Security Error: fetch URL exceeds maximum length (${MAX_FETCH_URL_LENGTH})`, }) - return JSON.stringify({ error: `Security Error: ${validation.error}` }) } try { - const response = await fetch(url, options) - const body = await response.text() + const response = await secureFetchWithValidation( + url, + normalizeFetchOptions(options), + 'fetchUrl' + ) + const bodyResult = truncateString(await response.text(), MAX_FETCH_RESPONSE_CHARS) const headers: Record = {} - response.headers.forEach((value, key) => { + for (const [key, value] of response.headers) { headers[key] = value - }) + } return JSON.stringify({ ok: response.ok, status: response.status, statusText: response.statusText, - body, + body: bodyResult.value, + bodyTruncated: bodyResult.truncated, headers, }) } catch (error: unknown) { + logger.warn(`[${requestId}] Isolated fetch failed`, { + url: sanitizeUrlForLog(url), + error: error instanceof Error ? error.message : String(error), + }) return JSON.stringify({ error: error instanceof Error ? error.message : 'Unknown fetch error' }) } } -/** - * Handle IPC messages from the Node.js worker - */ -function handleWorkerMessage(message: unknown) { +function normalizeOwnerKey(ownerKey?: string): string { + if (!ownerKey) return 'anonymous' + const normalized = ownerKey.trim() + return normalized || 'anonymous' +} + +function normalizeOwnerWeight(ownerWeight?: number): number { + if (!Number.isFinite(ownerWeight) || ownerWeight === undefined) return 1 + return Math.max(1, Math.min(MAX_OWNER_WEIGHT, Math.floor(ownerWeight))) +} + +function ownerRedisKey(ownerKey: string): string { + return `${DISTRIBUTED_KEY_PREFIX}:${ownerKey}` +} + +type LeaseAcquireResult = 'acquired' | 'limit_exceeded' | 'unavailable' + +async function tryAcquireDistributedLease( + ownerKey: string, + leaseId: string, + timeoutMs: number +): Promise { + // Redis not configured: explicit local-mode fallback is allowed. + if (!env.REDIS_URL) return 'acquired' + + const redis = getRedisClient() + if (!redis) { + logger.error('Redis is configured but unavailable for distributed lease acquisition', { + ownerKey, + }) + return 'unavailable' + } + + const now = Date.now() + const leaseTtlMs = Math.max( + timeoutMs + QUEUE_TIMEOUT_MS + DISTRIBUTED_LEASE_GRACE_MS, + DISTRIBUTED_LEASE_MIN_TTL_MS + ) + const expiresAt = now + leaseTtlMs + const key = ownerRedisKey(ownerKey) + + const script = ` + redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', ARGV[1]) + local current = redis.call('ZCARD', KEYS[1]) + if current >= tonumber(ARGV[2]) then + return 0 + end + redis.call('ZADD', KEYS[1], ARGV[3], ARGV[4]) + redis.call('PEXPIRE', KEYS[1], ARGV[5]) + return 1 + ` + + try { + const result = await redis.eval( + script, + 1, + key, + now.toString(), + DISTRIBUTED_MAX_INFLIGHT_PER_OWNER.toString(), + expiresAt.toString(), + leaseId, + leaseTtlMs.toString() + ) + return Number(result) === 1 ? 'acquired' : 'limit_exceeded' + } catch (error) { + logger.error('Failed to acquire distributed owner lease', { ownerKey, error }) + return 'unavailable' + } +} + +async function releaseDistributedLease(ownerKey: string, leaseId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const key = ownerRedisKey(ownerKey) + const script = ` + redis.call('ZREM', KEYS[1], ARGV[1]) + if redis.call('ZCARD', KEYS[1]) == 0 then + redis.call('DEL', KEYS[1]) + end + return 1 + ` + + try { + await redis.eval(script, 1, key, leaseId) + } catch (error) { + logger.error('Failed to release distributed owner lease', { ownerKey, error }) + } +} + +function queueLength(): number { + return queueSize +} + +function maybeClearDrainRetry() { + if (queueSize === 0 && queueDrainRetryTimeout) { + clearTimeout(queueDrainRetryTimeout) + queueDrainRetryTimeout = null + } +} + +function getOrCreateOwnerState(ownerKey: string, ownerWeight: number): OwnerState { + const existing = ownerStates.get(ownerKey) + if (existing) { + existing.weight = Math.max(existing.weight, ownerWeight) + return existing + } + + const ownerState: OwnerState = { + ownerKey, + weight: ownerWeight, + activeExecutions: 0, + queueHead: null, + queueTail: null, + queueLength: 0, + burstRemaining: 0, + } + ownerStates.set(ownerKey, ownerState) + return ownerState +} + +function addOwnerToRing(ownerKey: string) { + if (queuedOwnerRing.includes(ownerKey)) return + queuedOwnerRing.push(ownerKey) +} + +function removeOwnerFromRing(ownerKey: string) { + const idx = queuedOwnerRing.indexOf(ownerKey) + if (idx === -1) return + queuedOwnerRing.splice(idx, 1) + if (queuedOwnerRing.length === 0) { + queuedOwnerCursor = 0 + return + } + if (idx < queuedOwnerCursor) { + queuedOwnerCursor-- + } else if (queuedOwnerCursor >= queuedOwnerRing.length) { + queuedOwnerCursor = 0 + } +} + +function maybeCleanupOwner(ownerKey: string) { + const owner = ownerStates.get(ownerKey) + if (!owner) return + if (owner.queueLength === 0) { + removeOwnerFromRing(ownerKey) + } + if (owner.queueLength === 0 && owner.activeExecutions === 0) { + ownerStates.delete(ownerKey) + } +} + +function removeQueueNode(node: QueueNode): QueuedExecution { + const owner = ownerStates.get(node.ownerKey) + if (!owner) { + queueNodes.delete(node.value.id) + queueSize = Math.max(0, queueSize - 1) + maybeClearDrainRetry() + return node.value + } + + const { prev, next, value } = node + if (prev) prev.next = next + else owner.queueHead = next + if (next) next.prev = prev + else owner.queueTail = prev + + node.prev = null + node.next = null + + queueNodes.delete(value.id) + owner.queueLength-- + queueSize-- + maybeCleanupOwner(owner.ownerKey) + maybeClearDrainRetry() + return value +} + +function shiftQueuedExecutionForOwner(owner: OwnerState): QueuedExecution | null { + if (!owner.queueHead) return null + return removeQueueNode(owner.queueHead) +} + +function removeQueuedExecutionById(queueId: number): QueuedExecution | null { + const node = queueNodes.get(queueId) + if (!node) return null + return removeQueueNode(node) +} + +function pushQueuedExecution(owner: OwnerState, queued: QueuedExecution) { + const node: QueueNode = { + ownerKey: owner.ownerKey, + value: queued, + prev: owner.queueTail, + next: null, + } + if (owner.queueTail) { + owner.queueTail.next = node + } else { + owner.queueHead = node + } + owner.queueTail = node + owner.queueLength++ + owner.burstRemaining = 0 + addOwnerToRing(owner.ownerKey) + queueNodes.set(queued.id, node) + queueSize++ +} + +function selectOwnerForDispatch(): OwnerState | null { + if (queuedOwnerRing.length === 0) return null + + let visited = 0 + while (queuedOwnerRing.length > 0 && visited < queuedOwnerRing.length) { + if (queuedOwnerCursor >= queuedOwnerRing.length) { + queuedOwnerCursor = 0 + } + const ownerKey = queuedOwnerRing[queuedOwnerCursor] + if (!ownerKey) return null + + const owner = ownerStates.get(ownerKey) + if (!owner) { + removeOwnerFromRing(ownerKey) + continue + } + + if (owner.queueLength === 0) { + owner.burstRemaining = 0 + removeOwnerFromRing(ownerKey) + continue + } + + if (owner.activeExecutions >= MAX_ACTIVE_PER_OWNER) { + owner.burstRemaining = 0 + queuedOwnerCursor = (queuedOwnerCursor + 1) % queuedOwnerRing.length + visited++ + continue + } + + if (owner.burstRemaining <= 0) { + owner.burstRemaining = owner.weight + } + + owner.burstRemaining-- + if (owner.burstRemaining <= 0) { + queuedOwnerCursor = (queuedOwnerCursor + 1) % queuedOwnerRing.length + } + + return owner + } + + return null +} + +function scheduleDrainRetry() { + if (queueDrainRetryTimeout || queueSize === 0) return + queueDrainRetryTimeout = setTimeout(() => { + queueDrainRetryTimeout = null + if (queueSize === 0) return + drainQueue() + }, QUEUE_RETRY_DELAY_MS) +} + +function handleWorkerMessage(workerId: number, message: unknown) { if (typeof message !== 'object' || message === null) return const msg = message as Record + const workerInfo = workers.get(workerId) if (msg.type === 'result') { - const pending = pendingExecutions.get(msg.executionId as number) + const execId = msg.executionId as number + const pending = workerInfo?.pendingExecutions.get(execId) if (pending) { clearTimeout(pending.timeout) - pendingExecutions.delete(msg.executionId as number) + workerInfo!.pendingExecutions.delete(execId) + workerInfo!.activeExecutions-- + totalActiveExecutions-- + const owner = ownerStates.get(pending.ownerKey) + if (owner) { + owner.activeExecutions = Math.max(0, owner.activeExecutions - 1) + maybeCleanupOwner(owner.ownerKey) + } pending.resolve(msg.result as IsolatedVMExecutionResult) + resetWorkerIdleTimeout(workerId) + drainQueue() } return } @@ -139,12 +535,31 @@ function handleWorkerMessage(message: unknown) { url: string optionsJson?: string } - let options: RequestInit | undefined + if (typeof url !== 'string' || url.length === 0) { + workerInfo?.process.send({ + type: 'fetchResponse', + fetchId, + response: JSON.stringify({ error: 'Invalid fetch URL' }), + }) + return + } + if (optionsJson && optionsJson.length > MAX_FETCH_OPTIONS_JSON_CHARS) { + workerInfo?.process.send({ + type: 'fetchResponse', + fetchId, + response: JSON.stringify({ + error: `Fetch options exceed maximum payload size (${MAX_FETCH_OPTIONS_JSON_CHARS} chars)`, + }), + }) + return + } + + let options: IsolatedFetchOptions | undefined if (optionsJson) { try { options = JSON.parse(optionsJson) } catch { - worker?.send({ + workerInfo?.process.send({ type: 'fetchResponse', fetchId, response: JSON.stringify({ error: 'Invalid fetch options JSON' }), @@ -155,14 +570,14 @@ function handleWorkerMessage(message: unknown) { secureFetch(requestId, url, options) .then((response) => { try { - worker?.send({ type: 'fetchResponse', fetchId, response }) + workerInfo?.process.send({ type: 'fetchResponse', fetchId, response }) } catch (err) { - logger.error('Failed to send fetch response to worker', { err, fetchId }) + logger.error('Failed to send fetch response to worker', { err, fetchId, workerId }) } }) .catch((err) => { try { - worker?.send({ + workerInfo?.process.send({ type: 'fetchResponse', fetchId, response: JSON.stringify({ @@ -170,21 +585,90 @@ function handleWorkerMessage(message: unknown) { }), }) } catch (sendErr) { - logger.error('Failed to send fetch error to worker', { sendErr, fetchId }) + logger.error('Failed to send fetch error to worker', { sendErr, fetchId, workerId }) } }) } } -/** - * Start the Node.js worker process - */ -async function ensureWorker(): Promise { - if (workerReady && worker) return - if (workerReadyPromise) return workerReadyPromise +function cleanupWorker(workerId: number) { + const workerInfo = workers.get(workerId) + if (!workerInfo) return - workerReadyPromise = new Promise((resolve, reject) => { + if (workerInfo.idleTimeout) { + clearTimeout(workerInfo.idleTimeout) + } + + workerInfo.process.kill() + + for (const [id, pending] of workerInfo.pendingExecutions) { + clearTimeout(pending.timeout) + totalActiveExecutions-- + const owner = ownerStates.get(pending.ownerKey) + if (owner) { + owner.activeExecutions = Math.max(0, owner.activeExecutions - 1) + maybeCleanupOwner(owner.ownerKey) + } + pending.resolve({ + result: null, + stdout: '', + error: { message: 'Code execution failed unexpectedly. Please try again.', name: 'Error' }, + }) + workerInfo.pendingExecutions.delete(id) + } + workerInfo.activeExecutions = 0 + + workers.delete(workerId) + logger.info('Worker removed from pool', { workerId, poolSize: workers.size }) +} + +function resetWorkerIdleTimeout(workerId: number) { + const workerInfo = workers.get(workerId) + if (!workerInfo) return + + if (workerInfo.idleTimeout) { + clearTimeout(workerInfo.idleTimeout) + workerInfo.idleTimeout = null + } + + if (workerInfo.activeExecutions === 0) { + workerInfo.idleTimeout = setTimeout(() => { + const w = workers.get(workerId) + if (w && w.activeExecutions === 0) { + logger.info('Cleaning up idle worker', { workerId }) + cleanupWorker(workerId) + } + }, WORKER_IDLE_TIMEOUT_MS) + } +} + +function spawnWorker(): Promise { + const workerId = nextWorkerId++ + spawnInProgress++ + let spawnSettled = false + + const settleSpawnInProgress = () => { + if (spawnSettled) { + return false + } + spawnSettled = true + spawnInProgress-- + return true + } + + const workerInfo: WorkerInfo = { + process: null as unknown as ChildProcess, + ready: false, + readyPromise: null, + activeExecutions: 0, + pendingExecutions: new Map(), + idleTimeout: null, + id: workerId, + } + + workerInfo.readyPromise = new Promise((resolve, reject) => { if (!checkNodeAvailable()) { + settleSpawnInProgress() reject( new Error( 'Node.js is required for code execution but was not found. ' + @@ -198,141 +682,361 @@ async function ensureWorker(): Promise { const workerPath = path.join(currentDir, 'isolated-vm-worker.cjs') if (!fs.existsSync(workerPath)) { + settleSpawnInProgress() reject(new Error(`Worker file not found at ${workerPath}`)) return } - import('node:child_process').then(({ spawn }) => { - worker = spawn('node', [workerPath], { - stdio: ['ignore', 'pipe', 'pipe', 'ipc'], - serialization: 'json', - }) + import('node:child_process') + .then(({ spawn }) => { + const proc = spawn('node', [workerPath], { + stdio: ['ignore', 'pipe', 'pipe', 'ipc'], + serialization: 'json', + }) + workerInfo.process = proc - worker.on('message', handleWorkerMessage) + proc.on('message', (message: unknown) => handleWorkerMessage(workerId, message)) - let stderrData = '' - worker.stderr?.on('data', (data: Buffer) => { - stderrData += data.toString() - }) + let stderrData = '' + proc.stderr?.on('data', (data: Buffer) => { + stderrData += data.toString() + }) - const startTimeout = setTimeout(() => { - worker?.kill() - worker = null - workerReady = false - workerReadyPromise = null - reject(new Error('Worker failed to start within timeout')) - }, 10000) - - const readyHandler = (message: unknown) => { - if ( - typeof message === 'object' && - message !== null && - (message as { type?: string }).type === 'ready' - ) { - workerReady = true - clearTimeout(startTimeout) - worker?.off('message', readyHandler) - resolve() - } - } - worker.on('message', readyHandler) + const startTimeout = setTimeout(() => { + proc.kill() + workers.delete(workerId) + if (!settleSpawnInProgress()) return + reject(new Error('Worker failed to start within timeout')) + }, 10000) - worker.on('exit', (code) => { - if (workerIdleTimeout) { - clearTimeout(workerIdleTimeout) - workerIdleTimeout = null + const readyHandler = (message: unknown) => { + if ( + typeof message === 'object' && + message !== null && + (message as { type?: string }).type === 'ready' + ) { + if (!settleSpawnInProgress()) { + proc.off('message', readyHandler) + return + } + workerInfo.ready = true + clearTimeout(startTimeout) + proc.off('message', readyHandler) + workers.set(workerId, workerInfo) + resetWorkerIdleTimeout(workerId) + logger.info('Worker spawned and ready', { workerId, poolSize: workers.size }) + resolve() + } } + proc.on('message', readyHandler) - const wasStartupFailure = !workerReady && workerReadyPromise - - worker = null - workerReady = false - workerReadyPromise = null - - let errorMessage = 'Worker process exited unexpectedly' - if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) { - errorMessage = - 'Code execution requires the isolated-vm native module which failed to load. ' + - 'This usually means the module needs to be rebuilt for your Node.js version. ' + - 'Please run: cd node_modules/isolated-vm && npm rebuild' - logger.error('isolated-vm module failed to load', { stderr: stderrData }) - } else if (stderrData) { - errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}` - logger.error('Worker process failed', { stderr: stderrData }) - } + proc.on('exit', () => { + const wasStartupFailure = !workerInfo.ready - if (wasStartupFailure) { - clearTimeout(startTimeout) - reject(new Error(errorMessage)) - return - } + if (wasStartupFailure) { + clearTimeout(startTimeout) + if (!settleSpawnInProgress()) return - for (const [id, pending] of pendingExecutions) { - clearTimeout(pending.timeout) - pending.resolve({ - result: null, - stdout: '', - error: { message: errorMessage, name: 'WorkerError' }, - }) - pendingExecutions.delete(id) - } + let errorMessage = 'Worker process exited unexpectedly' + if (stderrData.includes('isolated_vm') || stderrData.includes('MODULE_NOT_FOUND')) { + errorMessage = + 'Code execution requires the isolated-vm native module which failed to load. ' + + 'This usually means the module needs to be rebuilt for your Node.js version. ' + + 'Please run: cd node_modules/isolated-vm && npm rebuild' + logger.error('isolated-vm module failed to load', { stderr: stderrData, workerId }) + } else if (stderrData) { + errorMessage = `Worker process failed: ${stderrData.slice(0, 500)}` + logger.error('Worker process failed', { stderr: stderrData, workerId }) + } + + reject(new Error(errorMessage)) + return + } + + cleanupWorker(workerId) + drainQueue() + }) + }) + .catch((error) => { + if (!settleSpawnInProgress()) return + reject(error instanceof Error ? error : new Error('Failed to load child_process module')) }) + }) + + return workerInfo.readyPromise.then(() => workerInfo) +} + +/** + * Returns the ready worker with the fewest active executions that still + * has capacity, or null if none available. + */ +function selectWorker(): WorkerInfo | null { + let best: WorkerInfo | null = null + for (const w of workers.values()) { + if (!w.ready) continue + if (w.activeExecutions >= MAX_PER_WORKER) continue + if (!best || w.activeExecutions < best.activeExecutions) { + best = w + } + } + return best +} + +/** + * Tries to get an existing worker with capacity, or spawns a new one if the + * pool is not full. Returns null when the pool is at capacity and all workers + * are saturated (caller should enqueue). + */ +async function acquireWorker(): Promise { + const existing = selectWorker() + if (existing) return existing + + const currentPoolSize = workers.size + spawnInProgress + if (currentPoolSize < POOL_SIZE) { + try { + return await spawnWorker() + } catch (error) { + logger.error('Failed to spawn worker', { error }) + return null + } + } + + return null +} + +function dispatchToWorker( + workerInfo: WorkerInfo, + ownerState: OwnerState, + req: IsolatedVMExecutionRequest, + resolve: (result: IsolatedVMExecutionResult) => void +) { + const execId = ++executionIdCounter + + if (workerInfo.idleTimeout) { + clearTimeout(workerInfo.idleTimeout) + workerInfo.idleTimeout = null + } + + const timeout = setTimeout(() => { + workerInfo.pendingExecutions.delete(execId) + workerInfo.activeExecutions-- + totalActiveExecutions-- + ownerState.activeExecutions = Math.max(0, ownerState.activeExecutions - 1) + maybeCleanupOwner(ownerState.ownerKey) + resolve({ + result: null, + stdout: '', + error: { message: `Execution timed out after ${req.timeoutMs}ms`, name: 'TimeoutError' }, + }) + resetWorkerIdleTimeout(workerInfo.id) + drainQueue() + }, req.timeoutMs + 1000) + + workerInfo.pendingExecutions.set(execId, { resolve, timeout, ownerKey: ownerState.ownerKey }) + workerInfo.activeExecutions++ + totalActiveExecutions++ + ownerState.activeExecutions++ + + try { + workerInfo.process.send({ type: 'execute', executionId: execId, request: req }) + } catch { + clearTimeout(timeout) + workerInfo.pendingExecutions.delete(execId) + workerInfo.activeExecutions-- + totalActiveExecutions-- + ownerState.activeExecutions = Math.max(0, ownerState.activeExecutions - 1) + maybeCleanupOwner(ownerState.ownerKey) + resolve({ + result: null, + stdout: '', + error: { message: 'Code execution failed to start. Please try again.', name: 'Error' }, + }) + resetWorkerIdleTimeout(workerInfo.id) + // Defer to break synchronous recursion: drainQueue → dispatchToWorker → catch → drainQueue + queueMicrotask(() => drainQueue()) + } +} + +function enqueueExecution( + ownerState: OwnerState, + req: IsolatedVMExecutionRequest, + resolve: (result: IsolatedVMExecutionResult) => void +) { + if (queueLength() >= MAX_QUEUE_SIZE) { + resolve({ + result: null, + stdout: '', + error: { + message: 'Code execution is at capacity. Please try again in a moment.', + name: 'Error', + }, }) + return + } + if (ownerState.queueLength >= MAX_QUEUED_PER_OWNER) { + resolve({ + result: null, + stdout: '', + error: { + message: + 'Too many concurrent code executions. Please wait for some to complete before running more.', + name: 'Error', + }, + }) + return + } + + const queueId = ++queueIdCounter + const queueTimeout = setTimeout(() => { + const queued = removeQueuedExecutionById(queueId) + if (!queued) return + resolve({ + result: null, + stdout: '', + error: { + message: 'Code execution timed out waiting for an available worker. Please try again.', + name: 'Error', + }, + }) + }, QUEUE_TIMEOUT_MS) + + pushQueuedExecution(ownerState, { + id: queueId, + ownerKey: ownerState.ownerKey, + req, + resolve, + queueTimeout, }) + logger.info('Execution queued', { + queueLength: queueLength(), + ownerKey: ownerState.ownerKey, + ownerQueueLength: ownerState.queueLength, + totalActive: totalActiveExecutions, + poolSize: workers.size, + }) + drainQueue() +} - return workerReadyPromise +/** + * Called after every completion or worker spawn — dispatches queued + * executions to available workers. + */ +function drainQueue() { + while (queueLength() > 0 && totalActiveExecutions < MAX_CONCURRENT) { + const worker = selectWorker() + if (!worker) { + const currentPoolSize = workers.size + spawnInProgress + if (currentPoolSize < POOL_SIZE) { + spawnWorker() + .then(() => drainQueue()) + .catch((err) => { + logger.error('Failed to spawn worker during drain', { err }) + scheduleDrainRetry() + }) + } + break + } + + const owner = selectOwnerForDispatch() + if (!owner) { + scheduleDrainRetry() + break + } + + const queued = shiftQueuedExecutionForOwner(owner) + if (!queued) { + owner.burstRemaining = 0 + maybeCleanupOwner(owner.ownerKey) + continue + } + clearTimeout(queued.queueTimeout) + dispatchToWorker(worker, owner, queued.req, queued.resolve) + } } /** * Execute JavaScript code in an isolated V8 isolate via Node.js subprocess. - * The worker's V8 isolate enforces timeoutMs internally. The parent timeout - * (timeoutMs + 1000) is a safety buffer for IPC communication. */ export async function executeInIsolatedVM( req: IsolatedVMExecutionRequest ): Promise { - if (workerIdleTimeout) { - clearTimeout(workerIdleTimeout) - workerIdleTimeout = null - } + const ownerKey = normalizeOwnerKey(req.ownerKey) + const ownerWeight = normalizeOwnerWeight(req.ownerWeight) + const ownerState = getOrCreateOwnerState(ownerKey, ownerWeight) - await ensureWorker() - - if (!worker) { + const distributedLeaseId = `${req.requestId}:${Date.now()}:${Math.random().toString(36).slice(2, 10)}` + const leaseAcquireResult = await tryAcquireDistributedLease( + ownerKey, + distributedLeaseId, + req.timeoutMs + ) + if (leaseAcquireResult === 'limit_exceeded') { + maybeCleanupOwner(ownerKey) return { result: null, stdout: '', - error: { message: 'Failed to start isolated-vm worker', name: 'WorkerError' }, + error: { + message: + 'Too many concurrent code executions. Please wait for some to complete before running more.', + name: 'Error', + }, + } + } + if (leaseAcquireResult === 'unavailable') { + maybeCleanupOwner(ownerKey) + return { + result: null, + stdout: '', + error: { + message: 'Code execution is temporarily unavailable. Please try again in a moment.', + name: 'Error', + }, } } - const executionId = ++executionIdCounter - - return new Promise((resolve) => { - const timeout = setTimeout(() => { - pendingExecutions.delete(executionId) - resolve({ - result: null, - stdout: '', - error: { message: `Execution timed out after ${req.timeoutMs}ms`, name: 'TimeoutError' }, - }) - }, req.timeoutMs + 1000) + let settled = false + const releaseLease = () => { + if (settled) return + settled = true + releaseDistributedLease(ownerKey, distributedLeaseId).catch((error) => { + logger.error('Failed to release distributed lease', { ownerKey, error }) + }) + } - pendingExecutions.set(executionId, { resolve, timeout }) + return new Promise((resolve) => { + const resolveWithRelease = (result: IsolatedVMExecutionResult) => { + releaseLease() + resolve(result) + } - try { - worker!.send({ type: 'execute', executionId, request: req }) - } catch { - clearTimeout(timeout) - pendingExecutions.delete(executionId) - resolve({ - result: null, - stdout: '', - error: { message: 'Failed to send execution request to worker', name: 'WorkerError' }, - }) + if ( + totalActiveExecutions >= MAX_CONCURRENT || + ownerState.activeExecutions >= MAX_ACTIVE_PER_OWNER + ) { + enqueueExecution(ownerState, req, resolveWithRelease) return } - resetIdleTimeout() + acquireWorker() + .then((workerInfo) => { + if (!workerInfo) { + enqueueExecution(ownerState, req, resolveWithRelease) + return + } + + dispatchToWorker(workerInfo, ownerState, req, resolveWithRelease) + if (queueLength() > 0) { + drainQueue() + } + }) + .catch((error) => { + logger.error('Failed to acquire worker for execution', { error, ownerKey }) + enqueueExecution(ownerState, req, resolveWithRelease) + }) + }).finally(() => { + releaseLease() + if (ownerState.queueLength === 0 && ownerState.activeExecutions === 0) { + maybeCleanupOwner(ownerState.ownerKey) + } }) } diff --git a/apps/sim/lib/execution/preprocessing.ts b/apps/sim/lib/execution/preprocessing.ts index 9a0236fd16..3eb14813e3 100644 --- a/apps/sim/lib/execution/preprocessing.ts +++ b/apps/sim/lib/execution/preprocessing.ts @@ -124,6 +124,7 @@ export interface PreprocessExecutionOptions { workspaceId?: string // If known, used for billing resolution loggingSession?: LoggingSession // If provided, will be used for error logging isResumeContext?: boolean // If true, allows fallback billing on resolution failure (for paused workflow resumes) + useAuthenticatedUserAsActor?: boolean // If true, use the authenticated userId as actorUserId (for client-side executions and personal API keys) /** @deprecated No longer used - background/async executions always use deployed state */ useDraftState?: boolean } @@ -170,6 +171,7 @@ export async function preprocessExecution( workspaceId: providedWorkspaceId, loggingSession: providedLoggingSession, isResumeContext = false, + useAuthenticatedUserAsActor = false, } = options logger.info(`[${requestId}] Starting execution preprocessing`, { @@ -257,7 +259,14 @@ export async function preprocessExecution( let actorUserId: string | null = null try { - if (workspaceId) { + // For client-side executions and personal API keys, the authenticated + // user is the billing and permission actor — not the workspace owner. + if (useAuthenticatedUserAsActor && userId) { + actorUserId = userId + logger.info(`[${requestId}] Using authenticated user as actor: ${actorUserId}`) + } + + if (!actorUserId && workspaceId) { actorUserId = await getWorkspaceBilledAccountUserId(workspaceId) if (actorUserId) { logger.info(`[${requestId}] Using workspace billed account: ${actorUserId}`) diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index 77e9b4d765..1dfe1faf24 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -221,7 +221,8 @@ export async function executeTool( // If it's a custom tool, use the async version with workflowId if (isCustomTool(normalizedToolId)) { const workflowId = params._context?.workflowId - tool = await getToolAsync(normalizedToolId, workflowId) + const userId = params._context?.userId + tool = await getToolAsync(normalizedToolId, workflowId, userId) if (!tool) { logger.error(`[${requestId}] Custom tool not found: ${normalizedToolId}`) } @@ -260,26 +261,25 @@ export async function executeTool( try { const baseUrl = getBaseUrl() + const workflowId = contextParams._context?.workflowId + const userId = contextParams._context?.userId + const tokenPayload: OAuthTokenPayload = { credentialId: contextParams.credential as string, } - - // Add workflowId if it exists in params, context, or executionContext - const workflowId = - contextParams.workflowId || - contextParams._context?.workflowId || - executionContext?.workflowId if (workflowId) { tokenPayload.workflowId = workflowId } logger.info(`[${requestId}] Fetching access token from ${baseUrl}/api/auth/oauth/token`) - // Build token URL and also include workflowId in query so server auth can read it const tokenUrlObj = new URL('/api/auth/oauth/token', baseUrl) if (workflowId) { tokenUrlObj.searchParams.set('workflowId', workflowId) } + if (userId) { + tokenUrlObj.searchParams.set('userId', userId) + } // Always send Content-Type; add internal auth on server-side runs const tokenHeaders: Record = { 'Content-Type': 'application/json' } @@ -583,6 +583,10 @@ async function executeToolRequest( if (workflowId) { fullUrlObj.searchParams.set('workflowId', workflowId) } + const userId = params._context?.userId + if (userId) { + fullUrlObj.searchParams.set('userId', userId) + } } const fullUrl = fullUrlObj.toString() diff --git a/apps/sim/tools/utils.ts b/apps/sim/tools/utils.ts index e5364e415b..0a7b635faa 100644 --- a/apps/sim/tools/utils.ts +++ b/apps/sim/tools/utils.ts @@ -311,7 +311,8 @@ export function getTool(toolId: string): ToolConfig | undefined { // Get a tool by its ID asynchronously (supports server-side) export async function getToolAsync( toolId: string, - workflowId?: string + workflowId?: string, + userId?: string ): Promise { // Check for built-in tools const builtInTool = tools[toolId] @@ -319,7 +320,7 @@ export async function getToolAsync( // Check if it's a custom tool if (isCustomTool(toolId)) { - return fetchCustomToolFromAPI(toolId, workflowId) + return fetchCustomToolFromAPI(toolId, workflowId, userId) } return undefined @@ -366,7 +367,8 @@ function createToolConfig(customTool: any, customToolId: string): ToolConfig { // Create a tool config from a custom tool definition by fetching from API async function fetchCustomToolFromAPI( customToolId: string, - workflowId?: string + workflowId?: string, + userId?: string ): Promise { const identifier = customToolId.replace('custom_', '') @@ -374,10 +376,12 @@ async function fetchCustomToolFromAPI( const baseUrl = getBaseUrl() const url = new URL('/api/tools/custom', baseUrl) - // Add workflowId as a query parameter if available if (workflowId) { url.searchParams.append('workflowId', workflowId) } + if (userId) { + url.searchParams.append('userId', userId) + } // For server-side calls (during workflow execution), use internal JWT token const headers: Record = {} diff --git a/helm/sim/values.yaml b/helm/sim/values.yaml index 86e6c90794..d5eecb51ec 100644 --- a/helm/sim/values.yaml +++ b/helm/sim/values.yaml @@ -139,7 +139,25 @@ app: EXECUTION_TIMEOUT_ASYNC_PRO: "5400" # Pro tier async timeout (90 minutes) EXECUTION_TIMEOUT_ASYNC_TEAM: "5400" # Team tier async timeout (90 minutes) EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: "5400" # Enterprise tier async timeout (90 minutes) - + + # Isolated-VM Worker Pool Configuration + IVM_POOL_SIZE: "4" # Max worker processes in pool + IVM_MAX_CONCURRENT: "10000" # Max concurrent executions globally + IVM_MAX_PER_WORKER: "2500" # Max concurrent executions per worker + IVM_WORKER_IDLE_TIMEOUT_MS: "60000" # Worker idle cleanup timeout (ms) + IVM_QUEUE_TIMEOUT_MS: "300000" # Max queue wait before rejection (ms) + IVM_MAX_QUEUE_SIZE: "10000" # Max queued executions globally + IVM_MAX_ACTIVE_PER_OWNER: "200" # Max concurrent executions per user + IVM_MAX_QUEUED_PER_OWNER: "2000" # Max queued executions per user + IVM_MAX_OWNER_WEIGHT: "5" # Max scheduling weight per user + IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER: "2200" # Max in-flight per user across instances (Redis) + IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: "120000" # Min distributed lease TTL (ms) + IVM_MAX_FETCH_RESPONSE_BYTES: "8388608" # Max fetch response size (8MB) + IVM_MAX_FETCH_RESPONSE_CHARS: "4000000" # Max fetch response chars + IVM_MAX_FETCH_URL_LENGTH: "8192" # Max fetch URL length + IVM_MAX_FETCH_OPTIONS_JSON_CHARS: "262144" # Max fetch options payload (256KB) + IVM_MAX_STDOUT_CHARS: "200000" # Max stdout capture per execution + # UI Branding & Whitelabeling Configuration NEXT_PUBLIC_BRAND_NAME: "Sim" # Custom brand name NEXT_PUBLIC_BRAND_LOGO_URL: "" # Custom logo URL (leave empty for default)