diff --git a/apps/sim/background/schedule-execution.ts b/apps/sim/background/schedule-execution.ts index 4c240e5f1e..0ee2b07f2f 100644 --- a/apps/sim/background/schedule-execution.ts +++ b/apps/sim/background/schedule-execution.ts @@ -1,3 +1,4 @@ +import { trace } from '@opentelemetry/api' import { db, jobExecutionLogs, @@ -947,16 +948,28 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) { ) } } catch (error: unknown) { - if (isRetryableInfrastructureError(error)) { - await retryScheduleAfterInfraFailure({ payload, requestId, claimedAt, error }) - return - } + try { + if (isRetryableInfrastructureError(error)) { + await retryScheduleAfterInfraFailure({ payload, requestId, claimedAt, error }) + return + } - logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error) - await releaseClaim( - now, - `Failed to release schedule ${payload.scheduleId} after unhandled error` - ) + logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error) + await releaseClaim( + now, + `Failed to release schedule ${payload.scheduleId} after unhandled error` + ) + } catch (recoveryError: unknown) { + // A secondary failure during error recovery (e.g. a transient DB blip while + // releasing the claim or scheduling an infra retry) must not fault the run. The + // claim expires on its TTL and the next tick re-claims the schedule. Record the + // exception on the span so it stays visible in traces without faulting the run. + logger.error( + `[${requestId}] Failed to recover schedule ${payload.scheduleId} after error`, + recoveryError + ) + trace.getActiveSpan()?.recordException(toError(recoveryError)) + } } }) } diff --git a/apps/sim/background/webhook-execution.test.ts b/apps/sim/background/webhook-execution.test.ts index 620c073ac0..7a75dc3e4f 100644 --- a/apps/sim/background/webhook-execution.test.ts +++ b/apps/sim/background/webhook-execution.test.ts @@ -2,17 +2,110 @@ * @vitest-environment node */ +import { + dbChainMock, + dbChainMockFns, + executionPreprocessingMock, + executionPreprocessingMockFns, + loggingSessionMock, + loggingSessionMockFns, +} from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' -const { mockResolveWebhookRecordProviderConfig } = vi.hoisted(() => ({ +const { + mockResolveWebhookRecordProviderConfig, + mockExecuteWorkflowCore, + mockWasExecutionFinalizedByCore, + mockRecordException, + mockGetActiveSpan, +} = vi.hoisted(() => ({ mockResolveWebhookRecordProviderConfig: vi.fn(), + mockExecuteWorkflowCore: vi.fn(), + mockWasExecutionFinalizedByCore: vi.fn(), + mockRecordException: vi.fn(), + mockGetActiveSpan: vi.fn(), })) +vi.mock('@opentelemetry/api', () => ({ + trace: { getActiveSpan: mockGetActiveSpan }, +})) + +vi.mock('@sim/db', () => dbChainMock) +vi.mock('@/lib/execution/preprocessing', () => executionPreprocessingMock) +vi.mock('@/lib/logs/execution/logging-session', () => loggingSessionMock) + vi.mock('@/lib/webhooks/env-resolver', () => ({ resolveWebhookRecordProviderConfig: mockResolveWebhookRecordProviderConfig, })) -import { resolveWebhookExecutionProviderConfig } from './webhook-execution' +vi.mock('@/lib/workflows/executor/execution-core', () => ({ + executeWorkflowCore: mockExecuteWorkflowCore, + wasExecutionFinalizedByCore: mockWasExecutionFinalizedByCore, +})) + +vi.mock('@/lib/core/idempotency', () => ({ + IdempotencyService: { createWebhookIdempotencyKey: vi.fn(() => 'idempotency-key') }, + webhookIdempotency: { + executeWithIdempotency: vi.fn( + (_provider: string, _key: string, operation: () => Promise) => operation() + ), + }, +})) + +vi.mock('@/lib/workflows/persistence/utils', () => ({ + loadDeployedWorkflowState: vi.fn(async () => ({ + blocks: {}, + edges: [], + loops: {}, + parallels: {}, + deploymentVersionId: 'deployment-1', + })), +})) + +vi.mock('@/lib/webhooks/providers', () => ({ + getProviderHandler: vi.fn(() => ({})), +})) + +vi.mock('@/lib/logs/execution/trace-spans/trace-spans', () => ({ + buildTraceSpans: vi.fn(() => ({ traceSpans: [] })), +})) + +vi.mock('@/lib/core/execution-limits', () => ({ + createTimeoutAbortController: vi.fn(() => ({ + signal: new AbortController().signal, + cleanup: vi.fn(), + isTimedOut: () => false, + timeoutMs: 120_000, + })), + getTimeoutErrorMessage: vi.fn(() => 'timed out'), +})) + +vi.mock('@/lib/workflows/executor/pause-persistence', () => ({ + handlePostExecutionPauseState: vi.fn(), +})) + +vi.mock('@/lib/webhooks/attachment-processor', () => ({ + WebhookAttachmentProcessor: class {}, +})) + +vi.mock('@/app/api/auth/oauth/utils', () => ({ + resolveOAuthAccountId: vi.fn(), +})) + +vi.mock('@/executor/execution/snapshot', () => ({ + ExecutionSnapshot: class {}, +})) + +vi.mock('@/tools/safe-assign', () => ({ safeAssign: vi.fn() })) + +vi.mock('@/blocks', () => ({ getBlock: vi.fn(() => null) })) + +vi.mock('@/triggers', () => ({ + getTrigger: vi.fn(), + isTriggerValid: vi.fn(() => false), +})) + +import { executeWebhookJob, resolveWebhookExecutionProviderConfig } from './webhook-execution' describe('resolveWebhookExecutionProviderConfig', () => { beforeEach(() => { @@ -66,3 +159,64 @@ describe('resolveWebhookExecutionProviderConfig', () => { ) }) }) + +describe('executeWebhookJob fault vs error handling', () => { + const payload = { + webhookId: 'webhook-1', + workflowId: 'workflow-1', + userId: 'user-1', + executionId: 'execution-1', + requestId: 'request-1', + provider: 'gmail', + body: { message: 'hello' }, + headers: {}, + path: '/webhook', + workspaceId: 'workspace-1', + } + + beforeEach(() => { + vi.clearAllMocks() + executionPreprocessingMockFns.mockPreprocessExecution.mockResolvedValue({ + success: true, + workflowRecord: { workspaceId: 'workspace-1', userId: 'user-1', variables: {} }, + executionTimeout: { async: 120_000 }, + }) + mockResolveWebhookRecordProviderConfig.mockImplementation(async (record) => record) + dbChainMockFns.limit.mockResolvedValue([{ id: 'webhook-1' }]) + mockGetActiveSpan.mockReturnValue({ recordException: mockRecordException }) + }) + + it('completes the run (does not throw) when the failure was finalized by core', async () => { + mockExecuteWorkflowCore.mockRejectedValue( + new Error('Gmail 2 is missing required fields: Label') + ) + mockWasExecutionFinalizedByCore.mockReturnValue(true) + + const result = await executeWebhookJob(payload) + + expect(result).toMatchObject({ + success: false, + workflowId: 'workflow-1', + executionId: 'execution-1', + provider: 'gmail', + }) + expect(loggingSessionMockFns.mockWaitForPostExecution).toHaveBeenCalled() + // User/workflow errors are already recorded by core — the catch must not re-log them. + expect(loggingSessionMockFns.mockSafeCompleteWithError).not.toHaveBeenCalled() + // The error is still recorded on the run span so it stays visible in traces. + expect(mockRecordException).toHaveBeenCalledWith( + expect.objectContaining({ message: 'Gmail 2 is missing required fields: Label' }) + ) + }) + + it('faults the run (re-throws) when the failure was not finalized by core', async () => { + mockExecuteWorkflowCore.mockRejectedValue(new Error('Workflow state not found')) + mockWasExecutionFinalizedByCore.mockReturnValue(false) + + await expect(executeWebhookJob(payload)).rejects.toThrow('Workflow state not found') + // waitForPostExecution must run on every path so the finalized-by-core signal is always reliable. + expect(loggingSessionMockFns.mockWaitForPostExecution).toHaveBeenCalled() + // Pipeline/infra errors are recorded here before re-throwing to fault the trigger.dev run. + expect(loggingSessionMockFns.mockSafeCompleteWithError).toHaveBeenCalled() + }) +}) diff --git a/apps/sim/background/webhook-execution.ts b/apps/sim/background/webhook-execution.ts index 1753813d84..41048f9208 100644 --- a/apps/sim/background/webhook-execution.ts +++ b/apps/sim/background/webhook-execution.ts @@ -1,3 +1,4 @@ +import { trace } from '@opentelemetry/api' import { db } from '@sim/db' import { account, webhook } from '@sim/db/schema' import { createLogger, runWithRequestContext } from '@sim/logger' @@ -616,8 +617,27 @@ async function executeWebhookJobInternal( provider: payload.provider, }) + // The finalized flag is set inside a fire-and-forget post-execution promise; await it so the + // signal is reliable and the failure is fully persisted before we decide fault vs error. + await loggingSession.waitForPostExecution() + + // A failure inside workflow execution (block error, provider 4xx, missing required field, etc.) + // is finalized by core and already recorded in the execution logs. That is a user/workflow error, + // not a trigger.dev job fault — complete the run normally so we don't fire a false alert. Errors + // that were not finalized came from the webhook pipeline itself, so we re-throw to fault below. if (wasExecutionFinalizedByCore(error, executionId)) { - throw error + // Record the exception on the run span so it stays visible in traces without + // marking the span as ERROR — that status is what faults the trigger.dev run. + trace.getActiveSpan()?.recordException(toError(error)) + + return { + success: false, + workflowId: payload.workflowId, + executionId, + output: hasExecutionResult(error) ? error.executionResult.output : {}, + executedAt: new Date().toISOString(), + provider: payload.provider, + } } try {