Skip to content
2 changes: 2 additions & 0 deletions apps/sim/app/api/function/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,8 @@ export async function POST(req: NextRequest) {
contextVariables,
timeoutMs: timeout,
requestId,
ownerKey: `user:${auth.userId}`,
ownerWeight: 1,
})

const executionTime = Date.now() - startTime
Expand Down
6 changes: 6 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,11 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
requestId
)

// Client-side sessions and personal API keys bill/permission-check the
// authenticated user, not the workspace billed account.
const useAuthenticatedUserAsActor =
isClientSession || (auth.authType === 'api_key' && auth.apiKeyType === 'personal')

const preprocessResult = await preprocessExecution({
workflowId,
userId,
Expand All @@ -334,6 +339,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
checkDeployment: !shouldUseDraftState,
loggingSession,
useDraftState: shouldUseDraftState,
useAuthenticatedUserAsActor,
})

if (!preprocessResult.success) {
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/agent/agent-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ export class AgentBlockHandler implements BlockHandler {
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/api/api-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export class ApiBlockHandler implements BlockHandler {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
executionId: ctx.executionId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/condition/condition-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export async function evaluateConditionExpression(
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/function/function-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export class FunctionBlockHandler implements BlockHandler {
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},
Expand Down
1 change: 1 addition & 0 deletions apps/sim/executor/handlers/generic/generic-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export class GenericBlockHandler implements BlockHandler {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
executionId: ctx.executionId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler {
_context: {
workflowId: ctx.workflowId,
workspaceId: ctx.workspaceId,
userId: ctx.userId,
isDeployedContext: ctx.isDeployedContext,
},
blockData: blockDataWithPause,
Expand Down
2 changes: 2 additions & 0 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,8 @@ export class LoopOrchestrator {
contextVariables: {},
timeoutMs: LOOP_CONDITION_TIMEOUT_MS,
requestId,
ownerKey: `user:${ctx.userId}`,
ownerWeight: 1,
})

if (vmResult.error) {
Expand Down
32 changes: 7 additions & 25 deletions apps/sim/lib/auth/hybrid.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service'
import { getSession } from '@/lib/auth'
Expand All @@ -13,35 +10,33 @@ export interface AuthResult {
success: boolean
userId?: string
authType?: 'session' | 'api_key' | 'internal_jwt'
apiKeyType?: 'personal' | 'workspace'
error?: string
}

/**
* Resolves userId from a verified internal JWT token.
* Extracts workflowId/userId from URL params or POST body, then looks up userId if needed.
* Extracts userId from the JWT payload, URL search params, or POST body.
*/
async function resolveUserFromJwt(
request: NextRequest,
verificationUserId: string | null,
options: { requireWorkflowId?: boolean }
): Promise<AuthResult> {
let workflowId: string | null = null
let userId: string | null = verificationUserId

const { searchParams } = new URL(request.url)
workflowId = searchParams.get('workflowId')
if (!userId) {
const { searchParams } = new URL(request.url)
userId = searchParams.get('userId')
}

if (!workflowId && !userId && request.method === 'POST') {
if (!userId && request.method === 'POST') {
try {
const clonedRequest = request.clone()
const bodyText = await clonedRequest.text()
if (bodyText) {
const body = JSON.parse(bodyText)
workflowId = body.workflowId || body._context?.workflowId
userId = userId || body.userId || body._context?.userId
userId = body.userId || body._context?.userId || null
}
} catch {
// Ignore JSON parse errors
Expand All @@ -52,22 +47,8 @@ async function resolveUserFromJwt(
return { success: true, userId, authType: 'internal_jwt' }
}

if (workflowId) {
const [workflowData] = await db
.select({ userId: workflow.userId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)

if (!workflowData) {
return { success: false, error: 'Workflow not found' }
}

return { success: true, userId: workflowData.userId, authType: 'internal_jwt' }
}

if (options.requireWorkflowId !== false) {
return { success: false, error: 'workflowId or userId required for internal JWT calls' }
return { success: false, error: 'userId required for internal JWT calls' }
}

return { success: true, authType: 'internal_jwt' }
Expand Down Expand Up @@ -222,6 +203,7 @@ export async function checkHybridAuth(
success: true,
userId: result.userId!,
authType: 'api_key',
apiKeyType: result.keyType,
}
}

Expand Down
18 changes: 18 additions & 0 deletions apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,24 @@ export const env = createEnv({
EXECUTION_TIMEOUT_ASYNC_TEAM: z.string().optional().default('5400'), // 90 minutes
EXECUTION_TIMEOUT_ASYNC_ENTERPRISE: z.string().optional().default('5400'), // 90 minutes

// Isolated-VM Worker Pool Configuration
IVM_POOL_SIZE: z.string().optional().default('4'), // Max worker processes in pool
IVM_MAX_CONCURRENT: z.string().optional().default('10000'), // Max concurrent executions globally
IVM_MAX_PER_WORKER: z.string().optional().default('2500'), // Max concurrent executions per worker
IVM_WORKER_IDLE_TIMEOUT_MS: z.string().optional().default('60000'), // Worker idle cleanup timeout (ms)
IVM_MAX_QUEUE_SIZE: z.string().optional().default('10000'), // Max pending queued executions in memory
IVM_MAX_FETCH_RESPONSE_BYTES: z.string().optional().default('8388608'),// Max bytes read from sandbox fetch responses
IVM_MAX_FETCH_RESPONSE_CHARS: z.string().optional().default('4000000'),// Max chars returned to sandbox from fetch body
IVM_MAX_FETCH_OPTIONS_JSON_CHARS: z.string().optional().default('262144'), // Max JSON payload size for sandbox fetch options
IVM_MAX_FETCH_URL_LENGTH: z.string().optional().default('8192'), // Max URL length accepted by sandbox fetch
IVM_MAX_STDOUT_CHARS: z.string().optional().default('200000'), // Max captured stdout characters per execution
IVM_MAX_ACTIVE_PER_OWNER: z.string().optional().default('200'), // Max active executions per owner (per process)
IVM_MAX_QUEUED_PER_OWNER: z.string().optional().default('2000'), // Max queued executions per owner (per process)
IVM_MAX_OWNER_WEIGHT: z.string().optional().default('5'), // Max accepted weight for weighted owner scheduling
IVM_DISTRIBUTED_MAX_INFLIGHT_PER_OWNER:z.string().optional().default('2200'), // Max owner in-flight leases across replicas
IVM_DISTRIBUTED_LEASE_MIN_TTL_MS: z.string().optional().default('120000'), // Min TTL for distributed in-flight leases (ms)
IVM_QUEUE_TIMEOUT_MS: z.string().optional().default('300000'), // Max queue wait before rejection (ms)

// Knowledge Base Processing Configuration - Shared across all processing methods
KB_CONFIG_MAX_DURATION: z.number().optional().default(600), // Max processing duration in seconds (10 minutes)
KB_CONFIG_MAX_ATTEMPTS: z.number().optional().default(3), // Max retry attempts
Expand Down
22 changes: 21 additions & 1 deletion apps/sim/lib/core/security/input-validation.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export interface SecureFetchOptions {
body?: string | Buffer | Uint8Array
timeout?: number
maxRedirects?: number
maxResponseBytes?: number
}

export class SecureFetchHeaders {
Expand Down Expand Up @@ -165,6 +166,7 @@ export async function secureFetchWithPinnedIP(
redirectCount = 0
): Promise<SecureFetchResponse> {
const maxRedirects = options.maxRedirects ?? DEFAULT_MAX_REDIRECTS
const maxResponseBytes = options.maxResponseBytes

return new Promise((resolve, reject) => {
const parsed = new URL(url)
Expand Down Expand Up @@ -237,14 +239,32 @@ export async function secureFetchWithPinnedIP(
}

const chunks: Buffer[] = []
let totalBytes = 0
let responseTerminated = false

res.on('data', (chunk: Buffer) => {
if (responseTerminated) return

totalBytes += chunk.length
if (
typeof maxResponseBytes === 'number' &&
maxResponseBytes > 0 &&
totalBytes > maxResponseBytes
) {
responseTerminated = true
res.destroy(new Error(`Response exceeded maximum size of ${maxResponseBytes} bytes`))
return
}

res.on('data', (chunk: Buffer) => chunks.push(chunk))
chunks.push(chunk)
})

res.on('error', (error) => {
reject(error)
})

res.on('end', () => {
if (responseTerminated) return
const bodyBuffer = Buffer.concat(chunks)
const body = bodyBuffer.toString('utf-8')
const headersRecord: Record<string, string> = {}
Expand Down
54 changes: 46 additions & 8 deletions apps/sim/lib/execution/isolated-vm-worker.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ const USER_CODE_START_LINE = 4
const pendingFetches = new Map()
let fetchIdCounter = 0
const FETCH_TIMEOUT_MS = 300000 // 5 minutes
const MAX_STDOUT_CHARS = Number.parseInt(process.env.IVM_MAX_STDOUT_CHARS || '', 10) || 200000
const MAX_FETCH_OPTIONS_JSON_CHARS =
Number.parseInt(process.env.IVM_MAX_FETCH_OPTIONS_JSON_CHARS || '', 10) || 256 * 1024

function stringifyLogValue(value) {
if (typeof value !== 'object' || value === null) {
return String(value)
}

try {
return JSON.stringify(value)
} catch {
return '[unserializable]'
}
}

/**
* Extract line and column from error stack or message
Expand Down Expand Up @@ -101,8 +116,32 @@ function convertToCompatibleError(errorInfo, userCode) {
async function executeCode(request) {
const { code, params, envVars, contextVariables, timeoutMs, requestId } = request
const stdoutChunks = []
let stdoutLength = 0
let stdoutTruncated = false
let isolate = null

const appendStdout = (line) => {
if (stdoutTruncated || !line) return

const remaining = MAX_STDOUT_CHARS - stdoutLength
if (remaining <= 0) {
stdoutTruncated = true
stdoutChunks.push('[stdout truncated]\n')
return
}

if (line.length <= remaining) {
stdoutChunks.push(line)
stdoutLength += line.length
return
}

stdoutChunks.push(line.slice(0, remaining))
stdoutChunks.push('\n[stdout truncated]\n')
stdoutLength = MAX_STDOUT_CHARS
stdoutTruncated = true
}

try {
isolate = new ivm.Isolate({ memoryLimit: 128 })
const context = await isolate.createContext()
Expand All @@ -111,18 +150,14 @@ async function executeCode(request) {
await jail.set('global', jail.derefInto())

const logCallback = new ivm.Callback((...args) => {
const message = args
.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg)))
.join(' ')
stdoutChunks.push(`${message}\n`)
const message = args.map((arg) => stringifyLogValue(arg)).join(' ')
appendStdout(`${message}\n`)
})
await jail.set('__log', logCallback)

const errorCallback = new ivm.Callback((...args) => {
const message = args
.map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg)))
.join(' ')
stdoutChunks.push(`ERROR: ${message}\n`)
const message = args.map((arg) => stringifyLogValue(arg)).join(' ')
appendStdout(`ERROR: ${message}\n`)
})
await jail.set('__error', errorCallback)

Expand Down Expand Up @@ -178,6 +213,9 @@ async function executeCode(request) {
} catch {
throw new Error('fetch options must be JSON-serializable');
}
if (optionsJson.length > ${MAX_FETCH_OPTIONS_JSON_CHARS}) {
throw new Error('fetch options exceed maximum payload size');
}
}
const resultJson = await __fetchRef.apply(undefined, [url, optionsJson], { result: { promise: true } });
let result;
Expand Down
Loading