queue.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const logger = require(__dirname + '/logger');
const lib = require(__dirname + '/lib');
const Client = require(__dirname + "/queue/client");

/**
 * @module queue
 */

const queue =

/**
 * Queue module for jobs, events and subscriptions.
 *
 * All methods use **options.queueName** for non-default queue.
 *
 * If it is an array then a client will be picked sequentially by maintaining internal sequence number.
 *
 * To specify a channel within a queue use the format **queueName#channelName**,
 * for drivers that support multiple channels like NATS/Redis the channel will be used for another subscription within the same connection.
 *
 * For drivers (NATS) that support multiple consumers the full queue syntax is queueName#channelName@groupName or queueName@groupName,
 * as well as the groupName property in the subscribe options.
 *
 * Empty default client always exists, it can be overridden to make default some other driver
 *
 * To enable stats collection for a queue it must be enabled with config: **queue-redis-options-metrics=1**
 *
 * The class {@link module:queue.QueueClient} defines the methods that a driver may or may not implement.
 *
 * The url query parameters that start with **bk-** will be extracted from the url and placed in the class **options* object,
 * this is a way to pass special properties without using **cache-options**, the rest of the url parameters will be passed to the driver.
 *
 * @example
 * queue-default=redis://
 * queue-redis=redis://?bk-threshold=5&bk-visibilityTimeout=30000&bk-count=2
 * queue-account=sqs://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2
 * queue-messages=sqs://messages?bk-interval=60000
 * queue-config={ "account": "https://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2" }
 *
 */
module.exports = {
    name: "queue",

    args: [
        { name: "config", obj: "_config", type: "json", merge: 1, onupdate: "checkConfig", descr: 'An object with driver configs, an object with at least url or an url string', example: '-queue-config {"redis":{"url":redis://localhost","count":1},"nats":"nats://localhost:4222"}' },
        { name: "([a-z0-9_]+)-options$", obj: "_config.$1", type: "map", merge: 1, onupdate: "applyOptions", descr: "Additional parameters for drivers, specific to each implementation", example: "-queue-redis-options count:10,interval:100" },
        { name: "([a-z0-9_]+)-options-(.+)", obj: "_config.$1", make: "$2", camel: '-', autotype: 1, onupdate: "applyOptions", descr: "Additional parameters for drivers, specific to each implementation", example: "-queue-default-options-count 10" },
        { name: "([a-z0-9_]+)", obj: "_config.$1", make: "url", nocamel: 1, onupdate: "applyOptions", descr: "An URL that points to a server in the format `PROTO://HOST[:PORT]?PARAMS`, multiple clients can be defined with unique names, all params starting with `bk-` will be copied into the options without the prefix and removed from the url, the rest of params will be left in the url", example: "-queue-redis redis://localhost?bk-count=3&bk-ttl=3000" },
    ],

    _queue: [],
    _nameIndex: 0,

    /** @var {object} - queue modules by type */
    modules: {},

    /** @var {object} - queue live clients by name */
    clients: { default: new Client() },

    // Config params
    _config: {
        local: "local://",
        worker: "worker://",
    },
};

queue.applyOptions = function(val, options)
{
    if (!options.obj) return;
    logger.debug("applyOptions:", options.obj, options.name, "NEW:", options.context);
    var d = lib.split(options.obj, ".");
    var client = d[0] == "_config" && this.getClient(d[1]);
    if (client?.queueName != (d[1] || "default")) return;
    logger.debug("applyOptions:", client.queueName, options.obj, options.name, "OLD:", client.options);
    if (options.name == "url" && typeof val == "string") client.url = val;
    client.applyOptions(options.context);
}

/**
 * Reinitialize all clients for queue purposes, previous clients will be closed.
 * @memberof module:queue
 * @method initClients
 */
queue.initClients = function()
{
    for (const name in this._config) {
        if (!name) continue;
        var opts = this._config[name];
        if (typeof opts == "string") opts = { url: opts };
        var client = this.createClient(opts);
        if (!client) continue;

        try {
            if (this.clients[name]) {
                this.clients[name].close();
            }
        } catch (e) {
            logger.error("initClient:", queue.name, name, e.stack);
        }
        client.queueName = name;
        this.clients[name] = client;
    }
}

/**
 * Initialize missing or new clients, existing clients stay the same
 * @memberof module:queue
 * @method checkConfig
 */
queue.checkConfig = function()
{
    for (const name in this._config) {
        if (!name) continue;
        if (this.clients[name]) continue;

        var opts = this._config[name];
        if (typeof opts == "string") opts = { url: opts };
        var client = this.createClient(opts);
        if (!client) continue;

        client.queueName = name;
        this.clients[name] = client;
        logger.debug("checkConfig:", queue.name, name, client.name, "added");
    }
}

/**
 * Close all existing clients except empty local client
 * @memberof module:queue
 * @method shutdown
 */
