jobs.js

/*
 *  Author: Vlad Seryakov vseryakov@gmail.com
 *  backendjs 2018
 */

const cluster = require('cluster');
const domain = require('domain');
const util = require('util');
const modules = require(__dirname + '/modules');
const app = require(__dirname + '/app');
const lib = require(__dirname + '/lib');
const logger = require(__dirname + '/logger');
const ipc = require(__dirname + '/ipc');
const queue = require(__dirname + '/queue');
const cache = require(__dirname + '/cache');
const metrics = require(__dirname + '/metrics');

/**
 * @module jobs
 */

const jobs =

/**
 * # Job queue processor
 *
 * When launched with __jobs-workers__ parameter equal or greater than 0, the server spawns a number of workers which subscribe to
 * configured job queues and listen for messages.
 *
 * Multiple job queues can be defined and processed at the same time.
 *
 * By default __local__ and __worker__ queues are always created and ready to be used, jobs sent to local always run inside the local process
 * but jobs sent to worker queue will be run in a worker.
 *
 * A job is an object that defines what method from which module to run with the options as the first argument and a callback as the second.
 *
 * A job can be in the following formats:
 *
 *  ```json
 * "module.method"
 *
 * {
 *    job: "module.method"
 * }
 *
 * {
 *    job: { "module.method": { ... } }
 * }
 *
 * {
 *   job: {
 *      "module.method": { ... },
 *      "module2.method2": { ... }
 *   }
 * }
 * ```
 *
 * Any task in string format "module.method" will be converted into { "module.method: {} } automatically
 *
 * Once configured, then all calls to {@link module:jobs.submitJob} will push jobs to be executed to the provided or default queue.
 *
 * If somewhere a backend server running with __-jobs-workers__ greater than 0 and connected to the same queue
 * it will pull jobs from the queue and execute.
 *
 * The naming convention is that any function defined as __function(options, callback)__ can be used as a job to
 * be executed in one of the worker processes assuming the module is available in the {@link module:modules}
 *
 * ## Example:
 *
 * ### This SQS queue is shared via bkjs.conf between all processes
 * ```
 * queue-users = sqs://users
 * ```
 *
 * ### The module below accepts requests via API and then process jobs in the background
 *
 * -  /process/users endpoint just submits a job request into SQS queue and returns immediately, for simplicity no validation in this example
 * -  mymod.processUsers is a job function which is run by a worker process, it can run on a different host
 *
 * ```js
 * const { app, api, db, jobs } = require("backendjs");
 *
 * module.exports = {
 *     name: "mymod",
 *
 *     configureWeb(options, callback)
 *     {
 *         api.app.post("/process/users", (req, res) => {
 *
 *             jobs.submitJob({ job: { "mymod.processUsers": { type: req.query.type } } }, { queueName: "users" }, (err) => {
 *                 api.sendReply(res, err);
 *             });
 *         });
 *         callback();
 *     }
 *
 *     processUsers(options, callback)
 *     {
 *         db.select("bk_user", { type: options.type || "user" }, (err, rows) => {
 *         ...
 *         callback();
 *     }
 * }
 *
 * app.start({ server: true });
 * ```
 *
 * Start the server
 * ```shell
 * node mymod.js -jobs-workers 1
 * ```
 *
 * ## Crontab
 * To support jobs to be run with intervals via cron-like schedule can be enabled with a JSON file or DB config.
 *
 * This will require to install peer dependency: `npm install --save croner`
 *
 * 1. Create file crontab.json with the following contents, reusing the example above:
 *
 *   ```js
 *   [
 *     { "cron": "0 1 1 * * 1,3", "job": { "mymod.processUsers": { "type": "admin" } } }
 *   ]
 *   ```
 *
 * 2. Start the server with cron parameters (in cmdline or in bkjs.conf config file)
 *
 *   ```shell
 *   node mymod.js -jobs-workers 1 -jobs-cron-file crontab.json
 *   ```
 *
 */

