ipc.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const cluster = require('cluster');
const { EventEmitter } = require("events");
const logger = require(__dirname + '/logger');
const app = require(__dirname + '/app');
const lib = require(__dirname + '/lib');
const cache = require(__dirname + '/cache');
const queue = require(__dirname + '/queue');

/**
 * @module ipc
 */

class Ipc extends EventEmitter {

    constructor() {
        super();
        this.name = "ipc";
        this.role = "";
        this.restarting = [];
        this.ports = {};
        this.ping = {};
        this.setMaxListeners(25);

        this.args = [
            { name: "ping", obj: "ping", type: "map", merge: 1, descr: "Keep alive pings for workers: interval:ms how oftern do pings, kill:ms kill worker after this period" },
            { name: "system-queue", descr: "System queue name to send broadcast control messages, this is a PUB/SUB queue to process system messages like restart, re-init config,..." },
        ];
    }

    /**
     * This function is called by a server server process to setup IPC channels and support for cache and messaging
     * @param {object} options
     * @memberof module:ipc
     * @method initServer
     */
    initServer(options) {
        if (this.__init || !app.isOk("ipc", options)) return;
        this.__init = 1;
        this.role = "server";

        cache.initClients();
        queue.initClients();

        cluster.on("exit", (worker, code, signal) => {
            this.handleServerMessages(worker, this.newMsg("cluster:exit", { id: worker.id, pid: worker.process.pid, code: code || undefined, signal: signal || undefined }));
        });

        cluster.on("disconnect", (worker, code, signal) => {
            this.handleServerMessages(worker, this.newMsg("cluster:disconnect", { id: worker.id, pid: worker.process.pid }));
        });

        cluster.on('listening', (worker, address) => {
            this.handleServerMessages(worker, this.newMsg("cluster:listen", { id: worker.id, pid: worker.process.pid, port: address.port, address: address.address }));
        });

        // Handle messages from the workers
        cluster.on('fork', (worker) => {
            worker.pingTime = worker.startTime = Date.now();
            worker.on('message', (msg) => {
                this.handleServerMessages(worker, msg);
            });
            worker.on("error", (err) => {
                logger.error("server:", worker.id, worker.process.pid, err);
            });
        });

        // Subscribe to the system bus
        for (const chan of [app.role, app.role + ":" + app.instance.tag]) {
            queue.subscribe(app.id + ":" + chan, { queueName: this.systemQueue }, (msg) => {
                this.handleServerMessages({ send: lib.noop }, this.newMsg(msg));
            });
        }

        this._pingTimer = setInterval(this.handleIdleWorkers.bind(this), this.ping.interval/2 || 60000);

        logger.debug("initServer:", this.name, "started", app.role, app.workerId || process.pid, "ping:", this.pingInterval, this.systemQueue, queue.getClient(this.systemQueue).name);
    }

    /**
     * This function is called by a worker process to setup IPC channels and support for cache and messaging
     * @param {object} options
     * @memberof module:ipc
     * @method initWorker
     */
    initWorker(options) {
        if (this.__init || !app.isOk("ipc", options)) return;
        this.__init = 1;
        this.role = "worker";

        cache.initClients();
        queue.initClients();

        // Handle messages from the server
        process.on("message", this.handleWorkerMessages.bind(this));

        // Subscribe to the system bus
        for (const chan of [app.role, app.role + ":" + app.instance.tag]) {
            queue.subscribe(app.id + ":" + chan, { queueName: this.systemQueue }, (msg) => {
                this.handleWorkerMessages(this.newMsg(msg));
            });
        }

        if (this.ping.interval > 0) {
            this._pingTimer = setInterval(this.sendMsg.bind(this, "worker:ping"), Math.max(1000, this.ping.interval/5));
            this.sendMsg("worker:ping");
        }

        logger.debug("initWorker:", this.name, "started", app.role, app.workerId || process.pid, this.systemQueue, queue.getClient(this.systemQueue).name);
    }

    shutdown(options, callback) {
        clearInterval(this._pingTimer);
        delete this._pingTimer;
        lib.deferShutdown(this);
        lib.tryCall(callback);
    }

