queue

module:queue

Description:
  • Queue module for jobs, events and pub/sub.

    The class module:queue.QueueClient defines the methods that a driver can implement.

    All methods below use options.queueName for a specific queue.

    If it is an array then a client will be picked sequentially by maintaining internal sequence number.

    Empty default client always exists, it can be overridden to make default some other driver using queue-default config parameter.

    The url query parameters that start with bk- will be extracted from the url and placed in the class's *options object, this is a way to pass special properties without using queue-options, the rest of the url parameters will be passed to the driver.

    Job queues are designed by using SQS visibility timeout logic.

Source:
Parameters:
Name Type Description
options object
Properties
Name Type Attributes Description
queueCount int <optional>

config property specifies how many messages to process at the same time, default is 1. Not all drivers support multiple messages.

pollInterval int <optional>

config property defines in ms how often to check for new messages, i.e. after messages processed it can poll immediately or after this amount of time, default is 1000 milliseconds.

visibilityTimeout int <optional>

property specifies how long the messages being processed stay hidden, in milliseconds.

noVisibilityTimeout boolean <optional>

disables auto extending visibilityTimeout while the job is running, allows default queue visibility timeout to be used instead

startTime int <optional>

property which is the time in the future when a message must be actually processed there, The scheduling is implemented using visibilityTimeout feature, keep scheduled messages hidden until the actual time.

maxVisibilityTimeout int <optional>

which defines in milliseconds the max time a messsage can stay invisible while waiting for its scheduled date, default is 6 hours, the AWS max is 12 hours.

To enable stats collection for a queue it must be enabled with config: queue-NAME-options-metrics=1

Example
queue-default=redis://
queue-redis=redis://?bk-visibilityTimeout=30000&bk-queueCount=2
queue-account=sqs://sqs.us-east-1.amazonaws.com/566932855105/account?bk-queueCount=2
queue-messages=sqs://messages?bk-pollInterval=60000
queue-config={ "account": "https://sqs.us-east-1.amazonaws.com/566932855105/account?bk-queueCount=2" }

Classes

DBQueueClient
EventBridgeQueueClient
JSONQueueClient
LocalQueueClient
NatsQueueClient
QueueClient
RedisQueueClient
SNSQueueClient
SQSQueueClient
WorkerQueueClient

Members

(static) clients :object

Description:
  • queue live clients by name

Source:

(static) modules :object

Description:
  • queue modules by type

Source:

Methods

(async, static) apublish(subject, msg, optionsopt)

Description:
Source:
Parameters:
Name Type Attributes Description
subject string
msg string | object
options object <optional>
Example
const { err, data } = await queue.apublish("events", { alert, detail });

(async, static) asubmit(msg, optionsopt)

Description:
Source:
Parameters:
Name Type Attributes Description
msg string
options object <optional>
Example
const { err, data } = await queue.asubmit({ alert, detail });

(static) checkConfig()

Description:
  • Initialize missing or new clients, existing clients stay the same

Source:

(static) createClient(options)

Description:
  • Return a new client for the given url or null if not supported

Source:
Parameters:
Name Type Description
options object

(static) getClient(options) → {QueueClient}

Description:
  • Return a queue client by name if specified in the options or use default client which always exists, use queueName to specify a specific driver. Ignores subject (@) and group(#) if present. If it is an array it will rotate items sequentially.

Source:
Parameters:
Name Type Description
options object | string | Array.<string>
Properties
Name Type Attributes Description
queueName string <optional>

queue to use

Returns:
Type Description
QueueClient

(static) initClients()

Description:
  • Reinitialize all clients for queue purposes, previous clients will be closed.

Source:

(static) listen(optionsopt, callback)

Description:
  • Listen for messages from the given queue, the callback will be called only on new message received. The callback accepts 2 arguments, a message and optional next callback, if it is provided it must be called at the end to confirm or reject the message processing. Only errors with code>=500 will result in rejection, not all drivers support the next callback if the underlying queue does not support message acknowledgement.

    Depending on the implementation, this can work as fan-out, delivering messages to all subscribed to the same channel or can implement job queue model where only one subscriber receives a message.

    For cases when the next callback is provided this means the queue implementation requires an acknowledgement of successful processing, returning an error with err.status >= 500 will keep the message in the queue to be processed later. Special code 600 means to keep the job in the queue and report as warning in the log.

Source:
Parameters:
Name Type Attributes Description
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function
Example
queue.listen({ queueName: "jobs" }, (msg, next) => {
  req.res.json(data);
  if (next) next();
 }, req);

(static) publish(subject, msg, optionsopt, callbackopt)

Description:
  • Publish an event with the subject to be delivered to all subscribers. If the msg is not a string it will be stringified. The module:events uses this.

Source:
Parameters:
Name Type Attributes Description
subject string
msg string | object
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>
Example
queue.publish("events", { id, name, data }, { queueName: "logs" })

(static) purge(optionsopt, callbackopt)

Description:
  • Purge all messages form the queue

Source:
Parameters:
Name Type Attributes Description
options object <optional>
callback function <optional>

(async, static) queue.apurge()

Description:
  • Async version of purge

Source:

(async, static) queue.astats()

Description:
  • Async version of stats

Source:

(static) shutdown()

Description:
  • Close all existing clients except empty local client

Source:

(static) stats(optionsopt, callback)

Description:
  • Returns the queue statistics, the format depends on the queue type used

Source:
Parameters:
Name Type Attributes Description
options object <optional>
callback function

(static) submit(msg, callbackopt)

Description:
  • Submit a message to the queue, msg can be a string or an object or an array of objects if the queue supports it.

    • For job processing at least one delivery is expected, the module:jobs uses this.
    • For events it depends on the queue being used, NATS supports multiple consumers usng the same queue.
Source:
Parameters:
Name Type Attributes Description
msg object | string
options.queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>
Example
queue.submit({ alert, detail }, { queueName: "sqs" }, lib.log)

(static) subscribe(subject, options, callbackopt)

Description:
  • Subscribe to receive messages for the given subject, the callback will be called only on new message received.

Source:
Parameters:
Name Type Attributes Description
subject string
options object
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>
Example
queue.subscribe("alerts", (msg) => {
   req.res.json(data);
}, req);

(static) unlisten(optionsopt, callbackopt)

Description:
  • Stop listening for message, if no callback is provided all listeners for the key will be unsubscribed, otherwise only the specified listener.

    The callback will not be called.

    It keeps a count how many subscribe/unsubscribe calls been made and stops any internal listeners once nobody is subscribed. This is specific to a queue which relies on polling.

Source:
Parameters:
Name Type Attributes Description
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>

(static) unsubscribe(subject, optionsopt, callbackopt)

Description:
  • Close a subscription for the given subject, no more messages will be delivered.

Source:
Parameters:
Name Type Attributes Description
subject string
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>