module.exports = {
    name: "jobs",
    // Config parameters
    args: [
        { name: "cap-(.+)", type: "int", strip: "cap-", descr: "Capability parameters" },
        { name: "workers", type: "number", min: -1, max: 32, descr: "How many worker processes to launch to process the job queue, -1 disables jobs, 0 means launch as many as the CPUs available" },
        { name: "worker-cpu-factor", type: "real", min: 0, descr: "A number to multiply the number of CPUs available to make the total number of workers to launch, only used if `workers` is 0" },
        { name: "worker-env", type: "map", logger: "warn", descr: "Environment to be passed to the worker via fork, see `cluster.fork`" },
        { name: "worker-settings", type: "json", logger: "warn", descr: "Worker fork setting, see cluster.setupPrimary" },
        { name: "worker-delay", type: "int", descr: "Delay in milliseconds for a worker before it will start accepting jobs, for cases when other dependencies may take some time to start" },
        { name: "worker-queue", type: "list", onupdate: function() { if (ipc.role=="worker"&&app.role=="worker") this.subscribeWorker()}, descr: "Queue(s) to subscribe for workers, multiple queues can be processes at the same time, i.e. more than one job can run from different queues" },
        { name: "worker-options-(.+)", obj: "workerOptions", make: "$1", type: "json", descr: "Custom parameters by queue name, passed to `queue.subscribeQueue` on worker start, useful with channels", example: '-jobs-worker-options-nats#events {"count":10}' },
        { name: "max-runtime", type: "int", min: 0, descr: "Max number of seconds a job can run before being killed" },
        { name: "max-lifetime", type: "int", min: 0, descr: "Max number of seconds a worker can live, after that amount of time it will exit once all the jobs are finished, 0 means indefinitely" },
        { name: "shutdown-timeout", type: "int", min: 0, descr: "Max number of milliseconds to wait for the graceful shutdown sequence to finish, after this timeout the process just exits" },
        { name: "cron-queue", type: "list", min: 1, descr: "Default queue to use for cron jobs" },
        { name: "global-queue", type: "list", min: 1, descr: "Default queue for all jobs, the queueName is ignored" },
        { name: "global-ignore", type: "list", array: 1, descr: "Queue names which ignore the global setting, the queueName is used as usual, local and worker are ignored by default" },
        { name: "cron-file", descr: "File with cron jobs in JSON format" },
        { name: "cron", type: "json", onupdate: function(v) { if (app.role == "server") this.scheduleCronjobs("config", v) }, logger: "error", descr: "Cron jobs to be scheduled, the JSON must be in the same format as crontab file, cron format by https://croner.56k.guru" },
        { name: "unique-cache", descr: "Default cache name to use for keeping track of unique jobs" },
        { name: "unique-ignore", type: "regexp", descr: "Ignore all unique parameters if a job's uniqueKey matches" },
        { name: "unique-set-ttl-([0-9]+)", type: "regexp", obj: "uniqueSetTtl", make: "$1", descr: "Override unique TTL to a new value if matches the unique key", example: "-jobs-unique-ttl-100 KEY" },
        { name: "unique-logger", descr: "Log level for unique error conditions" },
        { name: "retry-visibility-timeout", type: "map", maptype: "int", descr: "Visibility timeout by error code >= 500 for queues that support it" },
        { name: "task-ignore", type: "regexp", descr: "Ignore matched tasks" },
    ],

    jobRx: /^[a-z0-9_.]+\.[a-z0-9_]+$/i,

    /** @var {object[]} - List of running jobs for a worker */
    runningJobs: [],

    exiting: 0,
    // Time of the last update on jobs and tasks
    runTime: 0,
    // Schedules cron jobs
    crontab: [],
    subscribed: new Set(),
    maxRuntime: 900,
    checkRuntime: 0,
    maxLifetime: 3600 * 12,
    shutdownTimeout: 50,
    uniqueError: "non-unique condition",
    workers: -1,
    workerDelay: 50,
    workerQueue: [],
    workerCpuFactor: 2,
    workerEnv: {},
    workerOptions: {},
    globalIgnore: ["local", "worker"],
    properties: [
        "noWait", "noWaitTimeout",
        "noVisibility", "visibilityTimeout", "retryVisibilityTimeout",
        "stopOnError",
        "startTime", "endTime",
        "uniqueTtl", "uniqueKey", "uniqueKeep",
        "uniqueLogger", "uniqueDrop", "uniqueOnce",
        "maxRuntime"
    ],
    metrics: {
        que: new metrics.Histogram(),
        running: 0,
        err_count: 0,
    },
};

