aws/dynamodb.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const modules = require(__dirname + '/../modules');
const logger = require(__dirname + '/../logger');
const app = require(__dirname + '/../app');
const lib = require(__dirname + '/../lib');
const aws = require(__dirname + '/../aws');

app.describeArgs("aws", [
    { name: "ddb-endpoint", descr: "Default endpoint to use, for local DynamoDB use" },
    { name: "ddb-read-capacity", type: "int", min: 0, descr: "Default DynamoDB read capacity for all tables" },
    { name: "ddb-write-capacity", type: "int", min: 0, descr: "Default DynamoDB write capacity for all tables" },
    { name: "ddb-retry-count", type: "int", min: 5, descr: "Default DynamoDB number of retries in case of throttling event" },
    { name: "ddb-retry-timeout", type: "int", min: 200, descr: "Default DynamoDB min timeout for retry backoff in case of throttling event" },
    { name: "ddb-retry-status", type: "regexp", descr: "Default DynamoDB HTTP statuses to retry in case of throttling event or an error" },
    { name: "ddb-table-map", type: "map", obj: "ddb-table-map", merge: 1, descr: "Table mappings" },
]);

// DynamoDB reserved keywords
aws.ddbReserved = {
    ABORT: 1, ABSOLUTE: 1, ACTION: 1, ADD: 1, AFTER: 1, AGENT: 1, AGGREGATE: 1, ALL: 1, ALLOCATE: 1, ALTER: 1, ANALYZE: 1, AND: 1, ANY: 1, ARCHIVE: 1, ARE: 1, ARRAY: 1, AS: 1, ASC: 1,
    ASCII: 1, ASENSITIVE: 1, ASSERTION: 1, ASYMMETRIC: 1, AT: 1, ATOMIC: 1, ATTACH: 1, ATTRIBUTE: 1, AUTH: 1, AUTHORIZATION: 1, AUTHORIZE: 1, AUTO: 1, AVG: 1, BACK: 1,
    BACKUP: 1, BASE: 1, BATCH: 1, BEFORE: 1, BEGIN: 1, BETWEEN: 1, BIGINT: 1, BINARY: 1, BIT: 1, BLOB: 1, BLOCK: 1, BOOLEAN: 1, BOTH: 1, BREADTH: 1,
    BUCKET: 1, BULK: 1, BY: 1, BYTE: 1, CALL: 1, CALLED: 1, CALLING: 1, CAPACITY: 1, CASCADE: 1, CASCADED: 1, CASE: 1, CAST: 1, CATALOG: 1, CHAR: 1, CHARACTER: 1, CHECK: 1,
    CLASS: 1, CLOB: 1, CLOSE: 1, CLUSTER: 1, CLUSTERED: 1, CLUSTERING: 1, CLUSTERS: 1, COALESCE: 1, COLLATE: 1, COLLATION: 1, COLLECTION: 1, COLUMN: 1, COLUMNS: 1, COMBINE: 1,
    COMMENT: 1, COMMIT: 1, COMPACT: 1, COMPILE: 1, COMPRESS: 1, CONDITION: 1, CONFLICT: 1, CONNECT: 1, CONNECTION: 1, CONSISTENCY: 1, CONSISTENT: 1, CONSTRAINT: 1,
    CONSTRAINTS: 1, CONSTRUCTOR: 1, CONSUMED: 1, CONTINUE: 1, CONVERT: 1, COPY: 1, CORRESPONDING: 1, COUNT: 1, COUNTER: 1, CREATE: 1, CROSS: 1, CUBE: 1, CURRENT: 1, CURSOR: 1, CYCLE: 1,
    DATA: 1, DATABASE: 1, DATE: 1, DATETIME: 1, DAY: 1, DEALLOCATE: 1, DEC: 1, DECIMAL: 1, DECLARE: 1, DEFAULT: 1, DEFERRABLE: 1, DEFERRED: 1, DEFINE: 1, DEFINED: 1, DEFINITION: 1,
    DELETE: 1, DELIMITED: 1, DEPTH: 1, DEREF: 1, DESC: 1, DESCRIBE: 1, DESCRIPTOR: 1, DETACH: 1, DETERMINISTIC: 1, DIAGNOSTICS: 1, DIRECTORIES: 1, DISABLE: 1, DISCONNECT: 1,
    DISTINCT: 1, DISTRIBUTE: 1, DO: 1, DOMAIN: 1, DOUBLE: 1, DROP: 1, DUMP: 1, DURATION: 1, DYNAMIC: 1, EACH: 1, ELEMENT: 1, ELSE: 1, ELSEIF: 1, EMPTY: 1, ENABLE: 1, END: 1, EQUAL: 1,
    EQUALS: 1, ERROR: 1, ESCAPE: 1, ESCAPED: 1, EVAL: 1, EVALUATE: 1, EXCEEDED: 1, EXCEPT: 1, EXCEPTION: 1, EXCEPTIONS: 1, EXCLUSIVE: 1, EXEC: 1, EXECUTE: 1, EXISTS: 1, EXIT: 1, EXPLAIN: 1,
    EXPLODE: 1, EXPORT: 1, EXPRESSION: 1, EXTENDED: 1, EXTERNAL: 1, EXTRACT: 1, FAIL: 1, FALSE: 1, FAMILY: 1, FETCH: 1, FIELDS: 1, FILE: 1, FILTER: 1, FILTERING: 1, FINAL: 1,
    FINISH: 1, FIRST: 1, FIXED: 1, FLATTERN: 1, FLOAT: 1, FOR: 1, FORCE: 1, FOREIGN: 1, FORMAT: 1, FORWARD: 1, FOUND: 1, FREE: 1, FROM: 1, FULL: 1, FUNCTION: 1, FUNCTIONS: 1,
    GENERAL: 1, GENERATE: 1, GET: 1, GLOB: 1, GLOBAL: 1, GO: 1, GOTO: 1, GRANT: 1, GREATER: 1, GROUP: 1, GROUPING: 1, HANDLER: 1, HASH: 1, HAVE: 1, HAVING: 1, HEAP: 1, HIDDEN: 1, HOLD: 1,
    HOUR: 1, IDENTIFIED: 1, IDENTITY: 1, IF: 1, IGNORE: 1, IMMEDIATE: 1, IMPORT: 1, IN: 1, INCLUDING: 1, INCLUSIVE: 1, INCREMENT: 1, INCREMENTAL: 1, INDEX: 1, INDEXED: 1,
    INDEXES: 1, INDICATOR: 1, INFINITE: 1, INITIALLY: 1, INLINE: 1, INNER: 1, INNTER: 1, INOUT: 1, INPUT: 1, INSENSITIVE: 1, INSERT: 1, INSTEAD: 1, INT: 1, INTEGER: 1, INTERSECT: 1,
    INTERVAL: 1, INTO: 1, INVALIDATE: 1, IS: 1, ISOLATION: 1, ITEM: 1, ITEMS: 1, ITERATE: 1, JOIN: 1, KEY: 1, KEYS: 1, LAG: 1, LANGUAGE: 1, LARGE: 1, LAST: 1, LATERAL: 1, LEAD: 1,
    LEADING: 1, LEAVE: 1, LEFT: 1, LENGTH: 1, LESS: 1, LEVEL: 1, LIKE: 1, LIMIT: 1, LIMITED: 1, LINES: 1, LIST: 1, LOAD: 1, LOCAL: 1, LOCALTIME: 1, LOCALTIMESTAMP: 1,
    LOCATION: 1, LOCATOR: 1, LOCK: 1, LOCKS: 1, LOG: 1, LOGED: 1, LONG: 1, LOOP: 1, LOWER: 1, MAP: 1, MATCH: 1, MATERIALIZED: 1, MAX: 1, MAXLEN: 1, MEMBER: 1, MERGE: 1, METHOD: 1,
    METRICS: 1, MIN: 1, MINUS: 1, MINUTE: 1, MISSING: 1, MOD: 1, MODE: 1, MODIFIES: 1, MODIFY: 1, MODULE: 1, MONTH: 1, MULTI: 1, MULTISET: 1, NAME: 1, NAMES: 1, NATIONAL: 1, NATURAL: 1,
    NCHAR: 1, NCLOB: 1, NEW: 1, NEXT: 1, NO: 1, NONE: 1, NOT: 1, NULL: 1, NULLIF: 1, NUMBER: 1, NUMERIC: 1, OBJECT: 1, OF: 1, OFFLINE: 1, OFFSET: 1, OLD: 1, ON: 1, ONLINE: 1, ONLY: 1,
    OPAQUE: 1, OPEN: 1, OPERATOR: 1, OPTION: 1, OR: 1, ORDER: 1, ORDINALITY: 1, OTHER: 1, OTHERS: 1, OUT: 1, OUTER: 1, OUTPUT: 1, OVER: 1, OVERLAPS: 1, OVERRIDE: 1, OWNER: 1,
    PAD: 1, PARALLEL: 1, PARAMETER: 1, PARAMETERS: 1, PARTIAL: 1, PARTITION: 1, PARTITIONED: 1, PARTITIONS: 1, PATH: 1, PERCENT: 1, PERCENTILE: 1, PERMISSION: 1,
    PERMISSIONS: 1, PIPE: 1, PIPELINED: 1, PLAN: 1, POOL: 1, POSITION: 1, PRECISION: 1, PREPARE: 1, PRESERVE: 1, PRIMARY: 1, PRIOR: 1, PRIVATE: 1, PRIVILEGES: 1, PROCEDURE: 1,
    PROCESSED: 1, PROJECT: 1, PROJECTION: 1, PROPERTY: 1, PROVISIONING: 1, PUBLIC: 1, PUT: 1, QUERY: 1, QUIT: 1, QUORUM: 1, RAISE: 1, RANDOM: 1, RANGE: 1, RANK: 1, RAW: 1, READ: 1,
    READS: 1, REAL: 1, REBUILD: 1, RECORD: 1, RECURSIVE: 1, REDUCE: 1, REF: 1, REFERENCE: 1, REFERENCES: 1, REFERENCING: 1, REGEXP: 1, REGION: 1, REINDEX: 1, RELATIVE: 1, RELEASE: 1,
    REMAINDER: 1, RENAME: 1, REPEAT: 1, REPLACE: 1, REQUEST: 1, RESET: 1, RESIGNAL: 1, RESOURCE: 1, RESPONSE: 1, RESTORE: 1, RESTRICT: 1, RESULT: 1, RETURN: 1, RETURNING: 1,
    RETURNS: 1, REVERSE: 1, REVOKE: 1, RIGHT: 1, ROLE: 1, ROLES: 1, ROLLBACK: 1, ROLLUP: 1, ROUTINE: 1, ROW: 1, ROWS: 1, RULE: 1, RULES: 1, SAMPLE: 1, SATISFIES: 1,
    SAVE: 1, SAVEPOINT: 1, SCAN: 1, SCHEMA: 1, SCOPE: 1, SCROLL: 1, SEARCH: 1, SECOND: 1, SECTION: 1, SEGMENT: 1, SEGMENTS: 1, SELECT: 1, SELF: 1, SEMI: 1, SENSITIVE: 1, SEPARATE: 1,
    SEQUENCE: 1, SERIALIZABLE: 1, SESSION: 1, SET: 1, SETS: 1, SHARD: 1, SHARE: 1, SHARED: 1, SHORT: 1, SHOW: 1, SIGNAL: 1, SIMILAR: 1, SIZE: 1, SKEWED: 1, SMALLINT: 1, SNAPSHOT: 1,
    SOME: 1, SOURCE: 1, SPACE: 1, SPACES: 1, SPARSE: 1, SPECIFIC: 1, SPECIFICTYPE: 1, SPLIT: 1, SQL: 1, SQLCODE: 1, SQLERROR: 1, SQLEXCEPTION: 1, SQLSTATE: 1, SQLWARNING: 1, START: 1,
    STATE: 1, STATIC: 1, STATUS: 1, STORAGE: 1, STORE: 1, STORED: 1, STREAM: 1, STRING: 1, STRUCT: 1, STYLE: 1, SUB: 1, SUBMULTISET: 1, SUBPARTITION: 1, SUBSTRING: 1, SUBTYPE: 1,
    SUM: 1, SUPER: 1, SYMMETRIC: 1, SYNONYM: 1, SYSTEM: 1, TABLE: 1, TABLESAMPLE: 1, TEMP: 1, TEMPORARY: 1, TERMINATED: 1, TEXT: 1, THAN: 1, THEN: 1, THROUGHPUT: 1, TIME: 1,
    TIMESTAMP: 1, TIMEZONE: 1, TINYINT: 1, TO: 1, TOKEN: 1, TOTAL: 1, TOUCH: 1, TRAILING: 1, TRANSACTION: 1, TRANSFORM: 1, TRANSLATE: 1, TRANSLATION: 1, TREAT: 1, TRIGGER: 1, TRIM: 1,
    TRUE: 1, TRUNCATE: 1, TTL: 1, TUPLE: 1, TYPE: 1, UNDER: 1, UNDO: 1, UNION: 1, UNIQUE: 1, UNIT: 1, UNKNOWN: 1, UNLOGGED: 1, UNNEST: 1, UNPROCESSED: 1, UNSIGNED: 1, UNTIL: 1, UPDATE: 1,
    UPPER: 1, URL: 1, USAGE: 1, USE: 1, USER: 1, USERS: 1, USING: 1, UUID: 1, VACUUM: 1, VALUE: 1, VALUED: 1, VALUES: 1, VARCHAR: 1, VARIABLE: 1, VARIANCE: 1, VARINT: 1, VARYING: 1, VIEW: 1,
    VIEWS: 1, VIRTUAL: 1, VOID: 1, WAIT: 1, WHEN: 1, WHENEVER: 1, WHERE: 1, WHILE: 1, WINDOW: 1, WITH: 1, WITHIN: 1, WITHOUT: 1, WORK: 1, WRAPPED: 1, WRITE: 1, YEAR: 1, ZONE: 1,
};

