db/dynamodb.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const lib = require(__dirname + '/../lib');
const aws = require(__dirname + '/../aws');
const db = require(__dirname + '/../db');
const logger = require(__dirname + '/../logger');
const DbPool = require(__dirname + '/pool');

const defaults = {
    name: "dynamodb",
    type: "dynamodb",
    configOptions: {
        noConcat: 1,
        skipNull: { add: /.*/, put: /.*/ },
        skipEmpty: { add: /^(list|set)$/, put: /^(list|set)$/ },
        noNulls: 1,
        emptyValue: "",
        concurrency: 3,
        requireCapacity: 1,
        maxIndexes: 20,
        batchSize: 100,
        bulkSize: 50,
        selectSize: 25,
        opsMap: { "!=": "<>", "": "=", eq: "=", ne: "<>", lt: "<", le: "<=", gt: ">", ge: ">=" }
    },
}
module.defaults = defaults;

/**
 * Create a database pool that works with AWS DynamoDB
 * is stored in its own index.
 *
 */

class DynamoDBPool extends DbPool {

    constructor(options)
    {
        super(options, defaults);

        this.dbprojections = {};
        for (const table in db.tables) {
            const { keys, local, global, projections } = this.prepareCreate(db.tables[table]);
            this.dbkeys[table] = keys;
            if (!lib.isEmpty(projections)) {
                this.dbprojections[table] = projections;
            }
            for (const i in local) {
                if (!this.dbcolumns[table]) this.dbcolumns[table] = {};
                for (const p in local[i]) {
                    lib.objSet(this.dbcolumns[table], [p, "lsi"], 1);
                }
            }
            Object.assign(local, global);
            if (!lib.isEmpty(local)) {
                this.dbindexes[table] = Object.keys(local).reduce((a, b) => { a[b] = Object.keys(local[b]); return a }, {});
            }
        }
    }

    _parseTable(table, rc, schema)
    {
        if (!rc || !rc.Table) return;
        (rc.Table.AttributeDefinitions || []).forEach(function(x) {
            if (!schema.dbcolumns[table]) schema.dbcolumns[table] = {};
            var data_type = x.AttributeType == "N" ? "number" : x.AttributeType.length == 2 ? "array" : "text";
            schema.dbcolumns[table][x.AttributeName] = { data_type: data_type };
        });
        (rc.Table.KeySchema || []).forEach(function(x) {
            schema.dbcolumns[table][x.AttributeName].primary = 1;
            lib.objSet(schema.dbkeys, [table], x.AttributeName, { push: 1, unique: 1 });
            lib.objSet(schema.dbcapacity, [table, table], { read: rc.Table.ProvisionedThroughput && rc.Table.ProvisionedThroughput.ReadCapacityUnits || 0,
                write: rc.Table.ProvisionedThroughput && rc.Table.ProvisionedThroughput.WriteCapacityUnits || 0 });
        });
        (rc.Table.LocalSecondaryIndexes || []).forEach(function(x) {
            if (x.Projection && Array.isArray(x.Projection.NonKeyAttributes)) {
                lib.objSet(schema.dbprojections, [table, x.IndexName], x.Projection.NonKeyAttributes);
            }
            (x.KeySchema || []).forEach(function(y) {
                lib.objSet(schema.dbindexes, [table, x.IndexName], y.AttributeName, { push: 1, unique: 1 });
                schema.dbcolumns[table][y.AttributeName].lsi = 1;
            });
        });
        (rc.Table.GlobalSecondaryIndexes || []).forEach(function(x) {
            if (x.Projection && Array.isArray(x.Projection.NonKeyAttributes)) {
                lib.objSet(schema.dbprojections, [table, x.IndexName], x.Projection.NonKeyAttributes);
            }
            (x.KeySchema || []).forEach(function(y) {
                lib.objSet(schema.dbindexes, [table, x.IndexName], y.AttributeName, { push: 1, unique: 1 });
                schema.dbcolumns[table][y.AttributeName].gsi = 1;
            });
            lib.objSet(schema.dbcapacity, [table, x.IndexName], { read: x.ProvisionedThroughput && x.ProvisionedThroughput.ReadCapacityUnits || 0,
              write: x.ProvisionedThroughput && x.ProvisionedThroughput.WriteCapacityUnits || 0 });
        });
    }