queue.shutdown = function(options, callback)
{
    for (const name in this.clients) {
        this.clients[name].close();
        delete this.clients[name];
    }
    this.clients.default = new Client();
    lib.tryCall(callback);
}

/**
 * Return a new client for the given host or null if not supported
 * @param {object} options
 * @memberof module:queue
 * @method createClient
 */
queue.createClient = function(options)
{
    var client = null;
    try {
        var type = lib.split(options?.url, ":")[0];
        if (!type) return;

        var Mod = this.modules[type];
        if (!Mod) {
            Mod = this.modules[type] = require(__dirname + "/queue/" + type);
        }
        if (!Mod) return;
        client = new Mod(options);
        client.applyReservedOptions(options);
    } catch (e) {
        logger.error("createClient:", queue.name, options, e.stack);
    }
    return client;
}

/**
 * Return a queue client by name if specified in the options or use default client which always exists,
 * use queueName to specify a specific driver.
 * If it is an array it will rotate items sequentially.
 * @param {object|string|string[]} options
 * @param {string} [options.queueName] - queue to use
 * @returns {QueueClient}
 * @memberof module:queue
 * @method getClient
 */
queue.getClient = queue.getQueue = function(options)
{
    var client, name = Array.isArray(options) || typeof options == "string" ? options : options?.queueName;
    if (name) {
        if (Array.isArray(name)) {
            if (name.length > 1) {
                name = name[this._nameIndex++ % name.length];
                if (this._nameIndex >= Number.MAX_SAFE_INTEGER) this._nameIndex = 0;
            } else {
                name = name[0];
            }
        }
        if (typeof name == "string") {
            var h = name.indexOf("#");
            if (h > -1) name = name.substr(0, h);
        }
        client = this.clients[name];
    }
    return client || this.clients.default;
}

/**
 * Returns the queue statistics, the format depends on the queue type used
 * @param {object} [options]
 * @param {function} callback
 * @memberof module:queue
 * @method stats
 */
queue.stats = function(options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    logger.dev("queue.stats:", options);
    try {
        this.getClient(options).stats(options || {}, typeof callback == "function" ? callback : undefined);
    } catch (e) {
        logger.error('queue.stats:', e.stack);
        if (typeof callback == "function") callback(e);
    }
}

/**
 * Subscribe to receive messages from the given channel, the callback will be called only on new message received.
 * @param {string} channel
 * @param {object} options
 * @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
 * @param {function} [callback]
 * @example
 * queue.subscribe("alerts", (msg) => {
 *    req.res.json(data);
 * }, req);
 * @memberof module:queue
 * @method subscribe
 */
queue.subscribe = function(channel, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    logger.dev("queue.subscribe:", channel, options);
    try {
        this.getClient(options).subscribe(channel, options || {}, typeof callback == "function" ? callback : undefined);
    } catch (e) {
        logger.error('queue.subscribe:', channel, options, e.stack);
    }
}

/**
 * Close a subscription for the given channel, no more messages will be delivered.
 * @param {string} channel
 * @param {object} [options]
 * @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
 * @param {function} [callback]
 * @memberof module:queue
 * @method unsubscribe
 */
queue.unsubscribe = function(channel, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    logger.dev("queue.unsubscribe:", channel, options);
    try {
        this.getClient(options).unsubscribe(channel, options || {}, typeof callback == "function" ? callback : undefined);
    } catch (e) {
        logger.error('queue.unsubscribe:', channel, e.stack);
    }
}

/**
 * Publish an event to the channel to be delivered to all subscribers. If the msg is not a string it will be stringified.
 * The {@link module:events} uses this.
 * @param {string} channel
 * @param {string|object} msg
 * @param {object} [options]
 * @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
 * @param {function} [callback]
 * @memberof module:queue
 * @method publish
 * @example
 * queue.publish("events", { id, name, data }, { queueName: "logs" })
 */
queue.publish = function(channel, msg, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    logger.dev("queue.publish:", channel, options);
    try {
        if (typeof msg != "string") msg = lib.stringify(msg);
        const client = this.getClient(options);
        const _timer = client.metrics.start();
        client.publish(channel, msg, options || {}, (err, val) => {
            _timer.end();
            if (typeof callback == "function") callback(err, val);
        });
    } catch (e) {
        logger.error('queue.publish:', channel, e.stack);
        if (typeof callback == "function") callback(e);
    }
}

/**
 * Async version of {@link module:queue.publish}
 * @param {string} channel
 * @param {string|object} msg
 * @param {object} [options]
 * @example
 * const { err, data } = await queue.apublish("events", { alert, detail });
 * @memberOf module:queue
 * @method apublish
 * @async
 */
queue.apublish = function(channel, msg, options)
{
    return new Promise((resolve, reject) => {
        queue.publish(channel, msg, options, (err, data) => {
            resolve({ err, data });
        });
    });
}

