lib/pool.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');

/**
 * Create a resource pool, `create` and `close` callbacks must be given which perform allocation and deallocation of the resources like db connections.
 *
 * @param {object} options - defines the following properties
 * @param {function} [options.create] - method to be called to return a new resource item, takes 1 argument, a callback as `function(err, item)`
 * @param {function} [options.destroy] - method to be called to destroy a resource item
 * @param {function} [options.reset] - method to be called just before releasing an item back to the resource pool, this is a chance to reset the item to the initial state
 * @param {function} [options.validate] - method to verify active resource item, return false if it needs to be destroyed
 * @param {function} [options.init] - method to cal on pool.init, it may be called multiple times
 * @param {function} [options.shutdown] - method to call on pool shutdown to clear other resources
 * @param {int} [options.min] - min number of active resource items
 * @param {int} [options.max] - max number of active resource items
 * @param {int} [options.max_queue] - how big the waiting queue can be, above this all requests will be rejected immediately
 * @param {int} [options.timeout] - number of milliseconds to wait for the next available resource item, cannot be 0
 * @param {int} [options.idle] - number of milliseconds before starting to destroy all active resources above the minimum, 0 to disable.
 *
 * If no create implementation callback is given then all operations are basically noop still calling the callbacks.
 *
 * @example
 * var pool = new lib.Pool({
 *     min: 1,
 *     max: 5,
 *     create: function(cb) {
 *         someDb.connect((err) => { cb(err, this) }
 *     },
 *     destroy: function(client) {
 *         client.close() }
 *     })
 * });
 *
 * pool.use((err, client) => {
 *     ...
 *     pool.release(client);
 * });
 *
 * const { err, client } = await pool.ause();
 * if (!err) {
 *     ...
 *     pool.release(client);
 * }
 * @class Pool
 */