aws.ddbNameRx = /^[a-zA-Z][a-zA-Z0-9]+$/;
aws.ddbRetryCount = 11;
aws.ddbRetryTimeout = 200;
aws.ddbRetryRx = /(InternalServerError|ProvisionedThroughputExceededException|ThrottlingException|SerializationException|UnrecognizedClientException|LimitExceededException|Syntax error;)/;
aws.ddbRetryStatus = /(405|429|500|503|529)/;
aws.ddbTableMap = {};

/**
 * DynamoDB requests
 * @memberof module:aws
 */
aws.queryDDB = function(action, obj, options, callback)
{
    if (typeof options == "function") callback = options, options = null;

    var region = options?.region || this.region || 'us-east-1';
    var uri = lib.rxUrl.test(options?.endpoint) ? options.endpoint :
              lib.rxUrl.test(this.ddbEndpoint) ? this.ddbEndpoint :
              ((options?.endpoint_protocol || 'https') + `://dynamodb.${region}.amazonaws.com/`);
    var target = (aws.ddbTarget || 'DynamoDB_20120810') + "." + action;
    var headers = { 'content-type': 'application/x-amz-json-1.0; charset=utf-8', 'x-amz-target': target };

    // All capitalized options are passed as is and take priority because they are in native format
    for (const p in options) if (p[0] >= 'A' && p[0] <= 'Z') obj[p] = options[p];

    logger.logger(options?.logger_db || "debug", 'queryDDB:', action, uri, 'OBJ:', obj, 'OPTS:', options);

    var json = lib.stringify(obj);
    var opts = this.queryOptions("POST", json, headers, options);
    opts.retryCount = opts.retryCount || this.ddbRetryCount;
    opts.retryTimeout = opts.retryTimeout || this.ddbRetryTimeout;
    opts.retryOnError = this.ddbRetryOnError;
    opts.datatype = "obj";
    opts.region = region;
    opts.signer = this.ddbQuerySigner;
    aws.fetch(uri, opts, (err, params) => {
        if (!params.obj) params.obj = {};
        if (params.status != 200) {
            if (!err) {
                err = lib.newError(params.obj.message || params.obj.Message || ("Error " + params.status + " " + params.data),
                                   params.status,
                                   lib.split(params.obj.__type || params.obj.code, "#").pop());
            }
            err.action = action;
            var level = err.code == "ConditionalCheckFailedException" ? "debug" : options?.logger_error || "error";
            logger.logger(level, 'queryDDB:', err, action, obj, params.toJSON());
        } else {
            logger.logger(options?.logger_db || "debug", 'queryDDB:', action, 'finished:', params.elapsed, 'ms', params.size, "bytes", params.obj.Item ? '1 row' : params.obj.Count ? params.obj.Count + ' rows' : "", params.obj.ConsumedCapacity || "");
        }
        if (params.retryCount < params.retryTotal) {
            params.obj.retry_count = params.retryTotal - Math.min(0, params.retryCount);
        }
        if (typeof callback == "function") callback(err, params.obj);
    });
}

