db/sqlpool.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');
const sql = require(__dirname + '/sql');
const DbPool = require(__dirname + '/pool');

exports.defaults = {
    config: {
        sql: true,
        schema: [],
        features: {
            limit: 25,

        },
        typesMap: {
            real: "numeric", number: "numeric", bigint: "bigint", smallint: "smallint", int: "bigint",
            now: "bigint", mtime: "bigint", ttl: "bigint", timeout: "bigint", random: "bigint", counter: "bigint",
            list: "varchar", set: "varchar", obj: "json", array: "json", object: "json",
            bool: "boolean",
            text: "varchar", string: "varchar", str: "varchar", keyword: "varchar",
            uuid: "varchar", suuid: "varchar", sfuuid: "varchar",
            "": "varchar",
        },
        opsMap: {
            begins_with: 'like%', ne: "<>", eq: '=', le: '<=', lt: '<', ge: '>=', gt: '>'
        },
        keywords: new Set([
            'ABORT','ACTION','ADD','AFTER','ALL','ALTER','ANALYZE','AND','AS','ASC','ATTACH','AUTOINCREMENT','BEFORE','BEGIN','BETWEEN',
            'BY','CASCADE','CASE','CAST','CHECK','COLLATE','COLUMN','COMMIT','CONFLICT','CONSTRAINT','CREATE','CROSS','CURRENT_DATE',
            'CURRENT_TIME','CURRENT_TIMESTAMP','DATABASE','DEFAULT','DEFERRABLE','DEFERRED','DELETE','DESC','DETACH','DISTINCT','DROP',
            'EACH','ELSE','END','ESCAPE','EXCEPT','EXCLUSIVE','EXISTS','EXPLAIN','FAIL','FOR','FOREIGN','FROM','FULL','GLOB','GROUP',
            'HAVING','IF','IGNORE','IMMEDIATE','IN','INDEX','INDEXED','INITIALLY','INNER','INSERT','INSTEAD','INTERSECT','INTO',
            'IS','ISNULL','JOIN','KEY','LEFT','LIKE','LIMIT','MATCH','NATURAL','NO','NOT','NOTNULL','NULL','OF','OFFSET','ON','OR',
            "ORDER","OUTER","PLAN","PRAGMA","PRIMARY","QUERY","RAISE","RECURSIVE","REFERENCES","REGEXP","REINDEX","RELEASE","RENAME",
            "REPLACE","RESTRICT","RIGHT","ROLLBACK","ROW","SAVEPOINT","SELECT","SET","TABLE","TEMP","TEMPORARY","THEN","TO","TRANSACTION",
            "TRIGGER","UNION","UNIQUE","UPDATE","USER","USING","VACUUM","VALUES","VIEW","VIRTUAL","WHEN","WHERE","WITH","WITHOUT"
        ])
    },
};

/**
 * Create a database pool for SQL like databases, see {@link dbPool}
 * @param {object} options - an object defining the pool, the following properties define the pool:
 * @param {string} options.pool - pool name/type, if not specified the SQLite is used
 * @param {int} options.max - max number of clients to be allocated in the pool
 * @param {int} options.idle - after how many milliseconds an idle client will be destroyed
 * @param {object} [defaults] - an object with default pool methods for init and shutdown and other properties, see {@link Pool}
 */
class SqlPool extends DbPool {

    constructor(options, defaults)
    {
        // SQL databases cannot support unlimited connections, keep reasonable default to keep it from overloading
        if (!lib.isPositive(options.max)) options.max = 25;

        super(options, lib.extend({}, exports.defaults, defaults));
    }