    exists(table)
    {
        return !!this.dbcolumns[aws.ddbTable(table)];
    }

    cacheColumns(client, options, callback)
    {
        options = lib.clone(options, {
            endpoint: this.url,
            endpoint_protocol: options.endpoint_protocol || this.configOptions.endpoint_protocol,
            region: options.region || this.configOptions.region,
            concurrency: options.concurrency || this.configOptions.concurrency || db.concurrency,
        });
        var schema = { dbcolumns: {}, dbkeys: {}, dbindexes: {}, dbcapacity: {}, dbprojections: {} };

        lib.series([
            (next) => {
                aws.ddbListTables(options, (err, rc) => {
                    next(err, rc.TableNames);
                });
            },
            (next, tables) => {
                lib.forEachLimit(tables, options.concurrency, (table, next) => {
                    aws.ddbDescribeTable(table, options, (err, rc) => {
                        if (err || rc.Table.TableStatus == "DELETING") return next();
                        this._parseTable(table, rc, schema);
                        next();
                    });
                }, next, true);
            },
            (next) => {
                // Replace all at once
                for (const p in schema) this[p] = schema[p];
                next();
            },
        ], callback, true);
    }

    // Convert into recognizable error codes
    convertError(req, err)
    {
        switch (req.op) {
        case "get":
            if (err.message === "The provided key element does not match the schema") err.code = "NotFound";
            break;
        case "add":
        case "put":
            if (err.code === "ConditionalCheckFailedException") err.code = "AlreadyExists";
            break;
        case "incr":
        case "update":
        case "del":
            if (err.code == "ConditionalCheckFailedException") err.code = "NotFound";
            break;
        case "select":
        case "search":
            if (err.message === "The provided starting key is invalid") err.code = "NotMatched"; else
            if (err.message === "The provided starting key is outside query boundaries based on provided conditions") err.code = "NotMatched";
            break;
        }
        if (err.code === "ProvisionedThroughputExceededException") err.code = "OverCapacity";
        return err;
    }

    prepareOptions(options)
    {
        options.region = options.region || this.configOptions.region;
        options.endpoint = options.endpoint || this.url;
        options.endpoint_protocol = options.endpoint_protocol || this.configOptions.endpoint_protocol;
        options.retryCount = options.retryCount || this.configOptions.retryCount;
        options.retryTimeout = this.configOptions.retryTimeout;
        options.httpTimeout = this.configOptions.httpTimeout;
    }

    // Simulate query as in SQL driver but performing AWS call, text will be a table name and values will be request options
    query(client, req, callback)
    {
        this.prepareOptions(req.options);
        switch (req.op) {
        case "create":
            this.queryCreate(client, req, callback);
            break;

        case "upgrade":
            this.queryUpgrade(client, req, callback);
            break;

        case "drop":
            this.queryDrop(client, req, callback);
            break;

        case "get":
            this.queryGet(client, req, callback);
            break;

        case "select":
        case "search":
            this.queryPrepareSelect(client, req, callback);
            break;

        case "list":
            this.queryList(client, req, callback);
            break;

        case "add":
            this.queryAdd(client, req, callback);
            break;

        case "put":
            this.queryPut(client, req, callback);
            break;

        case "incr":
        case "update":
            this.queryUpdate(client, req, callback);
            break;

        case "del":
            this.queryDel(client, req, callback);
            break;

        case "bulk":
            if (req.options.transaction) {
                this.queryTransact(client, req, callback);
            } else {
                this.queryBulk(client, req, callback);
            }
            break;

        case "sql":
            this.querySql(client, req, callback);
            break;

        default:
            callback(lib.newError("invalid op: " + req.op), []);
        }
    }