/**
 * Listen for messages from the given queue, the callback will be called only on new message received.
 * The callback accepts 2 arguments, a message and optional next callback, if it is provided it must be called at the end to confirm or reject the message processing.
 * Only errors with code>=500 will result in rejection, not all drivers support the next callback if the underlying queue does not support message acknowledgement.
 *
 * Depending on the implementation, this can work as fan-out, delivering messages to all subscribed to the same channel or
 * can implement job queue model where only one subscriber receives a message.
 * For some cases like Redis this is the same as subscribe.
 *
 * For cases when the next callback is provided this means the queue implementation requires an acknowledgement of successful processing,
 * returning an error with err.status >= 500 will keep the message in the queue to be processed later. Special code 600 means to keep the job
 * in the queue and report as warning in the log.
 * @param {object} [options]
 * @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
 * @param {function} callback
 * @example
 * queue.listen({ queueName: "jobs" }, (msg, next) => {
 *   req.res.json(data);
 *   if (next) next();
 *  }, req);
 * @memberof module:queue
 * @method listen
 */
queue.listen = function(options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    logger.dev("queue.listen:", options);
    try {
        this.getClient(options).listen(options || {}, typeof callback == "function" ? callback : undefined);
    } catch (e) {
        logger.error('queue.listen:', options, e.stack);
    }
}

/**
 * Stop listening for message, if no callback is provided all listeners for the key will be unsubscribed, otherwise only the specified listener.
 *
 * The callback will not be called.
 *
 * It keeps a count how many subscribe/unsubscribe calls been made and stops any internal listeners once nobody is
 * subscribed. This is specific to a queue which relies on polling.
 * @param {object} [options]
 * @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
 * @param {function} [callback]
 * @memberof module:queue
 * @method unlisten
 */
queue.unlisten = function(options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    logger.dev("queue.unlisten:", options);
    try {
        this.getClient(options).unlisten(options || {}, typeof callback == "function" ? callback : undefined);
    } catch (e) {
        logger.error('queue.unlisten:', options, e.stack);
    }
}

/**
 * Submit a message to the queue, msg can be a string or an object or an array of objects if the queue supports it.
 * This is for jobs processing, at least one delivery is expected, the {@link module:jobs} uses this.
 * @param {object|string} msg
 * @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
 * @param {int} [options.stime] - defines when the message should be processed, it will be held in the queue until the time comes
 * @param {int} [options.etime] defines when the message expires, i.e. will be dropped if not executed before this time.
 * @param {function} [callback]
 * @memberof module:queue
 * @method submit
 * @example
 * queue.submit({ alert, detail }, { queueName: "sqs" })
 */
queue.submit = function(msg, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    logger.dev("queue.submit:", options);
    try {
        const client = this.getClient(options);
        const _timer = client.metrics.start();
        client.submit(msg, options || {}, (err, val) => {
            _timer.end();
            if (typeof callback == "function") callback(err, val);
        });
    } catch (e) {
        logger.error('queue.submit:', e.stack);
        if (typeof callback == "function") callback(e);
    }
}

/**
 * Async version of {@link module:queue.submit}
 * @param {string} msg
 * @param {object} [options]
 * @example
 * const { err, data } = await queue.asubmit({ alert, detail });
 * @memberOf module:queue
 * @method asubmit
 * @async
 */
queue.asubmit = function(msg, options)
{
    return new Promise((resolve, reject) => {
        queue.submit(msg, options, (err, data) => {
            resolve({ err, data });
        });
    });
}

/**
 * Queue specific monitor services that must be run in the master process, this is intended to perform
 * queue cleanup or dealing with stuck messages (Redis)
 * @param {object} [options]
 * @memberof module:queue
 * @method monitor
 */
queue.monitor = function(options)
{
    logger.dev("queue.monitor:", options);
    try {
        this.getClient(options).monitor(options || {});
    } catch (e) {
        logger.error('queue.monitor:', e.stack);
    }
}

/**
 * Queue specific message deletion from the queue in case of abnormal shutdown or job running too long in order not to re-run it after the restart, this
 * is for queues which require manual message deletion ofter execution(SQS).
 * Each queue client must maintain the mapping or other means to identify messages,
 * the options is the message passed to the listener
 * @param {string|object} msg
 * @param {object} [options]
 * @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
 * @param {function} [callback]
 * @memberof module:queue
 * @method drop
 */
queue.drop = function(msg, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    logger.dev("queue.drop:", msg, options);
    try {
        this.getClient(options).drop(msg, options || {}, callback);
    } catch (e) {
        logger.error('queue.drop:', e.stack);
        if (typeof callback == "function") callback(e);
    }
}

queue.configureCollectStats = function(options)
{
    for (let q in this.clients) {
        const cl = this.clients[q];
        if (!cl.options?.metrics) continue;
        const m = cl.metrics.toJSON({ reset: 1 });
        q = cl.queueName;
        if (!m.meter?.count) continue;
        options.stats["queue_" + q + "_req_count"] = m.meter.count;
        options.stats["queue_" + q + "_req_rate"] = m.meter.rate;
        options.stats["queue_" + q + "_res_time"] = m.histogram.med;
    }
}