module:events
- Description:
Event queue processor
This module implement simple event publishing and processing logic, useful for logging events for post-processing by backendjs workers or by other systems using shared queues.
All events will have the same structure:
{ subject: "string", // event subject data: { ... }, // event payload, must be an object id: "string", // auto-generated: lib.uuid() time: bigint, // auto-generated: lib.clock() origin: "string", // auto-generated: app.origin() }Features support:
- publishing and processing: SQS, NATS
- publishing only: EventBridge, SNS
If any of
events-worker-queue-XXXparameters are defined then workers subscribe to configured event queues and listen for events.Each event queue can run multiple functions idependently but will ack/nack for all functions so to deal with replay dups it is advised to split between multiple consumers using the syntax:
queue#channel@consumerMultiple event queues can be defined and processed at the same time.
An event processing function takes 2 arguments, an event and callback to call on finish
- Source:
Examples
Configured below in bkjs.conf: NATS server for events, routing by prefix USER-EVENT: and event processor for user events
queue-events = nats://localhost:4222
events-routing-events#user = ^USER-EVENT:
events-worker-queue-events#user = mymod.syncUserEvents
The module below logs all user events in the queue and defines an event processor function to sync such events with external service: - /user/... endpoints for managing users - mymod.syncUserEvents is an event processor function which is run by a worker process, it can run on a different host
const { app, api, lib, events } = require("backendjs");
module.exports = {
name: "mymod",
configureWeb(options, callback)
{
api.app.post(/^\/user\/(login|update|view)/, this.handleUsers);
callback();
}
handleUsers(req, res) {
... endpoint processing logic, assume req.user contains currently logged in user ...
const event = {
type: req.params[0],
id: req.user.id,
name: req.user.name,
access_time: Date.now()
}
events.putEvent("USER-EVENT:" + req.params[0].toUpperCase(), event);
}
syncUserEvents(event, callback)
{
// .. syncing last access time with external database
lib.fetch("https://some.api.com", { method: "POST", postdata: event }, callback)
}
}
app.start({ server: true });
Start the server
node mymod.js -jobs-workers 1
Methods
(static) putEvent(subject, data, optionsopt) → {int}
Parameters:
| Name | Type | Attributes | Description |
|---|---|---|---|
subject |
string | event subject, topic, ID, ... |
|
data |
object | an object to be placed as the |
|
options |
object |
<optional> |
queue specific properties |
Returns:
| Type | Description |
|---|---|
| int |
|
Example
events.putEvent("USER-LOGIN", { id: ..., name: ... })
events.putEvent("ORDER-SHIPPED", { id: ... })
events.putEvent("social.post.like", { id: ..., liked: ... })