lib/file.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const fs = require('fs');
const util = require('util');
const path = require('path');
const child = require("child_process");
const logger = require(__dirname + '/../logger');
const lib = require(__dirname + '/../lib');
const os = require("os");

/**
 * Safe path.normalize, no exceptions
 * @param {...any} args
 * @memberof module:lib
 * @method normalize
 */
lib.normalize = function(...args)
{
    try {
        return path.normalize(path.join.apply(path, args.map((x) => (typeof x == "string" ? x : String(x))))).replace(/\\/g, "/");
    } catch (e) {
        logger.error("lib.normalize:", e, args);
        return "";
    }
}

/**
 * Call callback for each line in the file options may specify the following parameters:
 * @param {string} file
 * @param {object} options
 * - sync - read file synchronously and call callback for every line
 * - abort - signal to stop processing
 * - limit - number of lines to process and exit
 * - length - amount of data in bytes to process and exit
 * - count - accumulate up to this number of lines in an array before sending a batch for processing
 * - batchsize - accumulate up to this size before sending a batch for processing
 * - skip - number of lines to skip from the start
 * - progress - if > 0 report how many lines processed so far every specified lines
 * - until - skip lines until this regexp matches
 * - ignore - skip lines that match this regexp
 * - header - if true then skip first line because it is the a header, if `options.header` it is a function
 *   it will be called with the first line as an argument and must return true if this line needs to be skipped
 * - json - each line represents an JSON object, convert and pass it to the line callback if not null
 * - split - split every line before calling the callback, it uses phraseSplit
 * - keepempty - by default is enabled if split is set to keep empty fields in the line array
 * - separator - a string with characters to be used for splitting, default is `,`
 * - rxLine - a Regexp for line splitting, default is `lib.rxLine`
 * - quotes - a string with characters to be used for phrase splitting, default is `"'`
 * - quiet - do not report about open file errors
 * - direct - to pass to lib.forEachLimit for true async processing
 * - concurrency - how many lines to process at the same time
 * - buflength - size of the internal buffer, 4096 default
 * - tail - if > 0 async version will keep trying to read from file after this number of ms
 *
 * Properties updated and returned in the options:
 * - nlines - number of lines read from the file
 * - nbytes - amount of data passed to line callback
 * - ncalls - number of lines/batches passed to the line callback
 * - npos - current position in the file
 * - nskip - number of lines skipped
 * @param {function} lineCallback
 * @param {function} [endCallback]
 * @memberof module:lib
 * @method forEachLine
 * @example
 * lib.forEachLine("file.csv" , { split: 1 }, (line, next) => {
 *     console.log(line[0], line[1]);
 * }, lib.log)
 */
lib.forEachLine = function(file, options, lineCallback, endCallback)
{
    if (!options) options = {};
    if (options.sync) {
        lib.forEachLineSync(file, options, lineCallback);
        return lib.tryCall(endCallback, null, options);
    }

    options.nlines = options.ncalls = options.nbytes = options.nskip = 0;
    if (options.split) {
        options.keepempty = true;
        if (!options.separator) options.separator = ",";
    }

    var ctx = {
        file: file,
        fd: typeof file == "number" ? file : null,
        batch: options.count > 0 ? [] : null,
        bsize: 0,
        buffer: Buffer.alloc(options.buflength || 4096),
        data: "",
        pos: options.start > 0 ? options.start : null,
    };
    options.npos = ctx.pos || 0;

    lib.series([
        function(next) {
            if (ctx.fd === file) return next();
            fs.open(file, 'r', (err, fd) => {
                if (err && !options.quiet) logger.error('forEachLine:', file, err);
                ctx.fd = fd;
                next(err, options);
            });
        },
        function(next) {
            const start = Date.now();
            lib.readLines(ctx, options, lineCallback, (err) => {
                if (ctx.fd !== file) fs.close(ctx.fd, lib.noop);
                options.elapsed = Date.now() - start;
                next(err, options);
            });
        },
    ], endCallback, true);
}

