/*
* Author: Vlad Seryakov vseryakov@gmail.com
* backendjs 2018
*/
const path = require('path');
const stream = require('stream');
const fs = require('fs');
const domain = require('domain');
const qs = require("qs");
const http = require('http');
const https = require('https');
const app = require(__dirname + '/app');
const lib = require(__dirname + '/lib');
const ipc = require(__dirname + '/ipc');
const metrics = require(__dirname + '/metrics');
const logger = require(__dirname + '/logger');
/**
* @module api
*/
const api =
/**
* HTTP API to the server from the clients, this module implements the basic HTTP(S) API functionality with some common features. The API module
* incorporates the Express server which is exposed as __api.app__ object, the server spawns Web workers to perform actual operations, monitors
* the worker processes if they die and restart them automatically.
*
* How many processes to spawn can be configured via __-server-max-workers__ config parameter.
*
* When an HTTP request arrives it goes over Express middleware, but before processing any registered routes there are several steps performed:
* - the __req__ object which is by convention is a Request object, assigned with common backend properties to be used later:
* - user - an empty object which will be filled after by signature verification method, if successful, properties from the __bk_user__ table will be set
* - options - an object with internal state and control parameters. Every request always has an options object attached very
* early with some properties always present:
* - ip - cached IP address
* - host - cached host header from the request
* - path - parsed request url path
* - apath - an array with the path split by /
* - secure - if the request is encrypted, like https
* - appTimezone - milliseconds offset from the UTC provided in the header by the app
* - access verification, can the request be satisfied without proper signature, i.e. is this a public request
* - autherization, check the signature and other global or user specific checks
* - when a API route found by the request url, it is called as any regular Express middlware
* - if there are registered pre processing callback they will be called during access or autherization phases
* - if inside the route a response was returned using __api.sendJSON__ method, registered post process callbacks will be called for such response
*
* Every request has the __trace__ property, either fake or X-Ray depending on the config, see metrics for usage
*
* ### Health enquiry
*
* When running with AWS load balancer there should be a url that a load balancer polls all the time and this must be very quick and lightweight request. For this
* purpose there is an API endpoint __/ping__ that just responds with status 200. It is open by default in the default __api-allow-path__ config parameter.
*
*/
module.exports = {
name: "api",
/**
* @var {ConfigOptions[]} args
*/
args: [
{ name: "err-(.+)", descr: "Error messages for various cases" },
{ name: "cap-(.+)", type: "int", strip: "cap-", descr: "Capability parameters" },
{ name: "max-request-queue", type: "number", min: 0, descr: "Max number of requests in the processing queue, if exceeds this value server returns too busy error" },
{ name: "timeout", type: "number", min: 0, max: 3600000, descr: "HTTP request idle timeout for servers in ms, how long to keep the connection socket open, this does not affect Long Poll requests" },
{ name: "keep-alive-timeout", type: "int", descr: "Number of milliseconds to keep the HTTP conection alive" },
{ name: "request-timeout", type: "int", min: 0, descr: "Number of milliseconds to receive the entire request from the client" },
{ name: "max-requests-per-socket", type: "int", min: 0, descr: "The maximum number of requests a socket can handle before closing keep alive connection" },
{ name: "port", type: "number", min: 0, descr: "port to listen for the HTTP server, this is global default" },
{ name: "bind", descr: "Bind to this address only, if not specified listen on all interfaces" },
{ name: "backlog", type: "int", descr: "The maximum length of the queue of pending connections, used by HTTP server in listen." },
{ name: "reuse-port", type: "bool", descr: "Allow multiple sockets on the same host to bind to the same port" },
{ name: "ssl", type: "map", obj: 'ssl', merge: 1, descr: "SSL params: port, bind, key, cert, pfx, ca, passphrase, crl, ciphers" },
{ name: "accesslog-disable", obj: "accesslog", type: "bool", descr: "Disable access logging in both file or syslog" },
{ name: "accesslog-file", obj: "accesslog", descr: "File for access logging" },
{ name: "accesslog-level", obj: "accesslog", type: "int", descr: "Syslog level priority, default is local5.info, 21 * 8 + 6" },
{ name: "accesslog-fields", obj: "accesslog", array: 1, type: "list", descr: "Additional fields from the request or user to put in the access log, prefix defines where the field is lcoated: q: - query, h: - headers, u: - user otherwise from the request", example: "-api-log-fields h:Referer,u:name,q:action" },
{ name: "errlog-max", obj: "errlog", type: "int", descr: "How many error messages to put in the log before throttling kicks in" },
{ name: "errlog-interval", obj: "errlog", type: "int", descr: "Interval for error log limiter, max errors per this interval" },
{ name: "errlog-ignore", obj: "errlog", type: "regexpobj", descr: "Do not show errors that match the regexp" },
{ name: "errlog-codes", obj: "errlog", type: "regexpobj", descr: "Error codes in exceptions to return in the response to the user, if not matched the errlog.message will be returned" },
{ name: "qs-options-(.+)", autotype: 1, obj: "qsOptions", strip: "qs-options-", nocamel: 1, descr: "Options to pass to qs when parsing the body: depth, arrayLimit, allowDots, comma, plainObjects, allowPrototypes, parseArrays" },
{ name: "query-token-secret", descr: "Name of the property to be used for encrypting tokens for pagination or other sensitive data, any property from bk_user can be used, if empty no secret is used, if not a valid property then it is used as the secret" },
{ name: "access-token-secret", descr: "A generic secret to be used for API access or signatures" },
{ name: "allow-configure-(web|middleware)", type: "regexp", descr: "Modules allowed to call configureWeb or Middleware, i.e. only allowed endpoints" },
{ name: "express-options", type: "json", obj: "express-options", merge: 1, logger: "warn", descr: "Set Express config options during initialization", example: '-api-express-options { "trust proxy": 1, "strict routing": true }' },
{ name: "body-methods", type: "list", upper: 1, descr: "HTTP methods allowed to have body" },
{ name: "body-types", type: "regexpobj", descr: "Collect full request body in the req.body property for the given MIME types in addition to default json/form posts, this is for custom body processing" },
{ name: "body-raw", type: "regexpobj", descr: "Do not parse the collected body for the following MIME content types, keep it as a string" },
{ name: "body-multipart", type: "regexpobj", descr: "URLs that expect multipart/form-data payloads, parsing will happend after the signature processed" },
{ name: "cors-origin", descr: "Origin header for CORS requests" },
{ name: "cors-allow", type: "regexpobj", descr: "Enable CORS requests if a request host/path matches the given regexp" },
{ name: "tz-header", descr: "Name for the timezone offset header a client can send for time sensitive requests, the backend decides how to treat this offset" },
{ name: "server-header", descr: "Custom Server: header to return for all requests" },
{ name: "rlimits-([a-z]+)$", obj: "rlimits", make: "$1", autotype: 1, descr: "Default rate limiter parameters, default interval is 1s, `ttl` is to expire old cache entries, message for error" },
{ name: "rlimits-(rate|max|interval|ttl|ip|delay|multiplier|queue)-(.+)", autotype: 1, obj: "rlimitsMap.$2", make: "$1", descr: "Rate limiter parameters by type for Token Bucket algorithm. `queue` to use specific queue, ttl` is to expire cache entries, `ip` is to limit by IP address as well", example: "api-rlimits-ip-ip=10\napi-rlimits-rate-/path=1\napi-rlimits-rate-GET/path=1" },
{ name: "rlimits-map-(.+)", type: "map", obj: "rlimitsMap.$1", merge: 1, descr: "Rate limiter parameters for Token Bucket algorithm. set all at once", example: "api-rlimits-map-/url=rate:1,interval:2000\napi-rlimits-map-GET/url=rate:10" },
{ name: "(query|header|upload)-limit", type: "number", descr: "Max size for query/headers/uploads, bytes" },
{ name: "(files|fields)-limit", type: "number", descr: "Max number of files or fields in uploads" },
{ name: "limiter-cache", descr: "Name of a cache for API rate limiting" },
{ name: "response-headers", type: "regexpmap", json: 1, descr: "An JSON object with list of regexps to match against the location and set response headers defined as a ist of pairs name, value...", example: 'api-response-headers={ "^/": ["x-frame-options","sameorigin","x-xss-protection","1; mode=block"] }' },
{ name: "cleanup-rules-(.+)", obj: "cleanupRules.$1", type: "map", merge: 1, nocamel: 1, descr: "Rules for the cleanupResult per table, ex. api-cleanup-rules-bk_user=email:0,phone:1" },
{ name: "cleanup-strict", type: "bool", descr: "Default mode for cleanup results" },
{ name: "request-cleanup", type: "list", array: 1, descr: "List of fields to explicitely cleanup on request end" },
{ name: "query-defaults-([a-z0-9_]+)-(.+)", obj: "queryDefaults.$2", make: "$1", autotype: 1, descr: "Global query defaults for getQuery, can be path specific", example: "-api-query-defaults-max-name 128 -api-query-defaults-max-/endpoint-name 255" },
{ name: "delays-(.+)", type: "int", obj: "delays", nocamel: 1, descr: "Delays in ms by status and code, useful for delaying error responses to slow down brute force attacks", example: "-api-delays-401 1000 -api-delays-403:DENY -1" },
{ name: "restart-hours", type: "list", datatype: "int", descr: "List of hours when to restart api workers, only done once for each hour" },
{ name: "trace-options", type: "map", obj: "trace-options", merge: 1, descr: "Options for tracing, host where to send if not local, path:regexp for URLs to be traced, interval:Interval in ms how often to trace requests, must be > 0 to enable tracing" },
{ name: "exit-on-error", type: "bool", descr: "Exit on uncaught exception in the route handler" },
{ name: "restart", descr: "On address in use error condition restart the specified servers, this assumes an external monitor like monit to handle restarts" },
{ name: "proxy-(.+)", obj: "proxy", type: "regexp", make: "$1", nocamel: 1, descr: "Proxy matched requests by path to given host" },
],
/** @var {int} port - HTTP port to listen to for Express app, 8000 default */
port: process.env.BKJS_PORT || 8000,
/** @var {string} bind - listen on the specified local interfcae, 0.0.0.0 is default */
bind: '0.0.0.0',
backlog: 5000,
ssl: { port: 443, bind: '0.0.0.0' },
// Rate limits
rlimitsMap: {},
rlimits: {
ttl: 86400000,
message: "Access limit reached, please try again later in %s.",
},
delays: {},
responseHeaders: [],
traceOptions: {},
expressOptions: {},
bodyMethods: ["POST", "PUT", "PATCH"],
// All listening servers
servers: [],
// Incoming data limits, bytes
filesLimit: 10,
fieldsLimit: 100,
uploadLimit: 10*1024*1024,
queryLimit: 16*1024,
headerLimit: 16*1024,
// Connection timeouts
timeout: 30000,
keepAliveTimeout: 61000,
requestTimeout: 0,
qsOptions: {
plainObjects: true
},
tzHeader: "bk-tz",
accessTokenSecret: "",
corsAllow: null,
corsOrigin: "*",
corsCredentials: true,
corsMethods: ['OPTIONS', 'HEAD', 'GET', 'POST', 'PUT', 'DELETE'],
// Properties to be cleaned up on finish
requestCleanup: ["options", "user", "signature", "body", "raw_body", "trace"],
cleanupRules: {},
restart: "server,web,process",
// Metrics and stats
metrics: {
req: new metrics.Timer(),
que: new metrics.Histogram(),
running: 0,
busy_count: 0,
large_count: 0,
bad_count: 0,
err_count: 0,
},
maxRequestQueue: 0,
limiterCache: "local",
accesslog: {
fields: [],
level: 174,
},
// Error reporter throttle
errlog: {
max: 100,
interval: 30000,
ignore: lib.toRegexpObj(null, [ "Range Not Satisfiable", "Precondition Failed" ]),
},
// getQuery global defaults, pased as data
queryDefaults: {
"*": {
maxlist: 255,
},
"*.json": {
max: 512,
},
"*.token": {
max: 1024,
},
"*.string": {
max: 255,
},
"*.text": {
max: 255,
}
},
errInternalError: "Internal error occurred, please try again later",
errTooLarge: "Unable to process the request, it is too large",
};
/**
* Initialize API layer, this must be called before the `api` module can be used but it is called by the server module automatically so `api.init` is
* rearely need to called directly, only for new server implementation or if using in the shell for testing.
*
* During the init sequence, this function calls `configureMiddleware` and `configureWeb` methods of all modules.
*
* The api uses its own request parser that places query parameters into `req.query` or `req.body` depending on the method.
*
* For GET method, `req.query` contains all url-encoded parameters, for POST method `req.body`
* contains url-encoded parameters or parsed JSON payload or multipart payload.
*
* @memberof module:api
* @method init
*/
api.init = function(options, callback)
{
if (typeof options == "function") callback = options, options = null;
if (!options) options = {};
// Shutdown signal from the server process
if (app.isWorker) {
ipc.on("api:restart", () => {
api.shutdown(() => { process.exit(0) });
});
}
// These will not be used outside of this call
this.express = require('express');
this.app = this.express();
// Acccess logging, always goes into api.accessLog, it must be a stream
if (!this.accesslog.disable) {
this.configureAccessLog();
this.app.use(this.handleAccessLog.bind(this));
}
// Early request setup and checks
this.app.use(this.startServerRequest.bind(this));
// Proxies must be setup early before to keep all data in the stream
if (!lib.isEmpty(this.proxy)) {
api.app.use((req, res, next) => {
if (api.checkProxy("web", req, res)) return;
next();
});
}
this.app.use(this.handleHeaders.bind(this));
// Metrics starts early, always enabled
this.app.use(this.startMetrics.bind(this));
// Default parsers
this.app.use(this.handleBody.bind(this));
// Config options for Express
for (const p in this.expressOptions) {
this.app.set(p, this.expressOptions[p]);
}
lib.series([
function(next) {
// Assign custom middleware including security, if the signature is disabled then the middleware
// handler may install some other authentication module and in such case
// must setup `req.user` with the current user record
app.runMethods("configureMiddleware", options, { allow: api.allowConfigureMiddleware }, next);
},
function(next) {
// Parse multipart payload, depends on formidable availability, disable if not found
_formidable = lib.tryRequire('formidable', { logger: "debug", message: "multipart uploads are disabled" });
if (_formidable) {
api.app.use((req, res, next) => {
if (!req.is('multipart/form-data')) return next("route");
if (!lib.testRegexpObj(req.options.path, api.bodyMultipart)) return next("route");
api.handleMultipart(req, res, (err) => (next(err || "route")));
});
}
// Setup routes from the loaded modules
app.runMethods("configureWeb", options, { allow: api.allowConfigureWeb }, next);
},
function(next) {
// For health checks
api.app.get("/ping", (req, res) => {
api.sendStatus(res, { contentType: "text/plain" });
});
// Static paths and templating setup
app.runMethods("configureStaticWeb", options, { allow: api.allowConfigureMiddleware }, next);
},
function(next) {
// Default error handler to show errors in the log, throttle the output to keep the log from overflow
if (api.errlogLimiterMax && api.errlogLimiterInterval) {
api.errlogLimiterToken = new metrics.TokenBucket(api.errlogLimiterMax, 0, api.errlogLimiterInterval);
}
// The last route is to return an error
api.app.use((err, req, res, next) => {
api.sendReply(res, err);
});
api.configureWebServers();
// Notify the server about new worker server
ipc.sendMsg("api:ready", { id: app.workerId || process.pid, port: api.port, ready: true });
// Performs graceful web worker restart
api._restartInterval = setInterval(() => {
if (lib.isFlag(api.restartHours, new Date().getHours())) {
logger.info('restarting web workers:', api.restartHours);
ipc.sendMsg("api:restart");
}
}, 3600000);
next();
},
], callback, true);
}
/**
* Gracefully close all connections, call the callback after that
* @memberof module:api
* @method shutdown
*/
var _exiting;
api.shutdown = function(options, callback)
{
if (_exiting) return lib.tryCall(callback);
_exiting = true;
logger.debug("shutdown:", app.name, app.role);
clearInterval(api._restartInterval);
delete api._restartInterval;
api.metrics.req.end();
// Make workers not ready during the shutdown
ipc.sendMsg("api:shutdown", { id: app.workerId || process.pid, pid: process.pid, port: api.port });
app.runMethods("shutdownServer", { direct: 1, parallel: 1 }, () => {
delete api.app;
_exiting = false;
lib.tryCall(callback);
});
}
api.shutdownServer = function(options, callback)
{
lib.forEach([ "server", "sslServer" ], (name, next) => {
if (!api[name]) return next();
var err, server = api[name];
delete api[name];
try {
server.close();
server.closeAllConnections();
server.closeIdleConnections();
} catch (e) {
err = e
}
logger.log("shutdownServer:", api.name, app.role, name, "closed", err);
next();
}, callback, true);
}
api.configureWebServers = function(options, callback)
{
// Start http server
if (this.port) {
api.server = this.createWebServer({
name: "http",
port: this.port,
bind: this.bind,
restart: this.restart,
timeout: this.timeout,
keepAliveTimeout: this.keepAliveTimeout,
requestTimeout: this.requestTimeout,
maxRequestsPerSocket: this.maxRequestsPerSocket,
maxHeaderSize: this.headerLimit,
reusePort: this.reusePort,
}, this.handleServerRequest);
}
// Start SSL server
if (this.ssl?.port && (this.ssl.key || this.ssl.pfx)) {
api.sslServer = this.createWebServer({
name: "https",
ssl: this.ssl,
port: this.ssl.port,
bind: this.ssl.bind,
restart: this.restart,
timeout: this.timeout,
keepAliveTimeout: this.keepAliveTimeout,
requestTimeout: this.requestTimeout,
maxRequestsPerSocket: this.maxRequestsPerSocket,
maxHeaderSize: this.headerLimit,
reusePort: this.reusePort,
}, this.handleServerRequest);
}
app.runMethods("configureWebServer", options, { direct: 1 }, callback);
}
// Setup access log stream
api.configureAccessLog = function()
{
if (logger.syslog) {
this.accesslog.stream = new stream.Stream();
this.accesslog.stream.writable = true;
this.accesslog.stream.write = (data) => { logger.syslog.log(api.accesslog.level, data); return true; };
} else
if (this.accesslog.file) {
this.accesslog.stream = fs.createWriteStream(this.accesslog.file, { flags: 'a' });
this.accesslog.stream.on('error', (err) => { logger.error('accessLog:', err); api.accessLog = null; });
} else {
this.accesslog.stream = logger;
}
}
api.handleAccessLog = function(req, res, next)
{
var startTime = new Date();
var end = res.end;
res.end = function(chunk, encoding) {
res.end = end;
res.end(chunk, encoding);
if (req._accessLog || !api.accesslog.stream) return;
req._accessLog = true;
var now = new Date();
var line = req.options.ip + " - " +
(api.accesslog.file ? '[' + now.toUTCString() + ']' : "-") + " " +
req.method + " " +
(req.accessLogUrl || req.originalUrl || req.url) + " " +
(req.httpProtocol || "HTTP") + "/" + req.httpVersionMajor + "/" + req.httpVersionMinor + " " +
res.statusCode + " " +
(req.options.clength || '-') + " - " +
(now - startTime) + " ms - " +
(req.headers['user-agent'] || "-") + " " +
(req.user?.id || "-");
// Append additional fields
for (let v of api.accesslog.fields) {
switch (v[1] == ":" ? v[0] : "") {
case "q":
v = req.query[v.substr(2)];
break;
case "h":
v = req.get(v.substr(2));
break;
case "u":
v = req.user && req.user[v.substr(2)];
break;
default:
v = req[v];
}
if (typeof v == "object") v = "";
line += " " + (v || "-");
}
if (api.accesslog.file) line += "\n";
api.accesslog.stream.write(line);
}
next();
}
api.startServerRequest = function(req, res, next)
{
// Fake i18n methods
req.__ = res.__ = res.locals.__ = lib.__;
// Request queue size
if (api.maxRequestQueue && api.metrics.running >= api.maxRequestQueue) {
api.metrics.busy_count++;
return api.sendReply(res, 503, "Server is unavailable");
}
// Setup request common/required properties
api.prepareRequest(req);
// Perform internal routing
api.routing.check(req, "path");
// Rate limits by IP address and path, early before all other filters
api.checkRateLimits(req, { type: ["ip", "path"] }, (err) => {
if (err) {
metrics.incr(api.metrics, err.type + '_count');
return api.sendReply(res, err);
}
logger.debug("startServerRequest:", req.options);
next();
});
}
// Start Express middleware processing wrapped in the node domain
api.handleServerRequest = function(req, res)
{
logger.dev("handleServerRequest:", api.port, req.url);
if (!api.app) return res.end();
var d = domain.create();
d.on('error', (err) => {
logger.error('handleServerRequest:', api.port, req.path, lib.traceError(err));
if (!res.headersSent) api.sendReply(res, err);
api.shutdown({}, () => { process.exit(0); });
});
d.add(req);
d.add(res);
d.run(api.app, req, res);
}
/**
* Prepare request options that the API routes will merge with, can be used by pre process hooks, initialize
* required properties for subsequent use
* @memberof module:api
* @method prepareRequest
*/
api.prepareRequest = function(req)
{
// Cache the path so we do not need reparse it every time
var path = req.path || "/";
var apath = path.substr(1).split("/");
req.options = {
ops: {},
ip: req.ip,
host: (req.hostname || "").toLowerCase(),
domain: lib.domainName(req.hostname),
path: path,
apath: apath,
secure: req.secure ? "s": "",
mtime: Date.now(),
clength: lib.toNumber(req.get("content-length")),
ctype: req.get("content-type") || "",
};
var sc = req.options.ctype.indexOf(";");
if (sc > 0) req.options.ctype = req.options.ctype.substr(0, sc).trim();
req.__ = lib.__.bind(req);
if (req.res) {
if (!req.res.locals) req.res.locals = {};
req.res.locals.__ = req.res.__ = lib.__.bind(req.res);
}
this.prepareOptions(req);
logger.debug("prepareRequest:", req.options);
}
/**
* Parse or re-parse special headers about app version, language and timezone, it is called early to parse headers first and then
* right after the query parameters are available, query values have higher priority than headers.
* @memberof module:api
* @method prepareOptions
*/
api.prepareOptions = function(req)
{
// Timezone offset from UTC passed by the client, we just keep it, how to use it is up to the application
if (!req.options.appTimezone) {
req.options.appTimezone = lib.toNumber(req.query[this.tzHeader] || req.headers[this.tzHeader], { dflt: 0, min: -720, max: 720 }) * 60000;
}
// Authorization user or token
var auth = req.headers.authorization;
if (auth) {
let idx = auth.indexOf(" ");
req.options.auth_type = auth.substr(0, idx);
req.options.auth_user = auth.substr(idx + 1);
if (req.options.auth_type == "Basic") {
auth = Buffer.from(req.options.auth_user, 'base64').toString();
idx = auth.indexOf(':');
req.options.auth_user = auth.substr(0, idx);
req.options.auth_passwd = auth.substr(idx + 1);
}
}
}
api.handleHeaders = function(req, res, next)
{
var location = req.options.host + req.options.path;
if (!api.serverHeader) {
api.serverHeader = app.version;
}
res.header('Server', api.serverHeader);
// Allow cross site requests
if (lib.testRegexpObj(location, api.corsAllow)) {
res.header('Access-Control-Allow-Origin', api.corsOrigin);
res.header('Access-Control-Allow-Headers', ['content-type', api.signature.header, api.tzHeader].join(", "));
res.header('Access-Control-Allow-Methods', api.corsMethods.join(", "));
res.header('Access-Control-Allow-Credentials', api.corsCredentials);
// Return immediately for preflight requests
if (req.method == 'OPTIONS' && req.get('Access-Control-Request-Method')) {
return res.sendStatus(204);
}
}
// Set response header by location
for (const i in api.responseHeaders) {
const rule = api.responseHeaders[i];
if (!lib.isArray(rule.value)) continue;
if (lib.testRegexpObj(req.options.path, rule) || lib.testRegexpObj(location, rule)) {
for (let j = 0; j < rule.value.length - 1; j += 2) {
if (rule.value[j + 1]) res.setHeader(rule.value[j], rule.value[j + 1]); else res.removeHeader(rule.value[j]);
}
}
}
logger.debug('handleHeaders:', api.port, req.method, req.connection.remoteAddress, req.options, req.headers);
// Redirect before processing the request
location = api.redirect.check(req);
if (location) return api.sendStatus(res, location);
if (req.headers.cookie) {
req.cookies = lib.parseCookies(req.headers.cookie);
}
next();
}
/**
* This is supposed to be called at the beginning of request processing to start metrics and install the handler which
* will be called at the end to finalize the metrics and call the cleanup handlers.
* @memberof module:api
* @method startMetrics
*/
api.startMetrics = function(req, res, next)
{
req._timer = this.metrics.req.start();
this.metrics.que.update(++this.metrics.running);
var end = res.end;
res.end = function(chunk, encoding) {
res.end = end;
res.end(chunk, encoding);
api.handleMetrics(req);
api.handleCleanup(req);
}
// Register trace for the request, by default use fake tracer unless explicity marked to use real metrics
if (api.traceOptions?.interval > 0) {
if ((!api._traceTime || req.options.mtime - api._traceTime > api.traceOptions.interval) &&
api.traceOptions?.path?.test && api.traceOptions.path.test(req.options.path)) {
var opts = {
_host: api.traceOptions?.host,
service: {
version: app.version,
},
annotations: {
tag: app.instance.tag || app.id,
role: app.role,
}
};
if (app.instance.type == "aws") {
opts.aws = {};
if (app.instance.container) {
opts.aws.ecs = {
container: app.instance.container,
container_id: app.instance.container_id,
};
}
if (app.instance.image) {
opts.aws.ec2 = {
instance_id: app.instance.id,
ami_id: app.instance.image,
};
}
}
req.trace = new metrics.Trace(opts);
api._traceTime = req.options.mtime;
}
}
if (!req.trace) req.trace = new metrics.FakeTrace();
next();
}
/**
* Finish metrics collection about the current rquest
* @memberof module:api
* @method handleMetrics
*/
api.handleMetrics = function(req)
{
req.elapsed = req._timer?.end();
delete req._timer;
this.metrics.running--;
if (req.res.statusCode) {
metrics.incr(this.metrics, req.res.statusCode + "_count");
}
if (req.res.statusCode >= 400 && req.res.statusCode < 500) {
this.metrics.bad_count++;
}
if (req.res.statusCode >= 500) {
this.metrics.err_count++;
}
req.trace.stop(req);
req.trace.send();
req.trace.destroy();
}
/**
* Call registered cleanup hooks and clear the request explicitly
* @memberof module:api
* @method handleCleanup
*/
api.handleCleanup = function(req)
{
var hooks = this.hooks.find('cleanup', req.method, req.options.path);
lib.forEverySeries(hooks, (hook, next) => {
logger.debug('cleanup:', req.method, req.options.path, hook.path);
hook.callback(req, next);
}, () => {
for (const p in req) {
if (p.startsWith("__") || api.requestCleanup.includes(p)) {
for (const c in req[p]) delete req[p][c];
if (!lib.isObject(req[p])) delete req[p];
}
}
for (const p in req.files) {
if (req.files[p] && req.files[p].path) {
fs.unlink(req.files[p].path, (err) => { if (err) logger.error("cleanup:", err); });
}
}
}, true);
}
/**
* Parse incoming query parameters in the request body, this is default middleware called early before authenticatoion.
* Only methods in `-api-body-methods` processed, defaults are POST/PUT/PATCH.
* Store parsed parameters in the `req.body`.
* @memberof module:api
* @method handleBody
*/
api.handleBody = function(req, res, next)
{
if (req._body) return next();
switch (req.options.ctype) {
case "text/json":
case 'application/json':
case 'application/x-www-form-urlencoded':
case "text/xml":
case "application/xml":
req.setEncoding('utf8');
break;
default:
// Custom types to be collected
if (!lib.testRegexpObj(req.options.ctype, this.bodyTypes)) return next();
req.setEncoding('binary');
}
if (req.options.clength > 0 && req.options.clength >= this.queryLimit) {
this.metrics.large_count++;
logger.debug("handleBody:", "too large:", req.path, req.headers);
return next(lib.newError({ message: "too large", _msg: api.errTooLarge, status: 413, length: req.options.clength }));
}
req._body = true;
var buf = '', size = 0;
var sig = this.signature.get(req);
req.on('data', (chunk) => {
size += chunk.length;
if (size > api.queryLimit) {
this.metrics.large_count++;
logger.debug("handleBody:", "too large:", req.path, req.headers, buf);
return next(lib.newError({ message: "too large", _msg: api.errTooLarge, status: 413, maxsize: api.queryLimit, length: size }));
}
buf += chunk;
});
req.on('end', () => {
try {
if (size > api.queryLimit) {
this.metrics.large_count++;
logger.debug("handleBody:", "too large:", req.path, req.headers, buf);
return next(lib.newError({ message: "too large", _msg: api.errTooLarge, status: 413, maxsize: api.queryLimit, length: size }));
}
// Verify data checksum before parsing
if (sig?.checksum && lib.hash(buf) != sig.checksum) {
return next(lib.newError("invalid data checksum"));
}
switch (lib.testRegexpObj(req.options.path, api.bodyRaw) ? null : req.options.ctype) {
case "text/json":
case "application/json":
if (!api.bodyMethods.includes(req.method)) break;
req.body = lib.jsonParse(buf, { datatype: "object", logger: "debug" });
req.raw_body = buf;
break;
case "application/x-www-form-urlencoded":
if (!api.bodyMethods.includes(req.method)) break;
req.body = buf.length ? qs.parse(buf, api.qsOptions) : {};
req.raw_body = buf;
break;
default:
req.body = buf;
}
api.prepareOptions(req);
next();
} catch (err) {
err.status = 400;
err.title = "handleBody";
next(err);
}
});
}
var _formidable;
/**
* Parse multipart forms for uploaded files, this must be called explicitly by the endpoints that need uploads.
* The api module handles uploads automatically for configured paths via `-api-body-multipart` config parameter.
* @param {object} req - Express Request
* @param {object} res - Express Response
* @param {function} next
*
* @example
*
* api.app.post("/upload", api.handleMultipart, (req, res, next) => {
* if (req.files.file) ....
* })
*
* // Another global way to handle uploads for many endpoints is to call it for all known paths at once before the actual upload handlers.
*
* api.app.post(/^\/upload\//, api.handleMultipart, (req, res, next) => (next("route")));
* ...
* api.app.post("/upload/icon", (req, res, next) => {
* ...
* api.app.post("/upload/icon", (req, res, next) => {
*
*
* @memberof module:api
* @method handleMultipart
*/
api.handleMultipart = function(req, res, next)
{
if (!req.is('multipart/form-data')) return next();
const opts = {
uploadDir: app.tmpDir,
allowEmptyFiles: true,
keepExtensions: true,
maxFiles: api.filesLimit,
maxFileSize: api.uploadLimit,
maxFields: api.fieldsLimit,
maxFieldsSize: api.queryLimit,
};
const form = _formidable.formidable(opts);
const trace = req.trace.start("handleMultipart");
var data = {}, files = {};
form.on('field', (name, val) => {
if (Array.isArray(data[name])) {
data[name].push(val);
} else
if (data[name]) {
data[name] = [data[name], val];
} else {
data[name] = val;
}
});
form.on('file', (name, val) => {
val = val.toJSON();
val.path = val.filepath;
val.name = val.originalFilename;
files[name] = val;
});
form.on('progress', (bytesReceived, bytesExpected) => {
if (bytesExpected < api.uploadLimit) return;
this.metrics.large_count++;
form.emit("error", lib.newError({ message: "too large", _msg: api.errTooLarge, status: 413, maxsize: api.uploadLimit, length: bytesExpected }));
});
form.parse(req, (err) => {
logger.debug("handleMultipart:", err, req.path, req.headers, data, Object.keys(files));
if (err) {
if (err && /maxFile|maxField|maxTotal/.test(err.message)) {
this.metrics.large_count++;
err._msg = api.errTooLarge;
err.status = 413;
}
trace.stop(err);
return next(err);
}
try {
req.body = qs.parse(data, api.qsOptions);
req.files = files;
trace.stop();
next();
} catch (e) {
e.status = 400;
e.title = "handleMultipart";
trace.stop(e);
next(e);
}
});
}
/**
* Replace redirect placeholders
* @param {object} req - Express Request
* @param {string} pathname - redirect path, it may contain placeholders in the form: @name@:
* - HOST - full host name from header
* - IP - remote IP address
* - DOMAIN - domain from the hostname
* - PATH([1-9])? - full path or a part
* - URL - full url
* - BASE - basename from the path no extention
* - FILE - base file name with extention
* - DIR - directory name only
* - SUBDIR - last part of the directory path
* - EXT - file extention
* - QUERY - stringified query
* @return {string} possibly new path
* @memberof module:api
* @method checkRedirectPlaceholders
*/
api.checkRedirectPlaceholders = function(req, pathname)
{
return pathname.replace(/@(HOST|IP|DOMAIN|PATH([1-9])?|URL|BASE|FILE|DIR|SUBDIR|EXT|QUERY)@/g, function(_, m) {
switch (m.substr(0, 2)) {
case "HO": return req.options.host;
case "IP": return req.options.ip;
case "DO": return req.options.domain;
case "PA": return m[4] > 0 ? req.options.apath.slice(m[4]).join("/") : req.options.path;
case "UR": return req.url;
case "BA": return path.basename(req.options.path).split(".").shift();
case "FI": return path.basename(req.options.path);
case "DI": return path.dirname(req.options.path);
case "SU": return path.dirname(req.options.path).split("/").pop();
case "EX": return path.extname(req.options.path);
case "QU": return qs.stringify(req.query);
}
});
}
/**
* Web proxy: checkProxy("web", req, res)
* WS proxy: checkProxy("ws", req, socket, head)
* @returns {string} - a host matched or undefined
* @memberof module:api
* @method checkProxy
*/
api.checkProxy = function(type, ...args)
{
const req = args[0];
const path = req.path || req.url;
for (const host in api.proxy) {
if (!lib.testRegexp(path, api.proxy[host])) continue;
if (!api._proxy) {
api._proxy = require("http-proxy").createProxyServer({});
}
const opts = {
target: "https://" + host,
ws: true,
changeOrigin: true,
hostRewrite: true,
cookieDomainRewrite: "localhost",
headers: {
origin: host
}
}
logger.debug("proxy:", opts, req.options);
api._proxy[type](...args, opts);
return host;
}
}
/**
* For now just convert cookies into an object so logger can process
* @memberof module:api
* @method cleanupHeaders
*/
api.cleanupHeaders = function(headers)
{
if (!headers) return headers;
if (headers.cookie) {
headers.cookie = lib.parseCookies(headers.cookie);
}
if (headers["set-cookie"]) {
headers["set-cookie"] = lib.parseCookies(headers["set-cookie"]);
}
return headers;
}
/**
* Create a Web server with options and request handler, returns a server object.
*
* Options can have the following properties:
* @param {int} port - port number is required
* @param {string} [bind] - address to bind
* @param {string} [restart] - name of the processes to restart on address in use error, usually "web"
* @param {objext} [ssl] - an object with SSL options for TLS createServer call
* @param {int} [timeout] - number of milliseconds for the request timeout
* @param {int} [keepAliveTimeout] - number of milliseconds to keep the HTTP connecton alive
* @param {int} [requestTimeout] - number of milliseconds to receive the entire request from the client
* @param {int} [maxRequestsPerSocket] - number of requests a socket can handle before closing keep alive connection
* @param {int} [maxHeaderSize] - maximum length of request headers in bytes
* @param {boolean} [reusePort] - allows multiple sockets on the same host to bind to the same port
* @param {string} [name] - server name to be assigned
* @memberof module:api
* @method createWebServer
*/
api.createWebServer = function(options, callback)
{
if (!options?.port) {
logger.error('createWebServer:', 'invalid options', options);
return null;
}
var server;
if (options.ssl) {
var opts = lib.objClone(options.ssl);
for (const p in options) if (p != "ssl") opts[p] = options[p];
server = https.createServer(opts, callback);
} else {
server = http.createServer(options, callback);
}
if (options.timeout) {
server.timeout = options.timeout;
}
server.serverPort = options.port;
if (options.name) {
server.serverName = options.name;
}
if (options.keepAliveTimeout) {
server.keepAliveTimeout = options.keepAliveTimeout;
server.headersTimeout = Math.round(options.keepAliveTimeout * 1.25);
}
server.requestTimeout = options.requestTimeout || 0;
server.maxRequestsPerSocket = options.maxRequestsPerSocket || null;
server.on('error', (err) => {
logger.error(app.role + ':', 'port:', options.port, lib.traceError(err));
// Restart backend processes on address in use
if (err.code == 'EADDRINUSE' && options.restart) {
app.killBackend(options.restart, "SIGKILL", () => { process.exit(0) });
}
});
try {
server.listen({
port: options.port,
host: options.bind,
backlog: options.backlog,
reusePort: options.reusePort,
});
} catch (e) {
logger.error('server: listen:', options, e);
server = null;
}
logger.log("createWebServer:", options);
return server;
}
// API stats if running
api.configureCollectStats = function(options)
{
if (!api.app) return;
var m = metrics.toJSON(api.metrics, { reset: 1, take: /_count$/ });
options.stats.api_req_count = m.req.meter.count;
options.stats.api_req_rate = m.req.meter.rate;
options.stats.api_res_time = m.req.histogram.med;
options.stats.api_que_size = m.que.med;
for (const p in m) {
if (p.endsWith("_count")) options.stats["api_" + p] = m[p];
}
}
require(__dirname + "/api/util");