From 812af117dc3c00ea7afaf0a0cac07bf0479a3725 Mon Sep 17 00:00:00 2001 From: Isaac <91521821+isimisi@users.noreply.github.com> Date: Fri, 15 May 2026 10:41:30 +0200 Subject: [PATCH 1/3] perf(redis): use sorted set index for O(log N) schedule claiming --- src/drivers/redis_adapter.ts | 214 ++++++++++++++++++++++++----------- 1 file changed, 148 insertions(+), 66 deletions(-) diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 27fe11d..12e6ba8 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -16,6 +16,7 @@ import { resolveRetention } from '../utils.js' const redisKey = 'jobs' const schedulesKey = 'schedules' const schedulesIndexKey = 'schedules::index' +const schedulesDueKey = 'schedules::due' type RedisConfig = Redis | RedisOptions /** @@ -352,76 +353,98 @@ const GET_JOB_SCRIPT = ` ` /** - * Lua script for atomically claiming a due schedule. - * Iterates the schedule index server-side and claims the first due schedule. - * Returns the schedule data if claimed, nil otherwise. + * Lua script for atomically claiming a due schedule using a sorted set index. + * + * Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N) + * lookup instead of scanning all schedule hashes via SMEMBERS. + * + * Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on + * sight so subsequent calls skip them. + * + * KEYS[1] = schedules::due (the ZSET) + * KEYS[2] = schedule key prefix (e.g. "schedules::") + * ARGV[1] = now (epoch milliseconds) */ const CLAIM_SCHEDULE_SCRIPT = ` - local schedules_index_key = KEYS[1] - local schedule_key_prefix = KEYS[2] + local due_key = KEYS[1] + local prefix = KEYS[2] local now = tonumber(ARGV[1]) - local ids = redis.call('SMEMBERS', schedules_index_key) + while true do + local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1) - for i = 1, #ids do - local schedule_key = schedule_key_prefix .. ids[i] + if #candidates == 0 then + return nil + end + + local id = candidates[1] + local schedule_key = prefix .. id -- Get schedule data local data = redis.call('HGETALL', schedule_key) - if #data > 0 then + + -- Deleted schedule still in ZSET + if #data == 0 then + redis.call('ZREM', due_key, id) + else -- Convert HGETALL result to table local schedule = {} for j = 1, #data, 2 do schedule[data[j]] = data[j + 1] end - -- Check if schedule is due - if schedule.status == 'active' then - local next_run_at = tonumber(schedule.next_run_at) - - if next_run_at and next_run_at <= now then - local run_count = tonumber(schedule.run_count or '0') - local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil - local to_date = schedule.to_date and tonumber(schedule.to_date) or nil - - -- Check limits - if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then - -- This schedule is claimable - atomically update it - local new_run_count = run_count + 1 - - -- Calculate new next_run_at (simple interval-based for now) - -- Complex cron calculation happens in the caller - local new_next_run_at = '' - local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil - if every_ms then - new_next_run_at = tostring(now + every_ms) - end - - -- Check if we've hit the limit after this run - if run_limit and new_run_count >= run_limit then - new_next_run_at = '' - end - - -- Check if past end date - if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then - new_next_run_at = '' - end - - -- Update the schedule atomically - redis.call('HSET', schedule_key, - 'next_run_at', new_next_run_at, - 'last_run_at', tostring(now), - 'run_count', tostring(new_run_count)) - - -- Return the schedule data (before update) as JSON - return cjson.encode(schedule) + -- Check if schedule is active + if schedule.status ~= 'active' then + redis.call('ZREM', due_key, id) + else + local run_count = tonumber(schedule.run_count or '0') + local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil + local to_date = schedule.to_date and tonumber(schedule.to_date) or nil + + -- Check limits + if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then + redis.call('ZREM', due_key, id) + else + -- This schedule is claimable - atomically update it + local new_run_count = run_count + 1 + + -- Calculate new next_run_at (simple interval-based for now) + -- Complex cron calculation happens in the caller + local new_next_run_at = '' + local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil + if every_ms then + new_next_run_at = tostring(now + every_ms) + end + + -- Check if we've hit the limit after this run + if run_limit and new_run_count >= run_limit then + new_next_run_at = '' + end + + -- Check if past end date + if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then + new_next_run_at = '' end + + -- Update the schedule atomically + redis.call('HSET', schedule_key, + 'next_run_at', new_next_run_at, + 'last_run_at', tostring(now), + 'run_count', tostring(new_run_count)) + + -- Update or remove from ZSET + if new_next_run_at ~= '' then + redis.call('ZADD', due_key, tonumber(new_next_run_at), id) + else + redis.call('ZREM', due_key, id) + end + + -- Return the schedule data (before update) as JSON + return cjson.encode(schedule) end end end end - - return nil ` /** @@ -700,10 +723,11 @@ export class RedisAdapter implements Adapter { const id = config.id ?? randomUUID() const now = Date.now() const scheduleKey = `${schedulesKey}::${id}` - const [existingRunCount, existingCreatedAt] = await this.#connection.hmget( + const [existingRunCount, existingCreatedAt, existingNextRunAt] = await this.#connection.hmget( scheduleKey, 'run_count', - 'created_at' + 'created_at', + 'next_run_at' ) const scheduleData: Record = { @@ -722,13 +746,17 @@ export class RedisAdapter implements Adapter { if (config.to !== undefined) scheduleData.to_date = config.to.getTime().toString() if (config.limit !== undefined) scheduleData.run_limit = config.limit.toString() - // Upsert schedule and clear stale optional fields from previous config. - await this.#connection + const multi = this.#connection .multi() .hdel(scheduleKey, 'cron_expression', 'every_ms', 'from_date', 'to_date', 'run_limit') .hset(scheduleKey, scheduleData) .sadd(schedulesIndexKey, id) - .exec() + + if (existingNextRunAt) { + multi.zadd(schedulesDueKey, Number.parseInt(existingNextRunAt, 10), id) + } + + await multi.exec() return id } @@ -804,14 +832,34 @@ export class RedisAdapter implements Adapter { } if (updates.runCount !== undefined) data.run_count = updates.runCount.toString() - if (Object.keys(data).length > 0) { - await this.#connection.hset(scheduleKey, data) + if (Object.keys(data).length === 0) return + + const multi = this.#connection.multi().hset(scheduleKey, data) + + if (updates.nextRunAt) { + multi.zadd(schedulesDueKey, updates.nextRunAt.getTime(), id) + } else if (updates.nextRunAt === null || updates.status === 'paused') { + multi.zrem(schedulesDueKey, id) + } + + if (updates.status === 'active' && updates.nextRunAt === undefined) { + const existing = await this.#connection.hget(scheduleKey, 'next_run_at') + if (existing) { + multi.zadd(schedulesDueKey, Number.parseInt(existing, 10), id) + } } + + await multi.exec() } async deleteSchedule(id: string): Promise { const scheduleKey = `${schedulesKey}::${id}` - await this.#connection.multi().del(scheduleKey).srem(schedulesIndexKey, id).exec() + await this.#connection + .multi() + .del(scheduleKey) + .srem(schedulesIndexKey, id) + .zrem(schedulesDueKey, id) + .exec() } async claimDueSchedule(): Promise { @@ -819,7 +867,7 @@ export class RedisAdapter implements Adapter { const result = await this.#connection.eval( CLAIM_SCHEDULE_SCRIPT, 2, - schedulesIndexKey, + schedulesDueKey, `${schedulesKey}::`, now.toString() ) @@ -841,7 +889,6 @@ export class RedisAdapter implements Adapter { }) const nextRun = cron.next().toDate().getTime() - // Check limits before updating const runCount = Number.parseInt(data.run_count || '0', 10) + 1 const runLimit = data.run_limit ? Number.parseInt(data.run_limit, 10) : null const toDate = data.to_date ? Number.parseInt(data.to_date, 10) : null @@ -854,16 +901,51 @@ export class RedisAdapter implements Adapter { newNextRunAt = '' } - await this.#connection.hset( - `${schedulesKey}::${data.id}`, - 'next_run_at', - newNextRunAt.toString() - ) + const scheduleKey = `${schedulesKey}::${data.id}` + const multi = this.#connection + .multi() + .hset(scheduleKey, 'next_run_at', newNextRunAt.toString()) + + if (typeof newNextRunAt === 'number') { + multi.zadd(schedulesDueKey, newNextRunAt, data.id) + } else { + multi.zrem(schedulesDueKey, data.id) + } + + await multi.exec() } return this.#hashToScheduleData(data) } + async backfillDueIndex(): Promise { + const ids = await this.#connection.smembers(schedulesIndexKey) + if (ids.length === 0) return 0 + + const pipeline = this.#connection.pipeline() + for (const id of ids) { + pipeline.hmget(`${schedulesKey}::${id}`, 'next_run_at', 'status') + } + const results = await pipeline.exec() + if (!results) return 0 + + const addPipeline = this.#connection.pipeline() + let count = 0 + + for (let i = 0; i < ids.length; i++) { + const [err, values] = results[i] + if (err || !values) continue + const [nextRunAt, status] = values as [string | null, string | null] + if (nextRunAt && status === 'active') { + addPipeline.zadd(schedulesDueKey, Number.parseInt(nextRunAt, 10), ids[i]) + count++ + } + } + + if (count > 0) await addPipeline.exec() + return count + } + #hashToScheduleData(data: Record): ScheduleData { return { id: data.id, From dd967dba82cf88a689295a6e1567da958e62fd16 Mon Sep 17 00:00:00 2001 From: Isaac <91521821+isimisi@users.noreply.github.com> Date: Tue, 2 Jun 2026 13:53:43 +0200 Subject: [PATCH 2/3] fix(redis): auto-backfill due index on first schedule claim Existing users upgrading will have schedules in the legacy format (hashes + SET) but not in the new ZSET. Run backfillDueIndex() once per worker process on the first claimDueSchedule() call so schedules keep firing without manual intervention. --- src/drivers/redis_adapter.ts | 9 +++++++++ tests/adapter.spec.ts | 3 +++ 2 files changed, 12 insertions(+) diff --git a/src/drivers/redis_adapter.ts b/src/drivers/redis_adapter.ts index 12e6ba8..509ce5c 100644 --- a/src/drivers/redis_adapter.ts +++ b/src/drivers/redis_adapter.ts @@ -480,6 +480,7 @@ export class RedisAdapter implements Adapter { readonly #connection: Redis readonly #ownsConnection: boolean #workerId: string = '' + #dueIndexReady = false constructor(connection: Redis, ownsConnection: boolean = false) { this.#connection = connection @@ -862,7 +863,15 @@ export class RedisAdapter implements Adapter { .exec() } + async #ensureDueIndex(): Promise { + if (this.#dueIndexReady) return + await this.backfillDueIndex() + this.#dueIndexReady = true + } + async claimDueSchedule(): Promise { + await this.#ensureDueIndex() + const now = Date.now() const result = await this.#connection.eval( CLAIM_SCHEDULE_SCRIPT, diff --git a/tests/adapter.spec.ts b/tests/adapter.spec.ts index a276625..869dc75 100644 --- a/tests/adapter.spec.ts +++ b/tests/adapter.spec.ts @@ -99,6 +99,9 @@ test.group('Adapter | Redis', (group) => { await adapter.updateSchedule(id, { nextRunAt: futureRunAt }) } + // Warm the due-index backfill so it doesn't count against the spy + await adapter.claimDueSchedule() + const { result: claimed, writes } = await withRedisWriteSpy({ connection, run: () => adapter.claimDueSchedule(), From f3c85cd18b8e37575da976cba3d6d2f484454cdf Mon Sep 17 00:00:00 2001 From: Isaac <91521821+isimisi@users.noreply.github.com> Date: Wed, 3 Jun 2026 12:29:12 +0200 Subject: [PATCH 3/3] perf(redis): update claim schedule script to use sorted set index --- src/drivers/redis_scripts.ts | 140 ++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 59 deletions(-) diff --git a/src/drivers/redis_scripts.ts b/src/drivers/redis_scripts.ts index e0ff6dd..2f8cedf 100644 --- a/src/drivers/redis_scripts.ts +++ b/src/drivers/redis_scripts.ts @@ -1,4 +1,4 @@ -import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js' +import { REDIS_DEDUP_LUA, REDIS_JOB_STORAGE_LUA } from './redis_job_storage.js'; /** * Lua script for pushing a job to the queue. @@ -18,7 +18,7 @@ ${REDIS_JOB_STORAGE_LUA} redis.call('ZADD', pending_key, score, job_id) return 1 -` +`; /** * Lua script for pushing a dedup job. @@ -80,7 +80,7 @@ ${REDIS_DEDUP_LUA} redis.call('PEXPIRE', dedup_key, ttl) end return {'added', job_id} -` +`; /** * Lua script for pushing a delayed job. @@ -100,7 +100,7 @@ ${REDIS_JOB_STORAGE_LUA} redis.call('ZADD', delayed_key, execute_at, job_id) return 1 -` +`; /** * Lua script for atomic job acquisition. @@ -158,7 +158,7 @@ ${REDIS_JOB_STORAGE_LUA} return encode_job_result(job_data, overlay_key, job_id, { acquiredAt = now }) -` +`; /** * Lua script for removing a job completely (no history). @@ -193,7 +193,7 @@ ${REDIS_JOB_STORAGE_LUA} delete_job_data(data_key, overlay_key, job_id) return 1 -` +`; /** * Lua script for finalizing a job in history. @@ -277,7 +277,7 @@ ${REDIS_JOB_STORAGE_LUA} end return 1 -` +`; /** * Lua script for retrying a job. @@ -330,7 +330,7 @@ ${REDIS_JOB_STORAGE_LUA} end return 1 -` +`; /** * Lua script for recovering stalled jobs. @@ -399,7 +399,7 @@ ${REDIS_JOB_STORAGE_LUA} end return recovered -` +`; /** * Lua script for getting a job record with its status. @@ -458,77 +458,99 @@ ${REDIS_JOB_STORAGE_LUA} finishedAt = finished_at, error = error_msg }) -` +`; /** - * Lua script for atomically claiming a due schedule. - * Iterates the schedule index server-side and claims the first due schedule. - * Returns the schedule data if claimed, nil otherwise. + * Lua script for atomically claiming a due schedule using a sorted set index. + * + * Uses ZRANGEBYSCORE on schedules::due (scored by next_run_at) for O(log N) + * lookup instead of scanning all schedule hashes via SMEMBERS. + * + * Stale entries (paused, exhausted, deleted) are cleaned from the ZSET on + * sight so subsequent calls skip them. + * + * KEYS[1] = schedules::due (the ZSET) + * KEYS[2] = schedule key prefix (e.g. "schedules::") + * ARGV[1] = now (epoch milliseconds) */ export const CLAIM_SCHEDULE_SCRIPT = ` - local schedules_index_key = KEYS[1] - local schedule_key_prefix = KEYS[2] + local due_key = KEYS[1] + local prefix = KEYS[2] local now = tonumber(ARGV[1]) - local ids = redis.call('SMEMBERS', schedules_index_key) + while true do + local candidates = redis.call('ZRANGEBYSCORE', due_key, '-inf', tostring(now), 'LIMIT', 0, 1) + + if #candidates == 0 then + return nil + end - for i = 1, #ids do - local schedule_key = schedule_key_prefix .. ids[i] + local id = candidates[1] + local schedule_key = prefix .. id -- Get schedule data local data = redis.call('HGETALL', schedule_key) - if #data > 0 then + + -- Deleted schedule still in ZSET + if #data == 0 then + redis.call('ZREM', due_key, id) + else -- Convert HGETALL result to table local schedule = {} for j = 1, #data, 2 do schedule[data[j]] = data[j + 1] end - -- Check if schedule is due - if schedule.status == 'active' then - local next_run_at = tonumber(schedule.next_run_at) - - if next_run_at and next_run_at <= now then - local run_count = tonumber(schedule.run_count or '0') - local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil - local to_date = schedule.to_date and tonumber(schedule.to_date) or nil - - -- Check limits - if not (run_limit and run_count >= run_limit) and not (to_date and now > to_date) then - -- This schedule is claimable - atomically update it - local new_run_count = run_count + 1 - - -- Calculate new next_run_at (simple interval-based for now) - -- Complex cron calculation happens in the caller - local new_next_run_at = '' - local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil - if every_ms then - new_next_run_at = tostring(now + every_ms) - end - - -- Check if we've hit the limit after this run - if run_limit and new_run_count >= run_limit then - new_next_run_at = '' - end + -- Check if schedule is active + if schedule.status ~= 'active' then + redis.call('ZREM', due_key, id) + else + local run_count = tonumber(schedule.run_count or '0') + local run_limit = schedule.run_limit and tonumber(schedule.run_limit) or nil + local to_date = schedule.to_date and tonumber(schedule.to_date) or nil + + -- Check limits + if (run_limit and run_count >= run_limit) or (to_date and now > to_date) then + redis.call('ZREM', due_key, id) + else + -- This schedule is claimable - atomically update it + local new_run_count = run_count + 1 + + -- Calculate new next_run_at (simple interval-based for now) + -- Complex cron calculation happens in the caller + local new_next_run_at = '' + local every_ms = schedule.every_ms and tonumber(schedule.every_ms) or nil + if every_ms then + new_next_run_at = tostring(now + every_ms) + end - -- Check if past end date - if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then - new_next_run_at = '' - end + -- Check if we've hit the limit after this run + if run_limit and new_run_count >= run_limit then + new_next_run_at = '' + end - -- Update the schedule atomically - redis.call('HSET', schedule_key, - 'next_run_at', new_next_run_at, - 'last_run_at', tostring(now), - 'run_count', tostring(new_run_count)) + -- Check if past end date + if to_date and new_next_run_at ~= '' and tonumber(new_next_run_at) > to_date then + new_next_run_at = '' + end - -- Return the schedule data (before update) as JSON - return cjson.encode(schedule) + -- Update the schedule atomically + redis.call('HSET', schedule_key, + 'next_run_at', new_next_run_at, + 'last_run_at', tostring(now), + 'run_count', tostring(new_run_count)) + + -- Update or remove from ZSET + if new_next_run_at ~= '' then + redis.call('ZADD', due_key, tonumber(new_next_run_at), id) + else + redis.call('ZREM', due_key, id) end + + -- Return the schedule data (before update) as JSON + return cjson.encode(schedule) end end end end - - return nil -` +`;