// Process lines asynchronously, both callbacks must be provided
lib.readLines = function(ctx, options, lineCallback, endCallback)
{
    fs.read(ctx.fd, ctx.buffer, 0, ctx.buffer.length, ctx.pos, (err, nread) => {
        ctx.pos = null;
        ctx.nread = nread;
        options.npos += nread;
        var lines;
        if (nread) {
            ctx.data += ctx.buffer.slice(0, nread).toString(options.encoding || 'utf8');
            lines = ctx.data.split(options.rxLine || this.rxLine);
            if (ctx.data.endsWith("\n")) {
                ctx.data = "";
                lines.length--;
            } else {
                ctx.data = lines.pop();
            }
        }
        lib.forEachLimit(lines, options.concurrency, (line, next) => {
            function doNext(err) {
                return options.nlines % 100 ? next(err) : setImmediate(next, err);
            }
            options.nlines++;
            if (options.nlines == 1 && options.header) {
                if (typeof options.header != "function") return doNext();
                if (options.header(line)) return doNext();
            }
            if ((options.length && options.nbytes >= options.length) ||
                (options.limit && options.nlines > options.limit) ||
                (options.skip && options.nlines <= options.skip)) {
                options.nskip++;
                return doNext();
            }

            if (options.progress && options.nlines % options.progress == 0) logger.info('forEachLine:', ctx.file, options);

            line = line.trim();
            if (!line) {
                options.nskip++;
                return doNext();
            }

            // Skip lines until we see our pattern
            if (options.until && !options.until_seen) {
                options.until_seen = line.match(options.until);
                options.nskip++;
                return doNext();
            }
            if (options.ignore && options.ignore.test(line)) {
                options.nskip++;
                return doNext();
            }
            if (options.json) {
                if (line[0] != '{' && line[0] != '[') {
                    options.nskip++;
                    return doNext();
                }
                const obj = lib.jsonParse(line, options);
                if (!obj) {
                    options.nskip++;
                    return doNext();
                }
                options.nbytes += line.length;
                if (ctx.batch) ctx.batch.push(obj); else {
                    options.ncalls++;
                    return lineCallback(obj, doNext);
                }
            } else
            if (options.split) {
                const obj = lib.phraseSplit(line.trim(), options);
                if (!obj.length) {
                    options.nskip++;
                    return doNext();
                }
                options.nbytes += line.length;
                if (ctx.batch) ctx.batch.push(obj); else {
                    options.ncalls++;
                    return lineCallback(obj, doNext);
                }
            } else {
                options.nbytes += line.length;
                if (ctx.batch) ctx.batch.push(line); else {
                    options.ncalls++;
                    return lineCallback(line, doNext);
                }
            }
            ctx.bsize += line.length;
            if (!ctx.batch || (ctx.batch.length < options.count && (!options.batchsize || ctx.bsize < options.batchsize))) return doNext();

            options.ncalls++;
            lineCallback(ctx.batch, (err) => {
                ctx.batch = [];
                ctx.bsize = 0;
                doNext(err);
            }, ctx);
        }, (err) => {
            // Stop on reaching limit or end of file
            if (err || options.abort ||
                (options.length && options.nbytes >= options.length) ||
                (options.limit && options.nlines >= options.limit) ||
                (!options.tail && nread < ctx.buffer.length)) {
                if (err || !ctx.batch?.length) return endCallback(err);
                options.ncalls++;
                return lineCallback(ctx.batch, endCallback, ctx);
            }
            // Keep trying to read more after a delay
            if (options.tail && nread < ctx.buffer.length) {
                setTimeout(() => {
                    lib.readLines(ctx, options, lineCallback, endCallback)
                }, options.tail);
            } else {
                lib.readLines(ctx, options, lineCallback, endCallback);
            }
        }, options.direct);
    });
}

/**
 * Sync version of the {@link module:lib.forEachLine}, read every line and call callback which may not do any async operations
 * because they will not be executed right away but only after all lines processed
 * @memberof module:lib
 * @method forEachLineSync
 */
lib.forEachLineSync = function(file, options, lineCallback)
{
    if (!options) options = {};
    try {
        var fd = typeof file == "number" ? file : fs.openSync(file, 'r');
    } catch (err) {
        if (!options.quiet) logger.error('forEachLine:', file, err);
        return err;
    }

    options.nlines = options.ncalls = options.nbytes = options.nskip = 0;
    if (options.split) {
        options.keepempty = true;
        if (!options.separator) options.separator = ",";
    }

    const start = Date.now();
    const buffer = Buffer.alloc(options.buflength || 4096);
    var batch = options.count > 0 ? [] : null, bsize = 0;
    var pos = options.start > 0 ? options.pos : null;
    var data = "", lines;

    options.npos = pos || 0;

    while (!options.abort) {
        const nread = fs.readSync(fd, buffer, 0, buffer.length, pos);
        pos = null;
        lines = null;
        if (nread) {
            options.npos += nread;
            data += buffer.slice(0, nread).toString(options.encoding || 'utf8');
            lines = data.split(options.rxLine || lib.rxLine);
            if (data.endsWith("\n")) {
                data = "";
                lines.length--;
            } else {
                data = lines.pop();
            }
        }

        for (let i = 0; i < lines.length; i++) {
            options.nlines++;
            if (!options.nlines == 1 && options.header) {
                if (typeof options.header != "function") continue;
                if (options.header(lines[i])) continue;
            }
            if ((options.length && options.nbytes >= options.length) ||
                (options.limit && options.nlines > options.limit) ||
                (options.skip && options.nlines <= options.skip)) {
                options.nskip++;
                continue;
            }

            if (options.progress && options.nlines % options.progress == 0) logger.info('forEachLine:', file, options);

            // Skip lines until we see our pattern
            if (options.until && !options.until_seen) {
                options.nskip++;
                options.until_seen = lines[i].match(options.until);
                continue;
            }
            if (options.ignore && options.ignore.test(lines[i])) {
                options.nskip++;
                continue;
            }
            if (options.json) {
                const obj = lib.jsonParse(lines[i], options);
                if (!obj) {
                    options.nskip++;
                    continue;
                }
                options.nbytes += lines[i].length;
                if (batch) batch.push(obj); else {
                    options.ncalls++;
                    lineCallback(obj);
                }
            } else
            if (options.split) {
                const line = lib.phraseSplit(lines[i].trim(), options);
                if (!line.length) {
                    options.nskip++;
                    continue;
                }
                options.nbytes += lines[i].length;
                if (batch) batch.push(line); else {
                    options.ncalls++;
                    lineCallback(line);
                }
            } else {
                const line = lines[i].trim();
                if (!line) {
                    options.nskip++;
                    continue;
                }
                options.nbytes += lines[i].length;
                if (batch) batch.push(line); else {
                    options.ncalls++;
                    lineCallback(line);
                }
            }
            bsize += lines[i].length;
            if (!batch || (batch.length < options.count && (!options.batchsize || bsize < options.batchsize))) continue;

            options.ncalls++;
            lineCallback(batch);
            batch = [];
            bsize = 0;
        }
        // Stop on reaching limit or end of file
        if (nread < buffer.length) break;
        if (options.length && options.nbytes >= options.length) break;
        if (options.limit && options.nlines >= options.limit) break;
    }
    if (batch?.length) {
        options.ncalls++;
        lineCallback(batch);
    }
    if (fd !== file) fs.close(fd, lib.noop);
    options.elapsed = Date.now() - start;
}

