-- Absurd installs a Postgres-native durable workflow system that can be dropped -- into an existing database. -- -- It bootstraps the `absurd` schema and required extensions so that jobs, runs, -- checkpoints, and workflow events all live alongside application data without -- external services. -- -- Each queue is materialized as its own set of tables that share a prefix: -- * `t_` for tasks (what is to be run) -- * `r_` for runs (attempts to run a task) -- * `c_` for checkpoints (saved states) -- * `e_` for emitted events -- * `w_` for wait registrations -- * `i_` for idempotency keys (partitioned queues only) -- -- `create_queue`, `drop_queue`, and `list_queues` provide the management -- surface for provisioning queues safely. -- -- Task execution flows through `spawn_task`, which records the logical task and -- its first run, and `claim_task`, which hands work to workers with leasing -- semantics, state transitions, and cancellation checks. Runtime routines -- such as `complete_run`, `schedule_run`, and `fail_run` advance or retry work, -- enforce attempt accounting, and keep the task and run tables synchronized. -- -- Long-running or event-driven workflows rely on lightweight persistence -- primitives. Checkpoint helpers (`set_task_checkpoint_state`, -- `get_task_checkpoint_state`, `get_task_checkpoint_states`) write arbitrary -- JSON payloads keyed by task and step, while `await_event` and `emit_event` -- coordinate sleepers and external signals so that tasks can suspend and resume -- without losing context. Events are uniquely indexed and use first-write-wins -- semantics: the first emission per name is cached, later emits are ignored. create extension if not exists "uuid-ossp"; create schema if not exists absurd; -- Returns either the actual current timestamp or a fake one if -- the session sets `absurd.fake_now`. This lets tests control time. create function absurd.current_time () returns timestamptz language plpgsql volatile as $$ declare v_fake text; begin v_fake := current_setting('absurd.fake_now', true); if v_fake is not null and length(trim(v_fake)) > 0 then return v_fake::timestamptz; end if; return clock_timestamp(); end; $$; create table if not exists absurd.queues ( queue_name text primary key, created_at timestamptz not null default absurd.current_time(), storage_mode text not null default 'unpartitioned' check (storage_mode in ('unpartitioned', 'partitioned')), default_partition text not null default 'enabled' check (default_partition in ('enabled', 'disabled')), partition_lookahead interval not null default interval '28 days' check (partition_lookahead >= interval '0 seconds'), partition_lookback interval not null default interval '1 day' check (partition_lookback >= interval '0 seconds'), cleanup_ttl interval not null default interval '30 days' check (cleanup_ttl >= interval '0 seconds'), cleanup_limit integer not null default 1000 check (cleanup_limit >= 1), detach_mode text not null default 'none' check (detach_mode in ('none', 'empty')), detach_min_age interval not null default interval '30 days' check (detach_min_age >= interval '0 seconds') ); -- Returns the Absurd schema release version baked into this SQL file. -- During development this is usually "main" and release automation replaces -- it with the actual tag version. create or replace function absurd.get_schema_version () returns text language sql as $$ select 'main'::text; $$; -- Queue names are used in generated table/index identifiers. -- We intentionally cap UTF-8 byte length so generated explicit index names -- (for instance r__sai) stay within PostgreSQL's 63-byte identifier -- limit. Character set is otherwise delegated to PostgreSQL quoted-ident rules. create function absurd.validate_queue_name (p_queue_name text) returns text language plpgsql as $$ begin if p_queue_name is null or p_queue_name = '' then raise exception 'Queue name must be provided'; end if; if octet_length(p_queue_name) > 57 then raise exception 'Queue name "%" is too long (max 57 bytes).', p_queue_name; end if; return p_queue_name; end; $$; create function absurd.ensure_queue_tables (p_queue_name text) returns void language plpgsql as $$ declare v_storage_mode text := 'unpartitioned'; v_t_suffix text; v_r_suffix text; v_c_suffix text; v_w_suffix text; v_t_idempotency_def text; begin perform absurd.validate_queue_name(p_queue_name); select storage_mode into v_storage_mode from absurd.queues where queue_name = p_queue_name; v_storage_mode := coalesce(v_storage_mode, 'unpartitioned'); if v_storage_mode not in ('unpartitioned', 'partitioned') then raise exception 'Unsupported queue storage mode "%"', v_storage_mode; end if; if v_storage_mode = 'partitioned' then v_t_suffix := 'partition by range (task_id)'; v_r_suffix := 'partition by range (run_id)'; v_c_suffix := 'partition by range (task_id)'; v_w_suffix := 'partition by range (run_id)'; v_t_idempotency_def := 'idempotency_key text'; else v_t_suffix := 'with (fillfactor=70)'; v_r_suffix := 'with (fillfactor=70)'; v_c_suffix := 'with (fillfactor=70)'; v_w_suffix := ''; v_t_idempotency_def := 'idempotency_key text unique'; end if; execute format( 'create table if not exists absurd.%I ( task_id uuid primary key, task_name text not null, params jsonb not null, headers jsonb, retry_strategy jsonb, max_attempts integer, cancellation jsonb, enqueue_at timestamptz not null default absurd.current_time(), first_started_at timestamptz, state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')), attempts integer not null default 0, last_attempt_run uuid, completed_payload jsonb, cancelled_at timestamptz, %s ) %s', 't_' || p_queue_name, v_t_idempotency_def, v_t_suffix ); execute format( 'create table if not exists absurd.%I ( run_id uuid primary key, task_id uuid not null, attempt integer not null, state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')), claimed_by text, claim_expires_at timestamptz, available_at timestamptz not null, wake_event text, event_payload jsonb, started_at timestamptz, completed_at timestamptz, failed_at timestamptz, result jsonb, failure_reason jsonb, created_at timestamptz not null default absurd.current_time() ) %s', 'r_' || p_queue_name, v_r_suffix ); execute format( 'create table if not exists absurd.%I ( task_id uuid not null, checkpoint_name text not null, state jsonb, status text not null default ''committed'', owner_run_id uuid, updated_at timestamptz not null default absurd.current_time(), primary key (task_id, checkpoint_name) ) %s', 'c_' || p_queue_name, v_c_suffix ); execute format( 'create table if not exists absurd.%I ( event_name text primary key, payload jsonb, emitted_at timestamptz not null default absurd.current_time() )', 'e_' || p_queue_name ); execute format( 'create table if not exists absurd.%I ( task_id uuid not null, run_id uuid not null, step_name text not null, event_name text not null, timeout_at timestamptz, created_at timestamptz not null default absurd.current_time(), primary key (run_id, step_name) ) %s', 'w_' || p_queue_name, v_w_suffix ); if v_storage_mode = 'partitioned' then execute format( 'create table if not exists absurd.%I ( idempotency_key text primary key, task_id uuid not null )', 'i_' || p_queue_name ); end if; execute format( 'create index if not exists %I on absurd.%I (state, available_at)', ('r_' || p_queue_name) || '_sai', 'r_' || p_queue_name ); execute format( 'create index if not exists %I on absurd.%I (task_id)', ('r_' || p_queue_name) || '_ti', 'r_' || p_queue_name ); execute format( 'create index if not exists %I on absurd.%I (claim_expires_at) where state = ''running'' and claim_expires_at is not null', ('r_' || p_queue_name) || '_cei', 'r_' || p_queue_name ); execute format( 'create index if not exists %I on absurd.%I (event_name)', ('w_' || p_queue_name) || '_eni', 'w_' || p_queue_name ); execute format( 'create index if not exists %I on absurd.%I (task_id)', ('w_' || p_queue_name) || '_ti', 'w_' || p_queue_name ); execute format( 'create index if not exists %I on absurd.%I (emitted_at)', ('e_' || p_queue_name) || '_eai', 'e_' || p_queue_name ); if v_storage_mode = 'partitioned' then execute format( 'create index if not exists %I on absurd.%I (task_id)', ('i_' || p_queue_name) || '_ti', 'i_' || p_queue_name ); perform absurd.ensure_partitions(p_queue_name); end if; end; $$; -- Creates the queue with the given name and storage mode. -- -- Existing queues are idempotent as long as the requested mode matches. create function absurd.create_queue ( p_queue_name text, p_storage_mode text ) returns void language plpgsql as $$ declare v_storage_mode text; v_existing_mode text; begin p_queue_name := absurd.validate_queue_name(p_queue_name); v_storage_mode := lower(trim(coalesce(p_storage_mode, ''))); if v_storage_mode not in ('unpartitioned', 'partitioned') then raise exception 'Unsupported queue storage mode "%"', p_storage_mode; end if; insert into absurd.queues (queue_name, storage_mode) values (p_queue_name, v_storage_mode) on conflict (queue_name) do nothing; select storage_mode into v_existing_mode from absurd.queues where queue_name = p_queue_name; if v_existing_mode is null then raise exception 'Queue "%" was not found after create attempt', p_queue_name; end if; if v_existing_mode <> v_storage_mode then raise exception 'Queue "%" already exists with storage mode "%"', p_queue_name, v_existing_mode; end if; perform absurd.ensure_queue_tables(p_queue_name); end; $$; -- Creates an unpartitioned queue (backward-compatible API). create or replace function absurd.create_queue (p_queue_name text) returns void language plpgsql as $$ begin perform absurd.create_queue(p_queue_name, 'unpartitioned'); end; $$; -- Drop a queue if it exists. -- We intentionally don't validate the provided name here so legacy queues -- created under older naming rules can still be removed. create function absurd.drop_queue (p_queue_name text) returns void language plpgsql as $$ declare v_existing_queue text; begin select queue_name into v_existing_queue from absurd.queues where queue_name = p_queue_name; if v_existing_queue is null then return; end if; -- Remove queue-scoped maintenance jobs only when pg_cron is available. if to_regclass('cron.job') is not null and exists ( select 1 from pg_proc p join pg_namespace n on n.oid = p.pronamespace where n.nspname = 'cron' and p.proname = 'unschedule' ) then perform absurd.disable_cron(p_queue_name); end if; execute format('drop table if exists absurd.%I cascade', 'i_' || p_queue_name); execute format('drop table if exists absurd.%I cascade', 'w_' || p_queue_name); execute format('drop table if exists absurd.%I cascade', 'e_' || p_queue_name); execute format('drop table if exists absurd.%I cascade', 'c_' || p_queue_name); execute format('drop table if exists absurd.%I cascade', 'r_' || p_queue_name); execute format('drop table if exists absurd.%I cascade', 't_' || p_queue_name); delete from absurd.queues where queue_name = p_queue_name; end; $$; -- Lists all queues that currently exist. create function absurd.list_queues () returns table (queue_name text) language sql as $$ select queue_name from absurd.queues order by queue_name; $$; -- Returns queue maintenance policy metadata. create function absurd.get_queue_policy ( p_queue_name text ) returns table ( queue_name text, storage_mode text, default_partition text, partition_lookahead interval, partition_lookback interval, cleanup_ttl interval, cleanup_limit integer, detach_mode text, detach_min_age interval ) language sql as $$ select q.queue_name, q.storage_mode, q.default_partition, q.partition_lookahead, q.partition_lookback, q.cleanup_ttl, q.cleanup_limit, q.detach_mode, q.detach_min_age from absurd.queues q where q.queue_name = p_queue_name; $$; -- Updates queue maintenance policy metadata. -- -- p_policy accepts optional keys: -- * partition_lookahead (interval text) -- * partition_lookback (interval text) -- * cleanup_ttl (interval text, >= 0) -- * cleanup_limit (integer >= 1) -- * detach_mode ('none' | 'empty') -- * detach_min_age (interval text) -- * default_partition ('enabled' | 'disabled') create function absurd.set_queue_policy ( p_queue_name text, p_policy jsonb ) returns void language plpgsql as $$ declare v_policy jsonb := coalesce(p_policy, '{}'::jsonb); v_unknown_key text; v_exists boolean := false; v_storage_mode text; v_default_partition text; v_previous_default_partition text; v_parent_prefix text; v_parent_table text; v_default_table text; v_default_attached boolean; v_default_has_rows boolean; v_partition_lookahead interval; v_partition_lookback interval; v_cleanup_ttl interval; v_cleanup_limit integer; v_detach_mode text; v_detach_min_age interval; begin p_queue_name := absurd.validate_queue_name(p_queue_name); if jsonb_typeof(v_policy) <> 'object' then raise exception 'Queue policy must be a JSON object'; end if; select k.key into v_unknown_key from jsonb_object_keys(v_policy) as k(key) where k.key not in ( 'partition_lookahead', 'partition_lookback', 'cleanup_ttl', 'cleanup_limit', 'detach_mode', 'detach_min_age', 'default_partition' ) limit 1; if v_unknown_key is not null then raise exception 'Unsupported queue policy key "%"', v_unknown_key; end if; select exists ( select 1 from absurd.queues where queue_name = p_queue_name ) into v_exists; if not v_exists then raise exception 'Queue "%" does not exist', p_queue_name; end if; select storage_mode, default_partition, partition_lookahead, partition_lookback, cleanup_ttl, cleanup_limit, detach_mode, detach_min_age into v_storage_mode, v_default_partition, v_partition_lookahead, v_partition_lookback, v_cleanup_ttl, v_cleanup_limit, v_detach_mode, v_detach_min_age from absurd.queues where queue_name = p_queue_name for update; if v_policy ? 'partition_lookahead' then v_partition_lookahead := (v_policy->>'partition_lookahead')::interval; end if; if v_policy ? 'partition_lookback' then v_partition_lookback := (v_policy->>'partition_lookback')::interval; end if; if v_policy ? 'cleanup_ttl' then v_cleanup_ttl := (v_policy->>'cleanup_ttl')::interval; end if; if v_policy ? 'cleanup_limit' then v_cleanup_limit := (v_policy->>'cleanup_limit')::integer; end if; if v_policy ? 'detach_mode' then v_detach_mode := lower(trim(coalesce(v_policy->>'detach_mode', ''))); end if; if v_policy ? 'detach_min_age' then v_detach_min_age := (v_policy->>'detach_min_age')::interval; end if; v_previous_default_partition := v_default_partition; if v_policy ? 'default_partition' then v_default_partition := lower(trim(coalesce(v_policy->>'default_partition', ''))); end if; if v_partition_lookahead < interval '0 seconds' then raise exception 'partition_lookahead must be non-negative'; end if; if v_partition_lookback < interval '0 seconds' then raise exception 'partition_lookback must be non-negative'; end if; if v_cleanup_ttl < interval '0 seconds' then raise exception 'cleanup_ttl must be non-negative'; end if; if v_cleanup_limit < 1 then raise exception 'cleanup_limit must be at least 1'; end if; if v_detach_mode not in ('none', 'empty') then raise exception 'Unsupported detach mode "%"', v_detach_mode; end if; if v_detach_min_age < interval '0 seconds' then raise exception 'detach_min_age must be non-negative'; end if; if v_default_partition not in ('enabled', 'disabled') then raise exception 'Unsupported default_partition mode "%"', v_default_partition; end if; if v_storage_mode <> 'partitioned' and v_policy ? 'default_partition' then raise exception 'default_partition policy is only supported for partitioned queues'; end if; update absurd.queues set default_partition = v_default_partition, partition_lookahead = v_partition_lookahead, partition_lookback = v_partition_lookback, cleanup_ttl = v_cleanup_ttl, cleanup_limit = v_cleanup_limit, detach_mode = v_detach_mode, detach_min_age = v_detach_min_age where queue_name = p_queue_name; if v_storage_mode = 'partitioned' and v_previous_default_partition <> v_default_partition then if v_default_partition = 'enabled' then perform absurd.ensure_partitions(p_queue_name); else foreach v_parent_prefix in array array['t', 'r', 'c', 'w'] loop v_parent_table := v_parent_prefix || '_' || p_queue_name; v_default_table := v_parent_table || '_d'; select exists ( select 1 from pg_inherits inh join pg_class parent on parent.oid = inh.inhparent join pg_class child on child.oid = inh.inhrelid join pg_namespace n on n.oid = parent.relnamespace where n.nspname = 'absurd' and parent.relname = v_parent_table and child.relname = v_default_table ) into v_default_attached; if not coalesce(v_default_attached, false) then continue; end if; -- Block out-of-window writes into the default partition while we -- validate emptiness and detach/drop it. execute format( 'lock table absurd.%I in access exclusive mode', v_default_table ); execute format( 'select exists (select 1 from absurd.%I limit 1)', v_default_table ) into v_default_has_rows; if coalesce(v_default_has_rows, false) then raise exception 'Cannot disable default_partition for queue "%": default partition "%" is not empty', p_queue_name, v_default_table; end if; execute format( 'alter table absurd.%I detach partition absurd.%I', v_parent_table, v_default_table ); execute format('drop table if exists absurd.%I', v_default_table); end loop; end if; end if; end; $$; -- Returns the current state and terminal payload (if any) for a task. -- -- Non-terminal states (pending/running/sleeping) return result/failure_reason -- as NULL. Completed tasks expose completed_payload as result. Failed tasks -- expose the last run failure_reason. create function absurd.get_task_result ( p_queue_name text, p_task_id uuid ) returns table ( task_id uuid, state text, result jsonb, failure_reason jsonb ) language plpgsql as $$ begin p_queue_name := absurd.validate_queue_name(p_queue_name); return query execute format( 'select t.task_id, t.state, case when t.state = ''completed'' then t.completed_payload else null end as result, case when t.state = ''failed'' then r.failure_reason else null end as failure_reason from absurd.%I t left join absurd.%I r on r.run_id = t.last_attempt_run where t.task_id = $1', 't_' || p_queue_name, 'r_' || p_queue_name ) using p_task_id; end; $$; -- Spawns a given task in a queue. -- -- If an idempotency_key is provided in p_options, the function will check if a task -- with that key already exists. If so, it returns the existing task_id with run_id -- and attempt set to NULL to signal "already exists". This is race-safe via -- INSERT ... ON CONFLICT DO NOTHING. create function absurd.spawn_task ( p_queue_name text, p_task_name text, p_params jsonb, p_options jsonb default '{}'::jsonb ) returns table ( task_id uuid, run_id uuid, attempt integer, created boolean ) language plpgsql as $$ declare v_task_id uuid := absurd.portable_uuidv7(); v_run_id uuid := absurd.portable_uuidv7(); v_attempt integer := 1; v_headers jsonb; v_retry_strategy jsonb; v_max_attempts integer; v_cancellation jsonb; v_idempotency_key text; v_existing_task_id uuid; v_existing_run_id uuid; v_existing_attempt integer; v_row_count integer; v_storage_mode text := 'unpartitioned'; v_task_inserted boolean := false; v_now timestamptz := absurd.current_time(); v_params jsonb := coalesce(p_params, 'null'::jsonb); begin if p_task_name is null or length(trim(p_task_name)) = 0 then raise exception 'task_name must be provided'; end if; if p_options is not null then v_headers := p_options->'headers'; v_retry_strategy := p_options->'retry_strategy'; if p_options ? 'max_attempts' then v_max_attempts := (p_options->>'max_attempts')::int; if v_max_attempts is not null and v_max_attempts < 1 then raise exception 'max_attempts must be >= 1'; end if; end if; v_cancellation := p_options->'cancellation'; v_idempotency_key := p_options->>'idempotency_key'; end if; if v_idempotency_key is not null then select storage_mode into v_storage_mode from absurd.queues where queue_name = p_queue_name; v_storage_mode := coalesce(v_storage_mode, 'unpartitioned'); if v_storage_mode not in ('unpartitioned', 'partitioned') then raise exception 'Unsupported queue storage mode "%"', v_storage_mode; end if; if v_storage_mode = 'partitioned' then -- Reserve idempotency key via dedicated side table. execute format( 'insert into absurd.%I (idempotency_key, task_id) values ($1, $2) on conflict (idempotency_key) do nothing', 'i_' || p_queue_name ) using v_idempotency_key, v_task_id; get diagnostics v_row_count = row_count; if v_row_count = 0 then execute format( 'select i.task_id, t.last_attempt_run, t.attempts from absurd.%I i join absurd.%I t on t.task_id = i.task_id where i.idempotency_key = $1 for key share of i', 'i_' || p_queue_name, 't_' || p_queue_name ) into v_existing_task_id, v_existing_run_id, v_existing_attempt using v_idempotency_key; if v_existing_task_id is null then raise exception 'Idempotency key "%" in queue "%" was concurrently cleaned up', v_idempotency_key, p_queue_name using errcode = '40001', hint = 'Retry spawn_task with the same idempotency key.'; end if; if v_existing_run_id is null then raise exception 'Idempotency key "%" in queue "%" resolved to task "%" without a run', v_idempotency_key, p_queue_name, v_existing_task_id; end if; return query select v_existing_task_id, v_existing_run_id, v_existing_attempt, false; return; end if; else -- Unpartitioned queues keep the original unique(idempotency_key) -- behavior directly on t_. execute format( 'insert into absurd.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at, idempotency_key) values ($1, $2, $3, $4, $5, $6, $7, $8, null, ''pending'', $9, $10, null, null, $11) on conflict (idempotency_key) do nothing', 't_' || p_queue_name ) using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_now, v_attempt, v_run_id, v_idempotency_key; get diagnostics v_row_count = row_count; if v_row_count = 0 then execute format( 'select task_id, last_attempt_run, attempts from absurd.%I where idempotency_key = $1', 't_' || p_queue_name ) into v_existing_task_id, v_existing_run_id, v_existing_attempt using v_idempotency_key; return query select v_existing_task_id, v_existing_run_id, v_existing_attempt, false; return; end if; v_task_inserted := true; end if; end if; if not v_task_inserted then execute format( 'insert into absurd.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at, idempotency_key) values ($1, $2, $3, $4, $5, $6, $7, $8, null, ''pending'', $9, $10, null, null, $11)', 't_' || p_queue_name ) using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_now, v_attempt, v_run_id, v_idempotency_key; end if; execute format( 'insert into absurd.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason) values ($1, $2, $3, ''pending'', $4, null, null, null, null)', 'r_' || p_queue_name ) using v_run_id, v_task_id, v_attempt, v_now; return query select v_task_id, v_run_id, v_attempt, true; end; $$; -- Workers call this to reserve a task from a given queue -- for a given reservation period in seconds. create function absurd.claim_task ( p_queue_name text, p_worker_id text, p_claim_timeout integer default 30, p_qty integer default 1 ) returns table ( run_id uuid, task_id uuid, attempt integer, task_name text, params jsonb, retry_strategy jsonb, max_attempts integer, headers jsonb, wake_event text, event_payload jsonb ) language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_claim_timeout integer := greatest(coalesce(p_claim_timeout, 30), 0); v_worker_id text := coalesce(nullif(p_worker_id, ''), 'worker'); v_qty integer := greatest(coalesce(p_qty, 1), 1); v_claim_until timestamptz := null; v_sql text; v_expired_run record; v_cancel_candidate record; v_expired_sweep_limit integer; begin if v_claim_timeout > 0 then v_claim_until := v_now + make_interval(secs => v_claim_timeout); end if; -- Keep claim polling work bounded: process at most v_qty expired leases -- per claim call. v_expired_sweep_limit := greatest(v_qty, 1); -- Apply cancellation rules before claiming. -- -- Use cancel_task() so lock order stays consistent (runs first, task second) -- with complete_run()/fail_run(). for v_cancel_candidate in execute format( 'select task_id from absurd.%I where state in (''pending'', ''sleeping'', ''running'') and ( ( (cancellation->>''max_delay'')::bigint is not null and first_started_at is null and extract(epoch from ($1 - enqueue_at)) >= (cancellation->>''max_delay'')::bigint ) or ( (cancellation->>''max_duration'')::bigint is not null and first_started_at is not null and extract(epoch from ($1 - first_started_at)) >= (cancellation->>''max_duration'')::bigint ) ) order by task_id', 't_' || p_queue_name ) using v_now loop perform absurd.cancel_task(p_queue_name, v_cancel_candidate.task_id); end loop; for v_expired_run in execute format( 'select run_id, claimed_by, claim_expires_at, attempt from absurd.%I where state = ''running'' and claim_expires_at is not null and claim_expires_at <= $1 order by claim_expires_at, run_id limit $2 for update skip locked', 'r_' || p_queue_name ) using v_now, v_expired_sweep_limit loop perform absurd.fail_run( p_queue_name, v_expired_run.run_id, jsonb_strip_nulls(jsonb_build_object( 'name', '$ClaimTimeout', 'message', 'worker did not finish task within claim interval', 'workerId', v_expired_run.claimed_by, 'claimExpiredAt', v_expired_run.claim_expires_at, 'attempt', v_expired_run.attempt )), null ); end loop; v_sql := format( 'with candidate as ( select r.run_id from absurd.%1$I r join absurd.%2$I t on t.task_id = r.task_id where r.state in (''pending'', ''sleeping'') and t.state in (''pending'', ''sleeping'', ''running'') and r.available_at <= $1 order by r.available_at, r.run_id limit $2 for update skip locked ), updated as ( update absurd.%1$I r set state = ''running'', claimed_by = $3, claim_expires_at = $4, started_at = $1, available_at = $1 where run_id in (select run_id from candidate) returning r.run_id, r.task_id, r.attempt ), task_upd as ( update absurd.%2$I t set state = ''running'', attempts = greatest(t.attempts, u.attempt), first_started_at = coalesce(t.first_started_at, $1), last_attempt_run = u.run_id from updated u where t.task_id = u.task_id returning t.task_id ), wait_cleanup as ( delete from absurd.%3$I w using updated u where w.run_id = u.run_id and w.timeout_at is not null and w.timeout_at <= $1 returning w.run_id ) select u.run_id, u.task_id, u.attempt, t.task_name, t.params, t.retry_strategy, t.max_attempts, t.headers, r.wake_event, r.event_payload from updated u join absurd.%1$I r on r.run_id = u.run_id join absurd.%2$I t on t.task_id = u.task_id order by r.available_at, u.run_id', 'r_' || p_queue_name, 't_' || p_queue_name, 'w_' || p_queue_name ); return query execute v_sql using v_now, v_qty, v_worker_id, v_claim_until; end; $$; -- Markes a run as completed create function absurd.complete_run ( p_queue_name text, p_run_id uuid, p_state jsonb default null ) returns void language plpgsql as $$ declare v_task_id uuid; v_state text; v_now timestamptz := absurd.current_time(); begin execute format( 'select task_id, state from absurd.%I where run_id = $1 for update', 'r_' || p_queue_name ) into v_task_id, v_state using p_run_id; if v_task_id is null then raise exception 'Run "%" not found in queue "%"', p_run_id, p_queue_name; end if; if v_state <> 'running' then if v_state = 'cancelled' then raise exception sqlstate 'AB001' using message = 'Task has been cancelled'; end if; if v_state = 'failed' then raise exception sqlstate 'AB002' using message = format('Run "%s" has already failed in queue "%s"', p_run_id, p_queue_name); end if; raise exception 'Run "%" is not currently running in queue "%"', p_run_id, p_queue_name; end if; execute format( 'update absurd.%I set state = ''completed'', completed_at = $2, result = $3 where run_id = $1', 'r_' || p_queue_name ) using p_run_id, v_now, p_state; execute format( 'update absurd.%I set state = ''completed'', completed_payload = $2, last_attempt_run = $3 where task_id = $1', 't_' || p_queue_name ) using v_task_id, p_state, p_run_id; execute format( 'delete from absurd.%I where run_id = $1', 'w_' || p_queue_name ) using p_run_id; end; $$; create function absurd.schedule_run ( p_queue_name text, p_run_id uuid, p_wake_at timestamptz ) returns void language plpgsql as $$ declare v_task_id uuid; begin execute format( 'select task_id from absurd.%I where run_id = $1 and state = ''running'' for update', 'r_' || p_queue_name ) into v_task_id using p_run_id; if v_task_id is null then raise exception 'Run "%" is not currently running in queue "%"', p_run_id, p_queue_name; end if; execute format( 'update absurd.%I set state = ''sleeping'', claimed_by = null, claim_expires_at = null, available_at = $2, wake_event = null where run_id = $1', 'r_' || p_queue_name ) using p_run_id, p_wake_at; execute format( 'update absurd.%I set state = ''sleeping'' where task_id = $1', 't_' || p_queue_name ) using v_task_id; end; $$; create function absurd.fail_run ( p_queue_name text, p_run_id uuid, p_reason jsonb, p_retry_at timestamptz default null ) returns void language plpgsql as $$ declare v_task_id uuid; v_attempt integer; v_run_state text; v_retry_strategy jsonb; v_max_attempts integer; v_now timestamptz := absurd.current_time(); v_next_attempt integer; v_delay_seconds double precision := 0; v_next_available timestamptz; v_retry_kind text; v_base double precision; v_factor double precision; v_max_seconds double precision; v_first_started timestamptz; v_cancellation jsonb; v_max_duration bigint; v_task_cancel boolean := false; v_new_run_id uuid; v_task_state_after text; v_recorded_attempt integer; v_last_attempt_run uuid := p_run_id; v_cancelled_at timestamptz := null; begin execute format( 'select r.task_id, r.attempt, r.state from absurd.%I r where r.run_id = $1 for update', 'r_' || p_queue_name ) into v_task_id, v_attempt, v_run_state using p_run_id; if v_task_id is null then raise exception 'Run "%" cannot be failed in queue "%"', p_run_id, p_queue_name; end if; if v_run_state = 'cancelled' then raise exception sqlstate 'AB001' using message = 'Task has been cancelled'; end if; if v_run_state = 'failed' then raise exception sqlstate 'AB002' using message = format('Run "%s" has already failed in queue "%s"', p_run_id, p_queue_name); end if; if v_run_state not in ('running', 'sleeping') then raise exception 'Run "%" cannot be failed in queue "%"', p_run_id, p_queue_name; end if; execute format( 'select retry_strategy, max_attempts, first_started_at, cancellation from absurd.%I where task_id = $1 for update', 't_' || p_queue_name ) into v_retry_strategy, v_max_attempts, v_first_started, v_cancellation using v_task_id; execute format( 'update absurd.%I set state = ''failed'', wake_event = null, failed_at = $2, failure_reason = $3 where run_id = $1', 'r_' || p_queue_name ) using p_run_id, v_now, p_reason; v_next_attempt := v_attempt + 1; v_task_state_after := 'failed'; v_recorded_attempt := v_attempt; if v_max_attempts is null or v_next_attempt <= v_max_attempts then if p_retry_at is not null then v_next_available := p_retry_at; else v_retry_kind := coalesce(v_retry_strategy->>'kind', 'none'); if v_retry_kind = 'fixed' then v_base := coalesce((v_retry_strategy->>'base_seconds')::double precision, 60); v_delay_seconds := v_base; elsif v_retry_kind = 'exponential' then v_base := coalesce((v_retry_strategy->>'base_seconds')::double precision, 30); v_factor := coalesce((v_retry_strategy->>'factor')::double precision, 2); v_delay_seconds := v_base * power(v_factor, greatest(v_attempt - 1, 0)); v_max_seconds := (v_retry_strategy->>'max_seconds')::double precision; if v_max_seconds is not null then v_delay_seconds := least(v_delay_seconds, v_max_seconds); end if; else v_delay_seconds := 0; end if; v_next_available := v_now + (v_delay_seconds * interval '1 second'); end if; if v_next_available < v_now then v_next_available := v_now; end if; if v_cancellation is not null then v_max_duration := (v_cancellation->>'max_duration')::bigint; if v_max_duration is not null and v_first_started is not null then if extract(epoch from (v_next_available - v_first_started)) >= v_max_duration then v_task_cancel := true; end if; end if; end if; if not v_task_cancel then v_task_state_after := case when v_next_available > v_now then 'sleeping' else 'pending' end; v_new_run_id := absurd.portable_uuidv7(); v_recorded_attempt := v_next_attempt; v_last_attempt_run := v_new_run_id; execute format( 'insert into absurd.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason) values ($1, $2, $3, $4, $5, null, null, null, null)', 'r_' || p_queue_name ) using v_new_run_id, v_task_id, v_next_attempt, v_task_state_after, v_next_available; end if; end if; if v_task_cancel then v_task_state_after := 'cancelled'; v_cancelled_at := v_now; v_recorded_attempt := greatest(v_recorded_attempt, v_attempt); v_last_attempt_run := p_run_id; end if; execute format( 'update absurd.%I set state = $2, attempts = greatest(attempts, $3), last_attempt_run = $4, cancelled_at = coalesce(cancelled_at, $5) where task_id = $1', 't_' || p_queue_name ) using v_task_id, v_task_state_after, v_recorded_attempt, v_last_attempt_run, v_cancelled_at; execute format( 'delete from absurd.%I where run_id = $1', 'w_' || p_queue_name ) using p_run_id; end; $$; -- Retries a failed task either by extending attempts on the same task or by -- spawning a brand new task from the original inputs. -- -- Options: -- - spawn_new (boolean, default false): create a new task instead of retrying in-place. -- - max_attempts (integer, optional): for in-place retry, defaults to -- coalesce(current max_attempts, current attempts) + 1 and must be greater -- than current attempts; for spawn_new it overrides copied max_attempts on -- the new task. create function absurd.retry_task ( p_queue_name text, p_task_id uuid, p_options jsonb default '{}'::jsonb ) returns table ( task_id uuid, run_id uuid, attempt integer, created boolean ) language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_spawn_new boolean := false; v_requested_max_attempts integer; v_task_name text; v_params jsonb; v_headers jsonb; v_retry_strategy jsonb; v_task_max_attempts integer; v_cancellation jsonb; v_task_attempts integer; v_task_state text; v_new_run_id uuid; v_new_attempt integer; v_spawn_options jsonb; begin if p_options is not null then if p_options ? 'spawn_new' then v_spawn_new := coalesce((p_options->>'spawn_new')::boolean, false); end if; if p_options ? 'max_attempts' then v_requested_max_attempts := (p_options->>'max_attempts')::int; if v_requested_max_attempts is not null and v_requested_max_attempts < 1 then raise exception 'max_attempts must be >= 1'; end if; end if; end if; execute format( 'select task_name, params, headers, retry_strategy, max_attempts, cancellation, attempts, state from absurd.%I where task_id = $1 for update', 't_' || p_queue_name ) into v_task_name, v_params, v_headers, v_retry_strategy, v_task_max_attempts, v_cancellation, v_task_attempts, v_task_state using p_task_id; if v_task_state is null then raise exception 'Task "%" not found in queue "%"', p_task_id, p_queue_name; end if; if v_task_state <> 'failed' then raise exception 'Task "%" is not currently failed in queue "%"', p_task_id, p_queue_name; end if; if v_spawn_new then v_spawn_options := jsonb_strip_nulls(jsonb_build_object( 'headers', v_headers, 'retry_strategy', v_retry_strategy, 'max_attempts', coalesce(v_requested_max_attempts, v_task_max_attempts), 'cancellation', v_cancellation )); return query select s.task_id, s.run_id, s.attempt, s.created from absurd.spawn_task(p_queue_name, v_task_name, v_params, v_spawn_options) s; return; end if; if v_requested_max_attempts is null then v_requested_max_attempts := coalesce(v_task_max_attempts, v_task_attempts) + 1; end if; if v_requested_max_attempts <= v_task_attempts then raise exception 'max_attempts (%) must be greater than current attempts (%)', v_requested_max_attempts, v_task_attempts; end if; v_new_run_id := absurd.portable_uuidv7(); v_new_attempt := v_task_attempts + 1; execute format( 'insert into absurd.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason) values ($1, $2, $3, ''pending'', $4, null, null, null, null)', 'r_' || p_queue_name ) using v_new_run_id, p_task_id, v_new_attempt, v_now; execute format( 'update absurd.%I set state = ''pending'', attempts = greatest(attempts, $2), max_attempts = $3, last_attempt_run = $4, cancelled_at = null where task_id = $1', 't_' || p_queue_name ) using p_task_id, v_new_attempt, v_requested_max_attempts, v_new_run_id; return query select p_task_id, v_new_run_id, v_new_attempt, false; end; $$; create function absurd.set_task_checkpoint_state ( p_queue_name text, p_task_id uuid, p_step_name text, p_state jsonb, p_owner_run uuid, p_extend_claim_by integer default null ) returns void language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_new_attempt integer; v_existing_attempt integer; v_existing_owner uuid; v_task_state text; v_run_state text; begin if p_step_name is null or length(trim(p_step_name)) = 0 then raise exception 'step_name must be provided'; end if; execute format( 'select r.attempt, r.state, t.state from absurd.%I r join absurd.%I t on t.task_id = r.task_id where r.run_id = $1', 'r_' || p_queue_name, 't_' || p_queue_name ) into v_new_attempt, v_run_state, v_task_state using p_owner_run; if v_new_attempt is null then raise exception 'Run "%" not found for checkpoint', p_owner_run; end if; if v_task_state = 'cancelled' then raise exception sqlstate 'AB001' using message = 'Task has been cancelled'; end if; if v_run_state = 'failed' then raise exception sqlstate 'AB002' using message = format('Run "%s" has already failed in queue "%s"', p_owner_run, p_queue_name); end if; -- Extend the claim if requested if p_extend_claim_by is not null and p_extend_claim_by > 0 then execute format( 'update absurd.%I set claim_expires_at = $2 + make_interval(secs => $3) where run_id = $1 and state = ''running'' and claim_expires_at is not null', 'r_' || p_queue_name ) using p_owner_run, v_now, p_extend_claim_by; end if; execute format( 'select c.owner_run_id, r.attempt from absurd.%I c left join absurd.%I r on r.run_id = c.owner_run_id where c.task_id = $1 and c.checkpoint_name = $2', 'c_' || p_queue_name, 'r_' || p_queue_name ) into v_existing_owner, v_existing_attempt using p_task_id, p_step_name; if v_existing_owner is null or v_existing_attempt is null or v_new_attempt >= v_existing_attempt then execute format( 'insert into absurd.%I (task_id, checkpoint_name, state, status, owner_run_id, updated_at) values ($1, $2, $3, ''committed'', $4, $5) on conflict (task_id, checkpoint_name) do update set state = excluded.state, status = excluded.status, owner_run_id = excluded.owner_run_id, updated_at = excluded.updated_at', 'c_' || p_queue_name ) using p_task_id, p_step_name, p_state, p_owner_run, v_now; end if; end; $$; create function absurd.extend_claim ( p_queue_name text, p_run_id uuid, p_extend_by integer ) returns void language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_task_state text; v_run_state text; v_claim_expires_at timestamptz; begin if p_extend_by is null or p_extend_by <= 0 then raise exception 'extend_by must be > 0'; end if; execute format( 'select r.state, r.claim_expires_at, t.state from absurd.%I r join absurd.%I t on t.task_id = r.task_id where r.run_id = $1 for update', 'r_' || p_queue_name, 't_' || p_queue_name ) into v_run_state, v_claim_expires_at, v_task_state using p_run_id; if v_run_state is null then raise exception 'Run "%" not found in queue "%"', p_run_id, p_queue_name; end if; if v_task_state = 'cancelled' then raise exception sqlstate 'AB001' using message = 'Task has been cancelled'; end if; if v_run_state <> 'running' then if v_run_state = 'failed' then raise exception sqlstate 'AB002' using message = format('Run "%s" has already failed in queue "%s"', p_run_id, p_queue_name); end if; raise exception 'Run "%" is not currently running in queue "%"', p_run_id, p_queue_name; end if; if v_claim_expires_at is null then raise exception 'Run "%" does not have an active claim in queue "%"', p_run_id, p_queue_name; end if; execute format( 'update absurd.%I set claim_expires_at = $2 + make_interval(secs => $3) where run_id = $1', 'r_' || p_queue_name ) using p_run_id, v_now, p_extend_by; end; $$; -- Returns one checkpoint by name. By default only committed checkpoint rows -- are visible; pass p_include_pending = true to include pending rows. create function absurd.get_task_checkpoint_state ( p_queue_name text, p_task_id uuid, p_step_name text, p_include_pending boolean default false ) returns table ( checkpoint_name text, state jsonb, status text, owner_run_id uuid, updated_at timestamptz ) language plpgsql as $$ begin return query execute format( 'select checkpoint_name, state, status, owner_run_id, updated_at from absurd.%I where task_id = $1 and checkpoint_name = $2 and ($3 or status = ''committed'')', 'c_' || p_queue_name ) using p_task_id, p_step_name, coalesce(p_include_pending, false); end; $$; -- Returns committed checkpoints visible to the given run. The run must belong -- to the provided task, and checkpoints from later attempts are hidden. create function absurd.get_task_checkpoint_states ( p_queue_name text, p_task_id uuid, p_run_id uuid ) returns table ( checkpoint_name text, state jsonb, status text, owner_run_id uuid, updated_at timestamptz ) language plpgsql as $$ declare v_run_task_id uuid; v_run_attempt integer; begin execute format( 'select task_id, attempt from absurd.%I where run_id = $1', 'r_' || p_queue_name ) into v_run_task_id, v_run_attempt using p_run_id; if v_run_task_id is null then raise exception 'Run "%" not found in queue "%"', p_run_id, p_queue_name; end if; if v_run_task_id <> p_task_id then raise exception 'Run "%" does not belong to task "%" in queue "%"', p_run_id, p_task_id, p_queue_name; end if; return query execute format( 'select c.checkpoint_name, c.state, c.status, c.owner_run_id, c.updated_at from absurd.%1$I c left join absurd.%2$I owner_run on owner_run.run_id = c.owner_run_id where c.task_id = $1 and c.status = ''committed'' and (owner_run.attempt is null or owner_run.attempt <= $2) order by c.updated_at asc', 'c_' || p_queue_name, 'r_' || p_queue_name ) using p_task_id, v_run_attempt; end; $$; create function absurd.await_event ( p_queue_name text, p_task_id uuid, p_run_id uuid, p_step_name text, p_event_name text, p_timeout integer default null ) returns table ( should_suspend boolean, payload jsonb ) language plpgsql as $$ declare v_run_state text; v_existing_payload jsonb; v_event_payload jsonb; v_checkpoint_payload jsonb; v_resolved_payload jsonb; v_timeout_at timestamptz; v_available_at timestamptz; v_now timestamptz := absurd.current_time(); v_task_state text; v_wake_event text; begin if p_event_name is null or length(trim(p_event_name)) = 0 then raise exception 'event_name must be provided'; end if; if p_timeout is not null then if p_timeout < 0 then raise exception 'timeout must be non-negative'; end if; v_timeout_at := v_now + (p_timeout::double precision * interval '1 second'); end if; v_available_at := coalesce(v_timeout_at, 'infinity'::timestamptz); execute format( 'select state from absurd.%I where task_id = $1 and checkpoint_name = $2', 'c_' || p_queue_name ) into v_checkpoint_payload using p_task_id, p_step_name; if v_checkpoint_payload is not null then return query select false, v_checkpoint_payload; return; end if; -- Ensure a row exists for this event so we can take a row-level lock. -- -- We use payload IS NULL as the sentinel for "not emitted yet". emit_event -- always writes a non-NULL payload (at minimum JSON null). -- -- Lock ordering is important to avoid deadlocks: await_event locks the event -- row first (FOR SHARE) and then the run row (FOR UPDATE). emit_event -- naturally locks the event row via its UPSERT before touching waits/runs. execute format( 'insert into absurd.%I (event_name, payload, emitted_at) values ($1, null, ''epoch''::timestamptz) on conflict (event_name) do nothing', 'e_' || p_queue_name ) using p_event_name; execute format( 'select 1 from absurd.%I where event_name = $1 for share', 'e_' || p_queue_name ) using p_event_name; execute format( 'select r.state, r.event_payload, r.wake_event, t.state from absurd.%I r join absurd.%I t on t.task_id = r.task_id where r.run_id = $1 for update', 'r_' || p_queue_name, 't_' || p_queue_name ) into v_run_state, v_existing_payload, v_wake_event, v_task_state using p_run_id; if v_run_state is null then raise exception 'Run "%" not found while awaiting event', p_run_id; end if; if v_task_state = 'cancelled' then raise exception sqlstate 'AB001' using message = 'Task has been cancelled'; end if; execute format( 'select payload from absurd.%I where event_name = $1', 'e_' || p_queue_name ) into v_event_payload using p_event_name; if v_existing_payload is not null then execute format( 'update absurd.%I set event_payload = null where run_id = $1', 'r_' || p_queue_name ) using p_run_id; if v_event_payload is not null and v_event_payload = v_existing_payload then v_resolved_payload := v_existing_payload; end if; end if; if v_run_state <> 'running' then raise exception 'Run "%" must be running to await events', p_run_id; end if; if v_resolved_payload is null and v_event_payload is not null then v_resolved_payload := v_event_payload; end if; if v_resolved_payload is not null then execute format( 'insert into absurd.%I (task_id, checkpoint_name, state, status, owner_run_id, updated_at) values ($1, $2, $3, ''committed'', $4, $5) on conflict (task_id, checkpoint_name) do update set state = excluded.state, status = excluded.status, owner_run_id = excluded.owner_run_id, updated_at = excluded.updated_at', 'c_' || p_queue_name ) using p_task_id, p_step_name, v_resolved_payload, p_run_id, v_now; return query select false, v_resolved_payload; return; end if; -- Detect if we resumed due to timeout: wake_event matches and payload is null if v_resolved_payload is null and v_wake_event = p_event_name and v_existing_payload is null then -- Resumed due to timeout; don't re-sleep and don't create a new wait execute format( 'update absurd.%I set wake_event = null where run_id = $1', 'r_' || p_queue_name ) using p_run_id; return query select false, null::jsonb; return; end if; execute format( 'insert into absurd.%I (task_id, run_id, step_name, event_name, timeout_at, created_at) values ($1, $2, $3, $4, $5, $6) on conflict (run_id, step_name) do update set event_name = excluded.event_name, timeout_at = excluded.timeout_at, created_at = excluded.created_at', 'w_' || p_queue_name ) using p_task_id, p_run_id, p_step_name, p_event_name, v_timeout_at, v_now; execute format( 'update absurd.%I set state = ''sleeping'', claimed_by = null, claim_expires_at = null, available_at = $3, wake_event = $2, event_payload = null where run_id = $1', 'r_' || p_queue_name ) using p_run_id, p_event_name, v_available_at; execute format( 'update absurd.%I set state = ''sleeping'' where task_id = $1', 't_' || p_queue_name ) using p_task_id; return query select true, null::jsonb; return; end; $$; create function absurd.emit_event ( p_queue_name text, p_event_name text, p_payload jsonb default null ) returns void language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_payload jsonb := coalesce(p_payload, 'null'::jsonb); v_emit_applied integer; begin if p_event_name is null or length(trim(p_event_name)) = 0 then raise exception 'event_name must be provided'; end if; -- Events are immutable once emitted: first write wins. -- -- await_event() may pre-create a row with payload=NULL as a "not emitted" -- sentinel. We allow exactly one transition NULL -> JSON payload. execute format( 'insert into absurd.%1$I as e (event_name, payload, emitted_at) values ($1, $2, $3) on conflict (event_name) do update set payload = excluded.payload, emitted_at = excluded.emitted_at where e.payload is null', 'e_' || p_queue_name ) using p_event_name, v_payload, v_now; get diagnostics v_emit_applied = row_count; -- Event was already emitted earlier; do not overwrite cached payload or -- re-run wakeup side effects. if v_emit_applied = 0 then return; end if; execute format( 'with expired_waits as ( delete from absurd.%1$I w where w.event_name = $1 and w.timeout_at is not null and w.timeout_at <= $2 returning w.run_id ), affected as ( select run_id, task_id, step_name from absurd.%1$I where event_name = $1 and (timeout_at is null or timeout_at > $2) ), updated_runs as ( update absurd.%2$I r set state = ''pending'', available_at = $2, wake_event = null, event_payload = $3, claimed_by = null, claim_expires_at = null where r.run_id in (select run_id from affected) and r.state = ''sleeping'' returning r.run_id, r.task_id ), checkpoint_upd as ( insert into absurd.%3$I (task_id, checkpoint_name, state, status, owner_run_id, updated_at) select a.task_id, a.step_name, $3, ''committed'', a.run_id, $2 from affected a join updated_runs ur on ur.run_id = a.run_id on conflict (task_id, checkpoint_name) do update set state = excluded.state, status = excluded.status, owner_run_id = excluded.owner_run_id, updated_at = excluded.updated_at ), updated_tasks as ( update absurd.%4$I t set state = ''pending'' where t.task_id in (select task_id from updated_runs) returning task_id ) delete from absurd.%5$I w where w.event_name = $1 and w.run_id in (select run_id from updated_runs)', 'w_' || p_queue_name, 'r_' || p_queue_name, 'c_' || p_queue_name, 't_' || p_queue_name, 'w_' || p_queue_name ) using p_event_name, v_now, v_payload; end; $$; -- Manually cancels a task by its task_id. -- Sets the task state to 'cancelled' and prevents any future runs. -- Currently running code will detect cancellation at the next checkpoint or heartbeat. create function absurd.cancel_task ( p_queue_name text, p_task_id uuid ) returns void language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_task_state text; begin -- Lock active runs before the task row so cancel_task() uses the same -- lock acquisition order as complete_run()/fail_run(). execute format( 'select run_id from absurd.%I where task_id = $1 and state not in (''completed'', ''failed'', ''cancelled'') order by run_id for update', 'r_' || p_queue_name ) using p_task_id; execute format( 'select state from absurd.%I where task_id = $1 for update', 't_' || p_queue_name ) into v_task_state using p_task_id; if v_task_state is null then raise exception 'Task "%" not found in queue "%"', p_task_id, p_queue_name; end if; if v_task_state in ('completed', 'failed', 'cancelled') then return; end if; execute format( 'update absurd.%I set state = ''cancelled'', cancelled_at = coalesce(cancelled_at, $2) where task_id = $1', 't_' || p_queue_name ) using p_task_id, v_now; execute format( 'update absurd.%I set state = ''cancelled'', claimed_by = null, claim_expires_at = null where task_id = $1 and state not in (''completed'', ''failed'', ''cancelled'')', 'r_' || p_queue_name ) using p_task_id; execute format( 'delete from absurd.%I where task_id = $1', 'w_' || p_queue_name ) using p_task_id; end; $$; -- Runs one cleanup batch for all queues (or one specific queue), using -- per-queue policy stored in absurd.queues. create function absurd.cleanup_all_queues ( p_queue_name text default null ) returns table ( queue_name text, tasks_deleted integer, events_deleted integer ) language plpgsql as $$ declare v_queue record; v_cleanup_ttl_seconds integer; begin if p_queue_name is not null then p_queue_name := absurd.validate_queue_name(p_queue_name); if not exists ( select 1 from absurd.queues q where q.queue_name = p_queue_name ) then raise exception 'Queue "%" does not exist', p_queue_name; end if; end if; for v_queue in select q.queue_name, q.cleanup_ttl, q.cleanup_limit from absurd.queues q where p_queue_name is null or q.queue_name = p_queue_name order by q.queue_name loop v_cleanup_ttl_seconds := greatest( floor(extract(epoch from v_queue.cleanup_ttl))::integer, 0 ); queue_name := v_queue.queue_name; tasks_deleted := absurd.cleanup_tasks( v_queue.queue_name, v_cleanup_ttl_seconds, v_queue.cleanup_limit ); events_deleted := absurd.cleanup_events( v_queue.queue_name, v_cleanup_ttl_seconds, v_queue.cleanup_limit ); return next; end loop; end; $$; -- Cleans up old completed, failed, or cancelled tasks and their related data. -- Deletes tasks whose terminal timestamp (completed_at, failed_at, or cancelled_at) -- is older than the specified TTL in seconds. -- -- Returns the number of tasks deleted. create function absurd.cleanup_tasks ( p_queue_name text, p_ttl_seconds integer, p_limit integer default 1000 ) returns integer language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_cutoff timestamptz; v_deleted_count integer; v_storage_mode text := 'unpartitioned'; begin if p_ttl_seconds is null or p_ttl_seconds < 0 then raise exception 'TTL must be a non-negative number of seconds'; end if; v_cutoff := v_now - (p_ttl_seconds * interval '1 second'); select storage_mode into v_storage_mode from absurd.queues where queue_name = p_queue_name; v_storage_mode := coalesce(v_storage_mode, 'unpartitioned'); if v_storage_mode = 'partitioned' then -- Delete in order: wait registrations, checkpoints, runs, idempotency keys, -- then tasks. execute format( 'with eligible_tasks as ( select t.task_id, case when t.state = ''completed'' then r.completed_at when t.state = ''failed'' then r.failed_at when t.state = ''cancelled'' then t.cancelled_at else null end as terminal_at from absurd.%1$I t left join absurd.%2$I r on r.run_id = t.last_attempt_run where t.state in (''completed'', ''failed'', ''cancelled'') ), to_delete as ( select task_id from eligible_tasks where terminal_at is not null and terminal_at < $1 order by terminal_at limit $2 ), del_waits as ( delete from absurd.%3$I w where w.task_id in (select task_id from to_delete) ), del_checkpoints as ( delete from absurd.%4$I c where c.task_id in (select task_id from to_delete) ), del_runs as ( delete from absurd.%2$I r where r.task_id in (select task_id from to_delete) ), del_idempotency as ( delete from absurd.%5$I i where i.task_id in (select task_id from to_delete) ), del_tasks as ( delete from absurd.%1$I t where t.task_id in (select task_id from to_delete) returning 1 ) select count(*) from del_tasks', 't_' || p_queue_name, 'r_' || p_queue_name, 'w_' || p_queue_name, 'c_' || p_queue_name, 'i_' || p_queue_name ) into v_deleted_count using v_cutoff, p_limit; else -- Unpartitioned queues keep idempotency key ownership on the task row, -- so no side-table cleanup is needed. execute format( 'with eligible_tasks as ( select t.task_id, case when t.state = ''completed'' then r.completed_at when t.state = ''failed'' then r.failed_at when t.state = ''cancelled'' then t.cancelled_at else null end as terminal_at from absurd.%1$I t left join absurd.%2$I r on r.run_id = t.last_attempt_run where t.state in (''completed'', ''failed'', ''cancelled'') ), to_delete as ( select task_id from eligible_tasks where terminal_at is not null and terminal_at < $1 order by terminal_at limit $2 ), del_waits as ( delete from absurd.%3$I w where w.task_id in (select task_id from to_delete) ), del_checkpoints as ( delete from absurd.%4$I c where c.task_id in (select task_id from to_delete) ), del_runs as ( delete from absurd.%2$I r where r.task_id in (select task_id from to_delete) ), del_tasks as ( delete from absurd.%1$I t where t.task_id in (select task_id from to_delete) returning 1 ) select count(*) from del_tasks', 't_' || p_queue_name, 'r_' || p_queue_name, 'w_' || p_queue_name, 'c_' || p_queue_name ) into v_deleted_count using v_cutoff, p_limit; end if; return v_deleted_count; end; $$; -- Cleans up old emitted events. -- Deletes events whose emitted_at timestamp is older than the specified TTL in seconds. -- -- Returns the number of events deleted. create function absurd.cleanup_events ( p_queue_name text, p_ttl_seconds integer, p_limit integer default 1000 ) returns integer language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_cutoff timestamptz; v_deleted_count integer; begin if p_ttl_seconds is null or p_ttl_seconds < 0 then raise exception 'TTL must be a non-negative number of seconds'; end if; v_cutoff := v_now - (p_ttl_seconds * interval '1 second'); execute format( 'with to_delete as ( select event_name from absurd.%I where emitted_at < $1 order by emitted_at limit $2 ), del_events as ( delete from absurd.%I e where e.event_name in (select event_name from to_delete) returning 1 ) select count(*) from del_events', 'e_' || p_queue_name, 'e_' || p_queue_name ) into v_deleted_count using v_cutoff, p_limit; return v_deleted_count; end; $$; -- utility function to generate a uuidv7 even for older postgres versions. create function absurd.portable_uuidv7 () returns uuid language plpgsql volatile as $$ declare v_server_num integer := current_setting('server_version_num')::int; ts_ms bigint; b bytea; rnd bytea; i int; begin if v_server_num >= 180000 then return uuidv7 (); end if; ts_ms := floor(extract(epoch from absurd.current_time()) * 1000)::bigint; rnd := uuid_send(uuid_generate_v4 ()); b := repeat(E'\\000', 16)::bytea; for i in 0..5 loop b := set_byte(b, i, ((ts_ms >> ((5 - i) * 8)) & 255)::int); end loop; for i in 6..15 loop b := set_byte(b, i, get_byte(rnd, i)); end loop; b := set_byte(b, 6, ((get_byte(b, 6) & 15) | (7 << 4))); b := set_byte(b, 8, ((get_byte(b, 8) & 63) | 128)); return encode(b, 'hex')::uuid; end; $$; -- Extracts the embedded timestamp from a UUIDv7 value. -- Returns NULL for non-v7 UUIDs. create function absurd.uuidv7_timestamp (p_id uuid) returns timestamptz language sql immutable strict as $$ with bytes as ( select uuid_send(p_id) as b ), decoded as ( select (get_byte(b, 6) >> 4) as version, ((get_byte(b, 0)::bigint << 40) | (get_byte(b, 1)::bigint << 32) | (get_byte(b, 2)::bigint << 24) | (get_byte(b, 3)::bigint << 16) | (get_byte(b, 4)::bigint << 8) | get_byte(b, 5)::bigint) as ts_ms from bytes ) select case when version = 7 then 'epoch'::timestamptz + (ts_ms * interval '1 millisecond') else null end from decoded; $$; -- Returns the lowest UUIDv7 value representable for the given timestamp. -- This is useful for time-window partition bounds over UUIDv7 keys. create function absurd.uuidv7_floor (p_ts timestamptz) returns uuid language plpgsql immutable strict as $$ declare ts_ms bigint := floor(extract(epoch from p_ts) * 1000)::bigint; b bytea; i int; begin if ts_ms < 0 or ts_ms > 281474976710655 then raise exception 'Timestamp "%" is outside UUIDv7 supported range', p_ts; end if; b := repeat(E'\\000', 16)::bytea; for i in 0..5 loop b := set_byte(b, i, ((ts_ms >> ((5 - i) * 8)) & 255)::int); end loop; -- Set UUIDv7 version and RFC4122 variant; keep all randomness bits at 0. b := set_byte(b, 6, (7 << 4)); b := set_byte(b, 8, 128); return encode(b, 'hex')::uuid; end; $$; -- Buckets a timestamp to ISO week start (Monday 00:00) in UTC. create function absurd.week_bucket_utc (p_ts timestamptz) returns timestamptz language sql immutable strict as $$ select date_trunc('week', p_ts at time zone 'UTC') at time zone 'UTC'; $$; -- Returns a compact weekly partition tag in YWW format, where: -- * Y = last digit of the ISO year in UTC -- * WW = zero-padded ISO week number in UTC (01..53) -- -- ISO weeks do not have week 0; days at year boundaries can belong -- to week 52/53 of the previous ISO year. -- -- Examples: -- * 2024-01-01 UTC -> 401 -- * 2021-01-01 UTC -> 053 (ISO week 53 of ISO year 2020) create function absurd.partition_week_tag (p_ts timestamptz) returns text language sql immutable strict as $$ with bucket as ( select absurd.week_bucket_utc(p_ts) at time zone 'UTC' as ts ) select ((extract(isoyear from ts)::int % 10)::text) || lpad((extract(week from ts)::int)::text, 2, '0') from bucket; $$; -- Ensures weekly UUIDv7 partitions exist for partitioned queues. -- -- Window selection is queue-policy driven: -- * start = week_bucket_utc(now() - partition_lookback) -- * end = week_bucket_utc(now() + partition_lookahead) create function absurd.ensure_partitions ( p_queue_name text default null ) returns void language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_window_start timestamptz; v_window_end timestamptz; v_week_start timestamptz; v_week_end timestamptz; v_partition_tag text; v_uuid_from uuid; v_uuid_to uuid; v_queue record; begin if p_queue_name is not null then p_queue_name := absurd.validate_queue_name(p_queue_name); if not exists ( select 1 from absurd.queues q where q.queue_name = p_queue_name ) then raise exception 'Queue "%" does not exist', p_queue_name; end if; end if; for v_queue in select queue_name, default_partition, partition_lookahead, partition_lookback from absurd.queues where storage_mode = 'partitioned' and (p_queue_name is null or queue_name = p_queue_name) order by queue_name loop v_window_start := absurd.week_bucket_utc(v_now - v_queue.partition_lookback); v_window_end := absurd.week_bucket_utc(v_now + v_queue.partition_lookahead); if v_queue.default_partition = 'enabled' then execute format( 'create table if not exists absurd.%I partition of absurd.%I default', 't_' || v_queue.queue_name || '_d', 't_' || v_queue.queue_name ); execute format( 'create table if not exists absurd.%I partition of absurd.%I default', 'r_' || v_queue.queue_name || '_d', 'r_' || v_queue.queue_name ); execute format( 'create table if not exists absurd.%I partition of absurd.%I default', 'c_' || v_queue.queue_name || '_d', 'c_' || v_queue.queue_name ); execute format( 'create table if not exists absurd.%I partition of absurd.%I default', 'w_' || v_queue.queue_name || '_d', 'w_' || v_queue.queue_name ); end if; v_week_start := v_window_start; while v_week_start <= v_window_end loop v_week_end := v_week_start + interval '7 days'; v_partition_tag := absurd.partition_week_tag(v_week_start); v_uuid_from := absurd.uuidv7_floor(v_week_start); v_uuid_to := absurd.uuidv7_floor(v_week_end); execute format( 'create table if not exists absurd.%I partition of absurd.%I for values from (%L::uuid) to (%L::uuid)', 't_' || v_queue.queue_name || '_' || v_partition_tag, 't_' || v_queue.queue_name, v_uuid_from, v_uuid_to ); execute format( 'create table if not exists absurd.%I partition of absurd.%I for values from (%L::uuid) to (%L::uuid)', 'r_' || v_queue.queue_name || '_' || v_partition_tag, 'r_' || v_queue.queue_name, v_uuid_from, v_uuid_to ); execute format( 'create table if not exists absurd.%I partition of absurd.%I for values from (%L::uuid) to (%L::uuid)', 'c_' || v_queue.queue_name || '_' || v_partition_tag, 'c_' || v_queue.queue_name, v_uuid_from, v_uuid_to ); execute format( 'create table if not exists absurd.%I partition of absurd.%I for values from (%L::uuid) to (%L::uuid)', 'w_' || v_queue.queue_name || '_' || v_partition_tag, 'w_' || v_queue.queue_name, v_uuid_from, v_uuid_to ); v_week_start := v_week_end; end loop; end loop; end; $$; -- Lists eligible partition tables for detach/drop planning. -- -- This does not execute detach directly. -- Callers should construct SQL locally from parent/partition names. create function absurd.list_detach_candidates ( p_queue_name text default null ) returns table ( queue_name text, parent_table text, partition_table text ) language plpgsql as $$ declare v_now timestamptz := absurd.current_time(); v_queue record; v_parent_prefix text; v_parent_table text; v_parent_oid oid; v_part record; v_upper_uuid uuid; v_upper_ts timestamptz; v_has_rows boolean; begin if p_queue_name is not null then p_queue_name := absurd.validate_queue_name(p_queue_name); if not exists ( select 1 from absurd.queues q where q.queue_name = p_queue_name ) then raise exception 'Queue "%" does not exist', p_queue_name; end if; end if; for v_queue in select q.queue_name, q.detach_mode, q.detach_min_age from absurd.queues q where q.storage_mode = 'partitioned' and q.detach_mode = 'empty' and (p_queue_name is null or q.queue_name = p_queue_name) order by q.queue_name loop foreach v_parent_prefix in array array['t', 'r', 'c', 'w'] loop v_parent_table := v_parent_prefix || '_' || v_queue.queue_name; select c.oid into v_parent_oid from pg_class c join pg_namespace n on n.oid = c.relnamespace where n.nspname = 'absurd' and c.relname = v_parent_table; if v_parent_oid is null then continue; end if; for v_part in select child.relname as partition_name, pg_get_expr(child.relpartbound, child.oid) as part_bound from pg_inherits inh join pg_class child on child.oid = inh.inhrelid where inh.inhparent = v_parent_oid loop if v_part.part_bound = 'DEFAULT' then continue; end if; select (regexp_match(v_part.part_bound, 'TO \(''([^'']+)''(::uuid)?\)'))[1]::uuid into v_upper_uuid; if v_upper_uuid is null then continue; end if; v_upper_ts := absurd.uuidv7_timestamp(v_upper_uuid); if v_upper_ts is null then continue; end if; if v_upper_ts >= (v_now - v_queue.detach_min_age) then continue; end if; execute format( 'select exists (select 1 from absurd.%I limit 1)', v_part.partition_name ) into v_has_rows; if coalesce(v_has_rows, false) then continue; end if; queue_name := v_queue.queue_name; parent_table := v_parent_table; partition_table := v_part.partition_name; return next; end loop; end loop; end loop; end; $$; -- Drops a detached partition table if it is no longer attached. -- -- Returns true when the table was dropped. If p_unschedule_job_name is -- provided and pg_cron is available, the matching cron job is unscheduled -- once the partition is gone. The paired detach job (if derivable from -- p_unschedule_job_name) is unscheduled as soon as the partition is observed -- detached so DETACH does not keep retrying. create function absurd.drop_detached_partition ( p_partition_table text, p_unschedule_job_name text default null ) returns boolean language plpgsql as $$ declare v_partition_table text := nullif(trim(coalesce(p_partition_table, '')), ''); v_partition_oid oid; v_is_attached boolean := false; v_detach_job_name text; begin if p_unschedule_job_name like 'absurd_drop_run_%' then v_detach_job_name := 'absurd_detach_run_' || substr(p_unschedule_job_name, length('absurd_drop_run_') + 1); end if; if v_partition_table is null then raise exception 'partition table must be provided'; end if; select c.oid into v_partition_oid from pg_class c join pg_namespace n on n.oid = c.relnamespace where n.nspname = 'absurd' and c.relname = v_partition_table; if v_partition_oid is null then if p_unschedule_job_name is not null and to_regclass('cron.job') is not null then perform cron.unschedule(jobid) from cron.job where jobname in (p_unschedule_job_name, coalesce(v_detach_job_name, '')); end if; return false; end if; select exists ( select 1 from pg_inherits where inhrelid = v_partition_oid ) into v_is_attached; if v_is_attached then return false; end if; -- Once detached, stop retrying detach runs immediately. Keep drop -- scheduled until the table is actually dropped. if v_detach_job_name is not null and to_regclass('cron.job') is not null then perform cron.unschedule(jobid) from cron.job where jobname = v_detach_job_name; end if; execute format('drop table if exists absurd.%I', v_partition_table); if p_unschedule_job_name is not null and to_regclass('cron.job') is not null then perform cron.unschedule(jobid) from cron.job where jobname = p_unschedule_job_name; end if; return true; end; $$; -- Schedules per-parent one-at-a-time cron jobs for detach/drop. -- -- For each parent table, only the oldest eligible partition is scheduled and -- only when there is no active detach/drop job for that parent. -- -- Detach jobs run the raw DETACH statement. They use CONCURRENTLY when -- possible; if a parent still has an attached DEFAULT partition, they fall -- back to non-concurrent DETACH (Postgres limitation). -- -- Drop jobs poll via absurd.drop_detached_partition(); once a partition is -- detached, that function unschedules the paired detach job immediately and -- keeps retrying drop until the table is gone. create function absurd.schedule_detach_jobs ( p_queue_name text default null ) returns table ( job_name text, job_id bigint, queue_name text, partition_table text, job_kind text ) language plpgsql as $$ declare v_scope text; v_candidate record; v_parent_key text; v_candidate_key text; v_detach_job_name text; v_drop_job_name text; v_detach_command text; v_drop_command text; v_parent_has_default_partition boolean; v_job_id bigint; begin if p_queue_name is not null then p_queue_name := absurd.validate_queue_name(p_queue_name); end if; if to_regclass('cron.job') is null then raise exception 'pg_cron is not available (missing cron.job)'; end if; if not exists ( select 1 from pg_proc p join pg_namespace n on n.oid = p.pronamespace where n.nspname = 'cron' and p.proname = 'schedule' ) then raise exception 'pg_cron is not available (missing cron.schedule)'; end if; if not exists ( select 1 from pg_proc p join pg_namespace n on n.oid = p.pronamespace where n.nspname = 'cron' and p.proname = 'unschedule' ) then raise exception 'pg_cron is not available (missing cron.unschedule)'; end if; v_scope := case when p_queue_name is null then 'all' else substr(md5(p_queue_name), 1, 12) end; for v_candidate in with candidates as ( select c.*, absurd.uuidv7_timestamp( (regexp_match( pg_get_expr(child.relpartbound, child.oid), 'TO \(''([^'']+)''(::uuid)?\)' ))[1]::uuid ) as upper_ts from absurd.list_detach_candidates(p_queue_name) c join pg_class child on child.relname = c.partition_table join pg_namespace n on n.oid = child.relnamespace where n.nspname = 'absurd' ), ranked as ( select candidates.*, row_number() over ( partition by candidates.parent_table order by candidates.upper_ts asc nulls last, candidates.partition_table asc ) as rn from candidates ) select ranked.queue_name, ranked.parent_table, ranked.partition_table from ranked where ranked.rn = 1 order by ranked.queue_name, ranked.parent_table, ranked.partition_table loop v_parent_key := substr(md5(v_candidate.parent_table), 1, 8); -- Only one active detach pipeline per parent table. if exists ( select 1 from cron.job where jobname like ('absurd_detach_run_%_' || v_parent_key || '_%') or jobname like ('absurd_drop_run_%_' || v_parent_key || '_%') ) then continue; end if; v_candidate_key := substr( md5(v_candidate.parent_table || ':' || v_candidate.partition_table), 1, 12 ); v_detach_job_name := format( 'absurd_detach_run_%s_%s_%s', v_scope, v_parent_key, v_candidate_key ); v_drop_job_name := format( 'absurd_drop_run_%s_%s_%s', v_scope, v_parent_key, v_candidate_key ); if not exists ( select 1 from cron.job where jobname = v_detach_job_name or jobname like ('absurd_detach_run_%_' || v_candidate_key) ) then select exists ( select 1 from pg_class parent join pg_namespace pn on pn.oid = parent.relnamespace join pg_inherits inh on inh.inhparent = parent.oid join pg_class child on child.oid = inh.inhrelid where pn.nspname = 'absurd' and parent.relname = v_candidate.parent_table and pg_get_expr(child.relpartbound, child.oid) = 'DEFAULT' ) into v_parent_has_default_partition; v_detach_command := format( 'alter table absurd.%I detach partition absurd.%I', v_candidate.parent_table, v_candidate.partition_table ); if not coalesce(v_parent_has_default_partition, false) then v_detach_command := v_detach_command || ' concurrently'; end if; execute 'select cron.schedule($1, $2, $3)' into v_job_id using v_detach_job_name, '* * * * *', v_detach_command; job_name := v_detach_job_name; job_id := v_job_id; queue_name := v_candidate.queue_name; partition_table := v_candidate.partition_table; job_kind := 'detach'; return next; end if; if not exists ( select 1 from cron.job where jobname = v_drop_job_name or jobname like ('absurd_drop_run_%_' || v_candidate_key) ) then v_drop_command := format( 'select absurd.drop_detached_partition(%L, %L);', v_candidate.partition_table, v_drop_job_name ); execute 'select cron.schedule($1, $2, $3)' into v_job_id using v_drop_job_name, '* * * * *', v_drop_command; job_name := v_drop_job_name; job_id := v_job_id; queue_name := v_candidate.queue_name; partition_table := v_candidate.partition_table; job_kind := 'drop'; return next; end if; end loop; end; $$; -- Configures pg_cron jobs for partition provisioning, cleanup, and detach planning. -- -- Detach planning schedules per-partition jobs (via absurd.schedule_detach_jobs) -- that run raw DETACH statements and follow-up drop checks. -- -- Requires pg_cron to be installed (or compatible cron schema/functions). create function absurd.enable_cron ( p_queue_name text default null, p_partition_schedule text default '5 * * * *', p_cleanup_schedule text default '17 * * * *', p_detach_schedule text default '29 * * * *' ) returns table ( job_name text, job_id bigint ) language plpgsql as $$ declare v_queue_exists boolean := false; v_queue_literal text; v_partition_job_name text; v_cleanup_job_name text; v_detach_plan_job_name text; v_partition_command text; v_cleanup_command text; v_detach_plan_command text; v_partitions_job_id bigint; v_cleanup_job_id bigint; v_detach_plan_job_id bigint; v_existing_job_id bigint; v_job_suffix text; begin if p_queue_name is not null then p_queue_name := absurd.validate_queue_name(p_queue_name); select exists ( select 1 from absurd.queues where queue_name = p_queue_name ) into v_queue_exists; if not v_queue_exists then raise exception 'Queue "%" does not exist', p_queue_name; end if; end if; if p_partition_schedule is null or length(trim(p_partition_schedule)) = 0 then raise exception 'Partition schedule must be provided'; end if; if p_cleanup_schedule is null or length(trim(p_cleanup_schedule)) = 0 then raise exception 'Cleanup schedule must be provided'; end if; if p_detach_schedule is null or length(trim(p_detach_schedule)) = 0 then raise exception 'Detach schedule must be provided'; end if; if to_regclass('cron.job') is null then raise exception 'pg_cron is not available (missing cron.job)'; end if; if not exists ( select 1 from pg_proc p join pg_namespace n on n.oid = p.pronamespace where n.nspname = 'cron' and p.proname = 'schedule' ) then raise exception 'pg_cron is not available (missing cron.schedule)'; end if; if not exists ( select 1 from pg_proc p join pg_namespace n on n.oid = p.pronamespace where n.nspname = 'cron' and p.proname = 'unschedule' ) then raise exception 'pg_cron is not available (missing cron.unschedule)'; end if; v_queue_literal := case when p_queue_name is null then 'null::text' else quote_literal(p_queue_name) end; v_partition_command := format( 'select absurd.ensure_partitions(%s);', v_queue_literal ); v_cleanup_command := format( 'select * from absurd.cleanup_all_queues(%s);', v_queue_literal ); v_job_suffix := case when p_queue_name is null then 'all' else substr(md5(p_queue_name), 1, 12) end; v_partition_job_name := 'absurd_partitions_' || v_job_suffix; v_cleanup_job_name := 'absurd_cleanup_' || v_job_suffix; v_detach_plan_job_name := 'absurd_detach_plan_' || v_job_suffix; v_detach_plan_command := format( 'select * from absurd.schedule_detach_jobs(%s);', v_queue_literal ); for v_existing_job_id in execute 'select jobid from cron.job where jobname = $1' using v_partition_job_name loop execute 'select cron.unschedule($1)' using v_existing_job_id; end loop; for v_existing_job_id in execute 'select jobid from cron.job where jobname = $1' using v_cleanup_job_name loop execute 'select cron.unschedule($1)' using v_existing_job_id; end loop; for v_existing_job_id in execute 'select jobid from cron.job where jobname = $1' using v_detach_plan_job_name loop execute 'select cron.unschedule($1)' using v_existing_job_id; end loop; execute 'select cron.schedule($1, $2, $3)' into v_partitions_job_id using v_partition_job_name, p_partition_schedule, v_partition_command; execute 'select cron.schedule($1, $2, $3)' into v_cleanup_job_id using v_cleanup_job_name, p_cleanup_schedule, v_cleanup_command; execute 'select cron.schedule($1, $2, $3)' into v_detach_plan_job_id using v_detach_plan_job_name, p_detach_schedule, v_detach_plan_command; job_name := v_partition_job_name; job_id := v_partitions_job_id; return next; job_name := v_cleanup_job_name; job_id := v_cleanup_job_id; return next; job_name := v_detach_plan_job_name; job_id := v_detach_plan_job_id; return next; end; $$; -- Removes pg_cron jobs previously installed by absurd.enable_cron. -- -- If p_queue_name is null, this removes the global ('all') maintenance jobs -- and global-scope detach/drop run jobs. -- If p_queue_name is provided, this removes jobs for that specific queue scope, -- including detach/drop run jobs. create function absurd.disable_cron ( p_queue_name text default null ) returns table ( job_name text, job_id bigint ) language plpgsql as $$ declare v_job_suffix text; v_partition_job_name text; v_cleanup_job_name text; v_detach_plan_job_name text; v_detach_run_pattern text; v_drop_run_pattern text; v_existing_job record; begin if p_queue_name is not null then p_queue_name := absurd.validate_queue_name(p_queue_name); end if; if to_regclass('cron.job') is null then raise exception 'pg_cron is not available (missing cron.job)'; end if; if not exists ( select 1 from pg_proc p join pg_namespace n on n.oid = p.pronamespace where n.nspname = 'cron' and p.proname = 'unschedule' ) then raise exception 'pg_cron is not available (missing cron.unschedule)'; end if; v_job_suffix := case when p_queue_name is null then 'all' else substr(md5(p_queue_name), 1, 12) end; v_partition_job_name := 'absurd_partitions_' || v_job_suffix; v_cleanup_job_name := 'absurd_cleanup_' || v_job_suffix; v_detach_plan_job_name := 'absurd_detach_plan_' || v_job_suffix; v_detach_run_pattern := 'absurd_detach_run_' || v_job_suffix || '_%'; v_drop_run_pattern := 'absurd_drop_run_' || v_job_suffix || '_%'; for v_existing_job in execute 'select jobid, jobname from cron.job where jobname = $1 or jobname = $2 or jobname = $3 or jobname like $4 or jobname like $5 order by jobname, jobid' using v_partition_job_name, v_cleanup_job_name, v_detach_plan_job_name, v_detach_run_pattern, v_drop_run_pattern loop execute 'select cron.unschedule($1)' using v_existing_job.jobid; job_name := v_existing_job.jobname; job_id := v_existing_job.jobid; return next; end loop; end; $$;