aws.ddbRetryOnError = function()
{
    return aws.ddbRetryStatus.test(this.status) || aws.ddbRetryRx.test(this.data);
}

aws.ddbQuerySigner = function()
{
    aws.querySign(this.region, "dynamodb", this.hostname, "POST", this.pathname, this.postdata, this.headers, this.credentials);
}

aws.ddbTable = function(name)
{
    return aws.ddbTableMap[name] || name;
}

/**
 * Convert a Javascript object into DynamoDB object
 * @memberof module:aws
 */
aws.toDynamoDB = function(value, level)
{
    var res;
    switch (lib.typeName(value)) {
    case 'null':
        return { "NULL": 'true' };

    case 'boolean':
        return { "BOOL": value.toString() };

    case 'number':
        return { "N": isNaN(value) ? 0 : value.toString() };

    case 'buffer':
        return { "B": value.toString("base64") };

    case "date":
        return { "N": Math.round(value.getTime()/1000) };

    case "set":
        return { "SS": Array.from(value) };

    case 'array':
        if (!value.length) return level ? { "L": value } : value;
        var types = { number: 0, string: 0 };
        for (let i = 0; i < value.length; i++) types[typeof value[i]]++;
        if (types.number == value.length) return { "NS": value };
        if (types.string == value.length) return { "SS": value };
        res = [];
        for (const i in value) {
            if (value[i] !== undefined) res.push(this.toDynamoDB(value[i], 1));
        }
        return level ? { "L": res } : res;

    case 'object':
        res = {};
        for (const p in value) {
            if (value[p] !== undefined) res[p] = this.toDynamoDB(value[p], 1);
        }
        return level ? { "M": res } : res;

    default:
        return { "S": String(value) };
    }
}

/**
 * Convert a DynamoDB object into Javascript object
 * @memberof module:aws
 */
aws.fromDynamoDB = function(value, level)
{
    var res;
    switch (lib.typeName(value)) {
    case 'array':
        res = [];
        for (var i in value) {
            res.push(this.fromDynamoDB(value[i], level));
        }
        return res;

    case 'object':
        if (level) {
            for (var p in value) {
                switch (p) {
                case 'NULL':
                    return null;
                case 'BOOL':
                    return lib.toBool(value[p]);
                case 'L':
                    return this.fromDynamoDB(value[p], 1);
                case 'M':
                    return this.fromDynamoDB(value[p]);
                case 'S':
                case 'SS':
                    return value[p];
                case 'B':
                    return Buffer.from(value[p].B, "base64");
                case 'BS':
                    res = [];
                    for (let j = 0; j < value[p].length; j++) {
                        res[j] = Buffer.from(value[p][j], "base64");
                    }
                    return res;
                case 'N':
                    return lib.toNumber(value[p]);
                case 'NS':
                    res = [];
                    for (let j = 0; j < value[p].length; j++) {
                        res[j] = lib.toNumber(value[p][j]);
                    }
                    return res;
                }
            }
            return null;
        }
        res = {};
        for (const p in value) {
            if (!value.hasOwnProperty(p)) continue;
            res[p] = this.fromDynamoDB(value[p], 1);
        }
        return res;

    default:
        return value;
    }
}

function _checkName(params, name)
{
    if (!aws.ddbNameRx.test(name) || aws.ddbReserved[name.toUpperCase()]) {
        if (name.includes(".")) {
            if (!params.ExpressionAttributeNames) params.ExpressionAttributeNames = {};
            name = name.split(".").map((x) => {
                for (const n in params.ExpressionAttributeNames) {
                    if (params.ExpressionAttributeNames[n] == x) return n;
                }
                const h = lib.objKeys(params.ExpressionAttributeNames).length;
                params.ExpressionAttributeNames["#n" + h] = x;
                return "#n" + h;
            }).join(".");
        } else {
            for (const n in params.ExpressionAttributeNames) {
                if (params.ExpressionAttributeNames[n] == name) {
                    name = params.ExpressionAttributeNames[n];
                }
            }
            if (name[0] != "#") {
                if (!params.ExpressionAttributeNames) params.ExpressionAttributeNames = {};
                const h = lib.objKeys(params.ExpressionAttributeNames).length;
                params.ExpressionAttributeNames["#n" + h] = name;
                name = "#n" + h;
            }
        }
    }
    return name;
}

function _addValue(params, val)
{
    if (!params.ExpressionAttributeValues) params.ExpressionAttributeValues = {};
    const len = lib.objKeys(params.ExpressionAttributeValues).length;
    params.ExpressionAttributeValues[":v" + len] = aws.toDynamoDB(val);
    return len;
}

// Build a condition expression for the given object, all properties in the query are used
function getQueryExpression(params, query, options, join)
{
    var req = modules.db.prepareRequest({ table: params.TableName, query, options });
    var expr = [];

    for (let name in query) {
        var val = query[name];
        var d = name.match(modules.db.rxOrAnd);
        if (d) {
            var e = getQueryExpression(params, val, options, d[1]);
            if (e) {
                expr.push("(" + e + ")");
            }
            continue;
        }

        const col = modules.db.prepareColumn(req, name, val);
        val = col.value;

        switch (col.op) {
        case 'not between':
        case 'between':
            if (val.length < 2) continue;
            name = _checkName(params, col.name);
            expr.push((col.op[0] == 'n' ? "not " : "") + name + " between :v" + _addValue(params, val[0]) + " and :v" + _addValue(params, val[1]));
            break;

        case 'not null':
            name = _checkName(params, col.name);
            expr.push("attribute_exists(" + name + ")");
            break;

        case 'null':
            name = _checkName(params, col.name);
            expr.push("attribute_not_exists(" + name + ")");
            break;

        case 'not in':
        case 'in':
            if (Array.isArray(val)) {
                if (!val.length) break;
                name = _checkName(params, col.name);
                const vals = [];
                for (let i = 0; i < val.length; i++) {
                    if (val[i]) vals.push(":v" + _addValue(params, val[i]));
                }
                if (!vals.length) break;
                expr.push((col.op[0] == 'n' ? "not " : "") + name + " in (" + vals + ")");
            } else
            if (val) {
                name = _checkName(params, col.name);
                expr.push(name + " " + (col.op[0] == 'n' ? "<>" : "=") + " :v" + _addValue(params, val));
            }
            break;

        case 'all in':
            if (Array.isArray(val)) {
                if (!val.length) break;
                name = _checkName(params, col.name);
                const vals = [];
                for (let i = 0; i < val.length; i++) {
                    if (val[i]) vals.push(":v" + _addValue(params, val[i]) + " in (" + name + ")");
                }
                if (!vals.length) break;
                expr.push("(" + vals.join(` ${col.join} || 'and'} `) + ")");
            } else
            if (val) {
                name = _checkName(params, col.name);
                expr.push(name + " " + (col.op[0] == 'n' ? "<>" : "=") + " :v" + _addValue(params, val));
            }
            break;

        case 'not contains':
            if (!val && ["string","object","undefined"].indexOf(typeof val) > -1) break;
            if (Array.isArray(val)) {
                if (!val.length) break;
                name = _checkName(params, col.name);
                const vals = [];
                for (let i = 0; i < val.length; i++) {
                    if (val[i]) vals.push("not contains(" + name + ",:v" + _addValue(params, val[i]) + ")");
                }
                if (!vals.length) break;
                expr.push("(" + vals.join(` ${col.join || 'or'} `) + ")");
            } else {
                name = _checkName(params, col.name);
                expr.push("not contains(" + name + ", :v" + _addValue(params, val) + ")");
            }
            break;

        case 'contains':
            if (!val && ["string","object","undefined"].indexOf(typeof val) > -1) break;
            if (Array.isArray(val)) {
                if (!val.length) break;
                name = _checkName(params, col.name);
                const vals = [];
                for (let i = 0; i < val.length; i++) {
                    if (val[i]) vals.push("contains(" + name + ", :v" + _addValue(params, val[i]) + ")");
                }
                if (!vals.length) break;
                expr.push("(" + vals.join(` ${col.join || 'or'} `) + ")");
            } else {
                name = _checkName(params, col.name);
                expr.push("contains(" + name + ", :v" + _addValue(params, val) + ")");
            }
            break;

        case '=':
        case '<>':
        case '>':
        case '>=':
        case '<':
        case '<=':
            if (!val && ["string","object","undefined"].indexOf(typeof val) > -1) break;
            name = _checkName(params, col.name);
            expr.push(name + " " + col.op + " :v" + _addValue(params, val));
            break;

        case 'like%':
        case 'begins with':
        case 'not like%':
        case 'not begins with':
            if (!val && ["string","object","number","undefined"].indexOf(typeof val) > -1) continue;
            name = _checkName(params, col.name);
            expr.push((col.op[0] == "n" ? "not " : "") + "begins_with(" + name + ", :v" + _addValue(params, val) + ")");
            break;
        }
    }
    return expr.join(" " + (join || "and") + " ");
}