/**
 * Write given lines into a file, lines can be a string or list of strings or numbers
 * @param {string} file
 * @param {string|string[]} lines
 * @param {object} [options]
 * - size - rotate if the file is larger, keep 2 files
 * - ext - file ext to append on rotation, without dot, `old` is default, it can be in the `strftime` format to use date, like %w, %d, %m
 * - mode - open file mode, usually a or w
 * - newline - if true newlines are added for each line
 * @param {function} [callback]
 * @memberof module:lib
 * @method writeLines
 */
lib.writeLines = function(file, lines, options, callback)
{
    if (typeof options == "function") callback = options, options = null;

    lib.series([
        function(next) {
            if (!file) return next();
            fs.open(file, options?.mode || 'a', (err, fd) => {
                if (err) return next(err);
                if (typeof lines == "string") lines = [ lines ];
                lib.forEachSeries(lines, (line, next2) => {
                    if (typeof line != "string") line = String(line);
                    fs.write(fd, line + (options?.newline ? "\n" : ""), next2);
                }, (err) => {
                    fs.close(fd, lib.noop);
                    next(err);
                });
            });
        },
        function(next) {
            if (!file || !options?.size) return next();
            fs.stat(file, (err, st) => {
                if (err || st.size < options.size) return next(err);
                var ext = `${options?.ext || "old"}`;
                if (ext[0] == "%") ext = lib.strftime(Date.now(), ext);
                fs.rename(file, file + "." + ext, next);
            });
        },
    ], callback);
}

/**
 * Copy file and then remove the source, do not overwrite existing file
 * @param {string} src
 * @param {string} dst
 * @param {boolean} [overwrite] true to overwrite
 * @param {function} [callback]
 * @memberof module:lib
 * @method moveFile
 */
lib.moveFile = function(src, dst, overwrite, callback)
{
    if (typeof overwrite == "function") callback = overwrite, overwrite = false;

    function _copyIfFailed(err) {
        if (!err) return lib.tryCall(callback, null);
        lib.copyFile(src, dst, overwrite, (err2) => {
            if (!err2) {
                fs.unlink(src, (err) => { lib.tryCall(callback, err) });
            } else {
                lib.tryCall(callback, err2);
            }
        });
    }

    logger.debug('moveFile:', src, dst, overwrite);
    fs.stat(dst, (err) => {
        if (!err && !overwrite) {
            return lib.tryCall(callback, lib.newError("File " + dst + " exists."));
        }
        fs.rename(src, dst, _copyIfFailed);
    });
}

/**
 * Copy file, overwrite is optional flag, by default do not overwrite
 * @param {string} src
 * @param {string} dst
 * @param {boolean} [overwrite] true to overwrite
 * @param {function} [callback]
 * @memberof module:lib
 * @method copyFile
 */
lib.copyFile = function(src, dst, overwrite, callback)
{
    if (typeof overwrite == "function") callback = overwrite, overwrite = false;

    function _copyFile(err) {
        var ist, ost;
        if (!err && !overwrite) return lib.tryCall(callback, lib.newError("File " + dst + " exists."));
        fs.stat(src, (err2) => {
            if (err2) return lib.tryCall(callback, err2);
            ist = fs.createReadStream(src);
            ost = fs.createWriteStream(dst);
            ist.on('end', () => { lib.tryCall(callback) });
            ist.pipe(ost);
        });
    }
    logger.debug('copyFile:', src, dst, overwrite);
    fs.stat(dst, _copyFile);
}