    // To be used in worker processes
    handleWorkerMessages(msg) {
        if (!msg) return;
        logger.dev('handleWorkerMessages:', msg.__op, app.role, msg)
        lib.runCallback(this, msg);

        try {
            switch (msg.__op || "") {
            case "repl:init":
                if (msg.port) app.startRepl(msg.port, app.repl.bind);
                break;

            case "repl:shutdown":
                if (app.repl.server) app.repl.server.end();
                delete app.repl.server;
                break;

            case "ipc:lru:del":
                msg.data = cache.lru.del(msg.name);
                logger.debug("ipc:lru:", msg);
                break;

            case "config:init":
                app.checkConfig();
                break;
            }

            this.emit(msg.__op, msg);
        } catch (e) {
            logger.error('handleWorkerMessages:', e.stack, msg);
        }
    }

    // To be used in messages processing that came from the clients or other way
    handleServerMessages(worker, msg) {
        if (!msg) return false;
        logger.dev('handleServerMessages:', msg.__op, app.role, worker.id, msg);

        try {
            switch (msg.__op) {
            case "api:restart":
                // Start gracefull restart of all api workers, wait till a new worker starts and then send a signal to another in the list.
                // This way we keep active processes responding to the requests while restarting
                if (this.restarting.length) break;
                this.restarting = lib.getWorkers({ worker_type: "web" });

            case "api:ready":
                // Restart the next worker from the list
                if (this.restarting.length) {
                    while (this.restarting.length > 0) {
                        let w = this.restarting.pop();
                        try { w.send({ __op: "api:restart" }) } catch (e) { w = 0 }
                        if (w) break;
                    }
                }
                worker.worker_type = "web";
                this.sendReplPort("web", worker);
                break;

            case "worker:restart":
                lib.notifyWorkers(msg, { worker_type: null });
                break;

            case "worker:ready":
                this.sendReplPort("worker", worker);
                break;

            case "worker:ping":
                worker.pingTime = Date.now();
                break;

            case "cluster:disconnect":
                worker.pingTime = -1;
                break;

            case "cluster:exit":
                for (const p in this.ports) {
                    if (this.ports[p] == worker.process?.pid) {
                        this.ports[p] = 0;
                        break;
                    }
                }
                worker.pingTime = -1;
                break;

            case "cluster:listen":
                this.ports[msg.port] = worker.process?.pid;
                break;

            case "ipc:limiter":
                worker.send(cache.localLimiter(msg));
                break;

            case "ipc:lru:del":
                msg.data = cache.lru.del(msg.name);
                logger.debug("ipc:lru:", msg);
                break;

            case "config:init":
                app.checkConfig();
                lib.notifyWorkers(msg);
                break;
            }

            this.emit(msg.__op, msg, worker);
        } catch (e) {
            logger.error('handleServerMessages:', e.stack, msg);
        }
    }

    // Check for dead workers if pings are enabled
    handleIdleWorkers() {
        if (!this.ping.interval) return;
        var now = Date.now();
        for (const w of lib.getWorkers()) {
            var pt = w.pingTime || 0, lt = now - pt;
            if (pt >= 0 && lt > this.ping.interval) {
                logger.error("initServer:", app.role, "dead worker detected", w.id, w.process.pid, "ping:", this.ping, "last ping:", lt, "started:", now - w.startTime);
                if (this.ping.kill > 0 && lt > this.ping.kill) {
                    try { process.kill(w.process.pid, w.killTime ? "SIGKILL" : "SIGTERM"); } catch (e) {}
                    w.killTime = Date.now();
                }
            }
        }
    }

    // Send REPL port to a worker if needed
    sendReplPort(role, worker) {
        if (!worker?.process) return;
        var port = app.repl[role + "Port"];
        if (!port) return;
        var ports = Object.keys(this.ports).sort();
        for (const p of ports) {
            var diff = p - port;
            if (diff > 0) break;
            if (diff == 0) {
                if (this.ports[port] == worker.process.pid) return;
                if (!this.ports[port]) break;
                port++;
            }
        }
        this.ports[port] = worker.process.pid;
        logger.debug("sendReplPort:", port, role, worker.id, this.ports);
        worker.send({ __op: "repl:init", port: port });
    }

    /**
     * Returns an IPC message object, `msg` must be an object if given.
     * @param {string} op
     * @param {object|string} msg
     * @param {object} [options]
     * @memberof module:ipc
     * @method newMsg
     */
    newMsg(op, msg, options) {
        if (op?.__op) return op;
        if (typeof op == "string" && op[0] == "{" && op[op.length-1] == "}") {
            return lib.jsonParse(op, { datatype: "obj" });
        }
        if (typeof msg == "string") msg = lib.jsonParse(msg, { logger: "info" });
        return lib.objExtend(msg, { __op: String(op) });
    }