// Initialize jobs processing in the server process
jobs.configureServer = function(options, callback)
{
    if (!app.isOk("jobs", options)) return callback();
    this.initServer(options, callback);
}

// Initialize a worker to be ready for jobs to execute, in instance mode setup timers to exit on no activity.
jobs.configureWorker = function(options, callback)
{
    if (!app.isOk("jobs", options)) return callback();
    this.initWorker(options, callback);
}

jobs.shutdown = function(options, callback)
{
    clearInterval(jobs._checkTimer);
    lib.tryCall(callback);
}

// Perform graceful worker shutdown, to be used for workers restart
jobs.shutdownWorker = function(options, callback)
{
    logger.log("shutdownWorker:", this.name, "queue:", this.workerQueue, "max-runtime:", this.maxRuntime, "max-lifetime:", this.maxLifetime, options);

    // Stop accepting messages from the queues
    for (const q of this.workerQueue) queue.unlisten({ queueName: q });

    setTimeout(callback, options?.shutdownTimeout || this.shutdownTimeout);
}

// Perform graceful worker shutdown and then exit the process
jobs.exitWorker = function(options)
{
    if (this.exiting++) return;
    app.runMethods("shutdownWorker", options, { parallel: 1, direct: 1 }, () => {
        process.exit(99);
    });
}

// Initialize a server that will manage jobs workers
jobs.initServer = function(options, callback)
{
    // Setup background tasks from the crontab
    if (this.cron || this.cronFile) {
        if (this.workers < 0) ipc.initWorker();
        this.loadCronjobs();
        this.scheduleCronjobs("config", this.cron);
    }

    if (this.workers < 0) return typeof callback == "function" && callback();

    ipc.initServer(options);

    // Start queue monitors if needed
    for (const name of this.workerQueue) {
        queue.monitor(lib.objExtend(jobs.workerOptions[name], { queueName: name }));
    }

    if (this.workerSettings) {
        cluster.setupPrimary(this.workerSettings);
    }

    // Launch the workers
    var workers = this.workers || Math.round(app.maxCPUs * (this.workerCpuFactor || 1));
    for (let i = 0; i < workers; i++) {
        cluster.fork(this.workerEnv);
    }

    logger.log("initServer:", this.name, "started", app.role, app.workerId || process.pid, "workers:", workers, "cron:", this.cron);

    if (typeof callback == "function") callback();
}

// Initialize a worker for processing jobs
jobs.initWorker = function(options, callback)
{
    ipc.initWorker(options);

    this._checkTimer = setInterval(this.checkTimes.bind(this), 30000);

    // Mark a jobs for cancellation
    ipc.on('jobs:cancel', jobs.markCancelled.bind(jobs));

    // Restart signal from the server process
    ipc.on("worker:restart", () => {
        jobs.exitWorker({ shutdownReason: "restart" });
    });

    // A job to process from the server (worker driver)
    ipc.on("worker:job", (msg) => {
        jobs.processJobMessage("#worker", msg);
    });

    // Randomize subscription when multiple workers start at the same time, some queue drivers use polling
    setTimeout(() => {
        this.subscribeWorker();
        logger.log("initWorker:", this.name, "started", app.role, app.workerId || process.pid, "queue:", this.subscribed, "maxRuntime:", this.maxRuntime, "maxLifetime:", this.maxLifetime);
    }, this.workerDelay);

    if (typeof callback == "function") callback();
}

jobs.subscribeWorker = function()
{
    // Always use the default queue if nothing specified but a job worker is running
    if (!this.workerQueue.length) this.workerQueue.push("queue");

    for (const name of this.workerQueue) {
        // Unsubscribed if started with -
        if (/^[!-]/.test(name)) {
            this.unsubscribeQueue(name.substr(1));
            continue;
        }
        // Prevent subscription more than once to the same queue in case of invalid or nonexistent queues
        var q = queue.getQueue(name);
        if (this.subscribed.has(q.canonical(name))) continue;
        var qopts = lib.objExtend({ queueName: name }, this.workerOptions[name]);
        queue.listen(qopts, this.processJobMessage.bind(this, name));
        this.subscribed.add(q.canonical(name));
        logger.info("subscribeWorker:", this.name, q.name, name);
    }
}

