stats.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const app = require(__dirname + '/app');
const lib = require(__dirname + '/lib');
const logger = require(__dirname + '/logger');
const db = require(__dirname + '/db');
const fs = require("fs");

/**
 * @module stats
 */

const stats = {
    name: "stats",
    args: [
        { name: "flags", type: "list", descr: "Feature flags" },
        { name: "interval", type: "int", descr: "Interval for process stats collection in ms" },
        { name: "target", type: "json", descr: "Target options, one of file, url, log..." },
        { name: "roles", type: "list", descr: "Process roles that report stats only" },
        { name: "filter", obj: "filter", type: "map", maptype: "regexp", merge: 1, noempty: 1, descr: "For each metric prefix provide regexp to keep only matched stats", example: "-stats-filter db:dynamodb" },
    ],
    columns: {
        timestamp: { type: "mtime" },
        id: { keyword: 1, label: "Instance/task ID" },
        pid: { type: "int", label: "Process pid" },
        host: { keyword: 1, label: "Full hostname name" },
        hostname: { keyword: 1, label: "Host name" },
        tag: { keyword: 1, label: "Instance tag" },
        zone: { keyword: 1, label: "Availability zone" },
        ip: { keyword: 1, label: "IP address" },
        role: { keyword: 1, label: "Process role" },
        arch: { keyword: 1, label: "System architecture" },
        eventloop_util: { type: "number", label: "Event loop utilization as %" },
        proc_cpu_util: { type: "number", label: "Process cpu utilization as %" },
        proc_mem_util: { type: "number", label: "Process memory used as %" },
        proc_mem_rss: { type: "bigint", label: "Process memory used in bytes" },
        proc_heap_total: { type: "bigint", label: "Number of bytes V8 has allocated for the heap" },
        proc_heap_used: { type: "bigint", label: "Number of bytes currently being used by V8’s JavaScript objects" },
        proc_heap_malloc: { type: "bigint", label: "Number of bytes allocated through malloc by V8" },
        proc_heap_external: { type: "bigint", label: "Number of bytes for array buffers and external strings" },
        proc_heap_native: { type: "bigint", label: "Number of the top-level contexts currently active" },
        proc_heap_detached: { type: "bigint", label: "Number of contexts that were detached and not yet garbage collected" },
        proc_heap_new_space: { type: "bigint", label: "Number of bytes for new objects" },
        proc_heap_old_space: { type: "bigint", label: "Number of bytes for old objects" },
        proc_gc_count: { type: "bigint", label: "Number of GC runs" },
        proc_gc_time: { type: "bigint", label: "GC run time" },
        host_cpu_util: { type: "number", label: "System cpu utilization as %" },
        host_mem_util: { type: "bigint", label: "System memory used as %" },
        host_mem_used: { type: "bigint", label: "System memory used in bytes" },
        net_rx_bytes: { type: "bigint", label: "Network bytes received", stat: "sum" },
        net_tx_bytes: { type: "bigint", label: "Network bytes sent", stat: "sum" },
        net_rx_rate: { type: "number", label: "Network receive rate as bytes per sec" },
        net_tx_rate: { type: "number", label: "Network transmit rate as bytes per sec" },
        net_rx_packets: { type: "bigint", label: "Received network packets", stat: "sum" },
        net_tx_packets: { type: "bigint", label: "Transmitted network packets", stat: "sum" },
        net_rx_errors: { type: "bigint", label: "Receive network errors", stat: "sum" },
        net_tx_errors: { type: "bigint", label: "Transmit network error", stat: "sum" },
        net_rx_dropped: { type: "bigint", label: "Dropped receive packets", stat: "sum" },
        net_tx_dropped: { type: "bigint", label: "Dropped transmit packets", stat: "sum" },
        api_200_count: { type: "number", label: "API requests with 200 status", stat: "sum" },
        api_400_count: { type: "number", label: "API requests with 400 status", stat: "sum" },
        api_401_count: { type: "number", label: "API requests with 401 status", stat: "sum" },
        api_403_count: { type: "number", label: "API requests with 403 status", stat: "sum" },
        api_404_count: { type: "number", label: "API requests with 404 status", stat: "sum" },
        api_409_count: { type: "number", label: "API requests with 409 status", stat: "sum" },
        api_417_count: { type: "number", label: "API requests with 417 status", stat: "sum" },
        api_429_count: { type: "number", label: "API requests with 429 status", stat: "sum" },
        api_bad_count: { type: "number", label: "API requests with 4XX status", stat: "sum" },
        api_err_count: { type: "number", label: "API requests with 5XX status", stat: "sum" },
        api_req_rate: { type: "number", label: "API request rate per second" },
        api_que_size: { type: "number", label: "API request queue size", stat: "sum" },
        api_res_time: { type: "number", label: "API response time in ms" },
        jobs_que_size: { type: "number", label: "Jobs running queue siae", stat: "sum" },
        jobs_err_count: { type: "number", label: "Jobs errors", stat: "sum" },
        jobs_task_count: { type: "number", label: "Jobs tasks executed", stat: "sum" },
        jobs_run_time: { type: "number", label: "Jobs running time" },
        cache_default_req_count: { type: "number", label: "Cache request count", stat: "sum" },
        cache_default_req_rate: { type: "number", label: "Cache request rate" },
        cache_default_res_time: { type: "number", label: "Cache response time " },
        queue_default_req_count: { type: "number", label: "Queue jobs count", stat: "sum" },
        queue_default_req_rate: { type: "number", label: "Queue job request rate" },
        queue_default_res_time: { type: "number", label: "Queue job run time" },
        db_dynamodb_req_count: { type: "number", label: "DB pool queries executed", stat: "sum" },
        db_dynamodb_res_time: { type: "number", label: "DB pool query response time" },
        db_dynamodb_que_size: { type: "number", label: "DB pool query queue size", stat: "sum" },
        db_dynamodb_default_read_count: { type: "number", label: "DB pool table read count", stat: "sum" },
        db_dynamodb_default_read_rate: { type: "number", label: "DB pool table read rate" },
        db_dynamodb_default_write_count: { type: "number", label: "DB pool table write count", stat: "sum" },
        db_dynamodb_default_write_rate: { type: "number", label: "DB pool table write rate" },
    },
    roles: [],
};