function setProjectionExpression(params, names)
{
    var n = 0, list = [];
    lib.split(names).forEach((name) => {
        if (name.includes(".")) {
            if (!params.ExpressionAttributeNames) params.ExpressionAttributeNames = {};
            name = name.split(".").map((x) => {
                for (const n in params.ExpressionAttributeNames) {
                    if (params.ExpressionAttributeNames[n] == x) return n;
                }
                params.ExpressionAttributeNames["#n" + n] = x;
                return "#n" + n++;
            }).join(".");
        } else
        if (!aws.ddbNameRx.test(name) || aws.ddbReserved[name.toUpperCase()]) {
            if (!params.ExpressionAttributeNames) params.ExpressionAttributeNames = {};
            params.ExpressionAttributeNames["#n" + n] = name;
            name = "#n" + n++;
        }
        list.push(name);
    });
    if (list.length) {
        params.ProjectionExpression = list.join(",");
    }
}

/**
 * Return list of tables in .TableNames property of the result
 *
 * @example:
 *
 * { TableNames: [ name, ...] }
 * @memberOf module:aws
 */
aws.ddbListTables = function(options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    var q = {}, rc = { TableNames: [] };
    lib.doWhilst(
        function(next) {
            aws.queryDDB('ListTables', q, options, (err, res) => {
                logger.debug("ListTables:", err, res);
                if (!err) {
                    q.ExclusiveStartTableName = res.LastEvaluatedTableName;
                    rc.TableNames.push.apply(rc.TableNames, res.TableNames);
                }
                next(err);
            });
    },
    function() {
        return q.ExclusiveStartTableName;
    },
    function(err) {
        if (typeof callback == "function") callback(err, rc);
    }, true);
}

/**
 * Return table definition and parameters in the result structure with property of the given table name
 *
 * @example
 *
 *          { name: { AttributeDefinitions: [], KeySchema: [] ...} }
 * @memberOf module:aws
 */
aws.ddbDescribeTable = function(name, options, callback)
{
    var params = { TableName: aws.ddbTable(name) };
    this.queryDDB('DescribeTable', params, options, (err, rc) => {
        logger.debug('DescribeTable:', name, err, rc);
        if (typeof callback == "function") callback(err, rc);
    });
}

/**
 * Create a table
 * - attrs can be an array in native DDB JSON format or an object with name:type properties, type is one of S, N, NN, NS, BS
 * - options may contain any valid native property if it starts with capital letter and the following:
 *   - waitTimeout - number of milliseconds to wait for ACTIVE status
 *   - waitDelay - how often to pool for table status, default is 250ms
 *   - keys is an array of column ids used for the primary key or a string with the hash key. if omitted, the first attribute will be used for the primary key
 *   - local - an object with each property for a local secondary index name defining key format the same way as for primary keys, all Uppercase properties are added to the top index object
 *   - global - an object for global secondary indexes, same format as for local indexes
 *   - projections - an object with index name and list of projected properties to be included in the index or "ALL" for all properties, if omitted then default KEYS_ONLY is assumed
 *   - readCapacity - read capacity units for provisioned throughput
 *   - writeCapacity - write capacity units
 *   - onDemand - billing mode, auto provision capacity and pay per request, if no read/write capacity is configured on-demand is the default
 *   - stream - enable stream support
 *
 *
 * @example
 *
 *          ddbCreateTable('users', { id: 'S', mtime: 'N', name: 'S'},
 *                                  { keys: ["id", "name"],
 *                                    local: { mtime: { mtime: "HASH" } },
 *                                    global: { name: { name: 'HASH', ProvisionedThroughput: { ReadCapacityUnits: 50 } } },
 *                                    projections: { mtime: ['gender','age'],
 *                                                   name: ['name','gender'] },
 *                                    stream: "NEW_IMAGE",
 *                                    readCapacity: 10,
 *                                    writeCapacity: 10 });
 * @memberOf module:aws
 */
aws.ddbCreateTable = function(name, attrs, options, callback)
{
    if (typeof options == "function") callback = options, options = {};
    if (!options) options = {};
    var r = options.readCapacity || aws.ddbReadCapacity, w = options.writeCapacity || aws.ddbWriteCapacity;
    var params = {
        TableName: aws.ddbTable(name),
        AttributeDefinitions: [],
        KeySchema: [],
    };
    if (options.stream) {
        params.StreamSpecification = { StreamEnabled: true, StreamViewType: options.stream };
    }
    if (options.onDemand || !(r && w)) {
        params.BillingMode = "PAY_PER_REQUEST";
    } else
    if (r && w) {
        params.ProvisionedThroughput = { ReadCapacityUnits: r, WriteCapacityUnits: w };
    }
    if (Array.isArray(attrs) && attrs.length) {
        params.AttributeDefinitions = attrs;
    } else {
        for (var p in attrs) {
            params.AttributeDefinitions.push({ AttributeName: p, AttributeType: String(attrs[p]).toUpperCase() });
        }
    }
    if (Array.isArray(options.keys)) {
        options.keys.forEach((x, i) => {
            params.KeySchema.push({ AttributeName: x, KeyType: !i ? "HASH" : "RANGE" });
        });
    } else
    if (typeof options.keys == "string" && options.keys) {
        params.KeySchema.push({ AttributeName: options.keys, KeyType: "HASH" });
    }
    if (!params.KeySchema.length && params.AttributeDefinitions.length) {
        params.KeySchema.push({ AttributeName: params.AttributeDefinitions[0].AttributeName, KeyType: "HASH" });
    }

    ["local","global"].forEach(t => {
        for (const name in options[t]) {
            var obj = options[t][name];

            var index = {
                IndexName: name,
                KeySchema: []
            };

            for (const p in obj) {
                if (p[0] >= 'A' && p[0] <= 'Z') {
                    index[p] = obj[p];
                } else {
                    index.KeySchema.push({ AttributeName: p, KeyType: String(obj[p]).toUpperCase() })
                }
            }

            if (!lib.isEmpty(options.projections?.[name])) {
                index.Projection = {
                    ProjectionType: Array.isArray(options.projections[name]) ? "INCLUDE" : String(options.projections[name]).toUpperCase()
                };
                if (index.Projection.ProjectionType == "INCLUDE") {
                    index.Projection.NonKeyAttributes = options.projections[name];
                }
            } else {
                index.Projection = { ProjectionType: "KEYS_ONLY" };
            }

            switch (t) {
            case "local":
                if (!params.LocalSecondaryIndexes) params.LocalSecondaryIndexes = [];
                params.LocalSecondaryIndexes.push(index);
                break;

            case "global":
                if (params.ProvisionedThroughput) {
                    if (!index.ProvisionedThroughput) index.ProvisionedThroughput = {};
                    if (!index.ProvisionedThroughput.ReadCapacityUnits) {
                        index.ProvisionedThroughput.ReadCapacityUnits = params.ProvisionedThroughput.ReadCapacityUnits;
                    }
                    if (!index.ProvisionedThroughput.WriteCapacityUnits) {
                        index.ProvisionedThroughput.WriteCapacityUnits = params.ProvisionedThroughput.WriteCapacityUnits;
                    }
                }
                if (!params.GlobalSecondaryIndexes) params.GlobalSecondaryIndexes = [];
                params.GlobalSecondaryIndexes.push(index);
                break;
            }
        }
    });

    this.queryDDB('CreateTable', params, options, (err, item) => {
        if (err || options.nowait) return typeof callback == "function" && callback(err, err ? { TableDescription: params } : item);

        // Wait because DynamoDB cannot create multiple tables at once especially with indexes
        options.waitStatus = "CREATING";
        aws.ddbWaitForTable(name, item, options, callback);
    });
}