jobs.unsubscribeQueue = function(name)
{
    const q = queue.getClient(name);
    if (!this.subscribed.delete(q.canonical(name))) return;
    queue.unlisten({ queueName: name });
    logger.info("unsubscribeQueue:", this.name, q.name, name);
}

jobs.processJobMessage = function(name, msg, next)
{
    if (typeof next != "function") next = lib.noop;
    var opts = { queue: name, message: msg, stopOnError: 1, direct: true, stime: Date.now() };
    app.runMethods("configureJob", opts, (err) => {
        if (err) return next(err);

        const _timer = queue.getQueue(name).metrics.start();
        jobs.runJob(opts.message, { queueName: name }, (err) => {
            _timer.end();
            opts.error = err;
            opts.parallel = 1;
            opts.etime = Date.now();
            opts.elapsed = opts.etime - opts.stime;
            if (err) {
                logger.logger(err.status >= 600 ? err.logger || "warn" : !err.status || err.status < 200 || err.status > 299 ? "error" : "info", "endJob:", name, lib.traceError(err), opts.message, opts.elapsed, "ms");
            } else {
                logger.logger(opts.message.logger || "debug", "endJob:", name, opts.message, opts.elapsed, "ms");
            }
            app.runMethods("finishJob", opts, () => { next(err) });
            // Mark end of last message processed
            jobs.runTime = Date.now();
            jobs.checkTimes();
        });
    });
}

/**
 * Mark all running jobs with the cancel key, it is up to any job to check for cancel keys and exit
 * @param {object} msg
 * @param {string} msg.key
 * @memberof module:jobs
 * @method markCancelled
 */
jobs.markCancelled = function(msg)
{
    if (!msg?.key) return;
    for (const job of this.runningJobs) {
        job.cancelKey = lib.toFlags("add", job.cancelKey, msg.key);
    }
    logger.info("markCancelled:", this.runningJobs.length, msg);
}

/**
 * Returns true if a cancel job key is set, this is called inside a job
 * @param {string} key
 * @memberof module:jobs
 * @method isCancelled
 */
jobs.isCancelled = function(key)
{
    if (!key) return this.exiting;
    for (const job of this.runningJobs) {
        if (lib.isFlag(job?.cancelKey, key)) return 1;
    }
    return this.exiting;
}

// Find the max runtime allowed in seconds
jobs.getMaxRuntime = function()
{
    return this.runningJobs.reduce((m, x) => (Math.max(m, x.maxRuntime || 0)), this.maxRuntime) * 1000;
}

// Return a list of unique job names currently running
jobs.getRunningJobs = function()
{
    var jobs = {};
    for (const job of this.runningJobs) {
        for (const p in job.job) jobs[p] = 1;
    }
    return Object.keys(jobs);
}

/**
 * Check how long we run a job and force kill if exceeded, check if total life time is exceeded.
 *
 * If exit is required the `shundownWorker` methods will receive options with `shutdownReason` property
 * set and the name-sake property will contained the value exceeded.
 */
jobs.checkTimes = function()
{
    var now = Date.now();
    if (this.runningJobs.length) {
        var maxRuntime = this.getMaxRuntime();
        if (maxRuntime > 0) {
            var badJobs = this.runningJobs.filter((x) => (now - x.jobTime > maxRuntime));
            // Stuck jobs can run much longer if we are still processing other small jobs
            if (badJobs.length) {
                if (this.runningJobs.length == badJobs.length) {
                    logger.warn('checkLifetime:', 'jobs: exceeded max run time', maxRuntime, badJobs);

                    // Notify all queues about bad jobs to be dropped completely
                    for (const job of badJobs) queue.unlisten(job, { queueName: job.jobQueue });
                    return this.exitWorker({ jobs: badJobs, shutdownReason: "maxRuntime", maxRuntime: maxRuntime });
                } else
                if (now - this.checkRuntime > maxRuntime) {
                    logger.warn('checkLifetime:', 'jobs: exceeded max run time but other jobs still running', maxRuntime, badJobs);
                }
                this.checkRuntime = Date.now();
            }
        }
    } else {
        // Idle mode, check max life time
        if (this.maxLifetime > 0 && now - app.ctime + lib.randomShort() > this.maxLifetime * 1000) {
            logger.log('checkLifetime:', 'jobs: exceeded max life time', this.maxLifetime);
            return this.exitWorker({ shutdownReason: "maxLifetime", maxLifetime: this.maxLifetime * 1000 });
        }
    }
}

