module:events
- Description:
Event queue processor
This module implement simple event publishing and processing logic, useful for logging events for post-processing by backendjs workers or by other systems using shared queues.
All events will have the same structure:
{ subject: "string", // event subject data: { ... }, // event payload, must be an object id: "string", // unique event id, auto-generated: lib.uuid() time: bigint, // auto-generated: lib.clock() origin: "string", // auto-generated: app.origin() sent: "string", // sent queue name as queueName[@subject][#groupName] received: "string", // revceived queue name as queueName[@subject][#groupName] seq: "int", // sequence per queue: 1...N }Features support:
- publishing and processing: SQS, NATS
- publishing only: EventBridge, SNS
If any of
events-worker-queue-XXXparameters are defined then workers subscribe to configured event queues and listen for events.Drivers like NATS support multiple consumers in the same queue using subject/group syntax:
queueName.subjectqueueName#groupNamequeueName.subject#groupName
The
options.groupNameproperty can be used as a group as well when passsed to listen.Multiple event queues can be defined and processed at the same time.
An event processing function takes 2 arguments, an event and callback to call on finish
- Source:
Examples
Create a stream
nats stream add --subjects 'events,events@*' --defaults events
Configured below in bkjs.conf: NATS server for events, routing by prefixes COMPANY-EVENT: or USER-EVENT: and event processor for corresponding events
queue-events = nats://
events-routing = events@user:^EVENT.USER
events-worker-queue = events@user: mymod.syncUserEvents, events@user#log: mymod.logUserEvents
events-routing = events@company:^EVENT.COMPANY
events-worker-queue = events@company: mymod.syncCompanyEvents
The module below logs all user events in the queue and defines an event processor function to sync such events with external service: - /user/... endpoints for managing users - mymod.syncUserEvents is an event processor function which is run by a worker process, it can run on a different host
const { app, api, lib, events } = require("backendjs");
module.exports = {
name: "mymod",
configureWeb(options, callback)
{
api.app.post(/^\/user\/(login|update|view)/, this.handleUsers);
callback();
}
handleUsers(req, res) {
... endpoint processing logic, assume req.user contains currently logged in user ...
const event = {
type: req.params[0],
id: req.user.id,
name: req.user.name,
access_time: Date.now()
}
events.putEvent("EVENT.USER." + req.params[0].toUpperCase(), event);
}
syncUserEvents(event, callback)
{
...
}
logUserEvents(event, callback)
{
...
}
}
app.start({ server: true });
Start the server
node mymod.js -jobs-workers 1
Methods
(async, static) aputEvent(subject, data, optionsopt) → {object}
- Description:
Async version of module:events.putEvent
- Source:
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
subject |
string | event subject, topic, ID, ... |
|
data |
object | an object to be placed as the |
|
options |
object |
<optional> |
queue specific properties |
Returns:
| Type | Description |
|---|---|
| object |
|
Example
const { err, data } = await events.aputEvent("USER-LOGIN", { id: ..., name: ... })
console.log("Sent to:", data.map(x => x.event))
(static) putEvent(subject, data, optionsopt, callbackopt)
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
subject |
string | event subject, topic, ID, ... |
|
data |
object | Array | an object to be placed as the |
|
options |
object |
<optional> |
queue specific properties |
callback |
function |
<optional> |
(err, data) - where data is a list of objects with event, error status and options sent to each queue: { err, event, options }, it is empty if nothing was sent. |
Example
events.putEvent("USER-LOGIN", { id: ..., name: ... })
events.putEvent("ORDER-SHIPPED", { id: ... })
events.putEvent("social.post.like", { id: ..., liked: ... }, (err, data) => {
if (!err) {
console.log("Sent:", data?.filter(x => !x.err))
console.log("Errors:", data?.filter(x => x.err))
}
})