/**
 * Collects metrics about host and processes like CPU, memory, network, API rates...
 * All metrics are stored in a single object and sent to the confgured destination
 *
 * To use Elasticsearch run
 *     bkjs es-prepare-stats -table bk_stats
 * Then set in config `stats-target={ "url": "http://localhost:9200/bk_stats/_doc/" }`
 */

module.exports = stats;

stats.configureModule = function(options, callback)
{
    this._runtime = this._errors = 0;
    this._timer = setInterval(this.run.bind(this), lib.randomInt(4000, 5000));

    callback();
}

stats.shutdown = function(options, callback)
{
    clearInterval(this._timer);
    delete this._timer;
    lib.tryCall(callback);
}

stats.run = function()
{
    if (this._running) return;
    if (this.roles.length && !this.roles.includes(app.role)) return;
    var now = Date.now();
    if (!this.interval || now - this._runtime < this.interval) return;

    this._running = 1;
    this._runtime = now;
    this.collect((err, stats) => {
        stats.stats_time = Date.now() - this._runtime;

        if (!this.target) return;

        this.send(this.target, stats, (err) => {
            this._running = 0;
            this._stats = stats;
            this._error = err;
            this._errors = err ? this.errors + 1 : 0;
        });
    });
}

// Collect process stats
stats.collect = function(options, callback)
{
    if (typeof options == "function") callback = options, options = {};

    if (!options) options = {};

    // Basic sys/process stats
    var stats = options.stats = lib.cpuStats();
    Object.assign(stats, lib.memoryStats());
    Object.assign(stats, lib.heapStats());

    // GC stats if enabled
    var gc = lib.gcStats(lib.isFlag(this.flags, "gc"));
    stats.proc_gc_count = gc.count;
    stats.proc_gc_time = gc.time;

    // Dimensions
    stats.role = app.role;
    stats.host = app.host;
    stats.hostname = app.hostName;
    stats.arch = app.arch;
    stats.ip = app.ipaddr;
    stats.id = app.instance.id;
    stats.tag = app.instance.tag;
    stats.zone = app.instance.zone;

    lib.parallel([
        function(next) {
            if (!lib.isFlag(stats.flags, "net")) return next();
            lib.networkStats(app.instance.netdev, (err, net) => {
                Object.assign(stats, net);
                next(null, stats);
            });
        },
        function(next) {
            app.runMethods("configureCollectStats", options, { sync: 1 }, () => {
                logger.debug("collect:", stats.name, options);
                next(null, stats);
            });
        }
    ], callback);
}

stats.send = function(target, stats, callback)
{
    if (typeof callback != "function") callback = lib.noop;

    if (!target || !stats) return callback();

    // Keep only relevant metrics
    for (const p in stats) {
        if (stats[p] === 0 || stats[p] < 0 || lib.isEmpty(stats[p])) {
            delete stats[p];
            continue;
        }
        for (const f in this.filter) {
            if (p.startsWith(f) && this.filter[f] && !this.filter[f].test(p)) {
                delete stats[p];
                break;
            }
        }
    }

    if (target.url) {
        var url = lib.toTemplate(target.url, stats);
        var opts = lib.objExtend({ method: "POST", retryCount: 2, retryOnError: 1 }, target.options);
        if (opts.posdata) {
            opts.postdata = lib.toTemplate(opts.postdata, { stats, target });
        } else {
            opts.postdata = stats;
        }
        return lib.fetch(url, opts, callback);
    }

    if (target.log) {
        logger.logger(target.log, "stats:", stats);
        return callback();
    }

    if (target.file) {
        return fs.appendFile(target.file, lib.stringify(stats) + "\n", callback);
    }

    callback();
}