function _badJob(jobspec)
{
    return lib.newError('Invalid job: ' + lib.objDescr(jobspec), 400, "InvalidJob");
}

jobs.isJob = function(jobspec)
{
    if (typeof jobspec == "string" && this.jobRx.test(jobspec)) {
        jobspec = { job: { [jobspec]: {} } };
    }
    if (!lib.isObject(jobspec)) {
        return _badJob(jobspec);
    }

    if (typeof jobspec.job == "string") {
        jobspec.job = { [jobspec.job]: {} };
    }

    if (lib.isObject(jobspec.job)) {
        if (!Object.keys(jobspec.job).every((y) => (jobs.jobRx.test(y)))) {
            return _badJob(jobspec);
        }
    } else {
        return _badJob(jobspec);
    }
    return jobspec;
}

// Apply special job properties from the options
jobs.checkOptions = function(jobspec, options)
{
    if (!jobspec || !options) return;
    for (const p of this.properties) {
        if (typeof jobspec[p] == "undefined" && typeof options[p] != "undefined") jobspec[p] = options[p];
    }
}

/**
 * Submit a job for execution, it will be saved in a queue and will be picked up later and executed.
 * The queue and the way how it will be executed depends on the configured queue. See `isJob` for
 * the format of the job objects.
 * @param {object} jobspec - an object with jobs to run
 * @param {object} [options]
 * @param {int} [options.uniqueTtl] - if greater than zero it defines number of milliseconds for this job to stay in the queue or run,
 * it creates a global lock using the job object as the hash key, no other job can be run until the ttl expires or the job
 * finished, non unique jobs will be kept in the queue and repeated later according to the `visibilityTimeout` setting.
 *
 * @param {int} [options.uniqueKey] - can define an alternative unique key for this job for cases when different jobs must be run sequentially
 *
 * @param {int} [options.uniqueKeep] - if true then keep the unique lock after the jobs finished, otherwise it is cleared
 *
 * @param {int} [options.uniqueDrop] - if true will make non-unique jobs to be silently dropped instead of keeping them in the queue
 *
 * @param {int} [options.logger] - defines the logger level which will be used to log when the job is finished, default is debug
 *
 * @param {int} [options.maxRuntime] - defines max number of seconds this job can run, if not specified then the queue default is used
 *
 * @param {int} [options.uniqueOnce] - if true than the visibility timeout is not kept alive while the job is running
 *
 * @param {int} [options.noWait] - will run the job and delete it from the queue immediately, not at the end, for one-off jobs
 *
 * @param {int} [options.noWaitTimeout] - number of seconds before deleting the job for one-off jobs but taking into account the uniqueKey and visibility timeout giving time
 *  to check for uniqueness and exit, can be used regardless of the noWait flag
 *
 * @param {int} [options.noVisibility] - will always delete messages after processing, ignore 600 errors as well
 *
 * @param {int} [options.visibilityTimeout] - custom timeout for how long to keep this job invisible, overrides the default timeout
 *
 * @param {int} [options.retryVisibilityTimeout] - an object with custom timeouts for how long to keep this job invisible by error status which results in keeping tasks in the queue for retry
 *
 * @param {int} [options.stopOnError] - will stop tasks processing on first error, otherwise all errors will be just logged. Errors with status >= 600 will
 *  stop the job regardless of this flag
 *
 * @param {int} [options.startTime] - job must start only after this date, if started sooner it will be put back into the queue
 * @param {int} [options.endTime] - job must not start after this date
 *
 * @param {int} [options.delay] - is only supported by SQS currently, it delays the job execution for the specified amount of ms
 * @param {int} [options.dedupTtl] - if set it defines number of ms to keep track of duplicate messages, it tries to preserver only-once behaviour. To make
 *  some queue to automatically use dedup mode it can be set in the queue options: `-queue[-NAME]-options-dedup-ttl 86400000`.
 *  Note: `uniqueTtl` settings take precedence and if present dedup is ignored.
 * @callback callback
 * @memberOf module:jobs
 * @method submitJob
 */
