queue

module:queue

Description:
  • Queue module for jobs, events and subscriptions.

    All methods use options.queueName for non-default queue.

    If it is an array then a client will be picked sequentially by maintaining internal sequence number.

    To specify a channel within a queue use the format queueName#channelName, for drivers that support multiple channels like NATS/Redis the channel will be used for another subscription within the same connection.

    For drivers (NATS) that support multiple consumers the full queue syntax is queueName#channelName@groupName or queueName@groupName, as well as the groupName property in the subscribe options.

    Empty default client always exists, it can be overridden to make default some other driver

    To enable stats collection for a queue it must be enabled with config: queue-redis-options-metrics=1

    The class module:queue.QueueClient defines the methods that a driver may or may not implement.

    The url query parameters that start with bk- will be extracted from the url and placed in the class *options object, this is a way to pass special properties without using cache-options, the rest of the url parameters will be passed to the driver.

Source:
Example
queue-default=redis://
queue-redis=redis://?bk-threshold=5&bk-visibilityTimeout=30000&bk-count=2
queue-account=sqs://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2
queue-messages=sqs://messages?bk-interval=60000
queue-config={ "account": "https://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2" }

Classes

EventBridgeClient
JSONClient
LocalClient
NatsClient
QueueClient
RedisClient
SNSClient
SQSClient
WorkerClient

Members

(static) clients :object

Description:
  • queue live clients by name

Source:

(static) modules :object

Description:
  • queue modules by type

Source:

Methods

(async, static) apublish(channel, msg, optionsopt)

Description:
Source:
Parameters:
Name Type Attributes Description
channel string
msg string | object
options object <optional>
Example
const { err, data } = await queue.apublish("events", { alert, detail });

(async, static) asubmit(msg, optionsopt)

Description:
Source:
Parameters:
Name Type Attributes Description
msg string
options object <optional>
Example
const { err, data } = await queue.asubmit({ alert, detail });

(static) checkConfig()

Description:
  • Initialize missing or new clients, existing clients stay the same

Source:

(static) createClient(options)

Description:
  • Return a new client for the given host or null if not supported

Source:
Parameters:
Name Type Description
options object

(static) drop(msg, optionsopt, callbackopt)

Description:
  • Queue specific message deletion from the queue in case of abnormal shutdown or job running too long in order not to re-run it after the restart, this is for queues which require manual message deletion ofter execution(SQS). Each queue client must maintain the mapping or other means to identify messages, the options is the message passed to the listener

Source:
Parameters:
Name Type Attributes Description
msg string | object
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>

(static) getClient(options) → {QueueClient}

Description:
  • Return a queue client by name if specified in the options or use default client which always exists, use queueName to specify a specific driver. If it is an array it will rotate items sequentially.

Source:
Parameters:
Name Type Description
options object | string | Array.<string>
Properties
Name Type Attributes Description
queueName string <optional>

queue to use

Returns:
Type Description
QueueClient

(static) initClients()

Description:
  • Reinitialize all clients for queue purposes, previous clients will be closed.

Source:

(static) listen(optionsopt, callback)

Description:
  • Listen for messages from the given queue, the callback will be called only on new message received. The callback accepts 2 arguments, a message and optional next callback, if it is provided it must be called at the end to confirm or reject the message processing. Only errors with code>=500 will result in rejection, not all drivers support the next callback if the underlying queue does not support message acknowledgement.

    Depending on the implementation, this can work as fan-out, delivering messages to all subscribed to the same channel or can implement job queue model where only one subscriber receives a message. For some cases like Redis this is the same as subscribe.

    For cases when the next callback is provided this means the queue implementation requires an acknowledgement of successful processing, returning an error with err.status >= 500 will keep the message in the queue to be processed later. Special code 600 means to keep the job in the queue and report as warning in the log.

Source:
Parameters:
Name Type Attributes Description
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function
Example
queue.listen({ queueName: "jobs" }, (msg, next) => {
  req.res.json(data);
  if (next) next();
 }, req);

(static) monitor(optionsopt)

Description:
  • Queue specific monitor services that must be run in the master process, this is intended to perform queue cleanup or dealing with stuck messages (Redis)

Source:
Parameters:
Name Type Attributes Description
options object <optional>

(static) publish(channel, msg, optionsopt, callbackopt)

Description:
  • Publish an event to the channel to be delivered to all subscribers. If the msg is not a string it will be stringified. The module:events uses this.

Source:
Parameters:
Name Type Attributes Description
channel string
msg string | object
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>
Example
queue.publish("events", { id, name, data }, { queueName: "logs" })

(static) shutdown()

Description:
  • Close all existing clients except empty local client

Source:

(static) stats(optionsopt, callback)

Description:
  • Returns the queue statistics, the format depends on the queue type used

Source:
Parameters:
Name Type Attributes Description
options object <optional>
callback function

(static) submit(msg, callbackopt)

Description:
  • Submit a message to the queue, msg can be a string or an object or an array of objects if the queue supports it. This is for jobs processing, at least one delivery is expected, the module:jobs uses this.

Source:
Parameters:
Name Type Attributes Description
msg object | string
options.queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

options.stime int <optional>

defines when the message should be processed, it will be held in the queue until the time comes

options.etime int <optional>

defines when the message expires, i.e. will be dropped if not executed before this time.

callback function <optional>
Example
queue.submit({ alert, detail }, { queueName: "sqs" })

(static) subscribe(channel, options, callbackopt)

Description:
  • Subscribe to receive messages from the given channel, the callback will be called only on new message received.

Source:
Parameters:
Name Type Attributes Description
channel string
options object
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>
Example
queue.subscribe("alerts", (msg) => {
   req.res.json(data);
}, req);

(static) unlisten(optionsopt, callbackopt)

Description:
  • Stop listening for message, if no callback is provided all listeners for the key will be unsubscribed, otherwise only the specified listener.

    The callback will not be called.

    It keeps a count how many subscribe/unsubscribe calls been made and stops any internal listeners once nobody is subscribed. This is specific to a queue which relies on polling.

Source:
Parameters:
Name Type Attributes Description
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>

(static) unsubscribe(channel, optionsopt, callbackopt)

Description:
  • Close a subscription for the given channel, no more messages will be delivered.

Source:
Parameters:
Name Type Attributes Description
channel string
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>