    prepareCreate(table)
    {
        var local = {}, global = {}, attrs = {}, projections = {};
        var keys = Object.keys(table).
                   filter((x, i) => (table[x].primary)).
                   sort((a,b) => (table[a].primary - table[b].primary)).
                   filter((x, i) => (i < 2)).
                   map((x) => { attrs[x] = 1; return x });

        var hash = keys[0];
        for (const type of ["index", "unique"]) {
            Array(this.configOptions.maxIndexes).fill(0, 0).forEach((_, n, t) => {
                n = n || "";
                t = type + n;
                const index = Object.keys(table).
                              filter((x) => (table[x][t])).
                              sort((a,b) => (table[a][t] - table[b][t])).
                              filter((x, i) => (i < 2));
                if (!index.length) return;

                var name = index.join("_");
                if (name.length < 3) name = "i_" + name;

                // Index starts with the same hash, local unless explicitly defined as global
                if (index.length == 2 && index[0] == hash && !table[index[0]].dynamodb?.global) {
                    local[name] = { [index[0]]: 'HASH', [index[1]]: 'RANGE' };
                } else
                // Global if does not start with the primary hash
                if (index.length == 2) {
                    global[name] = { [index[0]]: 'HASH', [index[1]]: 'RANGE' };
                } else {
                    global[name] = { [index[0]]: 'HASH' };
                }
                index.forEach((y) => { attrs[y] = 1 });

                projections[name] = table[index[0]].dynamodb?.["projections" + n];
            });
        }
        return { keys, local, global, attrs, projections };
    }

    queryCreate(client, req, callback)
    {
        const { keys, local, global, attrs, projections } = this.prepareCreate(req.query);

        // All native properties for options from the key columns
        Object.keys(attrs).forEach((x) => {
            attrs[x] = !req.query[x].join && lib.rxNumericType.test(req.query[x].type) ? "N" : "S";
            for (const p in req.query[x].dynamodb) {
                req.options[p] = req.query[x].dynamodb[p];
            }
        });
        req.options.keys = keys;
        req.options.local = local;
        req.options.global = global;
        req.options.projections = projections;
        var ctable = req.table.substr(0,1).toUpperCase() + req.table.substr(1);
        var cap = lib.split(this.configOptions["capacity" + ctable], ",", { datatype: "int" });
        req.options.readCapacity = req.options.readCapacity || this.configOptions["readCapacity" + ctable] || cap[0] || this.configOptions.readCapacity;
        req.options.writeCapacity = req.options.writeCapacity || this.configOptions["writeCapacity" + ctable] || cap[1] || cap[0] || this.configOptions.writeCapacity;
        // Wait long enough for the table to be active
        if (typeof req.options.waitTimeout == "undefined") req.options.waitTimeout = 60000;

        var rc;
        lib.series([
            (next) => {
                aws.ddbCreateTable(req.table, attrs, req.options, (err, item) => {
                    rc = item;
                    if (!err) client.affected_rows = 1;
                    // Create table columns for cases when describeTable never called or errored, for example Rate limit
                    // happened during the cacheColumns stage
                    if (item && item.TableDescription && !this.dbindexes[req.table]) {
                        this._parseTable(req.table, { Table: item.TableDescription }, this);
                    }
                    next(err);
                });
            },
            (next) => {
                // Manage TTL column
                req.options.attribute = Object.keys(req.query).filter((x) => (req.query[x].type == "ttl")).pop();
                if (!req.options.attribute) return next();
                aws.ddbDescribeTimeToLive(req.table, req.options, (err, item) => {
                    if (err || item.TimeToLiveDescription.TimeToLiveStatus == "ENABLED") return next(err);
                    req.options.enabled = 1;
                    req.options.name = req.table;
                    aws.ddbUpdateTimeToLive(req.options, next);
                });
            },
        ], (err) => {
            callback(err, [], rc);
        }, true);
    }

