queue/nats.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2021
 */

const app = require(__dirname + '/../app');
const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');
const QueueClient = require(__dirname + "/client");
const nats = require("nats");
const sc = nats.StringCodec();

/**
 * Queue client using NATS server
 *
 * @example
 *
 * To enable install the npm module:
 *
 * npm i nats
 *
 * @example
 * To create a jetstream
 *
 * natscli stream add --subjects nats --defaults nats
 *
 * @example
 *
 * queue-nats=nats://
 * queue-nats=nats://host:4222
 *
 * @memberOf module:queue
 */

class NatsQueueClient extends QueueClient {

    constructor(options) {
        super(options);
        this.name = "nats";
        this.applyOptions();
        this._subs = {};
        this._pending = [];
        this._running = new Set();
        this._mtime = this._ptime = 0;
        this.connect();
    }

    applyOptions(options) {
        super.applyOptions(options);
        this.options.servers = lib.split(this.options.servers);
        this.options.maxReconnectAttempts = lib.toNumber(this.options.maxReconnectAttempts, { dflt: -1 });
        this.options.expires = lib.toNumber(this.options.expires, { min: 0 });
        this.options.no_wait = lib.toBool(this.options.no_wait, true);
        this.options.name = this.options.name && lib.toTemplate(this.options.name, [this.options, app]) || `${app.env.id}-${app.env.pid}`;
        if (typeof this.options.waitOnFirstConnect == "undefined") this.options.waitOnFirstConnect = true;
    }

    connect() {
        nats.connect(this.options).then((client) => {
            this.client = client;
            this.emit("ready");
            for (const client of this._pending) {
                this[client[0]].apply(this, client.slice(1));
            }
        }).catch((err) => {
            logger.error("connect:", this.name, err, this.options);
            setTimeout(this.connect.bind(this), this.options.reconnectTimeWait || 2000);
        });
    }

    close() {
        super.close();
        if (!this.client) return;
        this.client.drain().then(() => {}).catch((err) => {
            logger.error("close:", this.name, err, this.options);
        });
    }

    subscribe(subject, options, callback) {
        if (!this.client) {
            return this._pending.push(["subscribe", subject, options, callback]);
        }
        super.subscribe(subject, options, callback);
        if (this._subs[subject]) return;
        var opts = {
            max: options.max,
            timeout: options.timeout,
            queue: options.queue,
            callback: (err, msg) => {
                if (err) return logger.error("subscribe:", this.name, subject, err, msg);
                logger.dev("onMessage:", this.name, subject, msg);
                var data = sc.decode(msg.data);
                if (options.raw) data = { subject: msg.subject, data: data, sid: msg.sid, headers: msg.headers };
                if (msg.reply) {
                    this.emit(subject, data, (err, rc) => {
                        if (!err) msg.respond(sc.encode(rc));
                    });
                } else {
                    this.emit(subject, data);
                }
            }
        };
        this._subs[subject] = this.client.subscribe(subject, opts);
    }

    unsubscribe(subject, options, callback) {
        super.unsubscribe(subject, options, callback);
        if (!callback && this._subs[subject]) {
            this._subs[subject].unsubscribe();
            delete this._subs[subject];
        }
    }

    publish(subject, msg, options, callback) {
        if (!this.client) {
            return this._pending.push(["publish", msg, options, callback]);
        }
        var opts;
        if (options.headers) opts = { headers: options.headers };
        if (options.reply) opts = Object.assign(opts, { reply: options.reply });
        this.client.publish(subject, sc.encode(msg), opts);
        return typeof callback == "function" && callback();
    }

    purge(options, callback) {
        if (!this.client) {
            return lib.tryCall(callback, { status: 404, message: "not connected" })
        }
        var subject = this.subject(options);
        this.client.jetstreamManager().then(mgr => {
            mgr.streams.purge(subject).
                        then(() => lib.tryCall(callback)).
                        catch(e => lib.tryCall(callback, e));
        }).catch(err => lib.tryCall(callback, err));
    }