/**
 * Update tables provisioned throughput settings, options is used instead of table name so this call can be used directly in the cron jobs to adjust
 * provisionined throughput on demand.
 * Options must provide the following properties:
 *  - name - table name
 *  - readCapacity and writeCapacity - new povisioned throughtput settings, both must be specified
 *  - stream - null to disable or one of the NEW_IMAGE | OLD_IMAGE | NEW_AND_OLD_IMAGES | KEYS_ONLY
 *  - add - an object with indexes to create
 *  - del - delete a global secondary index by name, a string or a list with multiple indexes
 *  - update - an object with indexes to update
 *  - waitTimeout - how long to wait in ms until the table is active again
 *  - onDemand - true to switch to pat per request mode, false to switch to provisioning mode
 *
 * @example
 *
 * aws.ddbUpdateTable({ name: "users", add: { name_id: { name: "S", id: 'N', readCapacity: 20, writeCapacity: 20, projections: ["mtime","email"] } })
 * aws.ddbUpdateTable({ name: "users", add: { name: { name: "S", readCapacity: 20, writeCapacity: 20, projections: ["mtime","email"] } })
 * aws.ddbUpdateTable({ name: "users", del: "name" })
 * aws.ddbUpdateTable({ name: "users", update: { name: { readCapacity: 10, writeCapacity: 10 } })
 *
 * @example of crontab job in etc/crontab:
 *
 * [
 *   { "cron": "0 0 1 * * *", "job": { "aws.ddbUpdateTable": { "name": "bk_user", "readCapacity": 1000, "writeCapacity": 1000 } } },
 *   { "cron": "0 0 6 * * *", "job": { "aws.ddbUpdateTable": { "name": "bk_user", "readCapacity": 2000, "writeCapacity": 2000 } } }
 * ]
 * @memberOf module:aws
 */
aws.ddbUpdateTable = function(options, callback)
{

    if (typeof options == "function") callback = options, options = null;
    if (!options) options = {};
    var params = {
        TableName: aws.ddbTable(options.name),
    };
    if (typeof options.onDemand == "boolean") {
        params.BillingMode = options.onDemand ? "PAY_PER_REQUEST" : "PROVISIONED";
    }

    if (typeof options.stream != "undefined") {
        params.StreamSpecification = { StreamEnabled: options.stream ? true : false };
        if (options.stream) {
            params.StreamSpecification.StreamViewType = options.stream;
        }
    } else

    if (options.BillingMode != "PAY_PER_REQUEST" && options.readCapacity && options.writeCapacity) {
        params.ProvisionedThroughput = { ReadCapacityUnits: options.readCapacity, WriteCapacityUnits: options.writeCapacity };
    } else

    if (options.add) {
        if (!params.AttributeDefinitions) params.AttributeDefinitions = [];
        if (!params.GlobalSecondaryIndexUpdates) params.GlobalSecondaryIndexUpdates = [];
        for (const name in options.add) {
            var obj = options.add[name];
            var index = {
                IndexName: name,
                KeySchema: [],
                Projection: {
                    ProjectionType: "KEYS_ONLY"
                }
            };
            for (const p in obj) {
                if (lib.isEmpty(obj[p])) continue;
                switch (p) {
                case "readCapacity":
                    if (!index.ProvisionedThroughput) index.ProvisionedThroughput = {};
                    index.ProvisionedThroughput.ReadCapacityUnits = obj[p];
                    break;
                case "writeCapacity":
                    if (!index.ProvisionedThroughput) index.ProvisionedThroughput = {};
                    index.ProvisionedThroughput.WriteCapacityUnits = obj[p];
                    break;
                case "projection":
                    index.Projection = { ProjectionType: Array.isArray(obj[p]) ? "INCLUDE" : String(obj[p]).toUpperCase() };
                    if (index.Projection.ProjectionType == "INCLUDE") {
                        index.Projection.NonKeyAttributes = obj[p];
                    }
                    break;
                default:
                    index.KeySchema.push({ AttributeName: p, KeyType: index.KeySchema.length ? "RANGE" : "HASH" })
                    if (!params.AttributeDefinitions.find(x => (x.AttributeName == p))) {
                        params.AttributeDefinitions.push({ AttributeName: p, AttributeType: obj[p] || "S" });
                    }
                }
            }
            params.GlobalSecondaryIndexUpdates.push({ Create: index });
            break;
        }
    } else

    if (options.del) {
        if (!params.GlobalSecondaryIndexUpdates) params.GlobalSecondaryIndexUpdates = [];
        lib.split(options.del).forEach(name => {
            params.GlobalSecondaryIndexUpdates.push({ Delete: { IndexName: name } });
        });
    } else

    if (options.update) {
        if (!params.GlobalSecondaryIndexUpdates) params.GlobalSecondaryIndexUpdates = [];
        for (const name in options.update) {
            params.GlobalSecondaryIndexUpdates.push({
                Update: {
                    IndexName: name,
                    ProvisionedThroughput: {
                        ReadCapacityUnits: options.update[name].readCapacity,
                        WriteCapacityUnits: options.update[name].writeCapacity,
                    }
                }
            });
        }
    }

    this.queryDDB('UpdateTable', params, options, (err, item) => {
        logger.debug('UpdateTable:', options, err, item);
        if (err || options.nowait) return typeof callback == "function" && callback(err, item);
        options.waitStatus = "UPDATING";
        aws.ddbWaitForTable(options.name, item, options, callback);
    });
}

/**
 * Update TTL attribute.
 * The options properties:
 * - name - table name
 * - attribute - the attribute name
 * - enabled - true or false
 * @memberOf module:aws
 */
aws.ddbUpdateTimeToLive = function(options, callback)
{
    var params = {
        TableName: aws.ddbTable(options.name),
        TimeToLiveSpecification: {
            AttributeName: options.attribute,
            Enabled: lib.toBool(options.enabled)
        }
    };
    this.queryDDB('UpdateTimeToLive', params, options, callback);
}

/**
 * Returns status of Time to live attribute for a table
 * @memberOf module:aws
 */
aws.ddbDescribeTimeToLive = function(name, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    var params = { TableName: aws.ddbTable(name) };
    this.queryDDB('DescribeTimeToLive', params, options, (err, rc) => {
        logger.debug('DescribeTimeToLive:', name, rc);
        if (typeof callback == "function") callback(err, rc);
    });
}

/**
 * Remove a table from the database.
 * By default the callback will ba callled only after the table is deleted, specifying `options.nowait` will return immediately
 * @memberOf module:aws
 */
aws.ddbDeleteTable = function(name, options, callback)
{
    var params = { TableName: aws.ddbTable(name) };
    this.queryDDB('DeleteTable', params, options, (err, item) => {
        if (err || options.nowait) return typeof callback == "function" && callback(err, item);
        options.waitStatus = "DELETING";
        aws.ddbWaitForTable(name, item, options, callback);
    });
}

/**
 * Call the callback after specified period of time or when table status become different from the given waiting status.
 * if options.waitTimeout is not specified calls the callback immediately. options.waitStatus is checked if given and keeps waiting
 * while the status is equal to it. options.waitDelay can be specified how often to request new status, default is 250ms.
 * @memberOf module:aws
 */