    queryUpgrade(client, req, callback)
    {
        var global = {}, rc;
        var table = aws.ddbTable(req.table);

        (new Array(this.configOptions.maxIndexes)).fill(0, 0).forEach((_, n) => {
            n = n || "";
            const index = Object.keys(req.query).
                          filter((x, i) => (req.query[x]["index" + n])).
                          sort((a, b) => (req.query[a]["index" + n] - req.query[b]["index" + n])).
                          filter((x, i) => (i < 2));
            if (!index.length) return;

            var name = index.join("_");
            if (name.length < 3) name = "i_" + name;
            if (this.dbindexes[table] && this.dbindexes[table][name]) return;
            var add = {}, cap = this.dbcapacity[table] && this.dbcapacity[table][table] || lib.empty;

            index.forEach((x) => {
                const ddb = req.query[x].dynamodb;
                if (ddb?.readCapacity) add.readCapacity = ddb.readCapacity;
                if (ddb?.writeCapacity) add.writeCapacity = ddb.writeCapacity;
                if (!add.readCapacity && cap.read) add.readCapacity = cap.read;
                if (!add.writeCapacity && cap.write) add.writeCapacity = cap.write;
                add[x] = !req.query[x].join && lib.rxNumericType.test(req.query[x].type) ? "N" : "S";
            });

            add.projection = Object.keys(req.query).filter(x => {
                if (x == index[0] || x == index[1]) return 0;
                const ddb = req.query[x].dynamodb;
                return lib.toBool(ddb.projections) || lib.isFlag(ddb?.projections, lib.toNumber(n));
            });
            global[name] = add;
            return;
        });

        lib.series([
            function(next) {
                if (!Object.keys(global).length) return next();
                req.options.name = table;
                req.options.add = global;
                aws.ddbUpdateTable(req.options, (err, item) => {
                    rc = item;
                    if (!err) client.affected_rows = 1;
                    next(err);
                });
            },
            function(next) {
                // Manage TTL column
                req.options.attribute = Object.keys(req.query).filter((x) => (req.query[x].type == "ttl")).pop();
                if (!req.options.attribute) return next();
                aws.ddbDescribeTimeToLive(table, req.options, (err, item) => {
                    if (err || item.TimeToLiveDescription.TimeToLiveStatus == "ENABLED") return next(err);
                    req.options.enabled = 1;
                    req.options.name = table;
                    aws.ddbUpdateTimeToLive(req.options, next);
                });
            },
        ], (err) => {
            callback(err, [], rc);
        }, true);
    }

    queryDrop(client, req, callback)
    {
        if (typeof req.options.waitTimeout == "undefined") req.options.waitTimeout = 60000;
        aws.ddbDeleteTable(req.table, req.options, (err, rc) => {
            callback(err, [], rc);
        });
    }

    queryGet(client, req, callback)
    {
        var keys = db.getQueryForKeys(req.keys, req.query);
        if (!Object.keys(keys).length) return callback();
        aws.ddbGetItem(req.table, keys, req.options, (err, rc) => {
            if (!err && rc.ConsumedCapacity) rc.consumed_capacity = rc.ConsumedCapacity.CapacityUnits;
            callback(err, rc.Item ? [rc.Item] : [], rc);
        });
    }

