module:queue
- Description:
Queue module for jobs, events and pub/sub.
The class module:queue.QueueClient defines the methods that a driver can implement.
All methods below use options.queueName for a specific queue.
If it is an array then a client will be picked sequentially by maintaining internal sequence number.
Empty default client always exists, it can be overridden to make default some other driver using
queue-defaultconfig parameter.The url query parameters that start with bk- will be extracted from the url and placed in the class's *options object, this is a way to pass special properties without using queue-options, the rest of the url parameters will be passed to the driver.
Job queues are designed by using SQS visibility timeout logic.
- Source:
Parameters:
| Name | Type | Description | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
options |
object |
Properties
|
Example
queue-default=redis://
queue-redis=redis://?bk-visibilityTimeout=30000&bk-queueCount=2
queue-account=sqs://sqs.us-east-1.amazonaws.com/566932855105/account?bk-queueCount=2
queue-messages=sqs://messages?bk-pollInterval=60000
queue-config={ "account": "https://sqs.us-east-1.amazonaws.com/566932855105/account?bk-queueCount=2" }
Classes
- DBQueueClient
- EventBridgeQueueClient
- JSONQueueClient
- LocalQueueClient
- NatsQueueClient
- QueueClient
- RedisQueueClient
- SNSQueueClient
- SQSQueueClient
- WorkerQueueClient
Members
(static) clients :object
(static) modules :object
Methods
(async, static) apublish(subject, msg, optionsopt)
- Description:
Async version of module:queue.publish
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
subject |
string | ||
msg |
string | object | ||
options |
object |
<optional> |
Example
const { err, data } = await queue.apublish("events", { alert, detail });
(async, static) asubmit(msg, optionsopt)
- Description:
Async version of module:queue.submit
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
msg |
string | ||
options |
object |
<optional> |
Example
const { err, data } = await queue.asubmit({ alert, detail });
(static) checkConfig()
- Description:
Initialize missing or new clients, existing clients stay the same
- Source:
(static) createClient(options)
- Description:
Return a new client for the given url or null if not supported
- Source:
Parameters:
| Name | Type | Description |
|---|---|---|
options |
object |
(static) getClient(options) → {QueueClient}
- Description:
Return a queue client by name if specified in the options or use default client which always exists, use queueName to specify a specific driver. Ignores subject (@) and group(#) if present. If it is an array it will rotate items sequentially.
- Source:
Parameters:
| Name | Type | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
options |
object | string | Array.<string> |
Properties
|
Returns:
| Type | Description |
|---|---|
| QueueClient |
(static) initClients()
- Description:
Reinitialize all clients for queue purposes, previous clients will be closed.
- Source:
(static) listen(optionsopt, callback)
- Description:
Listen for messages from the given queue, the callback will be called only on new message received. The callback accepts 2 arguments, a message and optional next callback, if it is provided it must be called at the end to confirm or reject the message processing. Only errors with code>=500 will result in rejection, not all drivers support the next callback if the underlying queue does not support message acknowledgement.
Depending on the implementation, this can work as fan-out, delivering messages to all subscribed to the same channel or can implement job queue model where only one subscriber receives a message.
For cases when the next callback is provided this means the queue implementation requires an acknowledgement of successful processing, returning an error with err.status >= 500 will keep the message in the queue to be processed later. Special code 600 means to keep the job in the queue and report as warning in the log.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
Example
queue.listen({ queueName: "jobs" }, (msg, next) => {
req.res.json(data);
if (next) next();
}, req);
(static) publish(subject, msg, optionsopt, callbackopt)
- Description:
Publish an event with the subject to be delivered to all subscribers. If the msg is not a string it will be stringified. The module:events uses this.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
subject |
string | ||||||||||
msg |
string | object | ||||||||||
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
<optional> |
Example
queue.publish("events", { id, name, data }, { queueName: "logs" })
(static) purge(optionsopt, callbackopt)
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
options |
object |
<optional> |
|
callback |
function |
<optional> |
(async, static) queue.apurge()
(async, static) queue.astats()
(static) shutdown()
(static) stats(optionsopt, callback)
- Description:
Returns the queue statistics, the format depends on the queue type used
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
options |
object |
<optional> |
|
callback |
function |
(static) submit(msg, callbackopt)
- Description:
Submit a message to the queue, msg can be a string or an object or an array of objects if the queue supports it.
- For job processing at least one delivery is expected, the module:jobs uses this.
- For events it depends on the queue being used, NATS supports multiple consumers usng the same queue.
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
msg |
object | string | ||
options.queueName |
string |
<optional> |
defines the queue, if not specified then it is sent to the default queue |
callback |
function |
<optional> |
Example
queue.submit({ alert, detail }, { queueName: "sqs" }, lib.log)
(static) subscribe(subject, options, callbackopt)
- Description:
Subscribe to receive messages for the given subject, the callback will be called only on new message received.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
subject |
string | ||||||||||
options |
object |
Properties
|
|||||||||
callback |
function |
<optional> |
Example
queue.subscribe("alerts", (msg) => {
req.res.json(data);
}, req);
(static) unlisten(optionsopt, callbackopt)
- Description:
Stop listening for message, if no callback is provided all listeners for the key will be unsubscribed, otherwise only the specified listener.
The callback will not be called.
It keeps a count how many subscribe/unsubscribe calls been made and stops any internal listeners once nobody is subscribed. This is specific to a queue which relies on polling.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
<optional> |
(static) unsubscribe(subject, optionsopt, callbackopt)
- Description:
Close a subscription for the given subject, no more messages will be delivered.
- Source:
Parameters:
| Name | Type | Attributes | Description | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
subject |
string | ||||||||||
options |
object |
<optional> |
Properties
|
||||||||
callback |
function |
<optional> |