/*
* Author: Vlad Seryakov vseryakov@gmail.com
* backendjs 2018
*/
const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');
const CacheClient = require(__dirname + "/client");
const redis = require("redis");
const scripts = {
sget: [
"redis.replicate_commands()",
"local val = redis.call(KEYS[2], KEYS[1]);",
"local ttl = tonumber(KEYS[3]);",
"if KEYS[2] == 'spop' and ttl > 0 then",
" while (val) do",
" if redis.call('exists', val .. '#') == 0 then break; end;",
" val = redis.call('spop', KEYS[1]);",
" end;",
" if val then redis.call('psetex', val .. '#', ttl, ''); end;",
"end;",
"local size = redis.call('scard', KEYS[1]);",
"return {val,size};",
].join("\n"),
limiter: [
"local name = KEYS[1];",
"local rate = tonumber(KEYS[2]);",
"local max = tonumber(KEYS[3]);",
"local interval = tonumber(KEYS[4]);",
"local now = tonumber(KEYS[5]);",
"local ttl = tonumber(KEYS[6]);",
"local reset = tonumber(KEYS[7]);",
"local multi = tonumber(KEYS[8]);",
"local count = tonumber(redis.call('HGET', name, 'c'));",
"local mtime = tonumber(redis.call('HGET', name, 'm'));",
"local lastint = tonumber(redis.call('HGET', name, 'i'));",
"if multi and lastint then",
" interval = lastint;",
"end;",
"if not mtime then",
" count = max;",
" mtime = now;",
"end;",
"if now < mtime then",
" mtime = now - interval;",
"end;",
"local elapsed = now - mtime;",
"if count < max then",
" count = math.min(max, count + rate * (elapsed / interval));",
" redis.call('HSET', name, 'c', count);",
"end;",
"redis.call('HSET', name, 'm', now);",
"local total = redis.call('HINCRBY', name, 't', 1);",
"if ttl > 0 then",
" redis.call('PEXPIRE', name, ttl);",
"end;",
"if count < 1 then",
" if multi and multi ~= 0 then",
" interval = math.min(30000000000, interval * math.abs(multi));",
" redis.call('HSET', name, 'i', interval);",
" end;",
" if reset > 0 then",
" redis.call('DEL', name);",
" end;",
" return {interval - elapsed,count,total,elapsed,interval};",
"else",
" if reset > 1 and total >= reset then",
" redis.call('DEL', name);",
" else",
" redis.call('HSET', name, 'c', count - 1);",
" if multi and multi < 0 then",
" redis.call('HDEL', name, 'i');",
" end;",
" end;",
" return {0,count,total,elapsed,interval};",
"end"
].join(""),
getset: [
"local v = redis.call('get', KEYS[1]);",
"if not v then",
" redis.call('set', KEYS[1], KEYS[3]);",
" local ttl = tonumber(KEYS[2]);",
" if ttl > 0 then",
" redis.call('pexpire', KEYS[1], ttl);",
" end;",
"end;",
"return v;",
].join(""),
setmax: [
"local v = tonumber(redis.call('get', KEYS[1]));",
"if not v or v < tonumber(KEYS[2]) then",
" redis.call('set', KEYS[1], KEYS[2]);",
" local ttl = tonumber(KEYS[3]);",
" if ttl > 0 then",
" redis.call('pexpire', KEYS[1], ttl);",
" end;",
"end;",
"return v;",
].join(""),
hmsetmax: [
"local v = tonumber(redis.call('hget', KEYS[1], KEYS[2]));",
"if not v or v < tonumber(KEYS[3]) then",
" redis.call('hmset', KEYS[1], KEYS[2], KEYS[3]);",
" local ttl = tonumber(KEYS[4]);",
" if ttl > 0 then",
" redis.call('pexpire', KEYS[1], ttl);",
" end;",
"end;",
"return v;",
].join(""),
lock: [
"local ttl = tonumber(KEYS[2]);",
"if tonumber(KEYS[4]) == 1 then",
" redis.call('set', KEYS[1], KEYS[3]);",
" if ttl > 0 then",
" redis.call('pexpire', KEYS[1], ttl);",
" end;",
"end;",
"local v = redis.call('setnx', KEYS[1], KEYS[3]);",
"if v == 1 and ttl > 0 then",
" redis.call('pexpire', KEYS[1], ttl);",
"end;",
"return v;",
].join(""),
};
/**
* Cache client based on Redis server using https://github.com/NodeRedis/node_redis3
* @param {object} options
* @param {bolean|int|object} [options.tls] will use TLS to connect to Redis servers, this is required for RedisCache Serverless
*
* @example
* cache-redis=redis://host1
* cache-redis-options-enable_offline_queue=1000
* cache-redis=redis://host1?bk-visibilityTimeout=30000&bk-count=2
*
* @memberOf module:cache
*/
class RedisClient extends CacheClient {
constructor(options) {
super(options);
this.name = "redis";
this.applyOptions();
this.client = this.connect(this.hostname, this.port);
}
close() {
super.close();
if (this.client) this.client.quit();
delete this.client;
delete this.options.retry_strategy;
}
applyOptions(options) {
super.applyOptions(options);
this.options.enable_offline_queue = lib.toBool(this.options.enable_offline_queue);
this.options.retry_max_delay = lib.toNumber(this.options.retry_max_delay, { min: 1000, dflt: 30000 });
this.options.max_attempts = lib.toNumber(this.options.max_attempts, { min: 0 });
}
connect(host, port) {
host = String(host).split(":");
// For reconnect or failover to work need retry policy
this.options.retry_strategy = (options) => {
logger.logger(options.attempt == 2 ? "error": "dev", "connect:", this.url, options);
if (this.options.max_attempts > 0 && options.attempt > this.options.max_attempts) undefined;
return Math.min(options.attempt * 200, this.options.retry_max_delay);
}
if (this.options.tls === true || this.options.tls === 1) {
this.options.tls = {};
}
var client = new redis.createClient(host[1] || port || this.options.port || 6379, host[0] || "127.0.0.1", this.options);
client.on("error", (err) => { logger.error("redis:", this.cacheName, this.url, err) });
client.on("ready", this.emit.bind(this, "ready"));
logger.debug("connect:", this.url, host, port, this.options);
return client;
}
stats(options, callback) {
var rc = {};
this.client.info((err, str) => {
lib.split(str, "\n").filter((x) => (x.indexOf(":") > -1)).forEach((x) => {
x = x.split(":");
rc[x[0]] = x[1];
});
lib.tryCall(callback, err, rc);
});
}
clear(pattern, callback) {
if (pattern) {
this.client.keys(pattern, (e, keys) => {
for (var i in keys) {
this.client.del(keys[i], lib.noop);
}
lib.tryCall(callback, e);
});
} else {
this.client.flushall(callback);
}
}
get(key, options, callback) {
if (options.listName) {
if (key == "*") {
if (options.del) {
this.client.spop(options.listName, 9999999999, callback);
} else {
this.client.smembers(options.listName, callback);
}
} else
if (!key) {
this.client.eval(scripts.sget, 3, options.listName, options.del ? "spop" : "srandmember", lib.toNumber(options.ttl), callback);
} else {
this.client.sismember(options.listName, key, callback);
}
} else
if (options.mapName) {
if (key == "*") {
this.client.hgetall(options.mapName, callback);
} else
if (Array.isArray(key)) {
this.client.hmget(options.mapName, key, callback);
} else {
this.client.hget(options.mapName, key, callback);
}
} else
if (Array.isArray(key)) {
this.client.mget(key, callback);
} else
if (options.set) {
var ttl = lib.toNumber(options.ttl) || lib.toNumber(this.options.ttl);
this.client.eval(scripts.getset, 3, key, ttl, options.set, callback);
} else {
this.client[options.del ? "getdel" : "get"](key, callback);
}
}
put(key, val, options, callback) {
var ttl = lib.toNumber(options.ttl) || lib.toNumber(this.options.ttl);
switch (typeof val) {
case "boolean":
case "number":
case "string":
break;
default:
if (!(options.mapName && (!key || key == "*")) && !(options.listName && Array.isArray(val))) {
val = lib.stringify(val);
}
}
if (options.listName) {
if (lib.isEmpty(val)) return lib.tryCall(callback);
const multi = this.client.multi();
multi.sadd(options.listName, val);
multi.scard(options.listName);
if (ttl > 0) multi.pexpire(options.listName, ttl);
multi.exec(callback);
} else
if (options.mapName) {
if (options.setmax) {
this.client.eval(scripts.hmsetmax, 4, options.mapName, key, val, ttl, callback);
} else {
const multi = this.client.multi();
if (!key || key == "*") {
multi.hmset(options.mapName, val);
} else {
multi.hmset(options.mapName, key, val);
}
if (ttl > 0) multi.pexpire(options.mapName, ttl);
multi.exec(callback);
}
} else {
if (options.setmax) {
this.client.eval(scripts.setmax, 3, key, val, ttl, callback);
} else
if (ttl > 0) {
this.client.psetex([key, ttl, val], callback || lib.noop);
} else {
this.client.set([key, val], callback || lib.noop);
}
}
}
incr(key, val, options, callback) {
var ttl = lib.toNumber(options.ttl) || lib.toNumber(this.options.ttl);
var isO = lib.isObject(val);
var map = options.mapName || isO && key;
if (map) {
const multi = this.client.multi();
if (options.returning == "old") multi.hgetall(map);
if (typeof val == "number") {
multi.hincrby(map, key, val);
} else {
for (const k in val) {
if (typeof val[k] == "number") {
multi.hincrby(map, k, val[k]);
} else {
multi.hset(map, k, val[k]);
}
}
}
if (ttl > 0) multi.pexpire(map, ttl);
if (["new", "*"].includes(options.returning)) multi.hgetall(map);
multi.exec(callback);
} else
if (isO && !key) {
const ttls = options.ttl || "", vals = [];
const multi = this.client.multi();
let y = 0;
for (const k in val) {
if (typeof val[k] == "number") {
multi.incrby(k, val[k]);
} else {
multi.set(k, val[k]);
}
const t = lib.toNumber(ttls[k]) || ttl;
vals.push(y);
if (t > 0) {
multi.pexpire(k, t);
y++;
}
y++;
}
if (options.returning) {
multi.exec((e, v) => {
if (!e) v = vals.map((x) => v[x]);
if (callback) callback(e, v);
});
} else {
multi.exec(callback);
}
} else
if (lib.isArray(key)) {
val = lib.toNumber(val);
const ttls = options.ttl || "", vals = [];
const multi = this.client.multi();
for (let i = 0, y = 0; i < key.length; i++, y++) {
multi.incrby(key[i], val);
const t = lib.toNumber(ttls[i]) || ttl;
vals.push(y);
if (t > 0) {
multi.pexpire(key[i], t);
y++;
}
}
if (options.returning) {
multi.exec((e, v) => {
if (!e) v = vals.map((x, i) => v[x]);
if (callback) callback(e, v);
});
} else {
multi.exec(callback);
}
} else {
if (ttl > 0) {
this.client.multi().
incrby(key, lib.toNumber(val)).
pexpire(key, ttl).
exec((e, v) => {
if (callback) callback(e, v && v[0]);
});
} else {
this.client.incrby(key, lib.toNumber(val), callback);
}
}
}
del(key, options, callback) {
if (options.listName) {
this.client.srem(options.listName, key, callback || lib.noop)
} else
if (options.mapName) {
if (key == "*") {
this.client.del(options.mapName, callback || lib.noop)
} else {
this.client.hdel(options.mapName, key, callback || lib.noop)
}
} else {
this.client.del(key, callback || lib.noop);
}
}
lock(name, options, callback) {
var ttl = lib.toNumber(options.ttl);
var set = lib.toBool(options.set);
this.client.eval(scripts.lock, 4, name, ttl, process.pid, set ? 1 : 0, callback);
}
unlock(name, options, callback) {
this.client.del(name, callback || lib.noop);
}
limiter(options, callback) {
this.client.eval(scripts.limiter, 8,
options.name,
options.rate,
options.max,
options.interval,
Date.now(),
options.ttl,
options.reset,
options.multiplier,
(err, rc) => {
rc = rc || lib.empty;
if (err) logger.error("limiter:", this.url, lib.traceError(err));
callback(lib.toNumber(rc[0]), {
cacheName: this.cacheName,
delay: lib.toNumber(rc[0]),
count: lib.toNumber(rc[1]),
total: lib.toNumber(rc[2]),
elapsed: lib.toNumber(rc[3]),
interval: lib.toNumber(rc[4]),
});
});
}
}
module.exports = RedisClient;