Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve error logging around cluster health #33

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .eslintrc
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
{
"extends": ["eslint:recommended", "eslint-config-prettier"],
"plugins": ["eslint-plugin-prettier"],
"extends": [
"eslint:recommended",
"plugin:@typescript-eslint/recommended",
"eslint-config-prettier"
],
"plugins": ["@typescript-eslint", "eslint-plugin-prettier"],
"parser": "@typescript-eslint/parser",
"ignorePatterns": ["**/dist"],
"env": {
"node": true,
"es2022": true
},
"rules": {
"prettier/prettier": "error"
"prettier/prettier": "error",
"@typescript-eslint/no-var-requires": "warn"
}
}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
node_modules
yarn-debug.log*
yarn-error.log*
dist

# Miscellaneous
.DS_Store
Expand Down
5 changes: 5 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Node files
node_modules

# Built files
dist
13 changes: 10 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
FROM node:18-buster AS builder
WORKDIR /app
COPY package.json yarn.lock tsconfig.json ./
RUN yarn install --frozen-lockfile
COPY src /app/src
RUN yarn build

FROM node:18-buster AS dependencies
WORKDIR /app
COPY package.json yarn.lock ./
RUN yarn install --frozen-lockfile --production

# Distroless production image
FROM gcr.io/distroless/nodejs:18
WORKDIR /app
COPY --from=builder /app/node_modules /app/node_modules
COPY src /app/src
COPY --from=builder /app/dist /app/dist
COPY --from=dependencies /app/node_modules /app/node_modules
COPY migrations /app/migrations
COPY package.json knexfile.js /app/