jobs.submitJob = function(jobspec, options, callback)
{
    if (typeof options == "function") callback = options, options = null;

    jobspec = this.isJob(jobspec);
    if (util.types.isNativeError(jobspec)) {
        return lib.tryCall(callback, jobspec);
    }

    var qname = options?.queueName;

    /*
     * We deal with queue lists here due to the round-robin processing, cannot call getClient multiple
     * times with a list because it returns the next queue with every call, so we get the next queue here
     * and pass just the name
     */
    if (this.globalQueue && !lib.isFlag(this.globalIgnore, qname)) {
        qname = this.globalQueue;
    }
    var q = queue.getQueue(qname);

    // Ignore duplicate messages
    var ttl = lib.toNumber(q.options.dedupTtl || options?.dedupTtl);
    if (ttl > 0) {
        jobspec.dedup = `${ttl}-${lib.uuid()}`;
    }

    // Keep track where the job is originated
    jobspec.origin = app.origin();
    logger.debug("submitJob:", jobspec, "OPTS:", options, "Q:", q.name);
    this.checkOptions(jobspec, options);

    // Use global timeouts if not specified
    if (lib.isEmpty(jobspec.retryVisibilityTimeout) && this.retryVisibilityTimeout) {
        jobspec.retryVisibilityTimeout = this.retryVisibilityTimeout;
    }
    // Queue unique ttl
    if (lib.isEmpty(jobspec.uniqueTtl) && q.options.uniqueTtl) {
        jobspec.uniqueTtl = q.options.uniqueTtl;
    }

    options = Object.assign({}, options, { queueName: q.queueName });
    queue.submit(jobspec, options, callback);
}

// Run all tasks in the job object
jobs.runJob = function(jobspec, options, callback)
{
    var q = options?.queueName;

    logger.debug("runJob:", q, jobspec);

    jobspec = this.isJob(jobspec);
    if (util.types.isNativeError(jobspec)) {
        return lib.tryCall(callback, jobspec);
    }
    var timer, ttl, key;

    lib.series([
        function(next) {
            // Make sure we do not have this job in the queue
            ttl = lib.toNumber(jobspec.uniqueTtl, { min: 0 });
            if (ttl && lib.testRegexp(jobspec.uniqueKey, jobs.uniqueIgnore)) ttl = 0;
            if (ttl && jobs.uniqueSetTtl) {
                // Managing throughput by changing ttl
                for (const p in jobs.uniqueSetTtl) {
                    if (lib.testRegexp(jobspec.uniqueKey, jobs.uniqueSetTtl[p])) {
                        ttl = lib.toNumber(p);
                        break;
                    }
                }
            }
            if (!ttl) {
                // Use dedup if present, simulate unique properties
                if (jobspec.dedup) {
                    ttl = lib.toNumber(jobspec.dedup);
                    jobspec.uniqueKey = jobspec.dedup;
                    jobspec.uniqueDrop = jobspec.uniqueKeep = jobspec.uniqueOnce = 1;
                }
                if (!ttl) return next();
            }
            key = jobspec.uniqueKey = jobspec.uniqueKey || lib.hash(lib.stringify(jobspec.job), "sha256");
            cache.lock("JOB:" + key, { ttl: ttl, cacheName: jobs.uniqueCache }, (err, locked) => {
                // If the queue service is down keep all messages in the queue until it is up again
                if (!locked) {
                    if (!err && jobspec.uniqueDrop) {
                        logger.logger(jobspec.uniqueLogger || jobs.uniqueLogger || "info", "runJob:", "dropped", q, jobspec);
                        ipc.emitMsg("jobs:dropped", { job: jobspec, queueName: q });
                        return lib.tryCall(callback, { status: 200, message: "dropped" });
                    }
                    err = { status: 600, message: err || jobs.uniqueError, logger: jobspec.uniqueLogger || jobs.uniqueLogger || "debug" };
                    ipc.emitMsg("jobs:nolock", { job: jobspec, queueName: q, err: err });
                } else
                if (!err && !jobspec.uniqueOnce) {
                    // Keep the lock active while the job is running
                    timer = setInterval(function() {
                        cache.lock("JOB:" + key, { ttl: ttl, cacheName: jobs.uniqueCache, set: 1 });
                    }, Math.max(ttl * 0.7, 1000));
                }
                logger.debug("runJob:", q, cache.getCache(jobs.uniqueCache).name, "locked:", locked, "ttl:", ttl, "key:", key, "JOB:", jobspec)
                next(err);
            });
        },
        function(next) {
            ipc.emitMsg("jobs:started", { job: jobspec, queueName: q });

            jobspec.jobQueue = q;
            jobspec.jobTime = Date.now();
            jobs.runningJobs.push(jobspec);
            if (cluster.isWorker) process.title = `${app.id}: worker ${jobs.getRunningJobs()}`;

            lib.forEvery(Object.keys(jobspec.job), (task, next2) => {
                _runTask(task, jobspec, options, (err) => {
                    // Stop the task, have to wait till all subtasks stop to avoid race conditions.
                    // All 600 errors are propagated regardless of the flag
                    if (!jobspec.error || err?.status >= 600) jobspec.error = err;
                    next2();
                });
            }, () => {
                var idx = jobs.runningJobs.indexOf(jobspec);
                if (idx > -1) jobs.runningJobs.splice(idx, 1);
                if (cluster.isWorker) process.title = `${app.id}: worker ${jobs.getRunningJobs()}`;

                clearInterval(timer);
                if (ttl && key && !jobspec.uniqueKeep) {
                    cache.unlock("JOB:" + key, { cacheName: jobs.uniqueCache });
                }
                ipc.emitMsg("jobs:stopped", { job: jobspec, queueName: q });

                next(jobspec.error);
            });
        },
    ], callback, true);
}