/**
 * Non-exception version, returns empty object,
 * mtime is 0 in case file does not exist or number of seconds of last modified time
 * mdate is a Date object with last modified time
 * @param {string} file
 * @return {object}
 * @memberof module:lib
 * @method statSync
 */
lib.statSync = function(file)
{
    try {
        var stat = fs.statSync(file);
        stat.mdate = stat.mtime.toISOString();
        stat._mtime = stat.mtime.getTime();
        return stat;
    } catch (e) {
        if (e.code != "ENOENT") logger.error('statSync:', e, e.stack);
        return {
            size: 0,
            mdate: "",
            mtime: new Date(0),
            _mtime: 0,
            isFile: function() { return false },
            isSymbolicLink: function() { return false },
            isDirectory: function() { return false },
        };
    }
}

/**
 * Return contents of a file, empty if not exist or on error.
 * @param {string} file
 * @param {object} [options]
 * - cfg - parse file in config format, name=value per line, return a list of args
 * - json - parse file as JSON, return an object, in case of error an empty object
 * - xml - parse the file as XML, return an object
 * - list - split contents with the given separator
 * - encoding - file encoding when converting to string, "binary" to return Buffer
 * - logger - log level for error messages
 * - missingok - if set ENOENT will not be logged
 * - offset - read from the position in the file, if negative the offset is from the end of file
 * - length - read only this much of the data, otherwise read till the end of file
 * @return {string|Buffer}
 * @memberof module:lib
 * @method readFileSync
 */
lib.readFileSync = function(file, options)
{
    if (!file) return "";
    var binary = options?.encoding == "binary";
    try {
        var data = binary ? Buffer.from("") : "";
        var offset = this.toNumber(options && options.offset);
        var length = this.toNumber(options && options.length);
        if (offset || (offset === 0 && length > 0)) {
            var buf = Buffer.alloc(length > 0 ? Math.min(length, 4096) : 4096);
            var bufsize = buf.length;
            var fd = fs.openSync(file, "r");
            var size = fs.statSync(file).size;
            if (offset < 0) offset = Math.max(0, size + offset);
            while (offset < size) {
                var nread = fs.readSync(fd, buf, 0, bufsize, offset);
                if (nread <= 0) break;
                if (binary) {
                    data = Buffer.concat([data, buf.slice(0, nread)]);
                } else {
                    data += buf.slice(0, nread).toString(options.encoding || 'utf8');
                }
                offset += nread;
                if (length > 0) {
                    if (data.length >= length) break;
                    if (length - data.length < bufsize) bufsize = length - data.length;
                }
            }
            fs.closeSync(fd);
        } else {
            data = fs.readFileSync(file);
            if (!binary) data = data.toString(options && options.encoding ? options.encoding : "utf8");
        }
        if (options) {
            if (options.json) data = lib.jsonParse(data, options); else
            if (options.list) data = lib.split(data, options.list); else
            if (options.cfg) data = lib.configParse(data, options);
        }
        return data;
    } catch (e) {
        if (options) {
            if (options.logger && !(options.missingok && e.code == "ENOENT")) logger.logger(options.logger, 'readFileSync:', file, e.stack);
            if (options.json) return {};
            if (options.list || options.cfg) return [];
        }
        return "";
    }
}

/**
 * Same as {@link module:lib.readFileSync} but asynchronous
 * @param {string} file
 * @param {object} [options]
 * @param {function} [callback]
 * @memberof module:lib
 * @method readFile
 */
lib.readFile = function(file, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    var offset = this.toNumber(options && options.offset);
    var length = this.toNumber(options && options.length);
    var binary = options?.encoding == "binary";
    var fd;

    function onError(err) {
        var data = "";
        if (options) {
            if (options.logger && !(options.missingok && err.code == "ENOENT")) logger.logger(options.logger, 'readFile:', file, err.stack);
            if (options.json) data = {};
            if (options.list || options.cfg) data = [];
        }
        if (typeof fd == "number") fs.close(fd, lib.noop);
        lib.tryCall(callback, err, data);
    }
    function onEnd(data) {
        if (options) {
            if (options.json) data = lib.jsonParse(data, options); else
            if (options.list) data = lib.split(data, options.list); else
            if (options.cfg) data = lib.configParse(data, options);
        }
        if (typeof fd == "number") fs.close(fd, lib.noop);
        lib.tryCall(callback, null, data);
    }

    if (offset || (offset === 0 && length > 0)) {
        fs.open(file, 'r', function(err, handle) {
            if (err) return onError(err);
            fd = handle;
            var data = binary ? Buffer.from("") : "";
            var buf = Buffer.alloc(length > 0 ? Math.min(length, 4096) : 4096);
            var bufsize = buf.length;
            function onRead() {
                fs.read(fd, buf, 0, bufsize, offset, function(err, nread, buffer) {
                    if (nread <= 0) return onEnd(data);
                    if (binary) {
                        data = Buffer.concat([data, buffer.slice(0, nread)]);
                    } else {
                        data += buffer.slice(0, nread).toString(options && options.encoding || 'utf8');
                    }
                    if (nread < bufsize) return onEnd(data);
                    if (length > 0) {
                        if (data.length >= length) return onEnd(data);
                        if (length - data.length < bufsize) bufsize = length - data.length;
                    }
                    offset += nread;
                    onRead();
                });
            }
            if (offset < 0) {
                fs.fstat(fd, function(err, stats) {
                    if (err) return onError(err);
                    offset = Math.max(0, stats.size + offset);
                    onRead();
                });
            } else {
                onRead();
            }
        });
    } else {
        fs.readFile(file, function(err, data) {
            if (err) return onError(err);
            if (!binary) data = data.toString(options && options.encoding || 'utf8');
            onEnd(data);
        });
    }
}