aws.ddbWaitForTable = function(name, item, options, callback)
{
    if (typeof callback != "function") callback = lib.noop;
    if (!options.waitTimeout) return typeof callback == "function" && callback(null, item);

    var expires = Date.now() + options.waitTimeout;
    var status = item.TableDescription.TableStatus;
    options = lib.objClone(options);
    options.quiet = 1;
    lib.whilst(
        function() {
            return status == options.waitStatus && Date.now() < expires;
        },
        function(next) {
            aws.ddbDescribeTable(name, options, (err, rc) => {
                if (err) {
                    // Table deleted, does not exist anymore
                    if (err.code == "ResourceNotFoundException" && options.waitStatus == "DELETING") {
                        status = err = null;
                    }
                    return next(err);
                }
                status = rc.Table.TableStatus;
                setTimeout(next, options.waitDelay || 1000);
            });
        },
        function(err) {
            if (typeof callback == "function") callback(err, item);
        }, true);
}

/**
 * Put or add an item
 * - item is an object, type will be inferred from the native js type.
 * - options may contain any valid native property if it starts with capital letter or special properties:
 *    - query - an object with column names to be used in ConditionExpression clause and value as null to set condition to { Exists: false } or
 *          any other exact value to be checked against which corresponds to { Exists: true, Value: value }
 *    - expr - condition expression
 *    - values - an object with values map to be used for in the update and/or condition expressions, to be used
 *          for ExpressionAttributeValues parameters
 *    - names - an object with a map to be used for attribute names in condition and update expressions, to be used
 *          for ExpressionAttributeNames parameter
 *    - returning - values to be returned on success, any value means ALL_OLD
 *
 * @example
 *
 * ddbPutItem("users", { id: 1, name: "john", mtime: 11233434 }, { query: { name: null } })
 * @memberOf module:aws
 */
aws.ddbPutItem = function(name, item, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { TableName: aws.ddbTable(name), Item: aws.toDynamoDB(item) };
    if (options.expr) {
        params.ConditionExpression = options.expr;
    } else
    if (options.query) {
        params.ConditionExpression = getQueryExpression(params, options.query, options);
    }
    if (options.names) {
        params.ExpressionAttributeNames = aws.toDynamoDB(options.names);
    }
    if (options.values) {
        params.ExpressionAttributeValues = aws.toDynamoDB(options.values);
    }
    if (options.returning) {
        params.ReturnValues = "ALL_OLD";
    }
    if (options.return_params) return params;

    this.queryDDB('PutItem', params, options, function(err, rc) {
        rc.Item = rc.Attributes ? aws.fromDynamoDB(rc.Attributes) : {};
        if (typeof callback == "function") callback(err, rc);
    });
}

/**
 * Update an item
 * - keys is an object with primary key attributes name and value.
 * - item is an object with properties where value can be:
 *      - number/string/array - action PUT, replace or add new value
 *      - null/empty string - action DELETE
 * - item can be a string with Update expression
 * - options may contain any valid native property if it starts with capital letter or special properties:
 *      - expr - condition expression
 *      - values - an object with values map to be used for in the update and/or condition expressions, to be used
 *          for ExpressionAttributeValues parameters
 *      - names - an object with a map to be used for attribute names in condition and update expressions, to be used
 *          for ExpressionAttributeNames parameter
 *      - ops - an object with operators to be used for properties, one of the: set, remove, unset, delete, incr, add, append, prepend, not_exists
 *      - query - an object with columns to be used in ConditionExpression, value null means the attribute does not exists,
 *          any other value to be checked against using regular compare rules. The conditional comparison operator is taken
 *          from `options.ops` the same way as for queries.
 *      - returning - values to be returned on success, `*` or `new` means ALL_NEW, `old` means ALL_OLD,
 *                    `updated` means UPDATED_NEW, `old_updated` means UPDATED_OLD
 *
 * @example
 *
 * ddbUpdateItem("users", { id: 1, name: "john" }, { gender: 'male', icons: '1.png' }, { action: { icons: 'add' }, query: { id: 1 }, returning: "*" })
 * ddbUpdateItem("users", { id: 1, name: "john" }, { gender: 'male', icons: '1.png' }, { action: { icons: 'incr' }, query: { id: null } })
 * ddbUpdateItem("users", { id: 1, name: "john" }, { gender: 'male', icons: '1.png', num: 1 }, { action: { num: 'add', icons: 'add' }, query: { id: null, num: 0 }, ops: { num: "gt" } })
 * @memberOf module:aws
 */
aws.ddbUpdateItem = function(name, keys, item, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { TableName: aws.ddbTable(name), Key: {} };
    for (const p in keys) {
        params.Key[p] = aws.toDynamoDB(keys[p]);
    }
    if (options.expr) {
        params.ConditionExpression = options.expr;
    } else
    if (options.query) {
        params.ConditionExpression = getQueryExpression(params, options.query, options);
    }
    if (options.names) {
        params.ExpressionAttributeNames = aws.toDynamoDB(options.names);
    }
    if (options.values) {
        params.ExpressionAttributeValues = aws.toDynamoDB(options.values);
    }
    if (options.returning) {
        params.ReturnValues = options.returning == "*" || options.returning == "new" ? "ALL_NEW" :
                              options.returning == "updated" ? "UPDATED_NEW" :
                              options.returning == "old" ? "ALL_OLD" :
                              options.returning == "old_updated" ? "UPDATED_OLD" :
                              options.returning;
    }
    if (typeof item == "string") {
        params.UpdateExpression = item;
    } else
    if (typeof item == "object") {
        var c = 0, d = 0, names = {}, values = {};
        var actions = { SET: [], REMOVE: [], ADD: [], DELETE: [] };

        for (let p in item) {
            if (params.Key[p]) continue;
            var val = item[p], colname = p;

            var op = options.ops?.[colname];
            if (val === null || val === undefined) {
                op = "remove";
            } else
            if (Array.isArray(val) || typeof val == "string") {
                if (!val.length) {
                    if (op) continue;
                    op = "remove";
                }
            }

            if (p.includes(".")) {
                p = p.split(".").map((x) => {
                    for (const n in names) {
                        if (names[n] == x) return n;
                    }
                    names["#c" + c] = x;
                    return "#c" + c++;
                }).join(".");
            } else
            if (!aws.ddbNameRx.test(p) || this.ddbReserved[p.toUpperCase()]) {
                names["#c" + c] = p;
                p = "#c" + c++;
            }

            switch (op) {
            case "add":
            case "incr":
                actions.ADD.push(p + " :d" + d);
                values[":d" + d++] = val;
                break;

            case "del":
                actions.DELETE.push(p + " :d" + d);
                values[":d" + d++] = val;
                break;

            case "unset":
            case "remove":
                actions.REMOVE.push(p);
                break;

            case "append":
                actions.SET.push(p + "=list_append(" + p + ",:d" + d + ")");
                values[":d" + d++] = val;
                break;

            case "prepend":
                actions.SET.push(p + "=list_append(:d" + d + "," + p + ")");
                values[":d" + d++] = val;
                break;

            case "not_exists":
                actions.SET.push(p + "=if_not_exists(" + p + ",:d" + d + ")");
                values[":d" + d++] = val;
                break;

            default:
                actions.SET.push(p + "= :d" + d);
                values[":d" + d++] = val;
            }
        }

        params.UpdateExpression = "";
        for (const p in actions) {
            var expr = actions[p].join(",");
            if (expr) params.UpdateExpression += " " + p + " " + expr;
        }
        if (c) {
            if (!params.ExpressionAttributeNames) params.ExpressionAttributeNames = {};
            for (const p in names) {
                params.ExpressionAttributeNames[p] = names[p];
            }
        }
        if (d) {
            if (!params.ExpressionAttributeValues) params.ExpressionAttributeValues = {};
            for (const p in values) {
                params.ExpressionAttributeValues[p] = this.toDynamoDB(values[p], 1);
            }
        }
    }
    if (options.return_params) return params;

    this.queryDDB('UpdateItem', params, options, (err, rc) => {
        rc.Item = rc.Attributes ? aws.fromDynamoDB(rc.Attributes) : {};
        if (typeof callback == "function") callback(err, rc);
    });
}

