Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions apps/sim/background/schedule-execution.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { trace } from '@opentelemetry/api'
import {
db,
jobExecutionLogs,
Expand Down Expand Up @@ -947,16 +948,28 @@ export async function executeScheduleJob(payload: ScheduleExecutionPayload) {
)
}
} catch (error: unknown) {
if (isRetryableInfrastructureError(error)) {
await retryScheduleAfterInfraFailure({ payload, requestId, claimedAt, error })
return
}
try {
if (isRetryableInfrastructureError(error)) {
await retryScheduleAfterInfraFailure({ payload, requestId, claimedAt, error })
return
}

logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error)
await releaseClaim(
now,
`Failed to release schedule ${payload.scheduleId} after unhandled error`
)
logger.error(`[${requestId}] Error processing schedule ${payload.scheduleId}`, error)
await releaseClaim(
now,
`Failed to release schedule ${payload.scheduleId} after unhandled error`
)
} catch (recoveryError: unknown) {
// A secondary failure during error recovery (e.g. a transient DB blip while
// releasing the claim or scheduling an infra retry) must not fault the run. The
// claim expires on its TTL and the next tick re-claims the schedule. Record the
// exception on the span so it stays visible in traces without faulting the run.
logger.error(
`[${requestId}] Failed to recover schedule ${payload.scheduleId} after error`,
recoveryError
)
trace.getActiveSpan()?.recordException(toError(recoveryError))
}
}
})
}
Expand Down
158 changes: 156 additions & 2 deletions apps/sim/background/webhook-execution.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,110 @@
* @vitest-environment node
*/

import {
dbChainMock,
dbChainMockFns,
executionPreprocessingMock,
executionPreprocessingMockFns,
loggingSessionMock,
loggingSessionMockFns,
} from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'

