events.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2021
 */

const modules = require(__dirname + '/modules');
const logger = require(__dirname + '/logger');
const app = require(__dirname + '/app');
const lib = require(__dirname + '/lib');
const ipc = require(__dirname + '/ipc');
const queue = require(__dirname + '/queue');
const jobs = require(__dirname + '/jobs');

/**
 * @module events
 */

const events = {
    name: "events",
    args: [
        { name: "cap-(.+)", type: "int", strip: "cap-", descr: "Capability parameters" },
        { name: "worker-queue", obj: "worker-queue", type: "map", merge: 1, maptype: "list", onupdate: function() { if (ipc.role=="worker"&&app.role=="worker") this.subscribeWorker()}, descr: "Queues to subscribe for workers, same queues can be used at the same time with different functions and channels and consumers, event queue format is `queue.subject#group`", example: "events-worker-queue = ticket:ticket.processEvents, ticket.inbox#staff: ticket.processInboxEvents, ticket#staff: ticket.processStaffEvents" },
        { name: "worker-options-(.+)", obj: "workerOptions", make: "$1", type: "map", descr: "Custom parameters by queue name, passed to `queue.listen` on worker start, useful with channels", example: "-events-worker-options-ticket count:3,raw:1" },
        { name: "worker-delay", type: "int", descr: "Delay in milliseconds for a worker before it will start accepting jobs, for cases when other dependencies may take some time to start" },
        { name: "max-runtime", type: "int", min: 0, multiplier: 1000, descr: "Max number of seconds an event processing can run before being killed" },
        { name: "routing", obj: "routing", type: "map", merge: 1, maptype: "regexp", descr: "Routing map by event subject or type", example: "-events-routing redis:local.+, nats:.+, sqs:billing.+" },
        { name: "routing-options-(.+)", obj: "routingOptions", make: "$1", type: "map", merge: 1, descr: "Routing options by queue name, used by `putEvent` to merge with passed queue options", example: "-events-routing-options-nats groupName:group" },
        { name: "shutdown-timeout", type: "int", min: 500, descr: "Max number of milliseconds to wait for the graceful shutdown sequence to finish, after this timeout the process just exits" },
    ],
    subscribed: new Set(),
    running: new Set(),
    runTime: 0,
    maxRuntime: 60000,
    checkRuntime: 0,
    shutdownTimeout: 50,
    workerDelay: 50,
    workerQueue: {},
    workerOptions: {},
    routing: {},
    routingOptions: {},
};

/**
 * Event queue processor
 *
 * This module implement simple event publishing and processing logic, useful for logging events for post-processing
 * by backendjs workers or by other systems using shared queues.
 *
 * All events will have the same structure:
 * ```js
 * {
 *    subject: "string",        // event subject
 *    data: { ... },            // event payload, must be an object
 *    id: "string",             // unique event id, auto-generated: lib.uuid()
 *    time: bigint,             // auto-generated: lib.clock()
 *    origin: "string",         // auto-generated: app.origin()
 *    sent: "string",           // sent queue name as queueName[@subject][#groupName]
 *    received: "string",       // revceived queue name as queueName[@subject][#groupName]
 *    seq: "int",               // sequence per queue: 1...N
 * }
 * ```
 *
 * Features support:
 * - publishing and processing: SQS, NATS
 * - publishing only: EventBridge, SNS
 *
 * If any of `events-worker-queue-XXX` parameters are defined then workers subscribe to configured event queues and listen for events.
 *
 *  Drivers like NATS support multiple consumers in the same queue using subject/group syntax:
 *
 * - `queueName.subject`
 * - `queueName#groupName`
 * - `queueName.subject#groupName`
 *
 * The `options.groupName` property can be used as a group as well when passsed to listen.
 *
 * Multiple event queues can be defined and processed at the same time.
 *
 * An event processing function takes 2 arguments, an event and callback to call on finish
 *
 * @example <caption>Create a stream</caption>
 *
 * nats stream add --subjects 'events,events@*' --defaults events
 *
 * @example <caption>Configured below in bkjs.conf: NATS server for events, routing by prefixes COMPANY-EVENT:
 * or USER-EVENT: and event processor for corresponding events</caption>
 *
 * queue-events = nats://
 *
 * events-routing = events@user:^EVENT.USER
 * events-worker-queue = events@user: mymod.syncUserEvents, events@user#log: mymod.logUserEvents
 *
 * events-routing = events@company:^EVENT.COMPANY
 * events-worker-queue = events@company: mymod.syncCompanyEvents
 *
 * @example <caption>
 * The module below logs all user events in the queue and defines an event processor function to sync such events with external service:
 *  - /user/... endpoints for managing users
 *  - mymod.syncUserEvents is an event processor function which is run by a worker process, it can run on a different host
 * </caption>
 *
 * const { app, api, lib, events } = require("backendjs");
 *
 * module.exports = {
 *     name: "mymod",
 *
 *     configureWeb(options, callback)
 *     {
 *         api.app.post(/^\/user\/(login|update|view)/, this.handleUsers);
 *
 *         callback();
 *     }
 *
 *     handleUsers(req, res) {
 *
 *        ... endpoint processing logic, assume req.user contains currently logged in user ...
 *
 *         const event = {
 *             type: req.params[0],
 *             id: req.user.id,
 *             name: req.user.name,
 *             access_time: Date.now()
 *         }
 *         events.putEvent("EVENT.USER." + req.params[0].toUpperCase(), event);
 *     }
 *
 *     syncUserEvents(event, callback)
 *     {
 *        ...
 *     }
 *
 *     logUserEvents(event, callback)
 *     {
 *         ...
 *     }
 * }
 *
 * app.start({ server: true });
 *
 * @example <caption>Start the server</caption>
 *
 * node mymod.js -jobs-workers 1
 *
 */

