/*
* Author: Vlad Seryakov vseryakov@gmail.com
* backendjs 2021
*/
const modules = require(__dirname + '/modules');
const logger = require(__dirname + '/logger');
const app = require(__dirname + '/app');
const lib = require(__dirname + '/lib');
const ipc = require(__dirname + '/ipc');
const queue = require(__dirname + '/queue');
const jobs = require(__dirname + '/jobs');
/**
* @module events
*/
const events = {
name: "events",
args: [
{ name: "worker-queue-(.+)", obj: "worker-queue", type: "list", onupdate: function() { if (ipc.role=="worker"&&app.role=="worker") this.subscribeWorker()}, descr: "Queues to subscribe for workers, same queues can be used at the same time with different functions and channels and consumers, event queue format is `queue#channel@consumer`", example: "-events-worker-queue-ticket ticket.processEvents, -events-worker-queue-ticket#inbox@staff ticket.processInboxEvents, -events-worker-queue-ticket@staff ticket.processStaffEvents" },
{ name: "worker-options-(.+)", obj: "workerOptions", make: "$1", type: "map", descr: "Custom parameters by queue name, passed to `queue.listen` on worker start, useful with channels", example: "-events-worker-options-ticket count:3,raw:1" },
{ name: "worker-delay", type: "int", descr: "Delay in milliseconds for a worker before it will start accepting jobs, for cases when other dependencies may take some time to start" },
{ name: "max-runtime", type: "int", min: 0, multiplier: 1000, descr: "Max number of seconds an event processing can run before being killed" },
{ name: "routing", obj: "routing", type: "map", merge: 1, maptype: "regexp", descr: "Routing map by event subject or type", example: "-events-routing redis:local.+,nats:.+,sqs:billing.+" },
{ name: "routing-options-(.+)", obj: "routingOptions", make: "$1", type: "map", merge: 1, descr: "Routing options by queue name, used by `putEvent` to merge with passed queue options", example: "-events-routing-options-sqs groupKey:id" },
{ name: "shutdown-timeout", type: "int", min: 500, descr: "Max number of milliseconds to wait for the graceful shutdown sequence to finish, after this timeout the process just exits" },
],
subscribed: new Set(),
running: new Set(),
runTime: 0,
maxRuntime: 60,
checkRuntime: 0,
shutdownTimeout: 50,
workerDelay: 0,
workerQueue: {},
workerOptions: {},
routing: {},
routingOptions: {},
};
/**
* Event queue processor
*
* This module implement simple event publishing and processing logic, useful for logging events for post-processing
* by backendjs workers or by other systems using shared queues.
*
* All events will have the same structure:
* ```js
* {
* subject: "string", // event subject
* data: { ... }, // event payload, must be an object
* id: "string", // auto-generated: lib.uuid()
* time: bigint, // auto-generated: lib.clock()
* origin: "string", // auto-generated: app.origin()
* }
* ```
*
* Features support:
* - publishing and processing: SQS, NATS
* - publishing only: EventBridge, SNS
*
* If any of `events-worker-queue-XXX` parameters are defined then workers subscribe to configured event queues and listen for events.
*
* Each event queue can run multiple functions idependently but will ack/nack for all functions so to deal with replay dups it is advised to
* split between multiple consumers using the syntax: `queue#channel@consumer`
*
* Multiple event queues can be defined and processed at the same time.
*
* An event processing function takes 2 arguments, an event and callback to call on finish
*
* @example <caption>Configured below in bkjs.conf: NATS server for events, routing by prefix USER-EVENT: and event processor for user events</caption>
*
* queue-events = nats://localhost:4222
*
* events-routing-events#user = ^USER-EVENT:
*
* events-worker-queue-events#user = mymod.syncUserEvents
*
*
* @example <caption>
* The module below logs all user events in the queue and defines an event processor function to sync such events with external service:
* - /user/... endpoints for managing users
* - mymod.syncUserEvents is an event processor function which is run by a worker process, it can run on a different host
* </caption>
*
* const { app, api, lib, events } = require("backendjs");
*
* module.exports = {
* name: "mymod",
*
* configureWeb(options, callback)
* {
* api.app.post(/^\/user\/(login|update|view)/, this.handleUsers);
*
* callback();
* }
*
* handleUsers(req, res) {
*
* ... endpoint processing logic, assume req.user contains currently logged in user ...
*
* const event = {
* type: req.params[0],
* id: req.user.id,
* name: req.user.name,
* access_time: Date.now()
* }
* events.putEvent("USER-EVENT:" + req.params[0].toUpperCase(), event);
* }
*
* syncUserEvents(event, callback)
* {
* // .. syncing last access time with external database
*
* lib.fetch("https://some.api.com", { method: "POST", postdata: event }, callback)
* }
* }
*
* app.start({ server: true });
*
* @example <caption>Start the server</caption>
*
* node mymod.js -jobs-workers 1
*
*/
module.exports = events;
events.configureWorker = function(options, callback)
{
if (!app.isOk("events", options)) return callback();
this.initWorker(options, callback);
}
// Perform graceful worker shutdown, to be used for workers restart
events.shutdownWorker = function(options, callback)
{
logger.log("shutdownWorker:", events.name, "queue:", this.subscribed, "max-runtime:", this.maxRuntime, "max-lifetime:", jobs.maxLifetime, options);
clearInterval(events._checkTimer);
// Stop accepting messages from the queues
for (const q in this.workerQueue) queue.unlisten({ queueName: q });
setTimeout(callback, options?.shutdownTimeout || this.shutdownTimeout);
}
/**
* Check how long we run a job and force kill if exceeded, check if total life time is exceeded.
*
* If exit is required the `shundownWorker` methods will receive options with `shutdownReason` property
* set and the name-sake property will contained the value exceeded.
*/
events.checkTimes = function()
{
var now = Date.now();
if (!this.running.size || !events.maxRuntime > 0) return;
var bad = [];
for (const e of this.running) if (now - e.time > events.maxRuntime) bad.push(e);
// Stuck jobs can run much longer if we are still processing other small jobs
if (bad.length) {
if (this.running.size == bad.length) {
logger.warn('checkLifetime:', 'events: exceeded max run time', events.maxRuntime, bad);
return jobs.exitWorker({ shutdownReason: "maxRuntime", maxRuntime: events.maxRuntime });
} else
if (now - this.checkRuntime > events.maxRuntime) {
logger.warn('checkLifetime:', 'events: exceeded max run time but other jobs still running', events.maxRuntime, bad);
}
this.checkRuntime = Date.now();
}
}
events.initWorker = function(options, callback)
{
ipc.initWorker(options);
events._checkTimer = setInterval(this.checkTimes.bind(this), 30000);
// Randomize subscription when multiple workers start at the same time, some queue drivers use polling
setTimeout(() => {
events.subscribeWorker();
logger.log("initWorker:", events.name, "started", "queue:", events.subscribed, "maxRuntime:", events.maxRuntime, "maxLifetime:", jobs.maxLifetime);
}, events.workerDelay);
if (typeof callback == "function") callback();
}
events.subscribeWorker = function()
{
for (const name in this.workerQueue) {
if (/^[!-]/.test(name)) {
this.unsubscribeQueue(name.substr(1));
continue;
}
// Prevent subscription more than once to the same queue in case of invalid or nonexistent queues
var q = queue.getQueue(name);
if (this.subscribed.has(q.canonical(name))) continue;
var procs = [];
for (const proc of this.workerQueue[name]) {
var parts = proc.split('.');
var path = parts.slice(0, -1).join(".");
var method = parts.at(-1);
var context = modules[path];
if (!context || typeof context[method] != "function") {
logger.error("subscribeWorker:", this.name, q.name, name, "invalid event proc:", proc);
continue;
}
procs.push(context[method].bind(context));
}
if (!procs.length) continue;
var qopts = lib.objExtend({ queueName: name }, this.workerOptions[name]);
queue.listen(qopts, this.processEvent.bind(this, name, procs));
this.subscribed.add(q.canonical(name));
logger.info("subscribeWorker:", this.name, q.name, name, this.workerQueue[name]);
}
}
events.unsubscribeQueue = function(name)
{
const q = queue.getClient(name);
if (!this.subscribed.delete(q.canonical(name))) return;
queue.unlisten({ queueName: name });
logger.info("unsubscribeQueue:", this.name, q.name, name);
}
events.processEvent = function(name, procs, event, callback)
{
var evt = { name, procs, event, time: Date.now() };
this.running.add(evt);
this.runTime = Date.now();
logger.debug("processEvent:", events.name, evt);
lib.forEvery(procs, (proc, next) => {
try {
proc(event, (err) => {
if (err) logger.error("processEvent:", events.name, err, evt);
this.runTime = Date.now();
next();
});
} catch (err) {
logger.error("processEvent:", events.name, err, evt);
next();
}
}, () => {
this.running.delete(evt);
this.runTime = Date.now();
if (typeof callback == "function") callback();
});
}
/**
* Place an event into a queue by subject and type
* @param {string} subject - event subject, topic, ID, ...
* @param {object} data - an object to be placed as the `data` property
* @param {object} [options] - queue specific properties
* @returns {int} - number of queues the event was sent to, -1 on error
* @memberof module:events
* @method putEvent
* @example
* events.putEvent("USER-LOGIN", { id: ..., name: ... })
* events.putEvent("ORDER-SHIPPED", { id: ... })
* events.putEvent("social.post.like", { id: ..., liked: ... })
*/
events.putEvent = function(subject, data, options)
{
var msg, n = 0;
if (typeof subject != "string" || typeof data != "object") {
logger.error("putEvent:", "invalid", subject, data);
return -1;
}
for (const q in this.routing) {
if (this.routing[q].test(subject)) {
if (!msg) {
msg = Object.assign({}, {
subject,
data,
id: lib.uuid(),
time: lib.clock(),
origin: app.origin(),
});
}
options = Object.assign({}, this.routingOptions[q], options, { queueName: q.queueName });
queue.submit(msg, options, (err) => {
logger.logger(err ? "error" : "debug", "putEvent:", q.queueName, err, msg);
});
n++;
}
}
return n;
}