lib.Pool = class Pool {
    #min = 0
    #max = 10
    #max_queue = 100
    #timeout = 5000
    #idle = 300000
    #queue_count = 0
    #queue = {}
    #avail = []
    #mtime = []
    #busy = []
    #interval
    #time


    constructor(options)
    {
        this.init(options);
    }

    // Initialize pool properties, this can be run anytime even on the active pool to override some properties
    init(options)
    {
        if (!options) return;
        var idle = this.#idle;

        if (typeof options.min != "undefined") this.#min = lib.toNumber(options.min, { float: 0, flt: 0, min: 0 });
        if (typeof options.max != "undefined") this.#max = lib.toNumber(options.max, { float: 0, dflt: 10, min: 0, max: 9999 });
        if (typeof options.interval != "undefined") this.#max_queue = lib.toNumber(options.interval, { float: 0, dflt: 100, min: 0 });
        if (typeof options.timeout != "undefined") this.#timeout = lib.toNumber(options.timeout, { float: 0, dflt: 5000, min: 1 });
        if (typeof options.idle != "undefined") this.#idle = lib.toNumber(options.idle, { float: 0, dflt: 300000, min: 0 });

        if (typeof options.create == "function") this._create = options.create;
        if (typeof options.destroy == "function") this._destroy = options.destroy;
        if (typeof options.reset == "function") this._reset = options.reset;
        if (typeof options.validate == "function") this._validate = options.validate;

        if (typeof options.shutdown == "function") this._shutdown = options.shutdown;
        if (typeof options.init == "function") this._init = options.init;

        // Periodic housekeeping if interval is set
        if (this._create && this.#idle > 0 && (idle != this.#idle || !this.#interval)) {
            clearInterval(this.#interval);
            this.#interval = setInterval(this.#timer.bind(this), this.#idle/2);
            setImmediate(this.#timer.bind(this));
        }
        if (this.#idle == 0) clearInterval(this.#interval);

        this.#call("_init", options);
        return this;
    }

    /**
    * Return next available resource item, if not available immediately wait for defined amount of time before calling the
    * callback with an error. The callback second argument is active resource item.
    */
    use(callback)
    {
        if (typeof callback != "function") throw lib.newError("callback is required");
        if (!this._create) return callback(null, {});

        // We have idle items
        if (this.#avail.length) {
            this.#mtime.shift();
            var item = this.#avail.shift();
            this.#busy.push(item);
            return callback.call(this, null, item);
        }
        // Put into waiting queue
        if (this.#busy.length >= this.#max) {
            if (this.#queue_count >= this.#max_queue) return callback(lib.newError("no more resources"));

            this.#queue_count++;
            return lib.deferCallback(this.#queue, {}, function(m) {
                callback(m.item ? null : lib.newError("timeout waiting for the resource"), m.item);
            }, this.#timeout);
        }
        // New item
        this.#call("_create", (err, item) => {
            if (err) {
                logger.error("pool: use:", this.name, lib.traceError(err));
            } else {
                if (!item) item = {};
                this.#busy.push(item);
                logger.dev('pool: use', this.name, 'avail:', this.#avail.length, 'busy:', this.#busy.length);
            }
            callback(err, item);
        });
    }

    /**
     * Async version of use
     * @returns {object} as { err, client }
     */
    async ause()
    {
        return new Promise((resolve, reject) => {
            this.use((err, client) => {
                resolve({ err, client });
            })
        })
    }

    // Destroy the resource item calling the provided close callback
    destroy(item, callback)
    {
        if (!item) return;
        if (!this._create) return typeof callback == "function" && callback();

        logger.dev('pool: destroy', this.name, 'avail:', this.#avail.length, 'busy:', this.#busy.length);

        var idx = this.#busy.indexOf(item);
        if (idx > -1) {
            this.#call("_destroy", item, callback);
            this.#busy.splice(idx, 1);
            return;
        }
        idx = this.#avail.indexOf(item);
        if (idx > -1) {
            this.#call("_destroy", item, callback);
            this.#avail.splice(idx, 1);
            this.#mtime.splice(idx, 1);
            return;
        }
    }

    // Return the resource item back to the list of available resources.
    release(item)
    {
        if (!item) return;
        if (!this._create) return;

        var idx = this.#busy.indexOf(item);
        if (idx == -1) {
            logger.error('pool: release:', 'not known', item);
            return;
        }
        logger.dev('pool: release', this.name, 'avail:', this.#avail.length, 'busy:', this.#busy.length, 'max:', this.#max);

        // Pass it to the next waiting item
        for (const id in this.#queue) {
            this.#queue_count--;
            this.#queue[id].item = item;
            return lib.runCallback(this.#queue, this.#queue[id]);
        }

        // Destroy if above the limit or invalid
        if (this.#avail.length > this.#max || this.#call("_validate", item) === false) {
            this.#call("_destroy", item);
        } else {
            // Add to the available list
            this.#avail.unshift(item);
            this.#mtime.unshift(Date.now());
            this.#call("_reset", item);
        }
        // Remove from the busy list at the end to keep the object referenced all the time
        this.#busy.splice(idx, 1);
    }

    // Close all active items
    destroyAll()
    {
        while (this.#avail.length > 0) this.destroy(this.#avail[0]);
    }

    // Return an object with stats
    stats()
    {
        return {
            avail: this.#avail.length,
            busy: this.#busy.length,
            queue: this.#queue_count,
            min: this.#min,
            max: this.#max,
            max_queue: this.#max_queue,
            idle: this.#idle,
        };
    }

    /**
    * Close all connections and shutdown the pool, no more items will be open and the pool cannot be used without re-initialization,
    * if callback is provided then wait until all items are released and call it, `options.timeout` can be used to retsrict how long to wait for
    * all items to be released, when expired the callback will be called
    */
    shutdown(options, callback)
    {
        logger.debug('pool.shutdown:', this.name, 'avail:', this.#avail.length, 'busy:', this.#busy.length);
        this.#max = -1;
        this.destroyAll();
        this.#queue = {};
        clearInterval(this.#interval);
        this.#interval = null;
        this.#call("_shutdown");
        if (typeof callback != "function") return;
        this.#time = Date.now();
        const timer = setInterval(() => {
            if (this.#busy.length && (!options?.timeout || Date.now() - this.#time < options?.timeout)) return;
            clearInterval(timer);
            callback();
        }, 50);
    }

    // Call registered method and catch exceptions, pass it to the callback if given
    #call(name, callback)
    {
        if (typeof this[name] != "function") {
            if (typeof callback == "function") return callback();
            return;
        }
        try {
            return this[name].call(this, callback);
        } catch (e) {
            logger.error('pool:', this.name, name, e);
            if (typeof callback == "function") callback(e);
        }
    }

    // Timer to ensure pool integrity
    #timer()
    {
        var now = Date.now();

        // Expire idle items
        if (this.#idle > 0) {
            for (let i = 0; i < this.#avail.length; i++) {
                if (now - this.#mtime[i] > this.#idle && this.#avail.length + this.#busy.length > this.#min) {
                    logger.dev('pool: timer:', this.name, 'idle', i, 'avail:', this.#avail.length, 'busy:', this.#busy.length);
                    this.destroy(this.#avail[i]);
                    i--;
                }
            }
        }

        // Ensure min number of items
        var min = this.#min - this.#avail.length - this.#busy.length;
        for (let i = 0; i < min; i++) {
            this.#call("_create", (err, item) => {
                if (err) return;
                this.#avail.push(item);
                this.#mtime.push(now);
            });
        }
    }

}