module.exports = events;

events.configureWorker = function(options, callback)
{
    if (!app.isOk("events", options)) return callback();
    this.initWorker(options, callback);
}

jobs.shutdown = function(options, callback)
{
    clearInterval(events._checkTimer);
    lib.tryCall(callback);
}

// Perform graceful worker shutdown, to be used for workers restart
events.shutdownWorker = function(options, callback)
{
    logger.log("shutdownWorker:", events.name, "queue:", this.subscribed, "max-runtime:", this.maxRuntime, "max-lifetime:", jobs.maxLifetime, options);

    // Stop accepting messages from the queues
    for (const q in this.workerQueue) queue.unlisten({ queueName: q });

    setTimeout(callback, options?.shutdownTimeout || this.shutdownTimeout);
}

/**
 * Check how long we run a job and force kill if exceeded, check if total life time is exceeded.
 *
 * If exit is required the `shundownWorker` methods will receive options with `shutdownReason` property
 * set and the name-sake property will contained the value exceeded.
 */
events.checkTimes = function()
{
    if (!this.running.size || !events.maxRuntime > 0) return;

    const now = Date.now(), bad = [];
    for (const e of this.running) if (now - e.time > events.maxRuntime) bad.push(e);

    // Stuck jobs can run much longer if we are still processing other small jobs
    if (bad.length) {
        if (this.running.size == bad.length) {
            logger.warn('checkLifetime:', 'events: exceeded max run time', events.maxRuntime, bad);
            return jobs.exitWorker({ shutdownReason: "maxRuntime", maxRuntime: events.maxRuntime });
        } else
        if (now - this.checkRuntime > events.maxRuntime) {
            logger.warn('checkLifetime:', 'events: exceeded max run time but other jobs still running', events.maxRuntime, bad);
        }
        this.checkRuntime = Date.now();
    }
}

events.initWorker = function(options, callback)
{
    ipc.initWorker(options);

    events._checkTimer = setInterval(this.checkTimes.bind(this), 30000);

    // Randomize subscription when multiple workers start at the same time, some queue drivers use polling
    setTimeout(() => {
        events.subscribeWorker();
        logger.log("initWorker:", events.name, "started", "queue:", events.subscribed, "maxRuntime:", events.maxRuntime, "maxLifetime:", jobs.maxLifetime);
    }, events.workerDelay);

    if (typeof callback == "function") callback();
}

events.subscribeWorker = function()
{
    for (const name in this.workerQueue) {
        if (/^[!-]/.test(name)) {
            this.unsubscribeQueue(name.substr(1));
            continue;
        }

        // Prevent subscription more than once to the same queue in case of invalid or nonexistent queues
        const q = queue.getQueue(name);
        const sub = q.subscription(name);
        if (this.subscribed.has(sub)) continue;

        const procs = [];
        for (const proc of this.workerQueue[name]) {
            const parts = proc.split('.');
            const path = parts.slice(0, -1).join(".");
            const method = parts.at(-1);
            const context = modules[path];

            if (!context || typeof context[method] != "function") {
                logger.error("subscribeWorker:", this.name, q.name, name, "invalid event proc:", proc);
                continue;
            }
            procs.push(context[method].bind(context));
        }
        if (!procs.length) continue;

        const qopts = Object.assign({ queueName: name }, this.workerOptions[name]);
        queue.listen(qopts, this.processEvent.bind(this, sub, procs));
        this.subscribed.add(sub);
        logger.info("subscribeWorker:", this.name, q.name, sub, this.workerQueue[name]);
    }
}

