/*
* Author: Vlad Seryakov vseryakov@gmail.com
* backendjs 2018
*/
const lib = require(__dirname + '/../lib');
const db = require(__dirname + '/../db');
const logger = require(__dirname + '/../logger');
const DbPool = require(__dirname + '/pool');
const defaults = {
name: "elasticsearch",
type: "elasticsearch",
configOptions: {
docType: "_doc",
defaultType: "text",
defaultParams: {},
scroll_timeout: "60000ms",
retryCount: 5,
retryTimeout: 100,
retryOnConflict: 3,
refreshInterval: 2000,
refreshBackoff: 1.2,
discoveryInterval: 300000,
discoveryDelay: 500,
selectSize: 25,
bulkSize: 500,
bulkRetryCount: 10,
bulkRetryTimeout: 250,
searchable: 1,
shards: 5,
date_detection: true,
numeric_detection: true,
distance: 10000,
replicas: 1,
clientNodes: 0,
typesMap: {
float: "float", real: "float", number: "float",
ttl: "long", counter: "long", random: "long",
long: "long", bigint: "long", int: "integer",
now: { type: "date", format: "epoch_millis" },
mtime: { type: "date", format: "epoch_millis" },
datetime: { type: "text" },
bool: "boolean", keyword: "keyword", symbol: "keyword",
text: "text", string: "text", json: "text",
object: "object",
array: "nested", obj: "nested",
geohash: "geo_point", geopoint: "geo_point",
email: { type: "text", analyzer: "keyword_lower" },
keyword_lower: { type: "text", analyzer: "keyword_lower" },
whitespace_lower: { type: "text", analyzer: "whitespace_lower" },
},
opsMap: { 'like%': 'begins_with', "": "==", "=": "==", 'eq': '==', 'ne': '!=', 'le': '<=', 'lt': '<', 'ge': '>=', 'gt': '>' },
tableMap: {},
},
// Native query parameters for each operation
_params: {
index: ["op_type","version","version_type","routing","parent","timestamp","ttl","consistency","include_type_name",
"retry_on_conflict","refresh","timeout","replication","wait_for_active_shards","pipeline"],
del: ["version","routing","parent","consistency","refresh","timeout","replication"],
delall: ["routing","conflicts","slices","scroll_size","refresh","wait_for_completion","wait_for_active_shards",
"timeout","scroll","requests_per_second","q"],
updateall: ["allow_no_indices","analyzer","analyze_wildcard","default_operator","df","expand_wildcards","ignore_unavailable",
"lenient","max_docs","pipeline","preference","request_cache","search_type","search_timeout","sort",
"terminate_after",
"routing","conflicts","slices","scroll","scroll_size","refresh","wait_for_completion","wait_for_active_shards",
"timeout","scroll","requests_per_second","q"],
get: ["version","fields","routing","realtime","preference","refresh","_source","_source_include","_source_exclude"],
list: ["version","fields","routing","_source","_source_include","_source_exclude"],
qs: ["default_field","default_operator","analyzer","quote_analyzer","allow_leading_wildcard",
"enable_position_increments","fuzzy_max_expansions","fuzziness","fuzzy_prefix_length","fuzzy_transpositions",
"phrase_slop","fields","auto_generate_phrase_queries","analyze_wildcard","max_determinized_states","minimum_should_match",
"lenient","time_zone","quote_field_suffix","auto_generate_synonyms_phrase_query","all_fields"],
query: ["search_type", "request_cache"],
sql: ["format","delimiter"],
},
_mappings: ["index","store","boost","format","doc_values","null_value","normalizer","analyzer","search_analyzer","norms","fields","fielddata","similarity"],
escapeRx: /([:+=&|><!(){}[\]^"~*?/\\-])/g,
quoteRx: /[ @/]/,
subfieldRx: /^([a-z0-9_]+:)(.+)/i,
};
module.defaults = defaults;
/**
* Create a database pool that works with ElasticSearch server, only the hostname and port will be used, by default each table
* is stored in its own index.
*
* To define shards and replicas per index:
* - `-db-elasticsearch-pool-options-shards-INDEX_NAME=NUM`
* - `-db-elasticsearch-pool-options-replicas-INDEX_NAME=NUM`
*
* To support multiple seed nodes a parameter `-db-elasticsearch-pool-options-servers=1.1.1.1,2.2.2.2` can be specified, if the primary node
* fails it will switch to other configured nodes. To control the switch retries and timeout there are options:
* - `-db-elasticsearch-pool-options-retry-count=3`
* - `-db-elasticsearch-pool-options-retry-timeout=250`
*
* On successful connect to any node the driver retrieves full list of nodes in the cluster and switches to a random node, this happens
* every `discovery-interval` in milliseconds, default is 1h, it can be specified as `-db-elasticserch-pool-options-discovery-interval=300000`
*/
module.exports = class ElasticsearchPool extends DbPool {
constructor(options)
{
super(options, defaults);
this._nodes = [];
this._nodesTimer = setTimeout(this._getNodes.bind(this), this.configOptions.discoveryDelay || 500);
}
shutdown(options, callback) {
clearTimeout(this._nodesTimer);
super.shutdown(options, callback);
}
_getNodes(callback)
{
if (this._nodesTimer === true) return;
clearTimeout(this._nodesTimer);
// If the driver is not setup yet fully we keep waiting
if (!this.url) {
this._nodesTimer = setTimeout(this._getNodes.bind(this), this.configOptions.discoveryDelay || 500);
return;
}
if (this.configOptions.noDiscovery) return;
logger.debug("getNodes:", this.name, this.url);
this._nodesTimer = true;
this.doQuery("GET", "/_nodes", "", "", { quiet: 1, logger_db: "none" }, (err, data, params) => {
if (err) logger.error("getNodes:", this.name, err, this._nodes, params.toJSON());
if (!err && typeof data?.nodes == "object") {
var nodes = [];
for (const p in data.nodes) {
var node = data.nodes[p];
if (!this._version) this._version = lib.toVersion(node.version);
if (lib.isEmpty(node.roles) || (!this.configOptions.clientNodes && lib.isFlag(node.roles, "data"))) {
nodes.push(lib.objGet(node, "http.publish_address") || node.ip);
}
}
this._nodes = lib.shuffle(nodes);
// Pick a random node
if (this._nodes.length) this._node = this._nodes[this._nodes.length - 1];
}
// Increase with every consequetive error
this._nodesInterval = ((err ? this._nodesInterval : 0) || this.configOptions.refreshInterval) * (err ? this.configOptions.refreshBackoff : 1);
var interval = !err && this._nodes.length ? this.configOptions.discoveryInterval || 300000 : this._nodesInterval;
this._nodesTimer = setTimeout(this._getNodes.bind(this), interval);
logger.debug("getNodes:", this.name, this._node, this._version, "nodes:", this._nodes, "interval:", this._nodesInterval, interval);
if (typeof callback == "function") callback();
});
}
_getNodeUrl(node)
{
if (!node) node = this.url;
if (node == "default") node = "http://127.0.0.1:9200";
if (!lib.rxUrl.test(node)) node = "http://" + node;
var h = URL.parse(node) || "";
if (!h.port) h.host += ":9200";
if (!this._node) this._node = h.host;
return h.protocol + "//" + h.host;
}
_retryPrepare(params)
{
params.retryNodes++;
var node = this._nodes.shift();
if (node) {
this._nodes.push(node);
// After we went over all nodes we try the default url again in case all nodes are replaced with new ones
if (params.retryNodes > this._nodes.length) {
params.retryNodes = 0;
node = this.url;
}
} else {
var servers = lib.split(this.configOptions.servers);
if (!this._servers || !this._servers.every((x) => (servers.indexOf(x) > -1))) this._servers = servers;
node = this._servers.shift();
if (node) this._servers.push(node);
}
// If no nodes yet we need to keep trying the default url assuming the server will be online soon or
// it will resolve to some other running node via DNS balancing
if (!node) node = this.url;
this._node = node;
params.nodeHost = this._getNodeUrl(node);
params._uri = params.nodeHost + params.nodePath;
logger.debug("retryPrepare:", this.name, params.nodeHost, params.nodePath, "trying node:", this._node, "count:", params.retryCount, params.retryTotal, params.retryNodes, "nodes:", this._nodes, this._servers);
}
doQuery(method, path, postdata, query, options, callback)
{
var self = this;
var opts = {
nodeHost: this._getNodeUrl(this._node),
nodePath: (path[0] == "/" ? "" : "/") + path,
method,
postdata,
query,
datatype: "obj",
quiet: options.quiet,
headers: options.headers,
retryCount: options.retryCount || this.configOptions.retryCount,
retryTimeout: options.retryTimeout || this.configOptions.retryTimeout,
retryOnError: function() { return this.status == 429 || this.status >= 500 },
retryPrepare: function() { self._retryPrepare(this) },
retryNodes: 0,
};
lib.fetch(opts.nodeHost + opts.nodePath, opts, (err, params) => {
logger.logger(options.logger_db || "debug", "elasticsearchQuery:", params.elapsed, 'ms', opts, "hits:", params.obj?.hits?.hits?.length, err);
if (params.obj && params.retryCount < params.retryTotal) {
params.obj.retry_count = params.retryTotal - Math.min(0, params.retryCount);
}
if (err || !params.status || params.status >= 400) {
if (!err) {
var error = params.obj?.error;
err = lib.newError({
message: (error ? lib.objDescr(error) : "") || (method + " Error " + params.status + " " + params.data),
code: error?.type,
status: params.status || 500
});
logger[options.quiet || params.status == 404 ? "debug": "error"]("elasticsearchQuery:", opts, "ERR:", params.status, params.data);
} else {
params.obj = null;
}
}
callback(err, params.obj, params);
});
}
prepareQuery(req)
{
switch (req.op) {
case "search":
case "select":
// Always allow full text search across all columns
if (!req.custom) req.custom = {};
req.custom.q = { allow: 1 };
break;
}
}
convertError(req, err)
{
if (err.status == 429) err.code = "OverCapacity";
return err;
}
nextToken(client, req, rows)
{
return req.options?.count && rows.length == req.options.count ? lib.toNumber(req.options.start) + lib.toNumber(req.options.count) : null;
}
_getKey(keys, obj)
{
return keys.filter((x) => (obj[x])).map((x) => (obj[x])).join("|");
}
_getTable(name)
{
return this.configOptions.tableMap[name] || name;
}
exists(table)
{
return !!this.dbcolumns[this._getTable(table)];
}
_escape(val, options, name)
{
var prefix = "", noescape;
var opts = options || lib.empty;
switch (typeof val) {
case "number":
case "boolean":
break;
case "string":
noescape = opts.noescape === 1 ||
opts.noescape === true ||
opts.noescape === name ||
lib.isFlag(opts.noescape, name);
if (!noescape) {
if (opts.subfields) {
var d = val.match(defaults.subfieldRx);
if (d) {
prefix = d[1];
val = d[2];
}
}
val = val.replace(defaults.escapeRx, '\\$1');
}
if (val == "OR" || val == "AND" || val == "NOT" || defaults.quoteRx.test(val)) val = '"' + val + '"';
break;
default:
val = null;
}
val = val !== null ? prefix + val : val;
if (name) {
if (opts._fuzziness && opts._fuzziness[name]) val += "~" + opts._fuzziness[name];
if (opts._boost && opts._boost[name]) val += "^" + opts._boost[name];
if (lib.isFlag(opts._ends_with, name) && val.endsWith("\\*")) val = val.slice(0, -2) + "*";
if (lib.isFlag(opts._begins_with, name) && val.startsWith("\\*")) val = "*" + val.substr(2);
}
return val;
}
parseQueryString(q, options)
{
return lib.phraseSplit(q).map((y) => (this._escape(y, options))).join(" AND ");
}
getQueryString(req, query, join)
{
var options = req.options || lib.empty;
query = query || req.query;
return Object.keys(query).map((name) => {
var val = query[name];
if (val === undefined) return 0;
var d = name.match(db.rxOrAnd);
if (d) {
val = this.getQueryString(req, val, d[1]);
if (val) val = "(" + val + ")";
return val;
}
const col = db.prepareColumn(req, name, val);
name = col.name && col.name != "q" ? col.name + ":" : "";
val = col.value;
// Cannot use empty values
if (typeof val == "string") {
val = col.value.trim();
if (!val && col.op != "null" && col.op != "not_null") return 0;
} else
if (lib.isArray(val)) {
val = val.filter((x) => (!lib.isEmpty(x)));
}
const cjoin = ` ${col.join || "AND"} `;
switch (col.op) {
case "null":
return lib.isArray(val) ? '(' + val.map((y) => (`(NOT _exists_:${col.name})`)).join(cjoin) + ')' : `(NOT _exists_:${col.name})`;
case "not_null":
return lib.isArray(val) ? '(' + val.map((y) => (`_exists_:${col.name}`)).join(cjoin) + ')' : "_exists_:" + col.name;
case "!=":
case "ne":
return lib.isArray(val) ? '(' + val.map((y) => ("-" + name + this._escape(y, options, col.name))).join(cjoin) + ')' : '-' + name + this._escape(val, options, col.name);
case ">":
case "gt":
return lib.isArray(val) ? '(' + val.map((y) => (name + ">" + this._escape(y, options, col.name))).join(cjoin) + ')' : name + '>' + this._escape(val, options, col.name);
case "<":
case "lt":
return lib.isArray(val) ? '(' + val.map((y) => (name + "<" + this._escape(y, options, col.name))).join(cjoin) + ')' : name + '<' + this._escape(val, options, col.name);
case ">=":
case "ge":
return lib.isArray(val) ? '(' + val.map((y) => (name + ">=" + this._escape(y, options, col.name))).join(cjoin) + ')' : name + '>=' + this._escape(val, options, col.name);
case "<=":
case "le":
return lib.isArray(val) ? '(' + val.map((y) => (name + "<=" + this._escape(y, options, col.name))).join(cjoin) + ')' : name + '<=' + this._escape(val, options, col.name);
case "in":
case "all_in":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? '(' + val.map((y) => (name + this._escape(y, options, col.name))).join(` ${col.join || col.op == "in" ? "OR" : "AND"} `) + ')' : name + this._escape(val, options, col.name);
case "not_in":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? name + '(' + val.map((y) => ("NOT " + this._escape(y, options, col.name))).join(cjoin) + ')' : "NOT " + name + this._escape(val, options, col.name);
case "between":
if (lib.isEmpty(val)) break;
return name + (val.length == 2 ? `[${this._escape(val[0], options, col.name)} TO ${this._escape(val[1], options, col.name)}]` : this._escape(val, options, col.name));
case "not_between":
if (lib.isEmpty(val)) break;
return "NOT " + name + (val.length == 2 ? `[${this._escape(val[0], options, col.name)} TO ${this._escape(val[1], options, col.name)}]` : this._escape(val, options, col.name));
case "begins_with":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? val.map((y, i) => (name + this._escape(y, options, col.name) + '*')).join(cjoin) : name + this._escape(val, options, col.name) + '*';
case "not_begins_with":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? val.map((y, i) => (`NOT ${name}${this._escape(y, options, col.name)}*`)).join(cjoin) : `NOT ${name}${this._escape(val, options, col.name)}*`;
case "ends_with":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? val.map((y, i) => (name + "*" + this._escape(y, options, col.name))).join(cjoin) : name + "*" + this._escape(val, options, col.name);
case "not_ends_with":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? val.map((y, i) => (`NOT ${name}*${this._escape(y, options, col.name)}`)).join(cjoin) : `NOT ${name}*${this._escape(val, options, col.name)}`;
case "like":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? val.map((y, i) => (name + this._escape(y, options, col.name) + '~')).join(cjoin) : name + this._escape(val, options, col.name) + '~';
case "contains":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? val.map((y) => (`${name}*${this._escape(y, options, col.name)}*`)).join(cjoin) : `${name}*${this._escape(val, options, col.name)}*`;
case "not_contains":
if (lib.isEmpty(val)) break;
return lib.isArray(val) ? name + '(' + val.map((y) => (`NOT *${this._escape(y, options, col.name)}*`)).join(cjoin) + ')' : `NOT ${name}*${this._escape(val, options, col.name)}*`;
case "=":
case "eq":
default:
if (val === "") break;
return Array.isArray(val) ? val.map((y) => (name + this._escape(y, options, col.name))).join(cjoin) : name + this._escape(val, options, col.name);
}
return null;
}).filter((x) => (x)).
join(" " + (join || options.join || "AND").toUpperCase() + " ");
}
getGeoQuery(table, query, options)
{
options.sort_names = ["distance"];
var q = typeof query == "string" ? query : this.getQueryString({ table, query, options });
var n = options.geopoint_name || Object.entries(db.tables[table]).filter(x => (x[1].type == "geopoint")).map(x => x[0])[0] || "latlon";
var ll = lib.isArray(options.latlon) ? [options.latlon[1], options.latlon[0]] :
lib.isArray(options.lonlat) ? options.lonlat : [options.longitude, options.latitude];
var d = options.distance || this.configOptions.distance;
if (options.distance_unit && lib.isNumeric(d)) d += options.distance_unit;
options.no_next_token = 1;
return {
query: {
bool: {
must: {
query_string: {
query: q
}
},
filter: {
geo_distance: {
distance: d,
[n]: ll,
}
}
}
},
sort: [
{
_geo_distance: {
[n]: ll,
order: options.desc ? "desc" : "asc",
unit: options.distance_unit || "m",
distance_type: options.distance_type || "plane"
}
}
]
};
}
_getQuery(name, options)
{
return defaults._params[name].reduce((x, y) => {
if (options[y]) x[y] = options[y]; else
if (this.configOptions.defaultParams[y]) x[y] = this.configOptions.defaultParams[y];
return x;
}, {});
}
_buildCondition(req, query, expected, join)
{
var expr = [];
for (const p in expected) {
const val = expected[p];
const d = p.match(db.rxOrAnd);
if (d) {
const e = this._buildCondition(req, query, val, d[1] == "or" ? "||" : "");
if (e) expr.push("(" + e + ")");
continue;
}
const col = db.prepareColumn(req, p, val);
var not = col.op.startsWith("not ") ? "!" : "";
switch (col.op) {
case "null":
expr.push(`!ctx._source.containsKey('${col.name}')`);
break;
case "not null":
expr.push(`ctx._source.containsKey('${col.name}')`);
break;
case "in":
case "not in":
if (!Array.isArray(col.value)) break;
expr.push(`${not}(ctx._source.containsKey('${col.name}') && params._${p}.contains(ctx._source.${col.name}))`);
query.script.params["_" + p] = col.value;
break;
case "between":
case "not between":
if (!Array.isArray(col.value) || col.value.length != 2) break;
expr.push(`${not}(ctx._source.containsKey('${col.name}') && ctx._source.${col.name} >= _${p} && ctx._source.${col.name} <= __${p})`);
query.script.params["_" + p] = col.value[0];
query.script.params["__" + p] = col.value[1];
break;
case "begins with":
case "not begins with":
expr.push(`${not}(ctx._source.containsKey('${col.name}') && ctx._source.${col.name}.startsWith(params._${p}))`);
query.script.params["_" + p] = col.value;
break;
case "contains":
case "not contains":
expr.push(`${not}(ctx._source.containsKey('${col.name}') && ctx._source.${col.name}.contains(params._${p}))`);
query.script.params["_" + p] = col.value;
break;
case "!=":
case ">":
case "<":
case ">=":
case "<=":
case "==":
expr.push(`(ctx._source.containsKey('${col.name}') && ctx._source.${col.name} ${col.op} params._${p})`);
query.script.params["_" + p] = col.value;
break;
}
}
return expr.join(" " + (join == "or" || join == "OR" ? "||" : "&&") + " ");
}
_buildUpdate(req, obj, query)
{
if (lib.isEmpty(obj)) return;
var rc;
if (!lib.isEmpty(req.options.ops) || !lib.isEmpty(query)) {
rc = { script: { source: "", params: {} } };
if (req.options.upsert) {
rc.scripted_upsert = true;
rc.upsert = {};
}
const expr = this._buildCondition(req, rc, query);
if (expr) {
rc.script.source = `if(!(${expr})){ctx.op="noop";return;}`;
}
for (const p in obj) {
let val = obj[p], op = req.options.ops?.[p], prop = p, param = p, src = "_source";
if (val === null || val === "") op = "remove";
// Handle dotted columns as nested maps
if (p.includes(".")) {
param = p.split(".");
for (let i = 0; i < param.length - 1; i++) {
rc.script.source += `if(!ctx.${src}.containsKey('${param[i]}') || ctx.${src}.${param[i]}==null) ctx.${src}.${param[i]} = new HashMap();`;
src += "." + param[i];
}
prop = param[param.length - 1];
param = param.join("");
}
switch (op) {
case "none":
break;
case "incr":
case "append":
rc.script.source += `if(!ctx.${src}.containsKey('${prop}') || ctx.${src}.${prop}==null) ctx.${src}.${prop}=params.${param}; else ctx.${src}.${prop}+=params.${param};`;
rc.script.params[param] = val;
if (req.options.upsert) lib.objSet(rc.upsert, p, 0);
break;
case "add":
val = Array.isArray(val) ? val : [ val ];
for (const i in val) {
rc.script.source += `if(!ctx.${src}.containsKey('${prop}') || ctx.${src}.${prop}==null) ctx.${src}.${prop}=[params.${param}${i}]; else if(!ctx.${src}.${prop}.contains(params.${param}${i})) ctx.${src}.${prop}.add(params.${param}${i});`;
rc.script.params[param + i] = val[i];
}
if (req.options.upsert) lib.objSet(rc.upsert, p, []);
break;
case "del":
val = Array.isArray(val) ? val : [ val ];
for (const i in val) {
rc.script.source += `if(ctx.${src}.containsKey('${prop}') && ctx.${src}.${prop}.indexOf(params.${param}${i})>-1) ctx.${src}.${prop}.remove(ctx.${src}.${prop}.indexOf(params.${param}${i}));`;
rc.script.params[param + i] = val[i];
}
break;
case "not_exists":
rc.script.source += `if(!ctx.${src}.containsKey('${prop}') || ctx.${src}.${prop}==null) ctx.${src}.${prop}=params.${param};`;
rc.script.params[param] = val;
if (req.options.upsert) lib.objSet(rc.upsert, p, val);
break;
case "unset":
case "remove":
rc.script.source += `ctx._source.remove('${param}');`;
break;
default:
var col = req.columns[p];
if (col?.type == "json" && typeof val != "string") val = lib.stringify(val);
if (!req.keys.includes(p)) {
rc.script.source += `ctx._source.${p} = params.${param};`;
rc.script.params[param] = val;
}
if (req.options.upsert) lib.objSet(rc.upsert, p, val);
}
}
if (!rc.script.source) return;
} else {
rc = { doc: obj };
if (!this.configOptions.keepEmpty) {
for (const p in obj) if (obj[p] === "") obj[p] = null;
}
if (req.options.upsert) rc.doc_as_upsert = true;
}
return rc;
}
cacheColumns(client, options, callback)
{
var docType = this.configOptions.docType;
this.doQuery("GET", "/_mappings", "", "", this.configOptions, (err, data, params) => {
if (err || !data) return callback(err);
const dbcolumns = {}, dbindexes = {};
for (const table in data) {
if (!dbindexes[table]) dbindexes[table] = 1;
if (!data[table].mappings) continue;
if (!dbcolumns[table]) dbcolumns[table] = {};
const properties = data[table].mappings.properties || data[table].mappings && data[table].mappings[docType] && data[table].mappings[docType].properties;
for (const c in properties) {
if (!dbcolumns[table][c]) dbcolumns[table][c] = {};
const col = properties[c];
for (const p in col) {
if (p == "type" && col[p] == "text") continue;
dbcolumns[table][c][p] = col[p];
}
}
}
this.dbcolumns = dbcolumns;
this.dbindexes = dbindexes;
callback();
});
setTimeout(this._getNodes.bind(this), this.configOptions.discoveryDelay || 500);
}
delAll(table, query, options, callback)
{
db.query(db.prepare({ op: "delall", table, query, options, callback }));
}
query(client, req, callback)
{
switch (req.op) {
case "create":
case "upgrade":
this.queryCreate(client, req, callback);
break;
case "drop":
// Only if one index per table or single table regexp
this.doQuery("DELETE", this._getTable(req.table), "", "", this.configOptions, callback);
break;
case "sql":
this.querySql(client, req, callback);
break;
case "select":
case "search":
this.querySelect(client, req, callback);
break;
case "list":
this.queryList(client, req, callback);
break;
case "get":
this.queryGet(client, req, callback);
break;
case "add":
req.options.op_type = "create";
case "put":
this.queryAdd(client, req, callback);
break;
case "incr":
req.options.upsert = true;
case "update":
this.queryUpdate(client, req, callback);
break;
case "updateall":
this.queryUpdateAll(client, req, callback);
break;
case "del":
this.queryDel(client, req, callback);
break;
case "delall":
this.queryDelAll(client, req, callback);
break;
case "bulk":
this.queryBulk(client, req, callback);
break;
default:
callback();
}
}
queryCreate(client, req, callback)
{
var table = this._getTable(req.table);
var ecols = this.dbcolumns[table], dynamic_templates = [];
var properties = {}, missing = 0;
var config = req.config;
for (const name in req.query) {
const col = req.query[name];
if (req.op == "upgrade" && ecols && ecols[name]) continue;
// All specific properties go as is
for (const f in col.elasticsearch) {
if (f == "dynamic_templates") {
dynamic_templates.push(...col.elasticsearch[f]);
} else {
if (!properties[name]) properties[name] = {};
properties[name][f] = col.elasticsearch[f];
}
}
if (col.keyword) {
if (!properties[name]) properties[name] = {};
properties[name].type = "keyword";
}
// Predefined properties with types and other field params
if (col.params && col.type == "object") {
properties[name] = { properties: {} };
for (const param in col.params) {
const prop = col.params[param];
const type = config.typesMap[prop.type] || config.defaultType;
properties[name].properties[param] = typeof type == "object" ? type: { type };
for (const m in this._mappings) {
if (prop[m]) properties[name].properties[param][m] = prop[m];
}
}
} else {
// Just the type mapping
const type = col.join ? "text" : config.typesMap[col.type] || config.defaultType;
if (type) properties[name] = typeof type == "object" ? type : { type };
}
if (!properties[name]) continue;
if (!properties[name].type && !properties[name].properties) {
properties[name].type = config.typesMap[col.type] || config.defaultType;
}
if (req.op == "upgrade" && properties[name].type == "text" && Object.keys(properties[name]).length == 1) {
delete properties[name];
continue;
}
if (!(ecols && ecols[name])) missing++;
}
if (!this.dbindexes[table]) {
this.dbcolumns[table] = properties;
this.dbindexes[table] = 1;
var query = {
settings: {
number_of_shards: config[lib.toCamel("shards-" + table, "-")] || config.shards,
number_of_replicas: config[lib.toCamel("replicas-" + table, "-")] || config.replicas,
analysis: {
filter: {
email: {
type: "pattern_capture",
preserve_original: true,
patterns: [ "([^@]+)", "@(.+)" ]
}
},
normalizer: {
keyword_lower: {
type: "custom",
char_filter: [],
filter: ["lowercase", "asciifolding"]
}
},
analyzer: {
email: {
tokenizer: "uax_url_email",
filter: [ "email", "lowercase", "unique" ]
},
keyword_lower: {
type: "custom",
tokenizer: "keyword",
filter: "lowercase"
},
whitespace_lower: {
type: "custom",
tokenizer: "whitespace",
filter: "lowercase"
}
}
}
},
mappings: {
date_detection: config.date_detection,
numeric_detection: config.numeric_detection,
dynamic_templates,
},
};
query.mappings.properties = properties;
lib.series([
(next) => {
if (this._version) return next();
this._getNodes(next);
},
(next) => {
this.doQuery("PUT", table, query, "", config, next);
},
], callback, true);
return;
}
if (missing) {
return this.doQuery("PUT", table + "/_mappings", { properties: properties }, "", config, callback);
}
callback();
}
queryGet(client, req, callback)
{
var table = this._getTable(req.table);
var keys = req.keys;
var config = req.config;
var path = table + "/" + config.docType + "/" + lib.escape(this._getKey(keys, req.query));
var query = this._getQuery("get", req.options);
if (req.options.select) query.fields = String(req.options.select);
this.doQuery("GET", path, null, query, req.options, (err, res) => {
if (err?.status == 404) err = null;
if (err || !(res?._source || res?.fields)) return callback(err, []);
callback(null, [ res._source || res.fields ], res);
});
}
querySelect(client, req, callback)
{
var table = this._getTable(req.table);
var config = req.config;
var cols = db.getColumns(req.table, req.options);
var method = "POST", opts;
var path = table + "/" + (req.options.op || "_search");
var query = this._getQuery("query", req.options);
var obj;
if (lib.isObject(req.query)) {
if (req.query.aggs || req.query.aggregations || req.query.query) {
// Native JSON request
obj = req.query;
opts = obj;
} else {
if (req.query.q && typeof req.query.q == "string") {
req.query.q = lib.phraseSplit(req.query.q, req.options.splitOptions);
}
if (lib.isObject(req.options.filter)) {
obj = {
query: {
bool: {
must: {
query_string: {
query: this._getQuery("qs", req.options),
}
},
filter: req.options.filter
}
},
};
obj.query.bool.must.query_string.query = this.getQueryString(req);
if (!obj.query.bool.must.query_string.query) delete obj.query.bool.must.query_string;
} else {
obj = {
query: {
query_string: this._getQuery("qs", req.options),
}
};
obj.query.query_string.query = this.getQueryString(req);
if (!obj.query.query_string.query) delete obj.query;
if (req.options.noscan && lib.isEmpty(obj)) {
logger.info('select:', this.name, req, "NO EMPTY SCANS ENABLED");
return callback();
}
}
opts = obj;
}
} else
// Already formatted query string
if (typeof req.query == "string" && req.query) {
obj = {
query: {
query_string: this._getQuery("qs", req.options),
}
};
obj.query.query_string.query = req.query;
opts = obj;
} else {
return callback();
}
if (req.options.random_score) {
obj = {
query: {
function_score: {
query: obj.query,
random_score: {},
}
}
}
opts = obj;
}
if (req.options.count > 0) {
opts.size = req.options.count;
} else
if (config.selectSize > 0) {
opts.size = config.selectSize;
}
if (req.options.select) {
opts._source = lib.split(req.options.select);
} else
if (req.options.select_fields) {
opts.fields = lib.split(req.options.select_fields);
opts._source = false;
}
if (req.options.scroll_timeout > 0) {
query.scroll = req.options.scroll_timeout + "ms";
} else
if (req.options.fullscan) {
query.scroll = config.scroll_timeout;
}
if (req.options.explain) {
query.explain = true;
}
if (req.options.start) {
if (Array.isArray(req.options.start)) {
obj.search_after = req.options.start;
} else
if (lib.isNumeric(req.options.start)) {
opts.from = req.options.start;
} else
if (req.options.start?.scroll_id && req.options.start?.scroll) {
for (const p in query) delete query[p];
for (const p in opts) delete opts[p];
query.scroll_id = req.options.start.scroll_id;
if (!req.options.scroll) query.scroll = req.options.start.scroll;
path = "/_search/scroll";
method = "GET";
}
}
// Pass a string, or a list of strings/formatted sorting objects
if (req.options.sort && !query.scroll_id) {
var sort = lib.split(req.options.sort).map((x) => {
if (!x) return null;
if (typeof x == "object") return x;
if (typeof x != "string") return null;
if (x[0] == "_") {
if (x == "_random") {
return { _script: { script: "Math.random()", type: "number", order: "asc" } };
}
return x;
}
let desc;
if (x[0] == "!") {
desc = 1;
x = x.substr(1);
}
const col = cols[x];
if (col) {
if (desc) {
return col.raw ? { [x + ".raw"]: { order: "desc" } } : { [x]: { order: "desc" } };
}
return col.raw ? x + ".raw" : x;
}
// Nested object
if (x.indexOf(".") > -1) {
var y = x.split(".");
if (cols[y[0]]) {
return desc ? { [x]: { order: "desc" } } : x;
}
}
return null;
}).filter((x) => (x));
if (sort.length) {
if (!Array.isArray(opts.sort)) opts.sort = [];
for (const i in sort) {
if (req.options.desc || req.options.sort_missing) {
const o = typeof sort[i] == "string" ? { [sort[i]]: {} } : sort[i];
for (const p in o) {
if (req.options.desc) o[p].order = "desc";
if (req.options.sort_missing && !/_score|_doc/.test(p)) o[p].missing = req.options.sort_missing;
}
opts.sort.push(o);
} else {
opts.sort.push(sort[i]);
}
}
}
if (req.options.sort == "random") {
opts.sort = { _script: { script: "Math.random()", type: "number", order: "asc" } };
}
}
if (req.options.total) {
path = table + "/_count";
delete opts.sort;
}
if (req.options.track_total_hits && obj.query) {
obj.track_total_hits = true;
}
this.doQuery(method, path, obj, query, req.options, (err, res) => {
if (err?.status == 404) {
logger.info("elasticsearchQuery:", "notfound:", path, obj, query, err);
err = null;
}
if (err || !res) return callback(err, []);
if (res._scroll_id) {
res.next_token = { scroll_id: res._scroll_id, scroll: query.scroll };
}
var rows = [];
if (res.hits) {
if (!res._scroll_id &&
!req.options.no_next_token &&
Array.isArray(opts.sort) &&
res.hits.hits.length == opts.size) {
res.next_token = res.hits.hits[res.hits.hits.length - 1].sort;
}
rows = res.hits.hits.map((x) => {
if (Array.isArray(req.options.sort_names) && Array.isArray(x.sort)) {
for (var i = 0; i < req.options.sort_names.length; i++) {
x._source[req.options.sort_names[i]] = x.sort[i];
}
}
return x._source || x.fields || {};
});
if (!res.total) {
res.total = res.hits.total ? res.hits.total > 0 ? res.hits.total : res.hits.total.value || 0 : 0;
}
if (!req.options.debug) {
delete res.hits.hits;
}
}
if (req.options.info_query) res.query = obj.query;
if (req.options.total) rows = [{ count: res.count }];
if (res._scroll_id) {
if (!rows.length || (req.options.limit > 0 && req.options.limit_count + rows.length >= req.options.limit)) {
this.doQuery("DELETE", "/_search/scroll", { scroll_id: res._scroll_id }, "", config, lib.noop);
res.next_token = null;
}
}
callback(null, rows, res);
});
}
queryList(client, req, callback)
{
var table = this._getTable(req.table);
var path = table + "/_mget";
var query = this._getQuery("list", req.options);
if (req.options.select) query._source = lib.split(req.options.select).join(",");
var ids = [];
for (const i in req.query) ids.push(this._getKey(Object.keys(req.query[i]), req.query[i]));
this.doQuery("GET", path, { ids: ids }, query, req.options, (err, res) => {
if (err?.status == 404) err = null;
if (err || !res) return callback(err, []);
var rows = res.docs ? res.docs.map((x) => (x._source || x.fields || {})) : [];
if (!req.options.debug) delete res.docs;
callback(null, rows, res);
});
}
queryAdd(client, req, callback)
{
var table = this._getTable(req.table);
var keys = req.keys;
var config = req.config;
var path = table + "/" + config.docType + "/" + lib.escape(this._getKey(keys, req.query));
var query = this._getQuery("index", req.options);
this.doQuery("PUT", path, req.query, query, req.options, (err, res) => {
if (!err && res) res.affected_rows = 1;
if (err?.status == 409) err.code = "AlreadyExists";
callback(err, [], res);
});
}
queryUpdate(client, req, callback)
{
var table = this._getTable(req.table);
var keys = req.keys;
var config = req.config;
var path = table + "/_update/" + lib.escape(this._getKey(keys, req.query));
var query = this._getQuery("index", req.options);
var obj = this._buildUpdate(req, req.query, req.options.query);
if (!obj) return callback(null, [], {});
if (req.options.returning == "*") obj._source = true;
if (typeof req.options.retry_on_conflict == "undefined") query.retry_on_conflict = config.retryOnConflict;
this.doQuery("POST", path, obj, query, req.options, (err, res) => {
if (!err && res?.result == "updated") res.affected_rows = 1;
if (err?.status == 404) err = null;
if (err?.status == 409) err.code = "AlreadyExists";
var rows = obj._source && res.get?._source ? [res.get?._source] : [];
callback(err, rows, res);
});
}
queryUpdateAll(client, req, callback)
{
var table = this._getTable(req.table);
var path = table + "/_update_by_query";
var query = this._getQuery("updateall", req.options);
var obj;
var filter = req.options.query;
if (lib.isObject(filter)) {
if (lib.isObject(filter.query)) {
obj = filter;
} else {
if (filter.q) filter.q = lib.phraseSplit(filter.q);
obj = {
query: {
query_string: this._getQuery("qs", req.options),
}
};
obj.query.query_string.query = this.getQueryString(req, filter);
if (!obj.query.query_string.query) obj.query = { match_all: {} };
}
} else
if (typeof filter == "string") {
query.q = filter;
}
var u = this._buildUpdate(req, req.query);
if (u?.script) {
obj.script = u.script;
}
this.doQuery("POST", path, obj, query, req.options, (err, res) => {
if (!err && res) res.affected_rows = res.deleted;
callback(err, [], res);
});
}
queryDel(client, req, callback)
{
var table = this._getTable(req.table);
var keys = req.keys;
var config = req.config;
var path = table + "/" + config.docType + "/" + lib.escape(this._getKey(keys, req.query));
var query = this._getQuery("del", req.options);
this.doQuery("DELETE", path, null, query, req.options, (err, res) => {
if (!err && res) res.affected_rows = 1;
if (err?.status == 404) err = null;
callback(err, [], res);
});
}
queryBulk(client, req, callback)
{
var keys = req.keys;
var config = req.config;
var info = { retry_count: 0, _size: req.query.length, _retries: 0, _timeout: config.bulkRetryTimeout };
var bulk = req.query, bulkSize = req.options.bulkSize || config.bulkSize;
var map = {}, errors = [];
lib.doWhilst(
(next) => {
var item, meta, data = "";
var batch = bulk.slice(0, bulkSize);
bulk = bulk.slice(bulkSize);
if (!batch.length) return next();
for (const i in batch) {
item = batch[i];
if (item.errstatus) {
errors.push(item);
continue;
}
keys = db.getKeys(item.table, item.options);
meta = { _id: this._getKey(keys, item.obj), _index: this._getTable(item.table) };
switch (item.op) {
case "add":
data += lib.stringify({ create: meta }) + "\n";
data += lib.stringify(item.obj) + "\n";
break;
case "put":
data += lib.stringify({ index: meta }) + "\n";
data += lib.stringify(item.obj) + "\n";
break;
case "incr":
case "update":
if (!item.options || typeof req.options.retry_on_conflict != "number") {
meta.retry_on_conflict = config.retryOnConflict;
}
var d = this._getUpdate(item);
if (!d) {
item.errstatus = 404;
errors.push(item);
break;
}
data += lib.stringify({ update: meta }) + "\n";
data += lib.stringify(d) + "\n";
break;
case "del":
data += lib.stringify({ delete: meta }) + "\n";
break;
}
map[meta._id] = item;
}
if (!data) return next();
req.options.headers = { "content-type": "application/json" };
this.doQuery("POST", "/_bulk", data, "", req.options, (err, res) => {
if (!err && res) {
var retry, item;
for (const i in res.items) {
item = res.items[i].create || res.items[i].index || res.items[i].update || res.items[i].delete;
if (item?.status >= 400 && map[item._id]) {
if (item.status == 429) {
bulk.unshift(map[item._id]);
retry = 1;
} else {
errors.push(lib.objExtend(map[item._id], { _id: item._id, errstatus: item.status, errmsg: item.error?.reason }, { del: /^(orig|options)/ }));
}
}
}
if (res.retry_count) info.retry_count += res.retry_count;
if (retry) {
info._retries++;
return setTimeout(next, lib.objMult(info, "_timeout", 2, "old"));
}
}
next(err);
});
},
() => (info._retries < config.bulkRetryCount && bulk.length > 0),
(err) => {
if (!err) info.affected_rows = info._size - errors.length;
callback(err, errors, info);
}, true);
}
queryDelAll(client, req, callback)
{
var table = this._getTable(req.table);
var path = table + "/_delete_by_query";
var query = this._getQuery("delall", req.options);
var obj;
if (lib.isObject(req.query)) {
if (lib.isObject(req.query.query)) {
obj = req.query;
} else {
if (req.query.q) req.query.q = lib.phraseSplit(req.query.q);
obj = {
query: {
query_string: this._getQuery("qs", req.options),
}
};
obj.query.query_string.query = this.getQueryString(req);
if (!obj.query.query_string.query) obj.query = { match_all: {} };
}
} else
if (typeof req.query == "string") {
query.q = req.query;
}
this.doQuery("POST", path, obj, query, req.options, (err, res) => {
if (!err && res) res.affected_rows = res.deleted;
callback(err, [], res);
});
}
querySql(client, req, callback)
{
var path = "/_sql";
var query = this._getQuery("sql", req.options);
var obj = { query: req.text };
if (req.options.filter) obj.filter = req.options.filter;
if (lib.isArray(req.values)) obj.params = req.values;
if (req.options.count > 0) obj.fetch_size = req.options.count;
if (req.options.multi) obj.field_multi_value_leniency = true;
if (req.options.columnar) obj.columnar = true;
if (req.options.translate) path += "/translate";
this.doQuery("POST", path, obj, query, req.options, (err, res) => {
if (err?.status == 404) err = null;
if (err || !res) return callback(err, []);
var rows = lib.isArray(res.rows, []).map((x) => (x.reduce((r, v, i) => { r[res.columns[i].name] = v; return r }, {})));
if (!req.options.no_next_token) res.next_token = res.cursor;
if (!req.options.debug) delete res.rows;
callback(err, rows, res);
});
}
}