queue

module:queue

Description:
  • Queue module for jobs, events and subscriptions.

    All methods use options.queueName for non-default queue.

    If it is an array then a client will be picked sequentially by maintaining internal sequence number.

    To specify a channel within a queue use the format queueName#channelName, for drivers that support multiple channels like NATS/Redis the channel will be used for another subscription within the same connection.

    For drivers (NATS) that support multiple consumers the full queue syntax is queueName#channelName@groupName or queueName@groupName, as well as the groupName property in the subscribe options.

    Empty default client always exists, it can be overridden to make default some other driver

    To enable stats collection for a queue it must be enabled with config: queue-redis-options-metrics=1

    The class module:queue.QueueClient defines the methods that a driver may or may not implement.

    The url query parameters that start with bk- will be extracted from the url and placed in the class *options object, this is a way to pass special properties without using cache-options, the rest of the url parameters will be passed to the driver.

Source:
Parameters:
Name Type Description
options object
Properties
Name Type Attributes Description
count int <optional>

config property specifies how messages to process at the same time, default is 1.

interval int <optional>

config property defines in ms how often to check for new messages after processing a message, i.e. after a messages processed it can poll immediately or after this amount of time, default is 1000 milliseconds.

retryInterval int <optional>

config property defines in ms how often to check for new messages after an error or no data, i.e. on empty pool when no messages are processed it can poll immediately or after this amount of time, default is 5000 mulliseconds.

visibilityTimeout int <optional>

property specifies how long the messages being processed stay hidden, in milliseconds.

noVisibilityTimeout boolean <optional>

disables auto extending visibilityTimeout while the job is running, allows default queue visibility timeout to be used instead

startTime int <optional>

property which is the time in the future when a message must be actually processed there, The scheduling is implemented using visibilityTimeout feature, keep scheduled messages hidden until the actual time.

maxTimeout int <optional>

which defines in milliseconds the max time a messsage can stay invisible while waiting for its scheduled date, default is 6 hours, the AWS max is 12 hours.

Example
queue-default=redis://
queue-redis=redis://?bk-threshold=5&bk-visibilityTimeout=30000&bk-count=2
queue-account=sqs://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2
queue-messages=sqs://messages?bk-interval=60000
queue-config={ "account": "https://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2" }

Classes

DBQueueClient
EventBridgeQueueClient
JSONQueueClient
LocalQueueClient
NatsQueueClient
QueueClient
RedisQueueClient
SNSQueueClient
SQSQueueClient
WorkerQueueClient

Members

(static) clients :object

Description:
  • queue live clients by name

Source:

(static) modules :object

Description:
  • queue modules by type

Source:

Methods

(async, static) apublish(channel, msg, optionsopt)

Description:
Source:
Parameters:
Name Type Attributes Description
channel string
msg string | object
options object <optional>
Example
const { err, data } = await queue.apublish("events", { alert, detail });

(async, static) asubmit(msg, optionsopt)

Description:
Source:
Parameters:
Name Type Attributes Description
msg string
options object <optional>
Example
const { err, data } = await queue.asubmit({ alert, detail });

(static) checkConfig()

Description:
  • Initialize missing or new clients, existing clients stay the same

Source:

(static) createClient(options)

Description:
  • Return a new client for the given host or null if not supported

Source:
Parameters:
Name Type Description
options object

(static) drop(msg, optionsopt, callbackopt)

Description:
  • Queue specific message deletion from the queue in case of abnormal shutdown or job running too long in order not to re-run it after the restart, this is for queues which require manual message deletion ofter execution(SQS). Each queue client must maintain the mapping or other means to identify messages, the options is the message passed to the listener

Source:
Parameters:
Name Type Attributes Description
msg string | object
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>

(static) getClient(options) → {QueueClient}

Description:
  • Return a queue client by name if specified in the options or use default client which always exists, use queueName to specify a specific driver. If it is an array it will rotate items sequentially.

Source:
Parameters:
Name Type Description
options object | string | Array.<string>
Properties
Name Type Attributes Description
queueName string <optional>

queue to use

Returns:
Type Description
QueueClient

(static) initClients()

Description:
  • Reinitialize all clients for queue purposes, previous clients will be closed.

Source:

(static) listen(optionsopt, callback)

Description:
  • Listen for messages from the given queue, the callback will be called only on new message received. The callback accepts 2 arguments, a message and optional next callback, if it is provided it must be called at the end to confirm or reject the message processing. Only errors with code>=500 will result in rejection, not all drivers support the next callback if the underlying queue does not support message acknowledgement.

    Depending on the implementation, this can work as fan-out, delivering messages to all subscribed to the same channel or can implement job queue model where only one subscriber receives a message. For some cases like Redis this is the same as subscribe.

    For cases when the next callback is provided this means the queue implementation requires an acknowledgement of successful processing, returning an error with err.status >= 500 will keep the message in the queue to be processed later. Special code 600 means to keep the job in the queue and report as warning in the log.

Source:
Parameters:
Name Type Attributes Description
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function
Example
queue.listen({ queueName: "jobs" }, (msg, next) => {
  req.res.json(data);
  if (next) next();
 }, req);

(static) publish(channel, msg, optionsopt, callbackopt)

Description:
  • Publish an event to the channel to be delivered to all subscribers. If the msg is not a string it will be stringified. The module:events uses this.

Source:
Parameters:
Name Type Attributes Description
channel string
msg string | object
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>
Example
queue.publish("events", { id, name, data }, { queueName: "logs" })

(static) purge(optionsopt, callbackopt)

Description:
  • Purge all messages form the queue

Source:
Parameters:
Name Type Attributes Description
options object <optional>
callback function <optional>

(static) shutdown()

Description:
  • Close all existing clients except empty local client

Source:

(static) stats(optionsopt, callback)

Description:
  • Returns the queue statistics, the format depends on the queue type used

Source:
Parameters:
Name Type Attributes Description
options object <optional>
callback function

(static) submit(msg, callbackopt)

Description:
  • Submit a message to the queue, msg can be a string or an object or an array of objects if the queue supports it. This is for jobs processing, at least one delivery is expected, the module:jobs uses this.

Source:
Parameters:
Name Type Attributes Description
msg object | string
options.queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

options.stime int <optional>

defines when the message should be processed, it will be held in the queue until the time comes

options.etime int <optional>

defines when the message expires, i.e. will be dropped if not executed before this time.

callback function <optional>
Example
queue.submit({ alert, detail }, { queueName: "sqs" })

(static) subscribe(channel, options, callbackopt)

Description:
  • Subscribe to receive messages from the given channel, the callback will be called only on new message received.

Source:
Parameters:
Name Type Attributes Description
channel string
options object
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>
Example
queue.subscribe("alerts", (msg) => {
   req.res.json(data);
}, req);

(static) unlisten(optionsopt, callbackopt)

Description:
  • Stop listening for message, if no callback is provided all listeners for the key will be unsubscribed, otherwise only the specified listener.

    The callback will not be called.

    It keeps a count how many subscribe/unsubscribe calls been made and stops any internal listeners once nobody is subscribed. This is specific to a queue which relies on polling.

Source:
Parameters:
Name Type Attributes Description
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>

(static) unsubscribe(channel, optionsopt, callbackopt)

Description:
  • Close a subscription for the given channel, no more messages will be delivered.

Source:
Parameters:
Name Type Attributes Description
channel string
options object <optional>
Properties
Name Type Attributes Description
queueName string <optional>

defines the queue, if not specified then it is sent to the default queue

callback function <optional>