    queryPrepareSelect(client, req, callback)
    {
        var dbkeys = req.keys;
        var dbindexes = this.dbindexes[req.table] || db.indexes[req.table] || lib.empty;

        // Sorting by the default range key is default
        if (req.options.sort && req.options.sort == dbkeys[1]) {
            req.options.sort = null;
        }
        if (req.options.sort && req.options.sort.length < 3) {
            req.options.sort = "i_" + req.options.sort;
        }

        // Use primary keys from the secondary index
        if (req.options.sort) {
            // Use index by name, mostly global indexes
            if (dbindexes[req.options.sort]) {
                dbkeys = dbindexes[req.options.sort];
            } else {
                // Local sorting order by range key
                for (const p in dbindexes) {
                    const idx = dbindexes[p];
                    if (idx && idx.length == 2 && (idx[0] == req.options.sort || idx[1] == req.options.sort)) {
                        req.options.sort = p;
                        dbkeys = dbindexes[p];
                        break;
                    }
                }
            }
        } else

        // Find a global index if any hash key for it provided, prefer index with hash and sort values
        if (!req.query[dbkeys[0]] && !req.options.fullscan) {
            let dbmax = 0;
            for (const p in dbindexes) {
                const idx = dbindexes[p];
                if (!idx || !req.query[idx[0]]) continue;
                const max = idx[1] && req.query[idx[1]] !== undefined ? 2 : 1;
                if (max < dbmax) continue;
                req.options.sort = p;
                dbkeys = dbindexes[p];
                dbmax = max;
            }
        }

        var query = db.getQueryForKeys(dbkeys, req.query);

        // Operation depends on the primary keys in the query, for Scan we can let the DB to do all the filtering
        var op = query[dbkeys[0]] !== undefined && !req.options.fullscan ? 'ddbQueryTable' : 'ddbScanTable';

        this.queryRunSelect(op, client, req, dbkeys, query, callback);
    }

    queryRunSelect(op, client, req, keys, query, callback)
    {
        // Capacity rate limiter
        var inFilter, inKey, inList, inListKey;

        // Scans explicitely disabled
        if (op == 'ddbScanTable' && req.options.noscan) {
            logger.info('select:', this.name, req.table, op, keys, 'QUERY:', query, "OPTS:", req.options, "NO EMPTY SCANS ENABLED");
            return callback(null, []);
        }

        var cap = req.options.capacity || db.getCapacity(req.table, { useCapacity: req.options.useCapacity || "read", factorCapacity: req.options.factorCapacity || 0.9 });

        req.options.ReturnConsumedCapacity = "TOTAL";
        if (!req.options.count) {
            req.options.count = this.configOptions.selectSize;
        }

        logger.debug('select:', this.name, req.table, op, keys, 'QUERY:', query, "OPTS:", req.options, 'CAP:', cap.rateCapacity, cap.useCapacity);

        for (const p in req.options.ops) {
            // IN is not supported for key condition, move it into the query
            if (req.options.ops[p] == "in" && (p == keys[1] || p == req.keys[1])) {
                if (Array.isArray(query[p])) {
                    if (query[p].length == 1) {
                        delete req.options.ops[p];
                        query[p] = query[p][0];
                    } else {
                        inKey = p;
                        inFilter = query[p];
                        delete query[p];
                    }
                }
            }
            // Full scan on multiple hash keys
            if (req.options.ops[p] == "in" && lib.isArray(query[p]) && (p == keys[0] || p == req.keys[0])) {
                if (query[p].length == 1) {
                    query[p] = query[p][0];
                    delete req.options.ops[p];
                } else {
                    op = 'ddbScanTable';
                }
            }
            // Noop for a hash key
            if (req.options.ops[p] && op == "ddbQueryTable" && (p == keys[0] || (!req.options.sort && p == req.keys[0]))) {
                req.options.ops[p] = '';
            }
            // Large IN lists, iterate, only one at a time
            if (req.options.ops[p] == "in" && lib.arrayLength(query[p]) > 100 && !inList) {
                inListKey = p;
                inList = query[p].slice(0);
                query[p] = inList.splice(0, 99);
            }
        }
        // Make sure we have valid start key
        if (req.options.start) {
            for (const key of keys) {
                if (!req.options.start[key]) {
                    delete req.options.start;
                    break;
                }
            }
        }

        req.options.keys = keys;

        var rows = [], info = { consumed_capacity: 0, total: 0, retry_count: 0 };
        // Keep retrieving items until we reach the end or our limit
        lib.doWhilst(
            function(next) {
                aws[op](req.table, query, req.options, (err, item) => {
                    if (req.options.total) {
                        if (!rows.length) {
                            rows.push({ count: 0 });
                        }
                        rows[0].count += item.Count;
                    } else {
                        if (inFilter) {
                            item.Items = item.Items.filter(x => (inFilter.includes(x[inKey])));
                        }
                        rows.push.apply(rows, item.Items);
                    }
                    if (item.retry_count) {
                        info.retry_count += item.retry_count;
                    }
                    client.next_token = item.LastEvaluatedKey ? aws.fromDynamoDB(item.LastEvaluatedKey) : null;
                    req.options.count -= item.Items.length;

                    // Deal with abrupt stops, no way to know but only to retry
                    if (!err && !item.Count && req.options.scanRetry > 0 && op == 'ddbScanTable') {
                        logger.info("ddbScanTable:", "retry:", req.options.start, cap);
                        req.options.scanRetry--;
                        client.next_token = req.options.start;
                        return db.checkCapacity(cap, next);
                    }

                    if (!err && item.ConsumedCapacity) {
                        info.consumed_capacity += item.ConsumedCapacity.CapacityUnits;
                        if (cap) {
                            return db.checkCapacity(cap, item.ConsumedCapacity.CapacityUnits, next);
                        }
                    }
                    next(err);
                });
            },

            function() {
                if (client.next_token == null || req.options.count <= 0) {
                    if (inList && inList.length) {
                        query[inListKey] = inList.splice(0, 99);
                        return true;
                    }
                    return false;
                }
                req.options.start = client.next_token;
                return true;
            }, (err) => {
                callback(err, rows, info);
            }, true);
    }