    /**
     * Wrapper around EventEmitter `emit` call to send unified IPC messages in the same format, this is for in-process
     * mesaging only between modules.
     * @param {string} op
     * @param {object|string} msg
     * @param {object} [options]
     * @memberof module:ipc
     * @method emitMsg
     * @example <caption>mymod.js</caption>
     * const { ipc } = require("backendjs");
     * ...
     * ipc.on("user:subscribed", (msg) => {
     *    console.log(msg);
     * });
     *
     * @example <caption>othermod.js</caption>
     * const { api, ipc } = require("backendjs");
     * ...
     * api.users.add({ ... }, (err, user) => {
     *    if (!err) ipc.emitMsg("user.subscribed", user);
     * })
     *
     */
    emitMsg(op, msg, options) {
        if (op) this.emit(op, this.newMsg(op, msg, options));
    }

    /**
    * Send a message to the server process via IPC messages, callback is used for commands that return value back
    *
    * - the `timeout` property can be used to specify a timeout for how long to wait the reply, if not given the default is used
    * - the rest of the properties are optional and depend on the operation.
    *
    * If called inside the server, it process the message directly, reply is passed in the callback if given.
    * @param {string} op
    * @param {object|string} msg
    * @param {object} [options]
    * @param {function} [callback]
    * @example
    *
    * ipc.sendMsg("op1", { data: "data" }, { timeout: 100 })
    * ipc.sendMsg("op1", { name: "name", value: "data" }, (data) => { console.log(data); })
    * ipc.sendMsg("op1", { 1: 1, 2: 2 }, { timeout: 100 })
    * ipc.sendMsg("op1", { 1: 1, 2: 2 }, (data) => { console.log(data); })
    * ipc.newMsg({ __op: "op1", name: "test" })
    * @memberof module:ipc
    * @method sendMsg
    */
    sendMsg(op, msg, options, callback) {
        if (typeof options == "function") callback = options, options = null;

        msg = this.newMsg(op, msg, options);
        logger.dev("sendMsg:", msg.__op, app.role, msg);

        if (!cluster.isWorker) {
            if (this.role == "server") this.handleServerMessages({ send: lib.noop }, msg);
            return typeof callback == "function" ? callback(msg) : null;
        }

        if (typeof callback == "function") {
            msg.__res = true;
            lib.deferCallback(this, msg, callback, options?.timeout);
        }
        try { process.send(msg); } catch (e) { logger.error('send:', e, msg); }
    }

    /**
     * Send a message to a channel, this is high level routine that uses the corresponding queue, it uses eventually queue.publish.
     * If no client or queue is provided in the options it uses default `systemQueue`.
     * If the channel starts with : the `app.id` will be prepended automatically.
     * @param {string} channel
     * @param {object|string} msg
     * @param {object} [options]
     * @param {function} [callback]
     * @memberof module:ipc
     * @method broadcast
     * @example
     * ipc.broadcast(":worker", "worker:restart")
     */
    broadcast(channel, msg, options, callback) {
        if (typeof options == "function") callback = options, options = null;
        if (!options?.queueName) {
            options = lib.objExtend(options, { queueName: this.systemQueue });
        }
        if (typeof channel != "string") channel = String(channel);
        if (channel[0] == ":") channel = app.id + channel;
        logger.debug("broadcast:", channel, msg, options, queue.getClient(options?.queueName).name);
        queue.publish(channel, msg, options, callback);
    }

}

/**
 * # IPC communications between processes and support for subscriptions via queues.
 *
 * The module is EventEmitter and emits messages received.
 *
 * A special system queue can be configured and it will be used by all processes to listen for
 * messages on the channel __bkjs:role__, where the role is the process role, the same messages
 * that are processed by the server/worker message handlers like api:restart, config:init,....
 *
 * All instances will be listening and processing these messages at once, the most
 * usefull use case is refreshing the DB config on demand or restarting without configuring
 * any other means like SSH, keys....
 *
 * ## Redis system bus
 *
 * If configured all processes subscribe to it and listen for system messages.
 * Websockets in the API server also use the system bus to send broadcasts between multiple api instances.
 * ```
 * queue-system=redis://
 * ipc-system-queue=system
 * ```
 */

module.exports = new Ipc();