/*
* Author: Vlad Seryakov vseryakov@gmail.com
* backendjs 2018
*/
const logger = require(__dirname + '/logger');
const lib = require(__dirname + '/lib');
const Client = require(__dirname + "/queue/client");
/**
* @module queue
*/
const queue =
/**
* Queue module for jobs, events and subscriptions.
*
* All methods use **options.queueName** for non-default queue.
*
* If it is an array then a client will be picked sequentially by maintaining internal sequence number.
*
* To specify a channel within a queue use the format **queueName#channelName**,
* for drivers that support multiple channels like NATS/Redis the channel will be used for another subscription within the same connection.
*
* For drivers (NATS) that support multiple consumers the full queue syntax is queueName#channelName@groupName or queueName@groupName,
* as well as the groupName property in the subscribe options.
*
* Empty default client always exists, it can be overridden to make default some other driver
*
* To enable stats collection for a queue it must be enabled with config: **queue-redis-options-metrics=1**
*
* The class {@link module:queue.QueueClient} defines the methods that a driver may or may not implement.
*
* The url query parameters that start with **bk-** will be extracted from the url and placed in the class **options* object,
* this is a way to pass special properties without using **cache-options**, the rest of the url parameters will be passed to the driver.
*
* @example
* queue-default=redis://
* queue-redis=redis://?bk-threshold=5&bk-visibilityTimeout=30000&bk-count=2
* queue-account=sqs://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2
* queue-messages=sqs://messages?bk-interval=60000
* queue-config={ "account": "https://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2" }
*
*/
module.exports = {
name: "queue",
args: [
{ name: "config", obj: "_config", type: "json", merge: 1, onupdate: "checkConfig", descr: 'An object with driver configs, an object with at least url or an url string', example: '-queue-config {"redis":{"url":redis://localhost","count":1},"nats":"nats://localhost:4222"}' },
{ name: "([a-z0-9_]+)-options$", obj: "_config.$1", type: "map", merge: 1, onupdate: "applyOptions", descr: "Additional parameters for drivers, specific to each implementation", example: "-queue-redis-options count:10,interval:100" },
{ name: "([a-z0-9_]+)-options-(.+)", obj: "_config.$1", make: "$2", camel: '-', autotype: 1, onupdate: "applyOptions", descr: "Additional parameters for drivers, specific to each implementation", example: "-queue-default-options-count 10" },
{ name: "([a-z0-9_]+)", obj: "_config.$1", make: "url", nocamel: 1, onupdate: "applyOptions", descr: "An URL that points to a server in the format `PROTO://HOST[:PORT]?PARAMS`, multiple clients can be defined with unique names, all params starting with `bk-` will be copied into the options without the prefix and removed from the url, the rest of params will be left in the url", example: "-queue-redis redis://localhost?bk-count=3&bk-ttl=3000" },
],
_queue: [],
_nameIndex: 0,
/** @var {object} - queue modules by type */
modules: {},
/** @var {object} - queue live clients by name */
clients: { default: new Client() },
// Config params
_config: {
local: "local://",
worker: "worker://",
},
};
queue.applyOptions = function(val, options)
{
if (!options.obj) return;
logger.debug("applyOptions:", options.obj, options.name, "NEW:", options.context);
var d = lib.split(options.obj, ".");
var client = d[0] == "_config" && this.getClient(d[1]);
if (client?.queueName != (d[1] || "default")) return;
logger.debug("applyOptions:", client.queueName, options.obj, options.name, "OLD:", client.options);
if (options.name == "url" && typeof val == "string") client.url = val;
client.applyOptions(options.context);
}
/**
* Reinitialize all clients for queue purposes, previous clients will be closed.
* @memberof module:queue
* @method initClients
*/
queue.initClients = function()
{
for (const name in this._config) {
if (!name) continue;
var opts = this._config[name];
if (typeof opts == "string") opts = { url: opts };
var client = this.createClient(opts);
if (!client) continue;
try {
if (this.clients[name]) {
this.clients[name].close();
}
} catch (e) {
logger.error("initClient:", queue.name, name, e.stack);
}
client.queueName = name;
this.clients[name] = client;
}
}
/**
* Initialize missing or new clients, existing clients stay the same
* @memberof module:queue
* @method checkConfig
*/
queue.checkConfig = function()
{
for (const name in this._config) {
if (!name) continue;
if (this.clients[name]) continue;
var opts = this._config[name];
if (typeof opts == "string") opts = { url: opts };
var client = this.createClient(opts);
if (!client) continue;
client.queueName = name;
this.clients[name] = client;
logger.debug("checkConfig:", queue.name, name, client.name, "added");
}
}
/**
* Close all existing clients except empty local client
* @memberof module:queue
* @method shutdown
*/
queue.shutdown = function(options, callback)
{
for (const name in this.clients) {
this.clients[name].close();
delete this.clients[name];
}
this.clients.default = new Client();
lib.tryCall(callback);
}
/**
* Return a new client for the given host or null if not supported
* @param {object} options
* @memberof module:queue
* @method createClient
*/
queue.createClient = function(options)
{
var client = null;
try {
var type = lib.split(options?.url, ":")[0];
if (!type) return;
var Mod = this.modules[type];
if (!Mod) {
Mod = this.modules[type] = require(__dirname + "/queue/" + type);
}
if (!Mod) return;
client = new Mod(options);
client.applyReservedOptions(options);
} catch (e) {
logger.error("createClient:", queue.name, options, e.stack);
}
return client;
}
/**
* Return a queue client by name if specified in the options or use default client which always exists,
* use queueName to specify a specific driver.
* If it is an array it will rotate items sequentially.
* @param {object|string|string[]} options
* @param {string} [options.queueName] - queue to use
* @returns {QueueClient}
* @memberof module:queue
* @method getClient
*/
queue.getClient = queue.getQueue = function(options)
{
var client, name = Array.isArray(options) || typeof options == "string" ? options : options?.queueName;
if (name) {
if (Array.isArray(name)) {
if (name.length > 1) {
name = name[this._nameIndex++ % name.length];
if (this._nameIndex >= Number.MAX_SAFE_INTEGER) this._nameIndex = 0;
} else {
name = name[0];
}
}
if (typeof name == "string") {
var h = name.indexOf("#");
if (h > -1) name = name.substr(0, h);
}
client = this.clients[name];
}
return client || this.clients.default;
}
/**
* Returns the queue statistics, the format depends on the queue type used
* @param {object} [options]
* @param {function} callback
* @memberof module:queue
* @method stats
*/
queue.stats = function(options, callback)
{
if (typeof options == "function") callback = options, options = null;
logger.dev("queue.stats:", options);
try {
this.getClient(options).stats(options || {}, typeof callback == "function" ? callback : undefined);
} catch (e) {
logger.error('queue.stats:', e.stack);
if (typeof callback == "function") callback(e);
}
}
/**
* Subscribe to receive messages from the given channel, the callback will be called only on new message received.
* @param {string} channel
* @param {object} options
* @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
* @param {function} [callback]
* @example
* queue.subscribe("alerts", (msg) => {
* req.res.json(data);
* }, req);
* @memberof module:queue
* @method subscribe
*/
queue.subscribe = function(channel, options, callback)
{
if (typeof options == "function") callback = options, options = null;
logger.dev("queue.subscribe:", channel, options);
try {
this.getClient(options).subscribe(channel, options || {}, typeof callback == "function" ? callback : undefined);
} catch (e) {
logger.error('queue.subscribe:', channel, options, e.stack);
}
}
/**
* Close a subscription for the given channel, no more messages will be delivered.
* @param {string} channel
* @param {object} [options]
* @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
* @param {function} [callback]
* @memberof module:queue
* @method unsubscribe
*/
queue.unsubscribe = function(channel, options, callback)
{
if (typeof options == "function") callback = options, options = null;
logger.dev("queue.unsubscribe:", channel, options);
try {
this.getClient(options).unsubscribe(channel, options || {}, typeof callback == "function" ? callback : undefined);
} catch (e) {
logger.error('queue.unsubscribe:', channel, e.stack);
}
}
/**
* Publish an event to the channel to be delivered to all subscribers. If the msg is not a string it will be stringified.
* The {@link module:events} uses this.
* @param {string} channel
* @param {string|object} msg
* @param {object} [options]
* @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
* @param {function} [callback]
* @memberof module:queue
* @method publish
* @example
* queue.publish("events", { id, name, data }, { queueName: "logs" })
*/
queue.publish = function(channel, msg, options, callback)
{
if (typeof options == "function") callback = options, options = null;
logger.dev("queue.publish:", channel, options);
try {
if (typeof msg != "string") msg = lib.stringify(msg);
const client = this.getClient(options);
const _timer = client.metrics.start();
client.publish(channel, msg, options || {}, (err, val) => {
_timer.end();
if (typeof callback == "function") callback(err, val);
});
} catch (e) {
logger.error('queue.publish:', channel, e.stack);
if (typeof callback == "function") callback(e);
}
}
/**
* Async version of {@link module:queue.publish}
* @param {string} channel
* @param {string|object} msg
* @param {object} [options]
* @example
* const { err, data } = await queue.apublish("events", { alert, detail });
* @memberOf module:queue
* @method apublish
* @async
*/
queue.apublish = function(channel, msg, options)
{
return new Promise((resolve, reject) => {
queue.publish(channel, msg, options, (err, data) => {
resolve({ err, data });
});
});
}
/**
* Listen for messages from the given queue, the callback will be called only on new message received.
* The callback accepts 2 arguments, a message and optional next callback, if it is provided it must be called at the end to confirm or reject the message processing.
* Only errors with code>=500 will result in rejection, not all drivers support the next callback if the underlying queue does not support message acknowledgement.
*
* Depending on the implementation, this can work as fan-out, delivering messages to all subscribed to the same channel or
* can implement job queue model where only one subscriber receives a message.
* For some cases like Redis this is the same as subscribe.
*
* For cases when the next callback is provided this means the queue implementation requires an acknowledgement of successful processing,
* returning an error with err.status >= 500 will keep the message in the queue to be processed later. Special code 600 means to keep the job
* in the queue and report as warning in the log.
* @param {object} [options]
* @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
* @param {function} callback
* @example
* queue.listen({ queueName: "jobs" }, (msg, next) => {
* req.res.json(data);
* if (next) next();
* }, req);
* @memberof module:queue
* @method listen
*/
queue.listen = function(options, callback)
{
if (typeof options == "function") callback = options, options = null;
logger.dev("queue.listen:", options);
try {
this.getClient(options).listen(options || {}, typeof callback == "function" ? callback : undefined);
} catch (e) {
logger.error('queue.listen:', options, e.stack);
}
}
/**
* Stop listening for message, if no callback is provided all listeners for the key will be unsubscribed, otherwise only the specified listener.
*
* The callback will not be called.
*
* It keeps a count how many subscribe/unsubscribe calls been made and stops any internal listeners once nobody is
* subscribed. This is specific to a queue which relies on polling.
* @param {object} [options]
* @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
* @param {function} [callback]
* @memberof module:queue
* @method unlisten
*/
queue.unlisten = function(options, callback)
{
if (typeof options == "function") callback = options, options = null;
logger.dev("queue.unlisten:", options);
try {
this.getClient(options).unlisten(options || {}, typeof callback == "function" ? callback : undefined);
} catch (e) {
logger.error('queue.unlisten:', options, e.stack);
}
}
/**
* Submit a message to the queue, msg can be a string or an object or an array of objects if the queue supports it.
* This is for jobs processing, at least one delivery is expected, the {@link module:jobs} uses this.
* @param {object|string} msg
* @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
* @param {int} [options.stime] - defines when the message should be processed, it will be held in the queue until the time comes
* @param {int} [options.etime] defines when the message expires, i.e. will be dropped if not executed before this time.
* @param {function} [callback]
* @memberof module:queue
* @method submit
* @example
* queue.submit({ alert, detail }, { queueName: "sqs" })
*/
queue.submit = function(msg, options, callback)
{
if (typeof options == "function") callback = options, options = null;
logger.dev("queue.submit:", options);
try {
const client = this.getClient(options);
const _timer = client.metrics.start();
client.submit(msg, options || {}, (err, val) => {
_timer.end();
if (typeof callback == "function") callback(err, val);
});
} catch (e) {
logger.error('queue.submit:', e.stack);
if (typeof callback == "function") callback(e);
}
}
/**
* Async version of {@link module:queue.submit}
* @param {string} msg
* @param {object} [options]
* @example
* const { err, data } = await queue.asubmit({ alert, detail });
* @memberOf module:queue
* @method asubmit
* @async
*/
queue.asubmit = function(msg, options)
{
return new Promise((resolve, reject) => {
queue.submit(msg, options, (err, data) => {
resolve({ err, data });
});
});
}
/**
* Queue specific monitor services that must be run in the master process, this is intended to perform
* queue cleanup or dealing with stuck messages (Redis)
* @param {object} [options]
* @memberof module:queue
* @method monitor
*/
queue.monitor = function(options)
{
logger.dev("queue.monitor:", options);
try {
this.getClient(options).monitor(options || {});
} catch (e) {
logger.error('queue.monitor:', e.stack);
}
}
/**
* Queue specific message deletion from the queue in case of abnormal shutdown or job running too long in order not to re-run it after the restart, this
* is for queues which require manual message deletion ofter execution(SQS).
* Each queue client must maintain the mapping or other means to identify messages,
* the options is the message passed to the listener
* @param {string|object} msg
* @param {object} [options]
* @param {string} [options.queueName] - defines the queue, if not specified then it is sent to the default queue
* @param {function} [callback]
* @memberof module:queue
* @method drop
*/
queue.drop = function(msg, options, callback)
{
if (typeof options == "function") callback = options, options = null;
logger.dev("queue.drop:", msg, options);
try {
this.getClient(options).drop(msg, options || {}, callback);
} catch (e) {
logger.error('queue.drop:', e.stack);
if (typeof callback == "function") callback(e);
}
}
queue.configureCollectStats = function(options)
{
for (let q in this.clients) {
const cl = this.clients[q];
if (!cl.options?.metrics) continue;
const m = cl.metrics.toJSON({ reset: 1 });
q = cl.queueName;
if (!m.meter?.count) continue;
options.stats["queue_" + q + "_req_count"] = m.meter.count;
options.stats["queue_" + q + "_req_rate"] = m.meter.rate;
options.stats["queue_" + q + "_res_time"] = m.histogram.med;
}
}