    listen(options, callback) {
        if (!this.client) {
            return this._pending.push(["listen", options, callback]);
        }
        if (!this.js) {
            this.js = this.client.jetstream();
        }

        const sub = this.subscription(options);
        if (this._subs[sub]) {
            logger.debug("listen:", this.name, sub, "reusing:", options);
            return super.listen(options, callback);
        }

        const subject = this.subject(options);
        const group = this.group(options) || subject;

        const opts = nats.consumerOpts();
        opts.ackExplicit();
        opts.ackWait(this.options.visibilityTimeout);
        opts.deliverAll();
        opts.manualAck();
        opts.maxAckPending(-1);
        opts.queue(group);
        opts.durable(group);
        opts.callback(this._callback.bind(this, options));

        for (const n of [ 'ackAll', 'ackExplicit', 'ackNone', 'deliverAll', 'deliverLast', 'deliverLastPerSubject', 'deliverNew',
                          'flowControl', 'headersOnly', 'manualAck', 'orderedConsumer', 'replayInstantly', 'replayOriginal' ]) {
            if (this.options[n]) opts[n]();
        }
        for (const n of [ 'ackWait', 'deliverGroup', 'deliverTo', 'durable', 'filterSubject', 'idleHeartbeat', 'limit',
                          'maxAckPending', 'maxDeliver',  'maxMessages', 'maxWaiting', 'queue',
                          'startAtTimeDelta', 'startSequence', 'startTime' ]) {
            if (this.options[n] !== undefined) opts[n](this.options[n]);
        }

        const jsSub = this.options.pushSubscribe ? this.js.subscribe(subject, opts) :
                              this.js.pullSubscribe(subject, opts);

        jsSub.then((s) => {
            this._subs[sub] = s;
            super.listen(options, callback);
            logger.debug("listen:", this.name, sub, opts);
        }).catch((err) => {
            logger.error("listen:", this.name, sub, opts, err);
        });
    }

    unlisten(options, callback) {
        super.unlisten(options, callback);
        if (callback) return;
        var sub = this.subscription(options);
        if (this._subs[sub]) {
            this._subs[sub].unsubscribe();
            delete this._subs[sub];
        }
    }

    submit(job, options, callback) {
        if (!this.client) {
            return this._pending.push(["publishQueue", options, callback]);
        }
        if (!this.js) {
            this.js = this.client.jetstream();
        }
        var subject = this.subject(options);
        var opts = options;
        if (options.unique) {
            opts = lib.clone(options, { msgID: options.unique });
        }
        if (typeof job != "string") {
            job = lib.stringify(job);
        }
        this.js.publish(subject, sc.encode(job), opts).then((rc) => {
            lib.tryCall(callback, null, rc);
        }).catch((err) => {
            lib.tryCall(callback, err);
        });
    }

    poll(options) {
        var sub = this.subscription(options);
        if (!this._subs[sub]) return;

        if (this._running.size < this.options.queueCount) {
            this._subs[sub].pull({
                batch: this.options.queueCount - this._running.size,
                expires: this.options.expires,
                no_wait: this.options.no_wait,
            });
        }
        this.schedule(options);
    }

    _poll_update(options, item, visibilityTimeout, callback) {
        if (!this.options.ackNone) {
            item.msg.working();
        }
        lib.tryCall(callback)
    }

    _poll_del(options, item, callback) {
        if (!this.options.ackNone) {
            item.msg[this.options.ackNext ? "next": "ack"]();
        }
        lib.tryCall(callback)
    }

    _callback(options, err, msg) {
        if (err && err.code != 404 && err.code != 408) {
            return logger.error("poll:", this.name, options, this._running.size, err, msg);
        }
        if (!msg) return;
        const item = {
            msg,
            data: sc.decode(msg.data),
        };
        this._running.add(item);
        this._poll_run_item(options, item, () => {
            this._running.delete(item);
        });
    }

}

module.exports = NatsQueueClient;