events

module:events

Description:
  • Event queue processor

    This module implement simple event publishing and processing logic, useful for logging events for post-processing by backendjs workers or by other systems using shared queues.

    All events will have the same structure:

    {
       subject: "string",        // event subject
       data: { ... },            // event payload, must be an object
       id: "string",             // unique event id, auto-generated: lib.uuid()
       time: bigint,             // auto-generated: lib.clock()
       origin: "string",         // auto-generated: app.origin()
       sent: "string",           // sent queue name as queueName[@subject][#groupName]
       received: "string",       // revceived queue name as queueName[@subject][#groupName]
       seq: "int",               // sequence per queue: 1...N
    }
    

    Features support:

    • publishing and processing: SQS, NATS
    • publishing only: EventBridge, SNS

    If any of events-worker-queue-XXX parameters are defined then workers subscribe to configured event queues and listen for events.

    Drivers like NATS support multiple consumers in the same queue using subject/group syntax:

    • queueName.subject
    • queueName#groupName
    • queueName.subject#groupName

    The options.groupName property can be used as a group as well when passsed to listen.

    Multiple event queues can be defined and processed at the same time.

    An event processing function takes 2 arguments, an event and callback to call on finish

Source:
Examples

Create a stream

nats stream add --subjects 'events,events@*' --defaults events

Configured below in bkjs.conf: NATS server for events, routing by prefixes COMPANY-EVENT: or USER-EVENT: and event processor for corresponding events

queue-events = nats://

events-routing = events@user:^EVENT.USER
events-worker-queue = events@user: mymod.syncUserEvents, events@user#log: mymod.logUserEvents

events-routing = events@company:^EVENT.COMPANY
events-worker-queue = events@company: mymod.syncCompanyEvents

The module below logs all user events in the queue and defines an event processor function to sync such events with external service: - /user/... endpoints for managing users - mymod.syncUserEvents is an event processor function which is run by a worker process, it can run on a different host

const { app, api, lib, events } = require("backendjs");

module.exports = {
    name: "mymod",

    configureWeb(options, callback)
    {
        api.app.post(/^\/user\/(login|update|view)/, this.handleUsers);

        callback();
    }

    handleUsers(req, res) {

       ... endpoint processing logic, assume req.user contains currently logged in user ...

        const event = {
            type: req.params[0],
            id: req.user.id,
            name: req.user.name,
            access_time: Date.now()
        }
        events.putEvent("EVENT.USER." + req.params[0].toUpperCase(), event);
    }

    syncUserEvents(event, callback)
    {
       ...
    }

    logUserEvents(event, callback)
    {
        ...
    }
}

app.start({ server: true });

Start the server

node mymod.js -jobs-workers 1

Methods

(async, static) aputEvent(subject, data, optionsopt) → {object}

Description:
Source:
Parameters:
Name Type Attributes Description
subject string

event subject, topic, ID, ...

data object

an object to be placed as the data property

options object <optional>

queue specific properties

Returns:
Type Description
object
  • { err, data }
Example
const { err, data } = await events.aputEvent("USER-LOGIN", { id: ..., name: ... })
console.log("Sent to:", data.map(x => x.event))

(static) putEvent(subject, data, optionsopt, callbackopt)

Description:
  • Place an event into a queue by subject and type

Source:
Parameters:
Name Type Attributes Description
subject string

event subject, topic, ID, ...

data object | Array

an object to be placed as the data property

options object <optional>

queue specific properties

callback function <optional>

(err, data) - where data is a list of objects with event, error status and options sent to each queue: { err, event, options }, it is empty if nothing was sent.

Example
events.putEvent("USER-LOGIN", { id: ..., name: ... })

events.putEvent("ORDER-SHIPPED", { id: ... })

events.putEvent("social.post.like", { id: ..., liked: ... }, (err, data) => {
    if (!err) {
        console.log("Sent:", data?.filter(x => !x.err))
        console.log("Errors:", data?.filter(x => x.err))
    }
})