queue/redis.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');
const QueueClient = require(__dirname + "/client");
const redis = require("redis");

const scripts = {

    monitor: [
        "local time = tonumber(KEYS[2]);",
        "local vals = redis.call('zrangebyscore', KEYS[1] .. '#', 0, time);",
        "redis.call('zremrangebyscore', KEYS[1] .. '#', 0, time);",
        "for i, val in ipairs(vals) do redis.call('zadd', KEYS[1], time+i, (val)); end;",
        "return redis.call('zcount', KEYS[1], '-inf', '+inf');"
    ].join("\n"),

    vpoller: [
        "local val = redis.call('zrange', KEYS[1], 0, 0)[1];",
        "if val then ",
        "  redis.call('zremrangebyrank', KEYS[1], 0, 0);",
        "  redis.call('zadd', KEYS[1] .. '#', tonumber(KEYS[2]), val);",
        "end;",
        "return val;"
    ].join(""),

    poller: [
        "local val = redis.call('zrange', KEYS[1], 0, 0)[1];",
        "if val then redis.call('zremrangebyrank', KEYS[1], 0, 0); end;",
        "local count = redis.call('zcount', KEYS[1], '-inf', '+inf');",
        "return {val,count};"
    ].join(""),

    stats: [
        "local count1 = redis.call('zcount', KEYS[1], '-inf', '+inf');",
        "local count2 = redis.call('zcount', KEYS[1] .. '#', '-inf', '+inf');",
        "return {count1,count2};"
    ].join(""),

};

/**
 * Queue client using Redis server
 *
 * @param {boolean|int|object} [options.tls] can be true or 1 to just enable default TLS properties
 *
 * @example
 *  -queue-default=redis://host1
 *  -queue-default-options-interval=1000
 *  -queue-redis=redis://host1?bk-visibilityTimeout=30000&bk-count=2
 *  -queue-default=redis://host1?bk-tls=1
 *
 * @memberOf module:queue
 */

class RedisQueueClient extends QueueClient {

    constructor(options) {
        super(options);
        this.name = "redis";
        this.applyOptions();

        if (this.options.tls === true || this.options.tls === 1) {
            this.options.tls = {};
        }

        // For reconnect or failover to work need retry policy
        this.options.retry_strategy = (options) => {
            logger.logger(options.attempt == 2 ? "error": "dev", "connect:", this.url, options);
            if (this.options.max_attempts > 0 && options.attempt > this.options.max_attempts) undefined;
            return Math.min(options.attempt * 200, this.options.retry_max_delay);
        }
        this.client = this.connect(this.hostname, this.port);
    }

    close() {
        super.close();
        if (this.client) this.client.quit();
        if (this.subclient) this.subclient.quit();
        delete this.client;
        delete this.subclient;
        delete this.options.retry_strategy;
    }

    applyOptions(options) {
        super.applyOptions(options);
        this.options.threshold = lib.toNumber(this.options.threshold, { min: 0 });
        this.options.enable_offline_queue = lib.toBool(this.options.enable_offline_queue);
        this.options.retry_max_delay = lib.toNumber(this.options.retry_max_delay, { min: 1000, dflt: 30000 });
        this.options.max_attempts = lib.toNumber(this.options.max_attempts, { min: 0 });
    }

    connect(hostname, port) {
        var host = String(hostname).split(":");
        var client = new redis.createClient(host[1] || port || this.options.port || 6379, host[0] || "127.0.0.1", this.options);
        client.on("error", (err) => { logger.error("redis:", this.queueName, this.url, err) });
        client.on("ready", this.emit.bind(this, "ready"));
        client.on("message", this.onMessage.bind(this));
        logger.debug("connect:", this.url, host, port, this.options);
        return client;
    }

    onMessage(channel, msg) {
        logger.dev("onMessage:", this.url, channel, msg);
        this.emit(channel, msg);
    }

    subscribe(channel, options, callback) {
        if (!this.subclient) {
            this.subclient = this.connect(this.hostname, this.port);
        }
        super.subscribe(channel, options, callback);
        if (!this.subclient.enable_offline_queue) this.subclient.enable_offline_queue = true;
        this.subclient.subscribe(channel);
    }

    unsubscribe(channel, options, callback) {
        super.unsubscribe(channel, options, callback);
        if (this.subclient) {
            if (!this.subclient.enable_offline_queue) this.subclient.enable_offline_queue = true;
            this.subclient.unsubscribe(channel);
        }
    }

    publish(channel, msg, options, callback) {
        if (!this.client.enable_offline_queue) this.client.enable_offline_queue = true;
        this.client.publish(channel, msg, callback);
    }

    stats(options, callback) {
        var rc = {};
        var chan = this.channel(options);
        this.client.eval(scripts.stats, 1, chan, (err, count) => {
            if (!err) {
                rc.queueCount = lib.toNumber(count[0]);
                rc.queueRunning = lib.toNumber(count[1]);
            }
            lib.tryCall(callback, err, rc);
        });
    }

    submit(job, options, callback) {
        var chan = this.channel(options);
        if (typeof job != "string") job = lib.stringify(job);
        this.client.zadd(chan, Date.now(), job, callback);
    }

    drop(msg, options, callback) {
        if (!msg.__queueMessageId) return lib.tryCall(callback);
        var chan = this.channel(options);
        this.client.zrem(chan + "#", msg.__queueMessageId, callback);
    }

    purge(options, callback) {
        var chan = this.channel(options);
        this.client.del(chan);
        this.client.del(chan + "#");
    }

    _poll_get(options, callback) {
        const chan = this.channel(options);
        const visibilityTimeout = lib.validPositive(options.visibilityTimeout, this.options.visibilityTimeout);
        const script = visibilityTimeout ? scripts.vpoller : scripts.poller;

        this.client.eval(script, 2, chan, Date.now(), (err, data) => {
            if (err) logger.error("poll:", this.name, chan, lib.traceError(err));
            if (!err && data) {
                data = [ { data } ];
            }
            callback(err, data);
        });
    }

    _poll_update(options, item, visibilityTimeout, callback) {
        const chan = this.channel(options);
        this.client.zadd(chan + "#", Date.now() + visibilityTimeout, item.data, callback);
    }

    _poll_del(options, item, callback) {
        const chan = this.channel(options);
        this.client.zrem(chan + "#", item.data, callback);
    }

}

module.exports = RedisQueueClient;