queue/db.js

//  Author: Vlad Seryakov vseryakov@gmail.com
//  Sep 2013
//
 
const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');
const db = require(__dirname + '/../db');
const QueueClient = require(__dirname + "/client");

/**
 * Queue client using a database for persistence, this driver uses naive content
 * resolution method by SELECT first and then UPDATE received record with new visibilityTimeout, this relies on
 * the database to atomically perform conditional UPDATE, if no record updated it is ignored and performs SELECT again.
 *
 * This is not supposed to be used in production but only for development without external tools like AWS, Redis.
 *
 * It supports the same behaviour as Redis/SQS clients regarding visibilityTimeout.
 *
 * ### To create bk_queue table run as:
 * ```
 * bksh -db-create-tables -queue-default db://
 * ```
 *
 * @example
 *
 * queue-messages = db://?bk-pool=dynamodb
 * queue-store = db://?bk-pool=pg&bk-visibilityTimeout=300&bk-queueCount=2
 *
 * @memberOf module:queue
 */

class DBQueueClient extends QueueClient {

    tables = {
        bk_queue: {
            id: { type: "uuid", primary: 1 },
            name: { index: 1, },                               // queue name
            data: { type: "obj" },                             // job definition object
            ctime: { type: "now", readonly: 1 },               // create time
            mtime: { type: "now" },                            // last update
            vtime: { type: "timeout" },                        // absolute visible time in the future
        },
    }

    constructor(options) {
        super(options);
        this.name = "db";
        this.applyOptions();
        this.emit("ready");
        db.describeTables(this.tables);
    }

    close() {
        super.close();
    }

    submit(job, options, callback) {
        logger.dev("submit:", this.url, job, options);
        const name = this.subject(options);
        const vtime = Date.now() + lib.validPositive(options.delay, this.options.delay);
        db.add("bk_queue", { name, data: job, vtime }, { pool: this.options.pool }, callback);
    }

    poll(options) {
        this._poll_run(options);
    }

    purge(options, callback) {
        const name = this.subject(options);
        db.delAll("bk_queue", { name }, { pool: this.options.pool }, callback);
    }

    _poll_get(options, callback) {
        const opts = {
            pool: this.options.pool,
            count: this.options.queueCount,
        };
        const q = {
            name: this.subject(options),
            vtime_$lt: Date.now(),
            data_$not_null: "",
        }

        var rc = [];
        db.select("bk_queue", q, opts, (err, rows) => {
            if (err) return callback(err);

            lib.forEvery(rows, (row, next) => {
                const vopts = {
                    query: {
                        vtime: row.vtime
                    },
                    pool: this.options.pool,
                    logger_error: "debug"
                };
                const vtime = Date.now() + lib.validPositive(row.data?.visibilityTimeout, this.options.visibilityTimeout, 1000);
                // If failed to update it means some other worker just did it before us so we just ignore this message
                db.update("bk_queue", { id: row.id, vtime }, vopts, (err, _, info) => {
                    if (!err && info?.affected_rows) rc.push(row);
                    next();
                });
            }, (err) => {
                callback(err, rc);
            });
        });
    }

    _poll_update(options, item, visibilityTimeout, callback) {
        db.update("bk_queue", { id: item.id, vtime: Date.now() + visibilityTimeout }, { pool: this.options.pool }, callback);
    }

    _poll_del(options, item, callback) {
        db.del("bk_queue", { id: item.id }, { pool: this.options.pool }, callback);
    }

}

module.exports = DBQueueClient;