    queryList(client, req, callback)
    {
        var info = { consumed_capacity: 0, retry_count: 0 }, rows = [];
        // Capacity rate limiter
        var cap = req.options.capacity || db.getCapacity(req.table, { useCapacity: req.options.useCapacity || "read", factorCapacity: req.options.factorCapacity });
        req.options.ReturnConsumedCapacity = "TOTAL";

        // Keep retrieving items until we reach the end or our limit
        var dbkeys = req.keys;
        var chunks = [];
        for (let i = 0; i < req.query.length; i+= this.configOptions.batchSize) {
            chunks.push(req.query.slice(i, i + this.configOptions.batchSize).filter((x) => (dbkeys.every((y) => (x[y] !== "")))));
        }

        lib.forEachLimit(chunks, req.options.concurrency, (chunk, next) => {
            const batch = {
                [req.table]: {
                    keys: chunk,
                    select: db.getSelectedColumns(req),
                    consistent: req.options.consistent
                }
            };
            aws.ddbBatchGetItem(batch, req.options, (err, item) => {
                if (item.retry_count) info.retry_count += item.retry_count;
                if (err) return next(err);

                rows.push.apply(rows, item.Responses && item.Responses[req.table] || lib.emptylist);

                // Keep retrieving items until we get all items from this batch
                var moreKeys = item.UnprocessedKeys || null;
                lib.whilst(
                    () => (moreKeys && Object.keys(moreKeys).length),

                    (next2) => {
                        const opts = lib.clone(req.options);
                        opts.RequestItems = moreKeys;
                        aws.ddbBatchGetItem({}, opts, (err, item) => {
                            if (item.retry_count) info.retry_count += item.retry_count;
                            if (err) return next2(err);
                            rows.push.apply(rows, item.Responses[req.table] || lib.emptylist);
                            moreKeys = item.UnprocessedKeys || null;
                            if (item.ConsumedCapacity) {
                                info.consumed_capacity += item.ConsumedCapacity.CapacityUnits;
                                if (cap) {
                                    return db.checkCapacity(cap, item.ConsumedCapacity.CapacityUnits, next2);
                                }
                            }
                            next2();
                        });
                    },
                    next, true);
            });
        },
        function(err) {
            callback(err, rows, info);
     }, true);
    }