/**
 * Delete an item from a table
 * - keys is an object with name: value for hash/range attributes
 * - options may contain any valid native property if it starts with capital letter and the following special options:
 *      - expr - condition expression
 *      - values - an object with values map to be used for in the update and/or condition expressions, to be used
 *          for ExpressionAttributeValues parameters
 *      - names - an object with a map to be used for attribute names in condition and update expressions, to be used
 *          for ExpressionAttributeNames parameter
 *      - returning - values to be returned on success, any value means ALL_OLD
 *
 * @example
 *
 *          ddbDeleteItem("users", { id: 1, name: "john" }, {})
 * @memberOf module:aws
 */
aws.ddbDeleteItem = function(name, keys, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { TableName: aws.ddbTable(name), Key: {} };
    for (const p in keys) {
        params.Key[p] = aws.toDynamoDB(keys[p]);
    }
    if (options.expr) {
        params.ConditionExpression = options.expr;
    } else
    if (options.query) {
        params.ConditionExpression = getQueryExpression(params, options.query, options);
    }
    if (options.names) {
        params.ExpressionAttributeNames = aws.toDynamoDB(options.names);
    }
    if (options.values) {
        params.ExpressionAttributeValues = aws.toDynamoDB(options.values);
    }
    if (options.returning) {
        params.ReturnValues = "ALL_OLD";
    }
    if (options.return_params) return params;
    this.queryDDB('DeleteItem', params, options, (err, rc) => {
        rc.Item = rc.Attributes ? aws.fromDynamoDB(rc.Attributes) : {};
        if (typeof callback == "function") callback(err, rc);
    });
}

/**
 * Update items from the list at the same time
 * - items is a list of objects with table name as property and list of operations, an operation can be PutRequest or DeleteRequest
 * - options may contain any valid native property if it starts with capital letter.
 *
 * @example
 *
 *          { table: [ { put: { id: 1, name: "tt" } }, { del: { id: 2 } }] }
 * @memberOf module:aws
 */
aws.ddbBatchWriteItem = function(items, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { RequestItems: {} };
    for (const p in items) {
        const table = aws.ddbTable(p);
        params.RequestItems[table] = [];
        items[p].forEach((x) => {
            var item = {};
            for (const op in x) {
                switch (op) {
                case "add":
                case "put":
                    item.PutRequest = { Item: aws.toDynamoDB(x[op]) };
                    break;
                case "del":
                    item.DeleteRequest = { Key: aws.toDynamoDB(x[op]) };
                    break;
                }
            }
            params.RequestItems[table].push(item);
        });
    }
    this.queryDDB('BatchWriteItem', params, options, callback);
}

/**
 * Retrieve all items for given list of keys
 * - items is an object with table name as property name and list of options for GetItem request
 * - options may contain any valid native property if it starts with capital letter.
 *
 * @example
 *
 *          { users: { keys: [{ id: 1, name: "john" },{ id: .., name: .. }], select: ['name','id'], consistent: true }, ... }
 * @memberOf module:aws
 */
aws.ddbBatchGetItem = function(items, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { RequestItems: {} }, map = {};
    for (const p in items) {
        const table = aws.ddbTable(p);
        var obj = {};
        obj.Keys = items[p].keys.map(x => aws.toDynamoDB(x));
        if (items[p].select) {
            setProjectionExpression(obj, items[p].select);
        }
        if (items[p].consistent) obj.ConsistentRead = true;
        params.RequestItems[table] = obj;
        map[table] = p;
    }
    this.queryDDB('BatchGetItem', params, options, (err, rc) => {
        for (const p in rc.Responses) {
            rc.Responses[map[p]] = aws.fromDynamoDB(rc.Responses[p]);
            if (p != map[p]) delete rc.Responses[p];
        }
        if (typeof callback == "function") callback(err, rc);
    });
}


/**
 * Retrieve one item by primary key
 *  - keys - an object with primary key attributes name and value.
 *  - select - list of columns to return, otherwise all columns will be returned
 *  - options may contain any native property allowed in the request or special properties:
 *    - consistent - set consistency level for the request
 *    - names - an object with a map to be used for attribute names in condition and update expressions, to be used
 *        for ExpressionAttributeNames parameter
 * @example
 *
 *       ddbGetItem("users", { id: 1, name: "john" }, { select: 'id,name' })
 * @memberOf module:aws
 */
aws.ddbGetItem = function(name, keys, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { TableName: aws.ddbTable(name), Key: {} };
    if (options.select) {
        setProjectionExpression(params, options.select);
    }
    if (options.projection) {
        params.ProjectionExpression = options.projection;
    }
    if (options.names) {
        params.ExpressionAttributeNames = aws.toDynamoDB(options.names);
    }
    if (options.consistent) {
        params.ConsistentRead = true;
    }
    for (const p in keys) {
        params.Key[p] = aws.toDynamoDB(keys[p]);
    }
    this.queryDDB('GetItem', params, options, (err, rc) => {
        if (!options.debug) rc.Item = rc.Item ? aws.fromDynamoDB(rc.Item) : null;
        if (typeof callback == "function") callback(err, rc);
    });
}

/**
 * Query on a table, return all matching items
 * - condition is an object with name: value pairs, by default EQ opeartor is used for comparison
 * - options may contain any valid native property if it starts with capital letter or special property:
 *      - start - defines starting primary key when paginating, can be a string/number for hash or an object with hash/range properties
 *      - consistent - set consistency level for the request
 *      - select - list of attributes to get only
 *      - total - return number of matching records
 *      - count - limit number of record in result
 *      - desc - descending order
 *      - sort - index name to use, indexes are named the same as the corresponding column, with index primary keys for Keycondition will be used
 *      - ops - an object with operators to be used for properties if other than EQ.
 *      - keys - list of primary key columns, if there are other properties in the condition then they will be
 *               put into QueryFilter instead of KeyConditions. If keys are absent, all properties in the condition are treated as primary keys.
 *      - projection - projection expression
 *      - values - an object with values map to be used for in the update and/or condition expressions, to be used
 *          for ExpressionAttributeValues parameters
 *      - names - an object with a map to be used for attribute names in condition and update expressions, to be used
 *          for ExpressionAttributeNames parameter
 *      - expr - filtering expression
 *
 * @example
 *
 * aws.ddbQueryTable("users", { id: 1, name: "john" }, { select: 'id,name', ops: { name: 'gt' } })
 * aws.ddbQueryTable("users", { id: 1, name: "john", status: "ok" }, { keys: ["id"], select: 'id,name', ops: { name: 'gt' } })
 * aws.ddbQueryTable("users", { id: 1 }, { expr: "status=:s", values: { s: "status" } })
 * @memberOf module:aws
 */
aws.ddbQueryTable = function(name, condition, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { TableName: aws.ddbTable(name) };
    if (options.names) {
        params.ExpressionAttributeNames = aws.toDynamoDB(options.names);
    }
    if (options.values) {
        params.ExpressionAttributeValues = aws.toDynamoDB(options.values);
    }
    if (options.projection) {
        params.ProjectionExpression = options.projection;
    }
    if (options.expr) {
        params.FilterExpression = options.expr;
    }
    if (options.consistent) {
        params.ConsistentRead = true;
    }
    if (options.start) {
        params.ExclusiveStartKey = aws.toDynamoDB(options.start);
    }
    if (options.sort) {
        params.IndexName = options.sort;
    }
    if (options.desc) {
        params.ScanIndexForward = false;
    }
    if (options.select) {
        setProjectionExpression(params, options.select);
    }
    if (options.count > 0) {
        params.Limit = options.count;
    }
    if (options.total) {
        params.Select = "COUNT";
    }
    if (typeof condition == "string") {
        params.KeyConditionExpression = condition;
    } else
    if (Array.isArray(options.keys)) {
        var keys = {}, filter = {};
        for (const p in condition) {
            if (options.keys.includes(p)) keys[p] = condition[p]; else filter[p] = condition[p];
        }
        params.KeyConditionExpression = getQueryExpression(params, keys, options);
        if (filter) {
            params.FilterExpression = getQueryExpression(params, filter, options);
        }
    } else
    if (lib.isObject(options.keys)) {
        params.KeyConditionExpression = getQueryExpression(params, options.keys, options);
    } else {
        params.KeyConditionExpression = getQueryExpression(params, condition, options);
    }

    this.queryDDB('Query', params, options, (err, rc) => {
        if (!options.debug) rc.Items = rc.Items ? aws.fromDynamoDB(rc.Items) : [];
        if (typeof callback == "function") callback(err, rc);
    });
}

