/*
* Author: Vlad Seryakov vseryakov@gmail.com
* backendjs 2021
*/
const app = require(__dirname + '/../app');
const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');
const QueueClient = require(__dirname + "/client");
const nats = require("nats");
const sc = nats.StringCodec();
/**
* Queue client using NATS server
*
* To enable install the npm modules:
*
* npm i -g nats
*
* @example
*
* queue-nats=nats://localhost:4222
*
* @memberOf module:queue
*/
class NatsClient extends QueueClient {
constructor(options) {
super(options);
this.name = "nats";
this.applyOptions();
if (!this.options.servers.length) {
this.options.servers = this.hostname + ":" + this.port;
}
this._subs = {};
this._pending = [];
this._running = new Set();
this._mtime = this._ptime = 0;
this.connect();
}
applyOptions(options) {
super.applyOptions(options);
this.options.servers = lib.split(this.options.servers);
this.options.maxReconnectAttempts = lib.toNumber(this.options.maxReconnectAttempts, { dflt: -1 });
this.options.visibilityTimeout = lib.toNumber(this.options.visibilityTimeout, { min: 0 });
this.options.interval = lib.toNumber(this.options.interval, { dflt: 1000, min: 0 });
this.options.count = lib.toNumber(this.options.count, { min: 1, max: 32 });
this.options.expires = lib.toNumber(this.options.expires, { min: 0 });
this.options.no_wait = lib.toBool(this.options.no_wait, true);
this.options.name = this.options.name && lib.toTemplate(this.options.name, [this.options, app]) || `${app.instance.id}-${app.instance.pid}`;
if (typeof this.options.waitOnFirstConnect == "undefined") this.options.waitOnFirstConnect = true;
}
connect() {
nats.connect(this.options).then((c) => {
this.client = c;
this.emit("ready");
for (const c of this._pending) {
this[c[0]].apply(this, c.slice(1));
}
}).catch((err) => {
logger.error("connect:", this.name, err, this.options);
setTimeout(this.connect.bind(this), this.options.reconnectTimeWait || 2000);
});
}
close() {
if (!this.client) return super.close();
this.client.drain().then(() => {}).catch((err) => {
logger.error("close:", this.name, err, this.options);
}).finally(() => {
super.close();
});
}
subscribe(channel, options, callback) {
if (!this.client) {
return this._pending.push(["subscribe", channel, options, callback]);
}
super.subscribe(channel, options, callback);
if (this._subs[channel]) return;
var opts = {
max: options.max,
timeout: options.timeout,
queue: options.queue,
callback: (err, msg) => {
if (err) return logger.error("subscribe:", this.name, channel, err, msg);
logger.dev("onMessage:", this.name, channel, msg);
var data = sc.decode(msg.data);
if (options.raw) data = { subject: msg.subject, data: data, sid: msg.sid, headers: msg.headers };
if (msg.reply) {
this.emit(channel, data, (err, rc) => {
if (!err) msg.respond(sc.encode(rc));
});
} else {
this.emit(channel, data);
}
}
};
this._subs[channel] = this.client.subscribe(channel, opts);
}
unsubscribe(channel, options, callback) {
super.unsubscribe(channel, options, callback);
if (!callback && this._subs[channel]) {
this._subs[channel].unsubscribe();
delete this._subs[channel];
}
}
publish(channel, msg, options, callback) {
if (!this.client) {
return this._pending.push(["publish", msg, options, callback]);
}
var opts;
if (options.headers) opts = { headers: options.headers };
if (options.reply) opts = lib.objExtend(opts, { reply: options.reply });
this.client.publish(channel, sc.encode(msg), opts);
return typeof callback == "function" && callback();
}
listen(options, callback) {
if (!this.client) {
return this._pending.push(["subscribeQueue", options, callback]);
}
if (!this.js) this.js = this.client.jetstream();
var sub = this.canonical(options);
if (this._subs[sub]) {
logger.debug("listen:", this.name, sub, "reusing:", options);
return super.listen(options, callback);
}
var chan = this.channel(options);
var opts = nats.consumerOpts(), p;
opts.deliverAll();
opts.callback(this._callback.bind(this, options));
for (const n of ["deliverAll", "deliverLastPerSubject", "deliverLast", "deliverNew",
"ackNone", "ackAll", "manualAck", "ackExplicit",
"replayInstantly", "replayOriginal", "flowControl", "orderedConsumer", "headersOnly"]) {
if (options[n] || this.options[n]) opts[n]();
}
for (const n of ["queue", "durable", "deliverTo", "deliverGroup", "idleHeartbeat", "limit", "filterSubject",
"maxAckPending", "maxWaiting", "maxMessages", "maxDeliver", "ackWait",
"startAtTimeDelta", "startTime", "startSequence"]) {
if (typeof options[n] != "undefined") opts[n](options[n]); else
if (typeof this.options[n] != "undefined") opts[n](this.options[n]);
}
if (options.pushSubscribe) {
p = this.js.subscribe(chan, opts);
} else {
delete this.options.ackNone;
delete options.ackNone;
opts.ackExplicit();
opts.maxAckPending(-1);
if (options.pullSubscribe) {
p = this.js.pullSubscribe(chan, opts);
} else {
var consumer = this.consumer(options) || chan;
opts.manualAck();
opts.queue(consumer);
opts.durable(consumer);
opts.ackWait(options.visibilityTimeout || this.options.visibilityTimeout);
p = this.js.pullSubscribe(chan, opts);
}
}
p.then((s) => {
this._subs[sub] = s;
super.listen(options, callback);
logger.debug("listen:", this.name, sub, opts);
}).catch((err) => {
logger.error("listen:", this.name, sub, opts, err);
});
}
unlisten(options, callback) {
super.unlisten(options, callback);
if (callback) return;
var sub = this.canonical(options);
if (this._subs[sub]) {
this._subs[sub].unsubscribe();
delete this._subs[sub];
}
}
submit(job, options, callback) {
if (!this.client) {
return this._pending.push(["publishQueue", options, callback]);
}
if (!this.js) this.js = this.client.jetstream();
var chan = this.channel(options);
var opts = options;
if (options.unique) {
opts = lib.objClone(options, { msgID: options.unique });
}
if (typeof job != "string") job = lib.stringify(job);
this.js.publish(chan, sc.encode(job), opts).then((rc) => {
lib.tryCall(callback, null, rc);
}).catch((err) => {
lib.tryCall(callback, err);
});
}
poll(options) {
var sub = this.canonical(options);
if (!this._subs[sub]) return;
var count = lib.validPositive(options.count, this.options.count);
if (this._running.size < count) {
this._subs[sub].pull({ batch: count - this._running.size,
expires: lib.validPositive(options.expire, this.options.expires),
no_wait: lib.validBool(options.no_wait, this.options.no_wait) });
}
this.schedule(options, lib.validPositive(options.interval, this.options.interval));
}
_callback(options, err, jsmsg) {
var sub = this.canonical(options);
if (err && err.code != 404 && err.code != 408) return logger.error("poll:", this.name, sub, this._running.size, err, jsmsg);
if (!jsmsg) return;
var vtimer, done;
var now = this._mtime = Date.now();
var running = this._running;
var ack = options.ackNext || this.options.ackNext ? "next": "ack";
var ackNone = options.ackNone || this.options.ackNone;
var raw = options.raw || this.options.raw;
var data = sc.decode(jsmsg.data);
var msg = raw ? data : lib.jsonParse(data, { logger: "info" });
logger.debug("poll:", this.name, sub, running.size, "MSG:", msg, "ITEM:", jsmsg);
if (!msg) {
if (!ackNone) jsmsg[ack]();
return;
}
if (!raw) {
if (msg.endTime > 0 && msg.endTime < now) {
if (!ackNone) jsmsg[ack]();
return;
}
if (msg.startTime > 0 && msg.startTime > now) {
if (!ackNone) jsmsg.working();
return;
}
if (!ackNone) {
if (msg.noWait) {
jsmsg[ack]();
} else
if (msg.noWaitTimeout > 0) {
setTimeout(() => { if (!done) { msg.noWait = 1; jsmsg[ack](); } }, msg.noWaitTimeout * 1000);
} else {
const vtimeout = msg.visibilityTimeout > 0 ? msg.visibilityTimeout : lib.validPositive(options.visibilityTimeout, this.options.visibilityTimeout);
if (vtimeout) {
vtimer = setInterval(() => { if (!done) jsmsg.working() }, vtimeout * 0.8);
}
}
}
}
function _end() {
done = 1;
if (!ackNone) running.delete(jsmsg);
clearInterval(vtimer);
}
if (!ackNone) running.add(jsmsg);
try {
var m = raw ? { subject: jsmsg.subject, data: msg, sid: jsmsg.sid, seq: jsmsg.seq, headers: jsmsg.headers } : msg;
if (!this.emit(sub, m, (err) => {
if (done) return;
_end();
if (ackNone) return;
if (!raw) {
if (!msg.noVisibility && (err && err.status >= 500 || msg.noWait)) {
if (!msg.noWait) jsmsg.working();
return;
}
}
jsmsg[ack]();
})) {
_end();
if (!ackNone) jsmsg.working();
}
} catch (e) {
_end();
if (!ackNone) try { jsmsg.working() } catch (e2) { e._errmsg = e2.message }
logger.error("poll:", this.name, sub, running.size, e, msg);
}
}
}
module.exports = NatsClient;