events

module:events

Description:
  • Event queue processor

    This module implement simple event publishing and processing logic, useful for logging events for post-processing by backendjs workers or by other systems using shared queues.

    All events will have the same structure:

    {
       subject: "string",        // event subject
       data: { ... },            // event payload, must be an object
       id: "string",             // auto-generated: lib.uuid()
       time: bigint,             // auto-generated: lib.clock()
       origin: "string",         // auto-generated: app.origin()
    }
    

    Features support:

    • publishing and processing: SQS, NATS
    • publishing only: EventBridge, SNS

    If any of events-worker-queue-XXX parameters are defined then workers subscribe to configured event queues and listen for events.

    Each event queue can run multiple functions idependently but will ack/nack for all functions so to deal with replay dups it is advised to split between multiple consumers using the syntax: queue#channel@consumer

    Multiple event queues can be defined and processed at the same time.

    An event processing function takes 2 arguments, an event and callback to call on finish

Source:
Examples

Configured below in bkjs.conf: NATS server for events, routing by prefix USER-EVENT: and event processor for user events

queue-events = nats://localhost:4222

events-routing-events#user = ^USER-EVENT:

events-worker-queue-events#user = mymod.syncUserEvents

The module below logs all user events in the queue and defines an event processor function to sync such events with external service: - /user/... endpoints for managing users - mymod.syncUserEvents is an event processor function which is run by a worker process, it can run on a different host

const { app, api, lib, events } = require("backendjs");

module.exports = {
    name: "mymod",

    configureWeb(options, callback)
    {
        api.app.post(/^\/user\/(login|update|view)/, this.handleUsers);

        callback();
    }

    handleUsers(req, res) {

       ... endpoint processing logic, assume req.user contains currently logged in user ...

        const event = {
            type: req.params[0],
            id: req.user.id,
            name: req.user.name,
            access_time: Date.now()
        }
        events.putEvent("USER-EVENT:" + req.params[0].toUpperCase(), event);
    }

    syncUserEvents(event, callback)
    {
        // .. syncing last access time with external database

        lib.fetch("https://some.api.com", { method: "POST", postdata: event }, callback)
    }
}

app.start({ server: true });

Start the server

node mymod.js -jobs-workers 1

Methods

(static) putEvent(subject, data, optionsopt) → {int}

Description:
  • Place an event into a queue by subject and type

Source:
Parameters:
Name Type Attributes Description
subject string

event subject, topic, ID, ...

data object

an object to be placed as the data property

options object <optional>

queue specific properties

Returns:
Type Description
int
  • number of queues the event was sent to, -1 on error
Example
events.putEvent("USER-LOGIN", { id: ..., name: ... })
events.putEvent("ORDER-SHIPPED", { id: ... })
events.putEvent("social.post.like", { id: ..., liked: ... })