/**
 * Async version of {@link module:lib.readFile}
 * @param {string} file
 * @param {object} [options]
 * @return {object} in format { data, err }
 * @memberOf module:lib
 * @method areadFile
 * @async
 */

lib.areadFile = async function(file, options)
{
    return new Promise((resolve, reject) => {
        lib.readFile(file, options, (err, data) => {
            resolve({ data, err });
        });
    });
}

/**
 * Filter function to be used in findFile methods
 * @memberof module:lib
 * @method findFileFilter
 */
lib.findFilter = function(file, stat, options)
{
    if (!options) return 1;
    if (typeof options.filter == "function") return options.filter(file, stat);
    if (util.types.isRegExp(options.exclude) && options.exclude.test(file)) return 0;
    if (util.types.isRegExp(options.include) && !options.include.test(file)) return 0;
    if (options.types) {
        if (stat.isFile() && options.types.indexOf("f") == -1) return 0;
        if (stat.isDirectory() && options.types.indexOf("d") == -1) return 0;
        if (stat.isBlockDevice() && options.types.indexOf("b") == -1) return 0;
        if (stat.isCharacterDevice() && options.types.indexOf("c") == -1) return 0;
        if (stat.isSymbolicLink() && options.types.indexOf("l") == -1) return 0;
        if (stat.isFIFO() && options.types.indexOf("p") == -1) return 0;
        if (stat.isSocket() && options.types.indexOf("s") == -1) return 0;
    }
    return 1;
}

/**
 * Return list of files than match filter recursively starting with given path, dir is the starting path.
 * @param {string} dir
 * @param {object} [options]
 *   - include - a regexp with file pattern to include
 *   - exclude - a regexp with file pattern to exclude
 *   - filter - a function(file, stat) that return 1 if the given file matches, stat is a object returned by fs.statSync
 *   - depth - if a number it specifies max depth to go into the subfolders, starts with 1
 *   - types - a string with types of files to include: d - a dir, f - a file, l - a symlink, c - char dev, b - block dev, s - socket, p - a FIFO
 *   - base - if set only keep base file name in the result, not full path
 *   - details - return the whole stat structure instead of just names
 * @return {string[]|object[]}
 * @example
 * lib.findFileSync("modules/", { depth: 1, types: "f", include: /\.js$/ }).sort()
 * @memberof module:lib
 * @method findFileSync
 */
lib.findFileSync = function(dir, options)
{
    var list = [];
    var level = arguments[2];
    if (typeof level != "number") level = 0;

    try {
        var stat = this.statSync(dir);
        var name = options?.base ? path.basename(dir) : dir;
        if (options?.details) stat.file = dir;

        if (stat.isFile()) {
            if (this.findFilter(name, stat, options)) {
                list.push(options?.details ? stat : name);
            }
        } else
        if (stat.isDirectory()) {
            if (this.findFilter(name, stat, options)) {
                list.push(options?.details ? stat: name);
            }
            // We reached our directory depth
            if (typeof options?.depth == "number" && level >= options.depth) return list;
            var files = fs.readdirSync(dir);
            for (var i in files) {
                list = list.concat(this.findFileSync(path.join(dir, files[i]), options, level + 1));
            }
        }
    } catch (e) {
        logger.error('findFileSync:', dir, options, e.stack);
    }
    return list;
}

/**
 * Async version of {@link module:lib.findFileSync}, same options as in the sync version, the starting dir is not included
 * @memberof module:lib
 * @method findFile
 */