events.unsubscribeQueue = function(name)
{
    const q = queue.getClient(name);
    const sub = q.subscription(name);
    if (!this.subscribed.delete(sub)) return;
    queue.unlisten({ queueName: name });
    logger.info("unsubscribeQueue:", this.name, q.name, sub);
}

events.processEvent = function(subscription, procs, event, callback)
{
    const task = { time: Date.now(), subscription, event, procs };
    logger.debug("processEvent:", events.name, task);

    this.running.add(task);
    this.runTime = Date.now();

    event.received = subscription;

    lib.forEvery(procs, (proc, next) => {
        try {
            proc(event, (err) => {
                if (err) logger.error("processEvent:", events.name, err, task);
                this.runTime = Date.now();
                next();
            });
        } catch (err) {
            logger.error("processEvent:", events.name, err, task);
            this.runTime = Date.now();
            next();
        }
    }, () => {
        this.running.delete(task);
        this.runTime = Date.now();
        if (typeof callback == "function") callback();
    });
}

/**
 * Place an event into a queue by subject and type
 * @param {string} subject - event subject, topic, ID, ...
 * @param {object|Array} data - an object to be placed as the `data` property
 * @param {object} [options] - queue specific properties
 * @param {function} [callback] - (err, data) - where data is a list of objects with event, error status and
 * options sent to each queue: { err, event, options }, it is empty if nothing was sent.
 * @memberof module:events
 * @method putEvent
 * @example
 * events.putEvent("USER-LOGIN", { id: ..., name: ... })
 *
 * events.putEvent("ORDER-SHIPPED", { id: ... })
 *
 * events.putEvent("social.post.like", { id: ..., liked: ... }, (err, data) => {
 *     if (!err) {
 *         console.log("Sent:", data?.filter(x => !x.err))
 *         console.log("Errors:", data?.filter(x => x.err))
 *     }
 * })
 */
events.putEvent = function(subject, data, options, callback)
{
    if (typeof options == "function") callback = options, options = null;

    if (!lib.isString(subject)) {
        return lib.tryCall(callback, { status: 400, message: "missing subject" });
    }

    data = lib.isArray(data) || lib.isObject(data);
    if (!data) {
        return lib.tryCall(callback, { status: 400, message: "missing data" });
    }

    var msg, queues = [], result = [], seq = 1;

    for (const queueName in this.routing) {
        if (!this.routing[queueName].test(subject)) continue;
        if (!msg) {
            msg = Object.assign({}, {
                subject,
                data,
                id: lib.uuid(),
                time: lib.clock(),
                origin: app.origin(),
            });
        }
        queues.push(Object.assign({}, this.routingOptions[queueName], options, { queueName }));
    }

    lib.forEvery(queues, (opts, next) => {
        const event = Object.assign({}, msg, { sent: opts.queueName, seq: seq++ });
        queue.submit(event, opts, (err) => {
            logger.logger(err ? "error" : "debug", "putEvent:", err, "MSG:", msg, "OPTS:", opts);
            result.push({ event, options: opts, err });
            next();
        });
    }, (err) => {
        lib.tryCall(callback, err, result);
    }, true);
}

/**
 * Async version of {@link module:events.putEvent}
 * @param {string} subject - event subject, topic, ID, ...
 * @param {object} data - an object to be placed as the `data` property
 * @param {object} [options] - queue specific properties
 * @Returns {object} - { err, data }
 * @memberof module:events
 * @method aputEvent
 * @example
 * const { err, data } = await events.aputEvent("USER-LOGIN", { id: ..., name: ... })
 * console.log("Sent to:", data.map(x => x.event))
 * @async
 */
events.aputEvent = async function(subject, data, options)
{
    return new Promise((resolve, reject) => {
        events.putEvent(subject, data, options, (err, data) => {
            resolve({ err, data });
        });
    })
}