jobs

module:jobs

Description:
  • Job queue processor

    When launched with jobs-workers parameter equal or greater than 0, the server spawns a number of workers which subscribe to configured job queues and listen for messages.

    Multiple job queues can be defined and processed at the same time.

    By default local and worker queues are always created and ready to be used, jobs sent to local always run inside the local process but jobs sent to worker queue will be run in a worker.

    A job is an object that defines what method from which module to run with the options as the first argument and a callback as the second.

    A job can be in the following formats:

    "module.method"
    
    {
      job: "module.method"
    }
    
    {
      job: { "module.method": { ... } }
    }
    
    {
     job: {
        "module.method": { ... },
        "module2.method2": { ... }
     }
    }
    

    Any task in string format "module.method" will be converted into { "module.method: {} } automatically

    Once configured, then all calls to module:jobs.submitJob will push jobs to be executed to the provided or default queue.

    If somewhere a backend server running with -jobs-workers greater than 0 and connected to the same queue it will pull jobs from the queue and execute.

    The naming convention is that any function defined as function(options, callback) can be used as a job to be executed in one of the worker processes assuming the module is available in the module:modules

    Example:

    This SQS queue is shared via bkjs.conf between all processes

    queue-users = sqs://users
    

    The module below accepts requests via API and then process jobs in the background

    • /process/users endpoint just submits a job request into SQS queue and returns immediately, for simplicity no validation in this example
    • mymod.processUsers is a job function which is run by a worker process, it can run on a different host
    const { app, api, db, jobs } = require("backendjs");
    
    module.exports = {
        name: "mymod",
    
        configureWeb(options, callback)
        {
            api.app.post("/process/users", (req, res) => {
    
                jobs.submitJob({ job: { "mymod.processUsers": { type: req.query.type } } }, { queueName: "users" }, (err) => {
                    api.sendReply(res, err);
                });
            });
            callback();
        }
    
        processUsers(options, callback)
        {
            db.select("bk_user", { type: options.type || "user" }, (err, rows) => {
            ...
            callback();
        }
    }
    
    app.start({ server: true });
    

    Start the server

    node mymod.js -jobs-workers 1
    

    Crontab

    To support jobs to be run with intervals via cron-like schedule can be enabled with a JSON file or DB config.

    This will require to install peer dependency: npm install --save croner

    1. Create file crontab.json with the following contents, reusing the example above:
    [
      { "cron": "0 1 1 * * 1,3", "job": { "mymod.processUsers": { "type": "admin" } } }
    ]
    
    1. Start the server with cron parameters (in cmdline or in bkjs.conf config file)
    node mymod.js -jobs-workers 1 -jobs-cron-file crontab.json
    
Source:

Members

(static) runningJobs :Array.<object>

Description:
  • List of running jobs for a worker

Source:

Methods

(static) cancelJob(key, callbackopt)

Description:
  • Send a cancellation request for given key to all workers

Source:
Parameters:
Name Type Attributes Description
key string
callback function <optional>

(static) isCancelled(key)

Description:
  • Returns true if a cancel job key is set, this is called inside a job

Source:
Parameters:
Name Type Description
key string

(static) loadCronjobs()

Description:
  • Load crontab from JSON file as list of job specs:

    • cron - cron time interval spec: 'second' 'minute' 'hour' 'dayOfMonth' 'month' 'dayOfWeek'
    • croner - optional object with additional properties for the Croner object
    • job - a string as obj.method or an object with job name as property name and the value is an object with additional jobspec for the job passed as first argument, a job callback always takes jobspec and callback as 2 arguments
    • disabled - disable the job but keep in the cron file, it will be ignored
    • queueName - name of the queue where to submit this job, if not given it uses cron-queue
    • uniqueTtl - defines that this job must be the only one in the queue for the number of milliseconds specified, after that time another job with the same arguments can be submitted.

    The expressions used by Croner(https://croner.56k.guru) are very similar to those of Vixie Cron, but with a few additions and changes as outlined below:

    ┌──────────────── (optional) second (0 - 59)
    │ ┌────────────── minute (0 - 59)
    │ │ ┌──────────── hour (0 - 23)
    │ │ │ ┌────────── day of month (1 - 31)
    │ │ │ │ ┌──────── month (1 - 12, JAN-DEC)
    │ │ │ │ │ ┌────── day of week (0 - 6, SUN-Mon)
    │ │ │ │ │ │       (0 to 6 are Sunday to Saturday; 7 is Sunday, the same as 0)
    │ │ │ │ │ │
    * * * * * *
    
Source:
Example
[ { cron: "0 0 * * * *", job: "scraper.run" }, ..]

(static) markCancelled(msg)

Description:
  • Mark all running jobs with the cancel key, it is up to any job to check for cancel keys and exit

Source:
Parameters:
Name Type Description
msg object
Properties
Name Type Description
key string

(static) scheduleCronjob(jobspec)

Description:
  • Create a new cron job, for remote jobs additional property args can be used in the object to define arguments for the instance backend process, properties must start with -

Source:
Parameters:
Name Type Description
jobspec object
Example
{ "cron": "0 10 * * * *", "croner": { "maxRun": 3 }, "job": "server.processQueue" },
{ "cron": "0 30 * * * *", "job": { "server.processQueue": { "name": "queue1" } } },
{ "cron": "0 5 * * * *", "job": [ { "scraper.run": { "url": "host1" } }, { "scraper.run": { "url": "host2" } } ] }

(static) scheduleCronjobs(type, list) → {int}

Description:
  • Schedule a list of cron jobs, types is used to cleanup previous jobs for the same type for cases when a new list needs to replace the existing jobs. Empty list does nothing, to reset the jobs for the particular type and empty invalid jobs must be passed, like: [ {} ]

Source:
Parameters:
Name Type Description
type string
list Array.<object>
Returns:
Type Description
int

number of cron jobs actually scheduled.

(static) submitJob(jobspec, optionsopt)

Description:
  • Submit a job for execution, it will be saved in a queue and will be picked up later and executed. The queue and the way how it will be executed depends on the configured queue. See isJob for the format of the job objects.

Source:
Parameters:
Name Type Attributes Description
jobspec object

an object with jobs to run

options object <optional>
Properties
Name Type Attributes Description
uniqueTtl int <optional>

if greater than zero it defines number of milliseconds for this job to stay in the queue or run, it creates a global lock using the job object as the hash key, no other job can be run until the ttl expires or the job finished, non unique jobs will be kept in the queue and repeated later according to the visibilityTimeout setting.

uniqueKey int <optional>

can define an alternative unique key for this job for cases when different jobs must be run sequentially

uniqueKeep int <optional>

if true then keep the unique lock after the jobs finished, otherwise it is cleared

uniqueDrop int <optional>

if true will make non-unique jobs to be silently dropped instead of keeping them in the queue

logger int <optional>

defines the logger level which will be used to log when the job is finished, default is debug

maxRuntime int <optional>

defines max number of seconds this job can run, if not specified then the queue default is used

uniqueOnce int <optional>

if true than the visibility timeout is not kept alive while the job is running

noWait int <optional>

will run the job and delete it from the queue immediately, not at the end, for one-off jobs

noWaitTimeout int <optional>

number of seconds before deleting the job for one-off jobs but taking into account the uniqueKey and visibility timeout giving time to check for uniqueness and exit, can be used regardless of the noWait flag

noVisibility int <optional>

will always delete messages after processing, ignore 600 errors as well

visibilityTimeout int <optional>

custom timeout for how long to keep this job invisible, overrides the default timeout

retryVisibilityTimeout int <optional>

an object with custom timeouts for how long to keep this job invisible by error status which results in keeping tasks in the queue for retry

stopOnError int <optional>

will stop tasks processing on first error, otherwise all errors will be just logged. Errors with status >= 600 will stop the job regardless of this flag

startTime int <optional>

job must start only after this date, if started sooner it will be put back into the queue

endTime int <optional>

job must not start after this date

delay int <optional>

is only supported by SQS currently, it delays the job execution for the specified amount of ms

dedupTtl int <optional>

if set it defines number of ms to keep track of duplicate messages, it tries to preserver only-once behaviour. To make some queue to automatically use dedup mode it can be set in the queue options: -queue[-NAME]-options-dedup-ttl 86400000. Note: uniqueTtl settings take precedence and if present dedup is ignored.