lib.findFile = function(dir, options, callback)
{
    if (typeof options == "function") callback = options, options = {};
    if (!options) options = {}
    if (!Array.isArray(options.files)) options.files = [];

    var level = arguments[3];
    if (typeof level != "number") level = 0;
    if (typeof dir != "string") dir = "";

    fs.readdir(dir, (err, files) => {
        if (err) return lib.tryCall(callback, err, options.files);

        lib.forEachSeries(files, (file, next) => {
            if (options.done) return next();
            var full = path.join(dir, file);

            fs.stat(full, (err, stat) => {
                if (err) return next(err);
                if (options.details) stat.file = full;

                if (stat.isFile()) {
                    if (lib.findFilter(full, stat, options)) {
                        options.files.push(options.details ? stat : options.base ? file : full);
                    }
                    next();
                } else
                if (stat.isDirectory()) {
                    if (lib.findFilter(full, stat, options)) {
                        options.files.push(options.details ? stat : options.base ? file : full);
                    }
                    // We reached our directory depth
                    if (options && typeof options.depth == "number" && level >= options.depth) return next();
                    lib.findFile(full, options, next, level + 1);
                } else {
                    next();
                }
            });
        }, (err) => {
            lib.tryCall(callback, err, options.files);
        }, true);
    });
}

/**
 * Watch files in a dir for changes and call the callback
 * @param {object} options
 * - root - a string with root path
 * - files - a regexp to watch files individually, if omitted watch the whole dir
 * - match - a regexp to watch files when using the whole dir, only for matched files the callback will be called
 * - ignore - a regexp to ignore files
 * - recursive - watch files in root subfolders
 * - depth - how deep to look for files in case of individual files
 * @param {function} fileCallback - called on changes (file)
 * @param {function} [endCallback]
 * @example
 * lib.watchFiles({ root: "./", match: /\.js$/ }, (file) => {
 * });
 * @memberof module:lib
 * @method watchFiles
 */
lib.watchFiles = function(options, fileCallback, endCallback)
{
    logger.debug('watchFiles:', options);

    function watcher(event, file) {
        // Check stat if no file name, Mac OS X does not provide it
        fs.stat(file.file, (err, stat) => {
            if (err) return logger.error("watcher:", event, file.file, file.size, err);
            switch (event) {
            case "rename":
                file.watcher.close();
                file.watcher = fs.watch(file.file, (event) => { watcher(event, file); });
                break;
            default:
                if (stat.size == file.size && stat.mtime.getTime() == file.mtime.getTime()) return;
            }
            logger.log('watchFiles:', event, file.file, file.ino, stat.size, stat.mtime);
            for (const p in stat) file[p] = stat[p];
            fileCallback(file);
        });
    }

    var root = options.root;
    var ignore = options.ignore && lib.toRegexp(options.ignore) || null;

    if (options.files) {
        var opts = { details: 1, include: lib.toRegexp(options.files), exclude: ignore, depth: options.depth, types: options.types };
        lib.findFile(options.root, opts, (err, list) => {
            if (err) return lib.tryCall(endCallback, err);
            list.forEach((file) => {
                logger.debug('watchFiles:', file.file, file.ino, file.size);
                file.watcher = fs.watch(file.file, (event) => { watcher(event, file) });
            });
            lib.tryCall(endCallback, err, list);
        });
    } else {
        var match = options.match && lib.toRegexp(options.match) || null;
        try {
            fs.watch(root, { recursive: !!options.recursive }, (event, file) => {
                logger.dev('watcher:', event, root, file);
                file = path.join(root, file);
                if (ignore && ignore.test(file)) return;
                if (match && !match.test(file)) return;
                fs.stat(file, (err, stat) => {
                    if (err) return logger.error("watcher:", file, err);
                    logger.log('watchFiles:', event, file, stat.size, stat.mtime);
                    fileCallback({ name: file, stat: stat });
                });
            });
            lib.tryCall(endCallback);
        } catch (err) {
            lib.tryCall(endCallback, err);
        }
    }
}

/**
 * Recursively create all directories, return 1 if created or 0 on error or if exists, no exceptions are raised, error is logged only
 * @param {string} dir
 * @memberof module:lib
 * @method makePathSync
 */
lib.makePathSync = function(dir)
{
    try {
        fs.mkdirSync(path.normalize(dir), { recursive: true });
        return 1;
    } catch (e) {
        logger.error('makePathSync:', dir, e);
        return 0;
    }
}

/**
 * Async version of makePath, stops on first error
 * @param {string} dir
 * @param {function} [callback]
 * @memberof module:lib
 * @method makePath
 */
lib.makePath = function(dir, callback)
{
    fs.mkdir(path.normalize(dir), (err) => {
        if (err) logger.error('makePath:', err);
        lib.tryCall(callback, err);
    });
}

/**
 * Unlink a file, no error on non-existent file, callback is optional
 * @param {string} name
 * @param {function} [callback]
 * @memberof module:lib
 * @method unlink
 */
lib.unlink = function(name, callback)
{
    fs.unlink(name, (err) => {
        if (err?.code == "ENOENT") err = null;
        lib.tryCall(callback, err);
    });
}