/**
 * Send a cancellation request for given key to all workers
 * @param {string} key
 * @param {function} [callback]
 * @memberof module:jobs
 * @method cancelJob
 */
jobs.cancelJob = function(key, callback)
{
    ipc.broadcast(":worker", ipc.newMsg("jobs:cancel", { key: key }), callback);
}

// Execute a task by name, the `options` will be passed to the function as the first argument, calls the callback on finish or error
function _runTask(name, jobspec, options, callback)
{
    var job = jobspec.job[name];

    if (jobs.taskIgnore && jobs.taskIgnore.test(name)) {
        logger.error("runTask:", options?.queueName, name, "task ignored", job, "RX:", jobs.taskIgnore);
        return callback(lib.newError("Task ignored: " + name, 499, "TaskIgnored"));
    }

    var parts = name.split('.');
    var path = parts.slice(0, -1).join(".");
    var method = parts.at(-1);
    var context = modules[path];

    if (!context || typeof context[method] != "function") {
        logger.error("runTask:", options?.queueName, name, "unknown method", job);
        return callback(lib.newError("Unknown method: " + name, 499, "UnknownMethod"));
    }
    if (!lib.isObject(job)) job = {};

    jobs.metrics.running++;

    var d = domain.create();
    d.on("error", (err) => {
        _finishTask(err, name, jobspec, options, callback);
    });
    d.run(function() {
        logger.debug('runTask:', 'started', name, job);
        jobs.runTime = Date.now();
        ipc.emitMsg("jobs:task:started", { name, job, queueName: options?.queueName });

        context[method](job, (err) => {
            _finishTask(err, name, jobspec, options, callback);
        });
    });
}

// Complete task execution, cleanup and update the status
function _finishTask(err, name, jobspec, options, callback)
{
    var job = jobspec.job[name];

    if (err && !(err.status >= 200 && err.status < 300)) {
        jobs.metrics.err_count++;
        logger.logger(err.status >= 400 || util.types.isNativeError(err) ? "error" : "info", 'endTask:', options.queueName, name, lib.traceError(err), job);
    } else {
        logger.debug('endTask:', options.queueName, name, err, job);
    }
    jobs.metrics.que.update(Date.now() - jobspec.jobTime);
    jobs.metrics.running--;
    jobs.runTime = Date.now();

    ipc.emitMsg("jobs:task:stopped", { name, job, queueName: options.queueName, err });
    callback(err);
}

// Jobs run time stats
jobs.configureCollectStats = function(options)
{
    var que = this.metrics.que.toJSON({ reset: 1 });
    if (que?.count) {
        options.stats.jobs_que_size = this.metrics.running;
        options.stats.jobs_err_count = metrics.take(this.metrics, "err_count");
        options.stats.jobs_task_count = que.count;
        options.stats.jobs_run_time = que.med;
    }
}