    queryAdd(client, req, callback)
    {
        req.options.query = req.keys.reduce((x,y) => { x[y] = null; return x }, {});
        if (req.options.useCapacity || req.options.capacity) req.options.ReturnConsumedCapacity = "TOTAL";
        aws.ddbPutItem(req.table, req.query, req.options, (err, rc) => {
            if (!rc) rc = {};
            if (!err) rc.affected_rows = 1;
            if (!err && rc.ConsumedCapacity) rc.consumed_capacity = rc.ConsumedCapacity.CapacityUnits;
            callback(err, rc.Item ? [rc.Item] : [], rc);
        });
    }

    queryPut(client, req, callback)
    {
        if (req.options.useCapacity || req.options.capacity) req.options.ReturnConsumedCapacity = "TOTAL";
        aws.ddbPutItem(req.table, req.query, req.options, (err, rc) => {
            if (!rc) rc = {};
            if (!err) rc.affected_rows = 1;
            if (!err && rc.ConsumedCapacity) rc.consumed_capacity = rc.ConsumedCapacity.CapacityUnits;
            callback(err, rc.Item ? [rc.Item] : [], rc);
        });
    }

    queryUpdate(client, req, callback)
    {
        var keys = db.getQueryForKeys(req.keys, req.query);
        if (req.op == "update" && !req.options.upsert) {
            if (req.options.query) {
                for (const p in keys) if (!req.options.query[p]) req.options.query[p] = keys[p];
            } else
            if (!req.options.Expected && !req.options.expr && !req.options.ConditionExpression) {
                req.options.query = keys;
            }
        }
        if (req.options.useCapacity || req.options.capacity) req.options.ReturnConsumedCapacity = "TOTAL";
        aws.ddbUpdateItem(req.table, keys, req.query, req.options, (err, rc) => {
            if (!rc) rc = {};
            if (!err) rc.affected_rows = 1;
            if (!err && rc.ConsumedCapacity) rc.consumed_capacity = rc.ConsumedCapacity.CapacityUnits;
            if (err && err.code == "ConditionalCheckFailedException") err = null;
            callback(err, rc.Item ? [rc.Item] : [], rc);
        });
    }

    queryDel(client, req, callback)
    {
        var keys = db.getQueryForKeys(req.keys, req.query);
        if (req.options.useCapacity || req.options.capacity) req.options.ReturnConsumedCapacity = "TOTAL";
        if (req.options.query) {
            for (var p in keys) if (!req.options.query[p]) req.options.query[p] = keys[p];
        } else {
            if (!req.options.Expected && !req.options.expr && !req.options.ConditionExpression) req.options.query = keys;
        }
        aws.ddbDeleteItem(req.table, keys, req.options, (err, rc) => {
            if (!rc) rc = {};
            if (!err) rc.affected_rows = 1;
            if (!err && rc.ConsumedCapacity) rc.consumed_capacity = rc.ConsumedCapacity.CapacityUnits;
            if (err && err.code == "ConditionalCheckFailedException") err = null;
            callback(err, rc.Item ? [rc.Item] : [], rc);
        });
    }

