postgres-job-queuePostgreSQL-based job queue with priority scheduling, batch claiming, and progress tracking. Use when building job queues without external dependencies. Triggers on PostgreSQL job queue, background jobs, task queue, priority queue, SKIP LOCKED.
Install via ClawdBot CLI:
clawdbot install wpank/postgres-job-queueProduction-ready job queue using PostgreSQL with priority scheduling, batch claiming, and progress tracking.
CREATE TABLE jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_type VARCHAR(50) NOT NULL,
priority INT NOT NULL DEFAULT 100,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
data JSONB NOT NULL DEFAULT '{}',
-- Progress tracking
progress INT DEFAULT 0,
current_stage VARCHAR(100),
events_count INT DEFAULT 0,
-- Worker tracking
worker_id VARCHAR(100),
claimed_at TIMESTAMPTZ,
-- Timing
created_at TIMESTAMPTZ DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
-- Retry handling
attempts INT DEFAULT 0,
max_attempts INT DEFAULT 3,
last_error TEXT,
CONSTRAINT valid_status CHECK (
status IN ('pending', 'claimed', 'running', 'completed', 'failed', 'cancelled')
)
);
-- Critical: Partial index for fast claiming
CREATE INDEX idx_jobs_claimable ON jobs (priority DESC, created_at ASC)
WHERE status = 'pending';
CREATE INDEX idx_jobs_worker ON jobs (worker_id)
WHERE status IN ('claimed', 'running');
CREATE OR REPLACE FUNCTION claim_job_batch(
p_worker_id VARCHAR(100),
p_job_types VARCHAR(50)[],
p_batch_size INT DEFAULT 10
) RETURNS SETOF jobs AS $
BEGIN
RETURN QUERY
WITH claimable AS (
SELECT id
FROM jobs
WHERE status = 'pending'
AND job_type = ANY(p_job_types)
AND attempts < max_attempts
ORDER BY priority DESC, created_at ASC
LIMIT p_batch_size
FOR UPDATE SKIP LOCKED -- Critical: skip locked rows
),
claimed AS (
UPDATE jobs
SET status = 'claimed',
worker_id = p_worker_id,
claimed_at = NOW(),
attempts = attempts + 1
WHERE id IN (SELECT id FROM claimable)
RETURNING *
)
SELECT * FROM claimed;
END;
$ LANGUAGE plpgsql;
const (
PriorityExplicit = 150 // User-requested
PriorityDiscovered = 100 // System-discovered
PriorityBackfill = 30 // Background backfills
)
type JobQueue struct {
db *pgx.Pool
workerID string
}
func (q *JobQueue) Claim(ctx context.Context, types []string, batchSize int) ([]Job, error) {
rows, err := q.db.Query(ctx,
"SELECT * FROM claim_job_batch($1, $2, $3)",
q.workerID, types, batchSize,
)
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var job Job
if err := rows.Scan(&job); err != nil {
return nil, err
}
jobs = append(jobs, job)
}
return jobs, nil
}
func (q *JobQueue) Complete(ctx context.Context, jobID uuid.UUID) error {
_, err := q.db.Exec(ctx, `
UPDATE jobs
SET status = 'completed',
progress = 100,
completed_at = NOW()
WHERE id = $1`,
jobID,
)
return err
}
func (q *JobQueue) Fail(ctx context.Context, jobID uuid.UUID, errMsg string) error {
_, err := q.db.Exec(ctx, `
UPDATE jobs
SET status = CASE
WHEN attempts >= max_attempts THEN 'failed'
ELSE 'pending'
END,
last_error = $2,
worker_id = NULL,
claimed_at = NULL
WHERE id = $1`,
jobID, errMsg,
)
return err
}
func (q *JobQueue) RecoverStaleJobs(ctx context.Context, timeout time.Duration) (int, error) {
result, err := q.db.Exec(ctx, `
UPDATE jobs
SET status = 'pending',
worker_id = NULL,
claimed_at = NULL
WHERE status IN ('claimed', 'running')
AND claimed_at < NOW() - $1::interval
AND attempts < max_attempts`,
timeout.String(),
)
if err != nil {
return 0, err
}
return int(result.RowsAffected()), nil
}
| Scenario | Approach |
|----------|----------|
| Need guaranteed delivery | PostgreSQL queue |
| Need sub-ms latency | Use Redis instead |
| < 1000 jobs/sec | PostgreSQL is fine |
| > 10000 jobs/sec | Add Redis layer |
| Need strict ordering | Single worker per type |
Generated Mar 1, 2026
Handles order fulfillment tasks like inventory updates, payment processing, and shipping notifications with priority scheduling for rush orders. Uses PostgreSQL for reliability, ensuring jobs persist through service restarts during high-traffic events like Black Friday.
Manages video and audio encoding jobs with progress tracking for long-running tasks. Prioritizes user-uploaded content over backfill operations, using batch claiming to efficiently process multiple files concurrently without external dependencies.
Processes batch jobs for generating monthly reports, tax documents, and audit trails with guaranteed delivery. Implements retry logic for failed jobs and stale job recovery to handle system outages, ensuring compliance and data integrity.
Coordinates synchronization of patient records and lab results across distributed systems with priority-based scheduling for urgent updates. Uses PostgreSQL to avoid third-party queue dependencies, maintaining HIPAA compliance and data persistence.
Processes sensor data streams from IoT devices, handling tasks like anomaly detection and alert generation. Employs batch claiming to manage high volumes of incoming jobs efficiently, with progress tracking for long-running analytics.
Offers the job queue as a managed service with tiered pricing based on throughput (e.g., jobs per second) and features like priority scheduling. Generates recurring revenue from businesses needing reliable, dependency-free job processing without infrastructure overhead.
Sells perpetual licenses or annual contracts to large organizations for on-premises deployment, with support and customization services. Targets industries like finance and healthcare where data control and compliance are critical, driving high-value deals.
Provides professional services to integrate the job queue into existing systems, offering training, optimization, and custom development. Revenue comes from project-based fees, appealing to companies lacking in-house PostgreSQL expertise.
💬 Integration Tip
Ensure your PostgreSQL instance has adequate connection pooling and monitor the partial index performance for efficient job claiming under load.
Use the @steipete/oracle CLI to bundle a prompt plus the right files and get a second-model review (API or browser) for debugging, refactors, design checks, or cross-validation.
Manage Things 3 via the `things` CLI on macOS (add/update projects+todos via URL scheme; read/search/list from the local Things database). Use when a user asks Clawdbot to add a task to Things, list inbox/today/upcoming, search tasks, or inspect projects/areas/tags.
Local search/indexing CLI (BM25 + vectors + rerank) with MCP mode.
Use when designing database schemas, writing migrations, optimizing SQL queries, fixing N+1 problems, creating indexes, setting up PostgreSQL, configuring EF Core, implementing caching, partitioning tables, or any database performance question.
Connect to Supabase for database operations, vector search, and storage. Use for storing data, running SQL queries, similarity search with pgvector, and managing tables. Triggers on requests involving databases, vector stores, embeddings, or Supabase specifically.
Query, design, migrate, and optimize SQL databases. Use when working with SQLite, PostgreSQL, or MySQL — schema design, writing queries, creating migrations, indexing, backup/restore, and debugging slow queries. No ORMs required.