/**
 * Unlink a file, no expections
 * @param {string} name
 * @memberof module:lib
 * @method unlinkSync
 */
lib.unlinkSync = function(name)
{
    try {
        fs.unlinkSync(name);
    } catch (e) {
        if (e.code != 'ENOENT') logger.error('unlinkSync:', name, e);
    }
}


/**
 * Recursively remove all files and folders in the given path, returns an error to the callback if any
 * @param {string} dir
 * @param {function} [callback]
 * @memberof module:lib
 * @method unlinkPath
 */
lib.unlinkPath = function(dir, callback)
{
    fs.stat(dir, (err, stat) => {
        if (err) return lib.tryCall(callback, err);
        if (stat.isDirectory()) {
            fs.readdir(dir, (err, files) => {
                if (err) return lib.tryCall(callback, err);
                lib.forEachSeries(files, (f, next) => {
                    lib.unlinkPath(path.join(dir, f), next);
                }, (err) => {
                    if (err) return lib.tryCall(callback, err);
                    fs.rmdir(dir, callback);
                }, true);
            });
        } else {
            fs.unlink(dir, callback);
        }
    });
}

/**
 * Recursively remove all files and folders in the given path, stops on first error
 * @param {string} dir
 * @memberof module:lib
 * @method unlinkPathSync
 */
lib.unlinkPathSync = function(dir)
{
    var files = this.findFileSync(dir);
    // Start from the end to delete files first, then folders
    for (var i = files.length - 1; i >= 0; i--) {
        try {
            var stat = this.statSync(files[i]);
            if (stat.isDirectory()) {
                fs.rmdirSync(files[i]);
            } else {
                fs.unlinkSync(files[i]);
            }
        } catch (e) {
            logger.error("unlinkPath:", dir, e);
            return 0;
        }
    }
    return 1;
}

/**
 * Return a list of processes
 * @param {object} options
 * @param {regexp} [options.filter] - return only matching
 * @param {function} callback - in format (err, list) where list is { pid, cmd }
 * @memberof module:lib
 * @method findProcess
 */
lib.findProcess = function(options, callback)
{
    if (os.platform() == "linux") {
        lib.findFile("/proc", { include: /^\/proc\/[0-9]+$/, exclude: new RegExp("^/proc/" + process.pid + "$"), depth: 0, base: 1 }, (err, files) => {
            if (!err) {
                files = files.map((x) => ({ pid: x, cmd: lib.readFileSync(`/proc/${x}/cmdline`).replace(/\0/g," ").trim() })).
                        filter((x) => (options.filter ? x.cmd.match(options.filter) : x.cmd));
            }
            callback(err, files);
        });
    } else {
        lib.execProcess("/bin/ps agx -o pid,args", (err, stdout, stderr) => {
            var list = stdout.split("\n").
                              filter((x) => (lib.toNumber(x) != process.pid && (options.filter ? x.match(options.filter) : 1))).
                              map((x) => ({ pid: lib.toNumber(x), cmd: x.replace(/^[0-9]+/, "").trim() }));

            callback(err, list);
        });
    }
}

/**
 * Async version of {@link module:lib.findProcess}
 * @param {object} [options]
 * @return {object} in format { data, err }
 * @example
 * const { data } = await lib.afindProcess({ filter: "bkjs" });
 * console.log(data)
 * [
 *  { pid: 65841, cmd: 'bkjs: watcher' },
 *  { pid: 65867, cmd: 'bkjs: master' },
 *  { pid: 65868, cmd: 'bkjs: worker' },
 *  { pid: 65869, cmd: 'bkjs: web' }
 * ]
 * @memberOf module:lib
 * @method afindProcess
 * @async
 */

lib.afindProcess = async function(options)
{
    return new Promise((resolve, reject) => {
        lib.findProcess(options, (err, data) => {
            resolve({ data, err });
        });
    });
}

/**
 * Run the process and return all output to the callback, this a simple wrapper around child_processes.exec so the lib.runProcess
 * can be used without importing the child_processes module. All fatal errors are logged.
 * @param {string} cmd
 * @param {object} [options]
 * - options.merge if set append stdout to stderr and return combined single value separated by 2 newlines
 * @param {function} [callback] - (err, stdout, stderr)
 * @example
 * lib.execProcess("ls -ls", lib.log)
 * @memberof module:lib
 * @method execProcess
 */
lib.execProcess = function(cmd, options, callback)
{
    if (typeof options == "function") callback = options, options = null;

    return child.exec(cmd, (err, stdout, stderr) => {
        logger.debug('execProcess:', cmd, err, stderr);
        if (options?.merge) {
            stdout = lib.toString(stderr) + "\n\n" + lib.toString(stdout);
            stderr = "";
        }
        lib.tryCall(callback, err, typeof stdout == "string" ? stdout : "", typeof stderr == "string" ? stderr : "");
    });
}