EXPOSE 8080
CMD ["/app/src/server.js"]
CMD ["/app/dist/server.js"]
12 changes: 11 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"private": true,
"license": "ISC",
"scripts": {
"build": "tsc",
"start": "yarn build && node dist/server.js",
"format": "prettier --write .",
"lint": "eslint --fix .",
"check:format": "prettier --check .",
Expand All @@ -29,13 +31,21 @@
"zod": "^3.22.4"
},
"devDependencies": {
"@tsconfig/node18": "^18.2.4",
"@types/bluebird": "^3.5.42",
"@types/express": "^4.17.21",
"@types/lodash": "^4.17.0",
"@types/node": "^18.11.12",
"@types/uuid": "^9.0.8",
"@typescript-eslint/eslint-plugin": "^7.7.1",
"@typescript-eslint/parser": "^7.7.1",
"eslint": "^8.53.0",
"eslint-config-prettier": "^9.0.0",
"eslint-plugin-prettier": "^5.0.1",
"husky": "^8.0.3",
"lint-staged": "^15.0.2",
"prettier": "^3.0.3"
"prettier": "^3.0.3",
"typescript": "^5.4.5"
},
"lint-staged": {
"**/*.js": [
Expand Down
11 changes: 7 additions & 4 deletions seeds/initialize.js → seeds/initialize.mjs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
const uuidv4 = require("uuid").v4;
const { CLUSTER_STATUS } = require("../src/lib/trino");
import { v4 as uuidv4 } from "uuid";
const CLUSTER_STATUS = {
ENABLED: "ENABLED",
DISABLED: "DISABLED",
};

const now = new Date();

/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.seed = async function (knex) {
export async function seed(knex) {
await knex("user").del();
await knex("user").insert({
id: uuidv4(),
Expand Down Expand Up @@ -98,4 +101,4 @@ exports.seed = async function (knex) {
updated_at: now,
created_at: now,
});
};
}
38 changes: 24 additions & 14 deletions src/lib/helpers.js → src/lib/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
const _ = require("lodash");
const { getQueryHeaderInfo } = require("./query");
import _ from "lodash";
import { getQueryHeaderInfo, QUERY_STATUS } from "./query";

function getProxiedBody(clusterBody, proxyId, proxyHost) {
import type { IncomingHttpHeaders } from "http";
import type { ClusterResponse } from "../types/trino";

export function getProxiedBody(
clusterBody: ClusterResponse,
proxyId: string,
proxyHost: string
) {
const newBody = _.cloneDeep(clusterBody);
// Save cluster's queryId for string replacement
const clusterQueryId = newBody.id;
Expand All @@ -25,17 +32,26 @@ function getProxiedBody(clusterBody, proxyId, proxyHost) {
return newBody;
}

async function getAuthorizationHeader(headers) {
const traceToken = headers["x-trino-trace-token"];
export async function getAuthorizationHeader(headers: IncomingHttpHeaders) {
const traceTokenHeader = headers["x-trino-trace-token"];
const traceToken = Array.isArray(traceTokenHeader)
? traceTokenHeader[0]
: traceTokenHeader;

const headerUser = await getQueryHeaderInfo(traceToken);
const authorizationHeader = headerUser
? "Basic " + Buffer.from(headerUser).toString("base64")
const authorizationHeader = headerUser?.user
? "Basic " + Buffer.from(headerUser.user).toString("base64")
: null;

return authorizationHeader;
}

function createErrorResponseBody(queryId, uuid, tempHost, queryStatus) {
export function createErrorResponseBody(
queryId: string,
uuid: string,
tempHost: string,
queryStatus: keyof typeof QUERY_STATUS
) {
return {
data: {
id: uuid,
Expand Down Expand Up @@ -74,9 +90,3 @@ function createErrorResponseBody(queryId, uuid, tempHost, queryStatus) {
},
};
}

module.exports = {
getAuthorizationHeader,
getProxiedBody,
createErrorResponseBody,
};
4 changes: 3 additions & 1 deletion src/lib/knex.js → src/lib/knex.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import Knex from "knex";

const { DB_URL, KNEX_POOL_MIN, KNEX_POOL_MAX, KNEX_CONNECTION_TIMEOUT } =
process.env;

Expand All @@ -11,7 +13,7 @@ const connectionTimeout = KNEX_CONNECTION_TIMEOUT
? parseInt(KNEX_CONNECTION_TIMEOUT)
: 10000;

exports.knex = require("knex")({
export const knex = Knex({
client: "pg",
connection: DB_URL,
/** It is recommended to set min to 0 so idle connections can be terminated */
Expand Down
4 changes: 2 additions & 2 deletions src/lib/logger.js → src/lib/logger.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const winston = require("winston");
import winston from "winston";

const logLevel = process.env.LOG_LEVEL || "info";
const logFormat = winston.format.printf(
Expand All @@ -21,4 +21,4 @@ logger.add(
})
);

module.exports = logger;
export default logger;
12 changes: 4 additions & 8 deletions src/lib/memcache.js → src/lib/memcache.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { LRUCache } = require("lru-cache");
import { LRUCache } from "lru-cache";
import { QueryUser, Trace } from "../types/models";

/**
* This cache stores mappings of `x-trino-trace-token` headers
Expand All @@ -9,7 +10,7 @@ const { LRUCache } = require("lru-cache");
* There is no TTL on this cache as some queries are long-running
* and we need to keep the mapping around for a while.
*/
const traceCache = new LRUCache({
export const traceCache = new LRUCache<string, Trace>({
max: process.env.TRACE_CACHE_SIZE
? parseInt(process.env.TRACE_CACHE_SIZE)
: 1000,
Expand All @@ -23,16 +24,11 @@ const traceCache = new LRUCache({
* There is a short TTL on this cache to ensure that updates to the
* user's cluster tags are fetched quickly by the service.
*/
const userCache = new LRUCache({
export const userCache = new LRUCache<string, QueryUser>({
max: process.env.USER_CACHE_SIZE
? parseInt(process.env.USER_CACHE_SIZE)
: 1000,
ttl: process.env.USER_CACHE_TTL_MS
? parseInt(process.env.USER_CACHE_TTL_MS)
: 1000 * 60 * 3, // 3min
});

module.exports = {
traceCache,
userCache,
};
38 changes: 17 additions & 21 deletions src/lib/query.js → src/lib/query.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
const { knex } = require("./knex");
const { traceCache } = require("./memcache");
const logger = require("./logger");
const stats = require("./stats");
import { knex } from "./knex";
import { traceCache } from "./memcache";
import logger from "./logger";
import stats from "./stats";

const QUERY_STATUS = {
import type { Parsers, Query } from "../types/models";

export const QUERY_STATUS = {
// Trino Proxy states
AWAITING_SCHEDULING: "AWAITING_SCHEDULING",
CANCELLED: "CANCELLED",
Expand All @@ -20,20 +22,22 @@ const QUERY_STATUS = {
QUEUED: "QUEUED",
RUNNING: "RUNNING",
STARTING: "STARTING",
};
} as const;

async function getQueryById(newQueryId) {
export async function getQueryById(newQueryId: string) {
return knex("query").where({ id: newQueryId }).first();
}

async function getFirstQueryByTraceId(traceId) {
export async function getFirstQueryByTraceId(traceId: string) {
return knex("query")
.where({ trace_id: traceId })
.orderBy("created_at", "asc")
.first();
}

async function getQueryHeaderInfo(traceId) {
export async function getQueryHeaderInfo(
traceId: string | undefined
): Promise<{ user: string | null; tags: string[] } | null> {
if (!traceId) {
return null;
}
Expand All @@ -54,7 +58,7 @@ async function getQueryHeaderInfo(traceId) {
return null;
}

async function updateQuery(queryId, data = {}) {
export async function updateQuery(queryId: string, data: Partial<Query> = {}) {
try {
await knex("query").where({ id: queryId }).update(data);
} catch (err) {
Expand All @@ -65,10 +69,10 @@ async function updateQuery(queryId, data = {}) {
stats.increment("query_updated", [`status:${data.status}`]);
}

function parseFirstQueryHeader(query, parsers = {}) {
export function parseFirstQueryHeader(query: string, parsers: Parsers = {}) {
const parsedInfo = {
user: null,
tags: [],
user: null as string | null,
tags: [] as string[],
};

if (parsers?.user) {
Expand All @@ -88,11 +92,3 @@ function parseFirstQueryHeader(query, parsers = {}) {

return parsedInfo;
}

module.exports = {
getQueryById,
getQueryHeaderInfo,
parseFirstQueryHeader,
QUERY_STATUS,
updateQuery,
};
14 changes: 0 additions & 14 deletions src/lib/stats.js

This file was deleted.

13 changes: 13 additions & 0 deletions src/lib/stats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import StatsD from "hot-shots";

const host = process.env.STATSD_HOST || "localhost";
const port = process.env.STATSD_PORT ? parseInt(process.env.STATSD_PORT) : 8125;
const prefix = process.env.STATSD_PREFIX || "trino_proxy";
const env = process.env.NODE_ENV || "production";

export default new StatsD({
host,
port,
prefix: prefix + ".",
globalTags: { env },
});
Loading
Loading