/**
 * Scan a table for all matching items
 * - condition is an object with name: value pairs or a string with FilterExpression
 * - options may contain any valid native property if it starts with capital letter or special property:
 *      - start - defines starting primary key
 *      - ops - an object with operators to be used for properties if other than EQ.
 *      - projection - projection expression
 *      - values - an object with values map to be used for in the update and/or condition expressions, to be used
 *          for ExpressionAttributeValues parameters
 *      - names - an object with a map to be used for attribute names in condition and update expressions, to be used
 *          for ExpressionAttributeNames parameter
 *
 * @example
 *
 *          aws.ddbScanTable("users", { id: 1, name: 'a' }, { ops: { name: 'gt' }})
 *          aws.ddbScanTable("users", "id=:id AND name=:name", { values: { id: 1, name: 'a' } });
 * @memberOf module:aws
 */
aws.ddbScanTable = function(name, condition, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { TableName: aws.ddbTable(name) };
    if (options.projection) {
        params.ProjectionExpression = options.projection;
    }
    if (options.names) {
        params.ExpressionAttributeNames = aws.toDynamoDB(options.names);
    }
    if (options.sort) {
        params.IndexName = options.sort;
    }
    if (options.values) {
        params.ExpressionAttributeValues = aws.toDynamoDB(options.values);
    }
    if (options.consistent) {
        params.ConsistentRead = true;
    }
    if (options.start) {
        params.ExclusiveStartKey = aws.toDynamoDB(options.start);
    }
    if (options.select) {
        setProjectionExpression(params, options.select);
    }
    if (options.count > 0) {
        params.Limit = options.count;
    }
    if (options.total) {
        params.Select = "COUNT";
    }
    params.FilterExpression = lib.isString(condition) || getQueryExpression(params, condition, options);

    this.queryDDB('Scan', params, options, (err, rc) => {
        if (!options.debug) rc.Items = rc.Items ? aws.fromDynamoDB(rc.Items) : [];
        if (typeof callback == "function") callback(err, rc);
    });
}

/**
 * Update items from the list at the same time in one transaction, on any failure everything is rolled back
 * - items is a list of operations to be performed in the same format as for aws.ddbPutItem, aws.ddbUpdateItem, aws.ddbDeleteItem and aws.ddbQueryItem
 * - options may contain any valid native property if it starts with capital letter.
 *
 * @example
 *
 *          { op: "put": table: "table-name", query: { id: 1, name: "tt" } },
 *          { op: "del": table: "table-name", query: { id: 2 } },
 *          { op: "update": table: "table-name", query: { id: 1, name: "test" }, options: { query: { status: "ok" } } },
 *          { op: "check": table: "table-name", query: { id: 1 }, options: { query: { status: "ok" } } }
 * @memberOf module:aws
 */
aws.ddbTransactWriteItems = function(items, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    var params = { TransactItems: [] }, ret = { return_params: 1 };
    lib.isArray(items, []).forEach(x => {
        var item, opts = x.options ? Object.assign(x.options, ret) : ret;
        switch (x.op) {
        case "get":
        case "check":
            item = { TableName: aws.ddbTable(x.table), Key: {} };
            for (const p in x.query) {
                item.Key[p] = aws.toDynamoDB(x.query[p]);
            }
            if (opts.query) {
                item.ConditionExpression = getQueryExpression(item, opts.query, opts);
            }
            if (opts.expr) {
                item.ConditionExpression = opts.expr;
            }
            if (opts.names) {
                item.ExpressionAttributeNames = aws.toDynamoDB(opts.names);
            }
            if (opts.values) {
                item.ExpressionAttributeValues = aws.toDynamoDB(opts.values);
            }
            item = { ConditionCheck: item };
            break;
        case "incr":
        case "update":
            item = { Update: aws.ddbUpdateItem(x.table, x.keys, x.query, opts) };
            if (item.Update.ReturnValues) item.Update.ReturnValuesOnConditionCheckFailure = "ALL_OLD";
            delete item.Update.ReturnValues;
            break;
        case "add":
        case "put":
            item = { Put: aws.ddbPutItem(x.table, x.query, opts) };
            if (item.Put.ReturnValues) item.Put.ReturnValuesOnConditionCheckFailure = "ALL_OLD";
            delete item.Put.ReturnValues;
            break;
        case "del":
            item = { Delete: aws.ddbDeleteItem(x.table, x.query, opts) };
            if (item.Delete.ReturnValues) item.Update.ReturnValuesOnConditionCheckFailure = "ALL_OLD";
            delete item.Delete.ReturnValues;
            break;
        default:
            return;
        }
        for (const p in opts) if (p[0] >= 'A' && p[0] <= 'Z') item[p] = opts[p];
        params.TransactItems.push(item);
    });
    this.queryDDB('TransactWriteItems', params, options, callback);
}

aws.ddbExecuteStatement = function(text, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { Statement: text };
    if (options.consistent) {
        params.ConsistentRead = true;
    }
    if (options.start) {
        params.NextToken = options.start;
    }
    if (lib.isArray(options.params)) {
        params.Parameters = aws.toDynamoDB(options.params);
    }

    this.queryDDB('ExecuteStatement', params, options, function(err, rc) {
        if (!options.debug) rc.Items = rc.Items ? aws.fromDynamoDB(rc.Items) : [];
        if (typeof callback == "function") callback(err, rc);
    });
}

aws.ddbExecuteTransaction = function(items, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { TransactStatements: [] };
    if (options.start) {
        params.ClientRequestToken = options.start;
    }
    lib.isArray(items, []).forEach(function(x) {
        var o = { Statement: typeof x == "string" ? x : x.text };
        if (lib.isArray(x.params)) {
            o.Parameters = aws.toDynamoDB(x.params);
        }
        params.TransactStatements.push(o);
    });

    this.queryDDB('ExecuteTransaction', params, options, function(err, rc) {
        if (!options.debug) rc.Responses = rc.Responses ? aws.fromDynamoDB(rc.Responses) : [];
        if (typeof callback == "function") callback(err, rc);
    });
}

aws.ddbBatchExecuteStatement = function(items, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = lib.empty;
    var params = { Statements: [] };
    lib.isArray(items, []).forEach(function(x) {
        var o = { Statement: typeof x == "string" ? x : x.text };
        if (x.consistent) {
            o.ConsistentRead = true;
        }
        if (lib.isArray(x.params)) {
            o.Parameters = aws.toDynamoDB(x.params);
        }
        params.Statements.push(o);
    });

    this.queryDDB('BatchExecuteStatement', params, options, function(err, rc) {
        if (!options.debug) rc.Responses = lib.isArray(rc.Responses, []).map((x) => { x.Item = aws.fromDynamoDB(x.Item); return x });
        if (typeof callback == "function") callback(err, rc);
    });
}

aws.ddbExportTableToPointInTime = function(query, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    var table = aws.ddbTableMap[query.table] || query.table || "";
    if (!table.includes(":")) table = `arn:aws:dynamodb:${aws.region}:${aws.accountId}:table/${table}`;

    var req = {
        ExportType: query.incr ? "INCREMENTAL_EXPORT" : "FULL_EXPORT",
        S3Bucket: query.bucket,
        S3Prefix: query.prefix,
        TableArn: table,
        ClientToken: options.token || `${query.table}:${query.stime}:${query.etime}`,
    }
    if (query.incr) {
        req.IncrementalExportSpecification = {
            ExportFromTime: Math.round(lib.toMtime(query.stime)/1000),
            ExportToTime: Math.round(lib.toMtime(query.etime)/1000),
            ExportViewType: query.new ? "NEW_IMAGE" : "NEW_AND_OLD_IMAGES",
        }
    }
    this.queryDDB('ExportTableToPointInTime', req, options, callback);
}