queue/redis.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');
const QueueClient = require(__dirname + "/client");
const redis = require("redis");

const scripts = {

    poller: [
        "local time = tonumber(KEYS[2]);",
        "local timeout = tonumber(KEYS[3]) + time;",
        "local val = redis.call('zrange', KEYS[1], 0, time, 'byscore', 'limit', 0, 1)[1];",
        "if val then redis.call('zadd', KEYS[1], timeout, val); end;",
        "return val;"
    ].join(""),

    stats: [
        "local time = tonumber(KEYS[2]);",
        "local count1 = redis.call('zcount', KEYS[1], '-inf', '+inf');",
        "local count2 = redis.call('zcount', KEYS[1], 0, time);",
        "return {count1,count2};"
    ].join(""),

};

/**
 * Queue client using Redis server
 *
 * @param {boolean|int|object} [options.tls] can be true or 1 to just enable default TLS properties
 *
 * @example
 * -queue-default=redis://host1
 * -queue-default-options-interval=1000
 * -queue-redis=redis://host1?bk-visibilityTimeout=30000&bk-count=2
 * -queue-default=redis://host1?bk-tls=1
 *
 * @memberOf module:queue
 */

class RedisQueueClient extends QueueClient {

    constructor(options) {
        super(options);
        this.name = "redis";
        this.applyOptions();

        if (this.options.tls === true || this.options.tls === 1) {
            this.options.tls = {};
        }

        // For reconnect or failover to work need retry policy
        this.options.retry_strategy = (options) => {
            logger.logger(options.attempt == 2 ? "error": "dev", "connect:", this.url, options);
            if (this.options.max_attempts > 0 && options.attempt > this.options.max_attempts) undefined;
            return Math.min(options.attempt * 200, this.options.retry_max_delay);
        }
        this.client = this.connect(this.hostname, this.port);
    }

    close() {
        super.close();
        if (this.client) this.client.quit();
        if (this.subclient) this.subclient.quit();
        delete this.client;
        delete this.subclient;
        delete this.options.retry_strategy;
    }

    applyOptions(options) {
        super.applyOptions(options);
        this.options.threshold = lib.toNumber(this.options.threshold, { min: 0 });
        this.options.enable_offline_queue = lib.toBool(this.options.enable_offline_queue);
        this.options.retry_max_delay = lib.toNumber(this.options.retry_max_delay, { min: 1000, dflt: 30000 });
        this.options.max_attempts = lib.toNumber(this.options.max_attempts, { min: 0 });
    }

    connect(hostname, port) {
        var host = String(hostname).split(":");
        var client = new redis.createClient(host[1] || port || this.options.port || 6379, host[0] || "127.0.0.1", this.options);
        client.on("error", (err) => { logger.error("redis:", this.queueName, this.url, err) });
        client.on("ready", this.emit.bind(this, "ready"));
        client.on("message", this.onMessage.bind(this));
        logger.debug("connect:", this.url, host, port, this.options);
        return client;
    }

    onMessage(subject, msg) {
        logger.dev("onMessage:", this.url, subject, msg);
        this.emit(subject, msg);
    }

    subscribe(subject, options, callback) {
        if (!this.subclient) {
            this.subclient = this.connect(this.hostname, this.port);
        }
        super.subscribe(subject, options, callback);
        if (!this.subclient.enable_offline_queue) this.subclient.enable_offline_queue = true;
        this.subclient.subscribe(subject);
    }

    unsubscribe(subject, options, callback) {
        super.unsubscribe(subject, options, callback);
        if (this.subclient) {
            if (!this.subclient.enable_offline_queue) this.subclient.enable_offline_queue = true;
            this.subclient.unsubscribe(subject);
        }
    }

    publish(subject, msg, options, callback) {
        if (!this.client.enable_offline_queue) this.client.enable_offline_queue = true;
        this.client.publish(subject, msg, callback);
    }

    stats(options, callback) {
        var rc = {};
        var subject = this.subject(options);
        this.client.eval(scripts.stats, 2, subject, Date.now(), (err, count) => {
            if (!err) {
                rc.queueCount = lib.toNumber(count[0]);
                rc.queueRunning = lib.toNumber(count[1]);
            }
            lib.tryCall(callback, err, rc);
        });
    }

    submit(job, options, callback) {
        var subject = this.subject(options);
        if (typeof job != "string") job = lib.stringify(job);
        this.client.zadd(subject, Date.now(), job, callback);
    }

    purge(options, callback) {
        var subject = this.subject(options);
        this.client.del(subject, callback);
    }

    poll(options) {
        this._poll_run(options);
    }

    _poll_get(options, callback) {
        const subject = this.subject(options);

        this.client.eval(scripts.poller, 3, subject, Date.now(), this.options.visibilityTimeout, (err, data) => {
            if (!err && data) {
                data = [ { data } ];
            }
            callback(err, data);
        });
    }

    _poll_update(options, item, visibilityTimeout, callback) {
        const subject = this.subject(options);
        this.client.zadd(subject, Date.now() + visibilityTimeout, item.data, callback);
    }

    _poll_del(options, item, callback) {
        const subject = this.subject(options);
        this.client.zrem(subject, item.data, callback);
    }

}

module.exports = RedisQueueClient;