const { mockResolveWebhookRecordProviderConfig } = vi.hoisted(() => ({
const {
mockResolveWebhookRecordProviderConfig,
mockExecuteWorkflowCore,
mockWasExecutionFinalizedByCore,
mockRecordException,
mockGetActiveSpan,
} = vi.hoisted(() => ({
mockResolveWebhookRecordProviderConfig: vi.fn(),
mockExecuteWorkflowCore: vi.fn(),
mockWasExecutionFinalizedByCore: vi.fn(),
mockRecordException: vi.fn(),
mockGetActiveSpan: vi.fn(),
}))

vi.mock('@opentelemetry/api', () => ({
trace: { getActiveSpan: mockGetActiveSpan },
}))

vi.mock('@sim/db', () => dbChainMock)
vi.mock('@/lib/execution/preprocessing', () => executionPreprocessingMock)
vi.mock('@/lib/logs/execution/logging-session', () => loggingSessionMock)

vi.mock('@/lib/webhooks/env-resolver', () => ({
resolveWebhookRecordProviderConfig: mockResolveWebhookRecordProviderConfig,
}))

import { resolveWebhookExecutionProviderConfig } from './webhook-execution'
vi.mock('@/lib/workflows/executor/execution-core', () => ({
executeWorkflowCore: mockExecuteWorkflowCore,
wasExecutionFinalizedByCore: mockWasExecutionFinalizedByCore,
}))

vi.mock('@/lib/core/idempotency', () => ({
IdempotencyService: { createWebhookIdempotencyKey: vi.fn(() => 'idempotency-key') },
webhookIdempotency: {
executeWithIdempotency: vi.fn(
(_provider: string, _key: string, operation: () => Promise<unknown>) => operation()
),
},
}))

vi.mock('@/lib/workflows/persistence/utils', () => ({
loadDeployedWorkflowState: vi.fn(async () => ({
blocks: {},
edges: [],
loops: {},
parallels: {},
deploymentVersionId: 'deployment-1',
})),
}))

vi.mock('@/lib/webhooks/providers', () => ({
getProviderHandler: vi.fn(() => ({})),
}))

vi.mock('@/lib/logs/execution/trace-spans/trace-spans', () => ({
buildTraceSpans: vi.fn(() => ({ traceSpans: [] })),
}))

vi.mock('@/lib/core/execution-limits', () => ({
createTimeoutAbortController: vi.fn(() => ({
signal: new AbortController().signal,
cleanup: vi.fn(),
isTimedOut: () => false,
timeoutMs: 120_000,
})),
getTimeoutErrorMessage: vi.fn(() => 'timed out'),
}))

vi.mock('@/lib/workflows/executor/pause-persistence', () => ({
handlePostExecutionPauseState: vi.fn(),
}))

vi.mock('@/lib/webhooks/attachment-processor', () => ({
WebhookAttachmentProcessor: class {},
}))

vi.mock('@/app/api/auth/oauth/utils', () => ({
resolveOAuthAccountId: vi.fn(),
}))

vi.mock('@/executor/execution/snapshot', () => ({
ExecutionSnapshot: class {},
}))

vi.mock('@/tools/safe-assign', () => ({ safeAssign: vi.fn() }))

vi.mock('@/blocks', () => ({ getBlock: vi.fn(() => null) }))

vi.mock('@/triggers', () => ({
getTrigger: vi.fn(),
isTriggerValid: vi.fn(() => false),
}))

import { executeWebhookJob, resolveWebhookExecutionProviderConfig } from './webhook-execution'

describe('resolveWebhookExecutionProviderConfig', () => {
beforeEach(() => {
Expand Down Expand Up @@ -66,3 +159,64 @@ describe('resolveWebhookExecutionProviderConfig', () => {
)
})
})

describe('executeWebhookJob fault vs error handling', () => {
const payload = {
webhookId: 'webhook-1',
workflowId: 'workflow-1',
userId: 'user-1',
executionId: 'execution-1',
requestId: 'request-1',
provider: 'gmail',
body: { message: 'hello' },
headers: {},
path: '/webhook',
workspaceId: 'workspace-1',
}

beforeEach(() => {
vi.clearAllMocks()
executionPreprocessingMockFns.mockPreprocessExecution.mockResolvedValue({
success: true,
workflowRecord: { workspaceId: 'workspace-1', userId: 'user-1', variables: {} },
executionTimeout: { async: 120_000 },
})
mockResolveWebhookRecordProviderConfig.mockImplementation(async (record) => record)
dbChainMockFns.limit.mockResolvedValue([{ id: 'webhook-1' }])
mockGetActiveSpan.mockReturnValue({ recordException: mockRecordException })
})

it('completes the run (does not throw) when the failure was finalized by core', async () => {
mockExecuteWorkflowCore.mockRejectedValue(
new Error('Gmail 2 is missing required fields: Label')
)
mockWasExecutionFinalizedByCore.mockReturnValue(true)

const result = await executeWebhookJob(payload)

expect(result).toMatchObject({
success: false,
workflowId: 'workflow-1',
executionId: 'execution-1',
provider: 'gmail',
})
expect(loggingSessionMockFns.mockWaitForPostExecution).toHaveBeenCalled()
// User/workflow errors are already recorded by core — the catch must not re-log them.
expect(loggingSessionMockFns.mockSafeCompleteWithError).not.toHaveBeenCalled()
// The error is still recorded on the run span so it stays visible in traces.
expect(mockRecordException).toHaveBeenCalledWith(
expect.objectContaining({ message: 'Gmail 2 is missing required fields: Label' })
)
})

it('faults the run (re-throws) when the failure was not finalized by core', async () => {
mockExecuteWorkflowCore.mockRejectedValue(new Error('Workflow state not found'))
mockWasExecutionFinalizedByCore.mockReturnValue(false)

await expect(executeWebhookJob(payload)).rejects.toThrow('Workflow state not found')
// waitForPostExecution must run on every path so the finalized-by-core signal is always reliable.
expect(loggingSessionMockFns.mockWaitForPostExecution).toHaveBeenCalled()
// Pipeline/infra errors are recorded here before re-throwing to fault the trigger.dev run.
expect(loggingSessionMockFns.mockSafeCompleteWithError).toHaveBeenCalled()
})
Comment thread
greptile-apps[bot] marked this conversation as resolved.
})
22 changes: 21 additions & 1 deletion apps/sim/background/webhook-execution.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { trace } from '@opentelemetry/api'
import { db } from '@sim/db'
import { account, webhook } from '@sim/db/schema'
import { createLogger, runWithRequestContext } from '@sim/logger'
Expand Down Expand Up @@ -616,8 +617,27 @@ async function executeWebhookJobInternal(
provider: payload.provider,
})

// The finalized flag is set inside a fire-and-forget post-execution promise; await it so the
// signal is reliable and the failure is fully persisted before we decide fault vs error.
await loggingSession.waitForPostExecution()

// A failure inside workflow execution (block error, provider 4xx, missing required field, etc.)
// is finalized by core and already recorded in the execution logs. That is a user/workflow error,
// not a trigger.dev job fault — complete the run normally so we don't fire a false alert. Errors
// that were not finalized came from the webhook pipeline itself, so we re-throw to fault below.
if (wasExecutionFinalizedByCore(error, executionId)) {
throw error
// Record the exception on the run span so it stays visible in traces without
// marking the span as ERROR — that status is what faults the trigger.dev run.
trace.getActiveSpan()?.recordException(toError(error))

return {
success: false,
workflowId: payload.workflowId,
executionId,
output: hasExecutionResult(error) ? error.executionResult.output : {},
executedAt: new Date().toISOString(),
provider: payload.provider,
}
}

try {
Expand Down
Loading