stats.queryElasticsearch = function(options, callback)
{
    var tsize = options.tsize || 50;
    var ssize = options.ssize || undefined;
    var tags = lib.split(options.tags);
    var groups = lib.split(options.groups);
    var interval = lib.toNumber(options.interval) || 60000;
    var age = lib.toNumber(options.age) || 300000;
    var timedelta = lib.toNumber(options.timedelta);
    var since = lib.toMtime(options.since, Date.now() - age - timedelta);
    var before = lib.toMtime(options.before, Date.now() - timedelta);
    var fields = lib.split(options.fields);
    var columns = Object.assign({}, options.columns);

    if (!fields.length || !groups.length) {
        return callback({ status: 400, message: "groups and fields must be provided" })
    }

    var q = {
        size: 0,
        query: {
            query_string: {
                 query: `timestamp:>=${since} AND timestamp:<=${before}`
            }
        },
        aggs: {
            stats: {
                aggs: {
                    fields: {
                        date_histogram: {
                            field: "timestamp",
                            min_doc_count: options.min_doc_count,
                            calendar_interval: options.unit || undefined,
                            fixed_interval: options.unit ? undefined : `${Math.round(interval/1000)}s`,
                        },
                        aggs: {
                        }
                    }
                }
            }
        }
    }

    if (tags.length) {
        q.query.query_string.query += ` AND (${tags.map((x) => (`tag:${x}`)).join(" OR ")})`;
    }

    if (groups.length > 1) {
        q.aggs.stats.multi_terms = {
            size: tsize,
            shard_size: ssize,
            terms: groups.map((x) => ({ field: x }))
        }
    } else {
        q.aggs.stats.terms = {
            size: tsize,
            shard_size: ssize,
            field: groups[0],
        }
    }

    for (const f of fields) {
        const stat = columns[f]?.stat || this.columns[f]?.stat || "avg";
        if (!columns[f]) columns[f] = {};
        const col = Object.assign(columns[f], this.columns[f]);

        // methods: rescale_0_1, rescale_0_100, percent_of_sum, mean, zscore, softmax
        if (col.norm) {
            q.aggs.stats.aggs.fields.aggs["_" + f] = {
                [stat]: {
                    field: f
                }
            }
            q.aggs.stats.aggs.fields.aggs[f] = {
                normalize: {
                    buckets_path: "_" + f,
                    method: col.norm,
                }
            }
        } else
        if (col.diff) {
            q.aggs.stats.aggs.fields.aggs["_" + f] = {
                [stat]: {
                    field: f
                }
            }
            q.aggs.stats.aggs.fields.aggs[f] = {
                serial_diff: {
                    buckets_path: "_" + f,
                    lag: col.diff_lag || 1,
                }
            }
        } else {
            q.aggs.stats.aggs.fields.aggs[f] = {
                [stat]: {
                    field: f
                }
            }
        }
    }

    logger.debug("queryElasticsearch:", stats.name, options, "Q:", q);

    db.search(options.table || "bk_stats", q, { pool: options.pool, count: options.count }, (err, rows, info) => {
        if (!err) {
            var data = {}, timestamps = [];
            if (!options.raw) {
                for (const t of info.aggregations.stats.buckets) {
                    data[t.key] = {};
                    for (const f of fields) {
                        const col = columns[f], blen = t.fields.buckets.length - 1, trim = col?.trim || options.trim;
                        let sum = 0, ndocs = 0, key = f;
                        const d = t.fields.buckets.map((x, i) => {
                            let v = x[f] && x[f].value || 0;
                            if (i < blen) {
                                ndocs += x.doc_count || 0;
                            } else
                            if (trim > 0) {
                                // Ignore the last value if number of docs is less than given threshold due to not enough data yet
                                x.trim = x.doc_count*100/(ndocs/blen);
                                if (x.trim < trim) return "";
                            }
                            if (col) {
                                if (col?.mult) v *= col.mult; else
                                if (col?.div) v /= col.div; else
                                if (col?.incr) v += col.incr;
                                if (v < 0 && col?.diff && col?.diff > 0) v = 0;
                                if (typeof col?.min == "number" && v < col.min) v = col.min; else
                                if (typeof col?.max == "number" && v > col.max) v = col.max;
                                if (col?.abs) v = Math.abs(v);
                                if (col?.key) key = col.key;
                            }
                            sum += v;
                            return v;
                        });
                        if (sum || options.zeros) data[t.key][key] = d;

                        if (!timestamps.length) {
                            timestamps = t.fields.buckets.map((x) => (x.key));
                        }
                    }
                }
            }
        }
        callback(err, { data, timestamps, info, rows });
    });

}