/**
 * Async version of {@link module:lib.execProcess}
 * @param {string} cmd
 * @param {object} [options]
 * @return {object} in format { stdout, stderr, err }
 * @example
 * const { stdout } = lib.aexecProcess("ls -ls")
 * @memberOf module:lib
 * @method aexecProcess
 * @async
 */

lib.aexecProcess = async function(file, options)
{
    return new Promise((resolve, reject) => {
        lib.execProcess(file, options, (err, stdout, stderr) => {
            resolve({ stdout, stderr, err });
        });
    });
}


/**
 * Run specified command with the optional arguments, this is similar to
 * child_process.spawn with callback being called after the process exited
 * @param {string} cmd
 * @param {string|string[]} args
 * @param {object} [options] - options for the child_processes.spawn
 * @param {boolean} [options.stdio] - if pipe then capture and return stdout/stderr in callback
 * @param {function} [callback] - (err, stdout, stderr)
 * @return {ChildProcess}
 * @example
 * lib.spawProcess("ls", "-ls", { cwd: "/tmp" }, lib.log)
 * @memberof module:lib
 * @method spawnProcess
 */
lib.spawnProcess = function(cmd, args, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) options = { stdio: "inherit", env: process.env, cwd: process.cwd() };
    if (!options.stdio) options.stdio = "inherit";
    if (!Array.isArray(args)) args = [ args ];
    var proc = child.spawn(cmd, args, options);
    proc.on("error", (err) => {
        logger.error("spawnProcess:", cmd, args, err);
        lib.tryCall(callback, err, stdout, stderr);
    });
    proc.on('exit', (code, signal) => {
        logger.debug("spawnProcess:", cmd, args, "exit", code || signal);
        lib.tryCall(callback, code || signal, stdout, stderr);
    });
    var stdout = "", stderr = "";
    if (proc.stdout) {
        proc.stdout.on('data', (data) => { stdout += data.toString() });
    }
    if (proc.stderr) {
        proc.stderr.on('data', (data) => { stderr += data.toString() });
    }
    return proc;
}

/**
 * Async version of {@link module:lib.spawnProcess}
 * @param {string} cmd
 * @param {object} [options]
 * @return {object} in format { proc, err }
 * @memberOf module:lib
 * @method aspawnProcess
 * @async
 */

lib.aspawnProcess = async function(cmd, options)
{
    return new Promise((resolve, reject) => {
        var proc = lib.spawnProcess(cmd, options, (err, stdout, stderr) => {
            resolve({ proc, stdout, stderr, err });
        });
    });
}

/**
 * Run a series of commands, if stdio is a pipe then output from all commands is concatenated.
 * @param {object} cmds is an object where a property name is a command to execute and the value is an array of arguments or null.
 * @param {object} [options]
 * if `options.error` is 1, then stop on first error or if non-zero status on a process exit.
 * @param {function} [callback] - (err, stdout, stderr)
 * @example
 * lib.spawnSeries({"ls": "-la",
 *                  "ps": "augx",
 *                  "du": { argv: "-sh", stdio: "inherit", cwd: "/tmp" },
 *                  "uname": ["-a"] },
 * lib.log)
 * @memberof module:lib
 * @method spawnSeries
 */
lib.spawnSeries = function(cmds, options, callback)
{
    if (typeof options == "function") callback = options, options = null;
    if (!options) {
        options = { stdio: "inherit", env: process.env, cwd: process.cwd() };
    }
    var stdout = "", stderr = "";
    this.forEachSeries(Object.keys(cmds), function(cmd, next) {
        var argv = cmds[cmd], opts = options;
        switch (lib.typeName(argv)) {
        case "null":
            argv = [];
            break;

        case "object":
            opts = argv;
            argv = opts.argv;
            break;

        case "array":
        case "string":
            break;

        default:
            logger.error("spawnSeries:", "invalid arguments", cmd, argv);
            return next(options.error ? lib.newError("invalid args", cmd) : null);
        }
        if (!opts.stdio) opts.stdio = "inherit";
        if (typeof argv == "string") argv = [ argv ];
        lib.spawnProcess(cmd, argv, opts, (err, o, e) => {
            stdout += o;
            stderr += e;
            next(options.error ? err : null);
        });
    }, (err) => {
        lib.tryCall(callback, err, stdout, stderr);
    });
}

/**
 * Async version of {@link module:lib.spawnSeries}
 * @param {object} cmds
 * @param {object} [options]
 * @return {object} in format { stdout, stderr, err }
 * @example
 * const { stdout, stderr } = await lib.aspawnSeries({"ls": "-l", "ps": "agx" }, { stdio:"pipe" })
 * @memberOf module:lib
 * @method areadFile
 * @async
 */

lib.aspawnSeries = async function(cmds, options)
{
    return new Promise((resolve, reject) => {
        lib.spawnSeries(cmds, options, (err, stdout, stderr) => {
            resolve({ stdout, stderr, err });
        });
    });
}