    queryBulk(client, req, callback)
    {
        var info = { consumed_capacity: 0, retry_count: 0, _size: req.query.length, _retries: 0 }, errors = [];
        var cap = req.options.capacity || db.getCapacity(req.table, { useCapacity: req.options.useCapacity || "write", factorCapacity: req.options.factorCapacity });
        var count = lib.toNumber(cap.writeCapacity || this.configOptions.bulkSize, { min: 2, max: this.configOptions.bulkSize });
        var breq, moreItems, objs = req.query;
        req.options.ReturnConsumedCapacity = "TOTAL";

        // Keep sending items until we reach the end or our limit
        lib.doWhilst(
            function(next) {
                breq = {};
                delete req.options.RequestItems;
                if (moreItems) {
                    req.options.RequestItems = moreItems;
                } else {
                    var list = objs.slice(0, count);
                    objs = objs.slice(count);
                    if (!list.length) return next();
                    for (var i in list) {
                        if (list[i].error) {
                            errors.push(list[i]);
                            continue;
                        }
                        switch (list[i].op) {
                        case "del":
                            list[i].query = db.getQueryForKeys(req.keys, list[i].obj);
                        case "put":
                            if (!breq[list[i].table]) breq[list[i].table] = [];
                            breq[list[i].table].push({ [list[i].op]: list[i].query });
                            break;
                        default:
                            list[i].error = "NotSupported";
                            errors.push(list[i]);
                        }
                    }
                }
                if (!Object.keys(breq).length) return next();

                aws.ddbBatchWriteItem(breq, req.options, (err, rc) => {
                    if (err) return callback(err, []);

                    var consumed = lib.toNumber(rc.ConsumedCapacity?.CapacityUnits);
                    info.consumed_capacity += consumed;
                    info._retries++;
                    if (rc.retry_count) info.retry_count += rc.retry_count;
                    moreItems = !lib.isEmpty(rc.UnprocessedKeys) ? rc.UnprocessedKeys : null;
                    if (!moreItems && !req.query.length) return next();
                    db.checkCapacity(cap, consumed, next);
                });
            },
            function() {
                return info._retries < info._size && (moreItems || objs.length > 0);
            },
            function(err) {
                if (!err) info.affected_rows = info._size - errors.length;
                delete req.options.RequestItems;
                callback(err, errors, info);
            }, true);
    }

    queryTransact(client, req, callback)
    {
        var info = { consumed_capacity: 0, count: req.query.length };
        var cap = req.options.capacity || db.getCapacity(req.table, { useCapacity: req.options.useCapacity || "write", factorCapacity: req.options.factorCapacity });
        var count = lib.toNumber(cap.writeCapacity || this.configOptions.bulkSize, { min: 2, max: this.configOptions.bulkSize });
        var objs = req.query, list;
        req.options.ReturnConsumedCapacity = "TOTAL";

        // Keep sending items until we reach the end or our limit
        lib.doWhilst(
            function(next) {
                list = objs.slice(0, count);
                objs = objs.slice(count);
                if (!list.length) return next();
                list.forEach((x) => {
                    switch (x.op) {
                    case "add":
                        x.options.query = req.keys.reduce((x,y) => { x[y] = null; return x }, {});
                        break;
                    case "incr":
                    case "update":
                        x.keys = db.getQueryForKeys(x.keys, x.query);
                        if (x.op == "update" && !x.options.upsert) {
                            if (x.options.query) {
                                for (const p in x.keys) {
                                    if (!x.options.query[p]) x.options.query[p] = x.keys[p];
                                }
                            } else
                            if (!x.options.Expected && !x.options.expr && !x.options.ConditionExpression) {
                                x.options.query = x.keys;
                            }
                        }
                        break;
                    case "del":
                        x.query = db.getQueryForKeys(x.keys, x.query);
                        break;
                    }
                    delete x.columns;
                    delete x.orig;
                });
                aws.ddbTransactWriteItems(list, req.options, (err, rc) => {
                    if (err) {
                        if (err.code == "TransactionCanceledException") {
                            const d = err.message.match(/reasons \[([^]+)\]/);
                            if (d) {
                                rc = d[1].split(",").map((msg, i) => {
                                    if (!msg || msg == "None") return 0;
                                    list[i].error = msg.trim();
                                    return list[i];
                                }).filter(x => x);
                            }
                        }
                        return callback(err, rc);
                    }

                    var consumed = lib.toNumber(rc?.ConsumedCapacity?.CapacityUnits);
                    info.consumed_capacity += consumed;
                    db.checkCapacity(cap, consumed, next);
                });
            },
            function() {
                return objs.length > 0;
            },
            function(err) {
                if (!err) info.affected_rows = info.count;
                callback(err, [], info);
            }, true);
    }

    querySql(client, req, callback)
    {
        req.options.params = lib.isArray(req.values);
        aws.ddbExecuteStatement(req.text, req.options, (err, rc) => {
           if (!err) client.next_token = rc.NextToken;
           callback(err, lib.isArray(rc.Items, []), {});
       });
    }

}

module.exports = DynamoDBPool;