    // Call column caching callback with our pool name
    cacheColumns(client, options, callback)
    {
        // Use current database name for schema if not specified
        if (!this.config.schema.length) {
            this.config.schema.push(client.name);
        }

        client.query("SELECT c.table_name,c.column_name,LOWER(c.data_type) AS data_type,c.column_default,c.ordinal_position,c.is_nullable " +
                     "FROM information_schema.columns c,information_schema.tables t " +
                     "WHERE c.table_schema IN (" + sql.valueIn(this.config.schema) + ") AND c.table_name=t.table_name " +
                     (lib.isArray(options.tables) ? `AND t.table_name IN (${sql.valueIn(options.tables)})` : "") +
                     "ORDER BY 5", (err, rows) => {
            this.dbcolumns = {};
            for (const i in rows) {
                const table = rows[i].table_name.toLowerCase()
                if (!this.dbcolumns[table]) this.dbcolumns[table] = {};

                // Split type cast and ignore some functions in default value expressions
                var isserial = false, val = rows[i].column_default ? String(rows[i].column_default).replace(/'/g,"").split("::")[0] : null;
                if (val && val.indexOf("nextval") == 0) val = null, isserial = true;
                if (val && val.indexOf("ARRAY") == 0) val = val.replace("ARRAY", "").replace("[", "{").replace("]", "}");

                this.dbcolumns[table][rows[i].column_name.toLowerCase()] = {
                    id: rows[i].ordinal_position,
                    value: val,
                    data_type: rows[i].data_type,
                    isnull: rows[i].is_nullable == "YES",
                    isserial: isserial
                };
            }
            lib.tryCall(callback, err);
        });
    }

    /**
     * Prepare for execution, return an object with formatted or transformed SQL query for the database driver of this pool
     * @param {DbRequest} req
     */
    prepare(req)
    {
        switch (req.op) {
        case "list":
        case "select":
        case "search":
            sql.select(req);
            break;

        case "create":
            sql.create(req);
            break;

        case "upgrade":
            sql.upgrade(req);
            break;

        case "drop":
            sql.drop(req);
            break;

        case "get":
            sql.get(req);
            break;

        case "add":
        case "put":
            sql.insert(req);
            break;

        case "incr":
            if (req.config.upsert) {
                sql.insert(req);
            } else {
                sql.update(req);
            }
            break;

        case "update":
            if (req.options.upsert && req.config.upsert) {
                sql.insert(req);
                break;
            }
        case "updateall":
            sql.update(req);
            break;

        case "del":
        case "delall":
            sql.delete(req);
            break;
        }
    }

    /**
     * Execute a query in req.text
     * @param {object} client
     * @param {DbRequest} req
     * @param {function} callback
     */
    query(client, req, callback)
    {
        logger.debug("sqlQuery:", req);

        if (req.op == "bulk") {
            return req.options.transaction ?
                        this.queryTransaction(client, req, callback) :
                        this.queryBulk(client, req, callback);
        }

        if (typeof req.text == "string" && req.text.length) {
            client.query(req, callback);
        } else

        if (lib.isArray(req.text)) {
            // run all at once, driver handles it
            if (req.config.features?.bulk) {
                return client.query(req, callback);
            }
            // or run each separately
            lib.forEachLimit(req.text, req.options.concurrency, (text, next) => {
                client.query({ text, options: req.options }, next);
            }, callback, true);
        } else {
            callback(req.op != "upgrade" ? lib.newError("sql text missing") : null, []);
        }
    }

    /**
     * Support for pagination, for SQL this is the OFFSET for the next request
     * @param {object} client
     * @param {DbRequest} req
     * @param {object[]} rows
     * @return {object}
     */
    nextToken(client, req, rows)
    {
        return req.options?.count && rows.length == req.options.count ? lib.toNumber(req.options.start) + lib.toNumber(req.options.count) : null;
    }

    /**
     * Return full placeholder for the given index, i.e. $1, ?, ....
     * @param {DbRequest} req
     * @param {int} [index] - if not provided req.values.length is used
     * @return {string}
     */
    placeholder(req, index)
    {
        return "$" + (index || req.values.length);
    }

    /**
     * Build expression
     * @param {DbRequest} req
     * @param {string} name - property name
     * @param {any} value - current value
     * @param {DbRequestColumn} [column] - column definition returned by {@link module.db:prepareColumn}
     * @return {string}
     */
    prepareExpr(req, name, value, column)
    {
    }

    /**
    * SQL SET expression for a column
    * @param {DbRequest} req
    * @param {object} expr
    * @param {string} expr.name - property name
    * @param {string} expr.type - column type
    * @param {string} expr.column - quoted column name to use in SQL
    * @param {any} expr.value - current value
    * @param {string} expr.op - default op
    * @param {string} expr.placeholder - placeholder position, like $1,
    *    unsetting this property means the value should not be added to req.values
    * @param {string} expr.text - must be set to complete SET statement, if not set this property is skipped
    */
    prepareUpdateExpr(req, expr)
    {
        switch (expr.op) {
        case "unset":
        case "remove":
            expr.text = expr.column + "=NULL";
            delete expr.placeholder;
            break;

        case "not_exists":
            // Update only if the value is null, otherwise skip
            expr.text = `${expr.column}=COALESCE(${expr.column},${expr.placeholder})`;
            break;

        case "incr":
            // Increment a number
            expr.text = `${expr.column}=COALESCE(${expr.column},0)+${expr.placeholder}`;
            break;

        case "add":
            // Add to a list
            expr.text = `${expr.column}=TRIM(COALESCE(${expr.column},'')||','||${expr.placeholder},',')`;
            break;

        case "del":
            // Delete from a list
            expr.text = `${expr.column}=TRIM(REPLACE(${expr.column},${expr.placeholder},''),',')`;
            break;

        case "append":
            // Append to a value
            expr.text = `${expr.column}=COALESCE(${expr.column},'')||${expr.placeholder}`;
            break;

        case "prepend":
            // Append to a value
            expr.text = `${expr.column}=${expr.placeholder}||COALESCE(${expr.column},'')`;
            break;

        default:
            expr.text = `${expr.column}=${expr.placeholder}`;
        }
    }

    queryTransaction(client, req, callback)
    {
        if (!/BEGIN.+TRANSACTION/i.test(req.query[0]?.text)) {
            req.query.splice(0, 0, { text: `BEGIN TRANSACTION` });
        }

        var info = { affected_rows: 0 }, errors = [];

        lib.forEachSeries(req.query, (item, next) => {
            client.query(item, (err, rc, meta) => {
                if (err) {
                    item.error = err;
                    errors.push(item);
                } else
                if (meta?.affected_rows) {
                    info.affected_rows += meta.affected_rows;
                }
                next(err);
            });
        }, (err) => {
            const text = `${err || errors?.length ? "ROLLBACK" : "COMMIT"} TRANSACTION`;
            client.query(text, (e) => {
                if (e) errors.push({ text, error: e });
                callback(err, errors, info);
            });
        }, true);
    }

    queryBulk(client, req, callback)
    {
        var info = { affected_rows: 0 }, errors = [];

        lib.forEachLimit(req.query, req.options.concurrency, (item, next) => {
            client.query(item, (err, rc, meta) => {
                if (err) {
                    item.error = err;
                    errors.push(item);
                } else
                if (meta?.affected_rows) {
                    info.affected_rows += meta.affected_rows;
                }
                next();
            });
        }, (err) => {
            callback(err, errors, info);
        }, true);
    }
}

exports.Pool = SqlPool;