module:queue
- Description:
Queue module for jobs, events and subscriptions.
All methods use options.queueName for non-default queue.
If it is an array then a client will be picked sequentially by maintaining internal sequence number.
To specify a channel within a queue use the format queueName#channelName, for drivers that support multiple channels like NATS/Redis the channel will be used for another subscription within the same connection.
For drivers (NATS) that support multiple consumers the full queue syntax is queueName#channelName@groupName or queueName@groupName, as well as the groupName property in the subscribe options.
Empty default client always exists, it can be overridden to make default some other driver
To enable stats collection for a queue it must be enabled with config: queue-redis-options-metrics=1
The class module:queue.QueueClient defines the methods that a driver may or may not implement.
The url query parameters that start with bk- will be extracted from the url and placed in the class *options object, this is a way to pass special properties without using cache-options, the rest of the url parameters will be passed to the driver.
- Source:
Example
queue-default=redis://
queue-redis=redis://?bk-threshold=5&bk-visibilityTimeout=30000&bk-count=2
queue-account=sqs://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2
queue-messages=sqs://messages?bk-interval=60000
queue-config={ "account": "https://sqs.us-east-1.amazonaws.com/566932855105/account?bk-count=2" }
Classes
- EventBridgeClient
- JSONClient
- LocalClient
- NatsClient
- QueueClient
- RedisClient
- SNSClient
- SQSClient
- WorkerClient
Members
(static) clients :object
(static) modules :object
Methods
(async, static) apublish(channel, msg, optionsopt)
- Description:
Async version of module:queue.publish
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
channel |
string | ||
msg |
string | object | ||
options |
object |
<optional> |
Example
const { err, data } = await queue.apublish("events", { alert, detail });
(async, static) asubmit(msg, optionsopt)
- Description:
Async version of module:queue.submit
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
msg |
string | ||
options |
object |
<optional> |
Example
const { err, data } = await queue.asubmit({ alert, detail });
(static) checkConfig()
- Description:
Initialize missing or new clients, existing clients stay the same
- Source:
(static) createClient(options)
- Description:
Return a new client for the given host or null if not supported
- Source:
Parameters:
| Name | Type | Description |
|---|---|---|
options |
object |
(static) drop(msg, optionsopt, callbackopt)
- Description:
Queue specific message deletion from the queue in case of abnormal shutdown or job running too long in order not to re-run it after the restart, this is for queues which require manual message deletion ofter execution(SQS). Each queue client must maintain the mapping or other means to identify messages, the options is the message passed to the listener
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
msg |
string | object | ||||||||||
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
<optional> |
(static) getClient(options) → {QueueClient}
- Description:
Return a queue client by name if specified in the options or use default client which always exists, use queueName to specify a specific driver. If it is an array it will rotate items sequentially.
- Source:
Parameters:
| Name | Type | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
options |
object | string | Array.<string> |
Properties
|
Returns:
| Type | Description |
|---|---|
| QueueClient |
(static) initClients()
- Description:
Reinitialize all clients for queue purposes, previous clients will be closed.
- Source:
(static) listen(optionsopt, callback)
- Description:
Listen for messages from the given queue, the callback will be called only on new message received. The callback accepts 2 arguments, a message and optional next callback, if it is provided it must be called at the end to confirm or reject the message processing. Only errors with code>=500 will result in rejection, not all drivers support the next callback if the underlying queue does not support message acknowledgement.
Depending on the implementation, this can work as fan-out, delivering messages to all subscribed to the same channel or can implement job queue model where only one subscriber receives a message. For some cases like Redis this is the same as subscribe.
For cases when the next callback is provided this means the queue implementation requires an acknowledgement of successful processing, returning an error with err.status >= 500 will keep the message in the queue to be processed later. Special code 600 means to keep the job in the queue and report as warning in the log.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
Example
queue.listen({ queueName: "jobs" }, (msg, next) => {
req.res.json(data);
if (next) next();
}, req);
(static) monitor(optionsopt)
- Description:
Queue specific monitor services that must be run in the master process, this is intended to perform queue cleanup or dealing with stuck messages (Redis)
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
options |
object |
<optional> |
(static) publish(channel, msg, optionsopt, callbackopt)
- Description:
Publish an event to the channel to be delivered to all subscribers. If the msg is not a string it will be stringified. The module:events uses this.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
channel |
string | ||||||||||
msg |
string | object | ||||||||||
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
<optional> |
Example
queue.publish("events", { id, name, data }, { queueName: "logs" })
(static) shutdown()
(static) stats(optionsopt, callback)
- Description:
Returns the queue statistics, the format depends on the queue type used
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
options |
object |
<optional> |
|
callback |
function |
(static) submit(msg, callbackopt)
- Description:
Submit a message to the queue, msg can be a string or an object or an array of objects if the queue supports it. This is for jobs processing, at least one delivery is expected, the module:jobs uses this.
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
msg |
object | string | ||
options.queueName |
string |
<optional> |
defines the queue, if not specified then it is sent to the default queue |
options.stime |
int |
<optional> |
defines when the message should be processed, it will be held in the queue until the time comes |
options.etime |
int |
<optional> |
defines when the message expires, i.e. will be dropped if not executed before this time. |
callback |
function |
<optional> |
Example
queue.submit({ alert, detail }, { queueName: "sqs" })
(static) subscribe(channel, options, callbackopt)
- Description:
Subscribe to receive messages from the given channel, the callback will be called only on new message received.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
channel |
string | ||||||||||
options |
object |
Properties
|
|||||||||
callback |
function |
<optional> |
Example
queue.subscribe("alerts", (msg) => {
req.res.json(data);
}, req);
(static) unlisten(optionsopt, callbackopt)
- Description:
Stop listening for message, if no callback is provided all listeners for the key will be unsubscribed, otherwise only the specified listener.
The callback will not be called.
It keeps a count how many subscribe/unsubscribe calls been made and stops any internal listeners once nobody is subscribed. This is specific to a queue which relies on polling.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
<optional> |
(static) unsubscribe(channel, optionsopt, callbackopt)
- Description:
Close a subscription for the given channel, no more messages will be delivered.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
channel |
string | ||||||||||
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
<optional> |