Skip to content

Commit

Permalink
Merge pull request #1120 from jetstreamapp/feat/add-ip-database
Browse files Browse the repository at this point in the history
Add IP database scripts
  • Loading branch information
paustint authored Dec 21, 2024
2 parents 0d19e97 + 2682489 commit 29344a3
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 159 deletions.
6 changes: 3 additions & 3 deletions apps/api/src/app/routes/route.middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export function notFoundMiddleware(req: express.Request, res: express.Response,
export function blockBotByUserAgentMiddleware(req: express.Request, res: express.Response, next: express.NextFunction) {
const userAgent = req.get('User-Agent');
if (userAgent?.toLocaleLowerCase().includes('python')) {
logger.debug(
(res.log || logger).debug(
{
blocked: true,
method: req.method,
Expand Down Expand Up @@ -129,7 +129,7 @@ export async function checkAuth(req: express.Request, res: express.Response, nex
req.log.error(`[AUTH][UNAUTHORIZED] User-Agent mismatch: ${req.session.userAgent} !== ${userAgent}`);
req.session.destroy((err) => {
if (err) {
logger.error({ ...getExceptionLog(err) }, '[AUTH][UNAUTHORIZED][ERROR] Error destroying session');
(res.log || logger).error({ ...getExceptionLog(err) }, '[AUTH][UNAUTHORIZED][ERROR] Error destroying session');
}
// TODO: Send email to user about potential suspicious activity
next(new AuthenticationError('Unauthorized'));
Expand Down Expand Up @@ -322,7 +322,7 @@ export function verifyCaptcha(req: express.Request, res: express.Response, next:
if (res.success) {
return next();
}
logger.warn({ token, res }, '[CAPTCHA][FAILED]');
(res.log || logger).warn({ token, res }, '[CAPTCHA][FAILED]');
throw new InvalidCaptcha();
})
.catch(() => {
Expand Down
4 changes: 4 additions & 0 deletions apps/cron-tasks/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
{
"entryName": "save-analytics-summary",
"entryPath": "apps/cron-tasks/src/save-analytics-summary.ts"
},
{
"entryName": "geo-ip-updater",
"entryPath": "apps/cron-tasks/src/geo-ip-updater.ts"
}
],
"tsConfig": "apps/cron-tasks/tsconfig.app.json",
Expand Down
2 changes: 1 addition & 1 deletion apps/cron-tasks/src/config/db.config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getExceptionLog } from '@jetstream/api-config';
import { Prisma, PrismaClient } from '@prisma/client';
import { Pool } from 'pg';
import { getExceptionLog } from '../utils/utils';
import { ENV } from './env-config';
import { logger } from './logger.config';

Expand Down
9 changes: 5 additions & 4 deletions apps/cron-tasks/src/config/env-config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { getExceptionLog } from '@jetstream/api-config';
import { ensureBoolean } from '@jetstream/shared/utils';
import * as dotenv from 'dotenv';
import { readFileSync } from 'fs-extra';
import { join } from 'path';
Expand All @@ -10,7 +8,7 @@ try {
VERSION = readFileSync(join(__dirname, '../../VERSION'), 'utf-8').trim();
console.info(`APP VERSION ${VERSION}`);
} catch (ex) {
console.warn('COULD NOT READ VERSION FILE', getExceptionLog(ex));
console.warn('COULD NOT READ VERSION FILE', ex);
}

export const ENV = {
Expand All @@ -21,7 +19,7 @@ export const ENV = {
ROLLBAR_SERVER_TOKEN: process.env.ROLLBAR_SERVER_TOKEN,
// FIXME: there was a typo in env variables, using both temporarily as a safe fallback
JETSTREAM_POSTGRES_DBURI: process.env.JETSTREAM_POSTGRES_DBURI || process.env.JESTREAM_POSTGRES_DBURI,
PRISMA_DEBUG: ensureBoolean(process.env.PRISMA_DEBUG),
PRISMA_DEBUG: process.env.PRISMA_DEBUG && process.env.PRISMA_DEBUG.toLocaleLowerCase().startsWith('t'),

// MAILGUN
MAILGUN_API_KEY: process.env.MAILGUN_API_KEY,
Expand All @@ -30,4 +28,7 @@ export const ENV = {

AMPLITUDE_API_KEY: process.env.AMPLITUDE_API_KEY,
AMPLITUDE_SECRET_KEY: process.env.AMPLITUDE_SECRET_KEY,

MAX_MIND_ACCOUNT_ID: process.env.MAX_MIND_ACCOUNT_ID,
MAX_MIND_LICENSE_KEY: process.env.MAX_MIND_LICENSE_KEY,
};
227 changes: 227 additions & 0 deletions apps/cron-tasks/src/geo-ip-updater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import { exec } from 'child_process';
import * as fs from 'fs';
import * as path from 'path';
import { Open } from 'unzipper';
import { promisify } from 'util';
import { ENV } from './config/env-config';
import { logger } from './config/logger.config';
import { getExceptionLog } from './utils/utils';

/**
CREATE TABLE IF NOT EXISTS geo_ip.network (
network cidr NOT NULL,
geoname_id int,
registered_country_geoname_id int,
represented_country_geoname_id int,
is_anonymous_proxy bool,
is_satellite_provider bool,
postal_code text,
latitude numeric,
longitude numeric,
accuracy_radius int,
is_anycast bool
);
CREATE TABLE IF NOT EXISTS geo_ip.location (
geoname_id int NOT NULL,
locale_code text NOT NULL,
continent_code text,
continent_name text,
country_iso_code text,
country_name text,
subdivision_1_iso_code text,
subdivision_1_name text,
subdivision_2_iso_code text,
subdivision_2_name text,
city_name text,
metro_code int,
time_zone text,
is_in_european_union bool NOT NULL,
PRIMARY KEY (geoname_id, locale_code)
);
CREATE TABLE IF NOT EXISTS geo_ip.organization (
network cidr NOT NULL,
autonomous_system_number int,
autonomous_system_organization text
);
CREATE INDEX idx_geoip2_network_network ON geo_ip.network USING gist (network inet_ops);
CREATE INDEX idx_geoip2_network_geoname_id ON geo_ip.network(geoname_id);
CREATE INDEX idx_geoip2_location_locale_code ON geo_ip.location (locale_code);
CREATE INDEX idx_geoip2_organization_network ON geo_ip.organization USING gist (network inet_ops);
*/

const execAsync = promisify(exec);

if (!ENV.MAX_MIND_ACCOUNT_ID || !ENV.MAX_MIND_LICENSE_KEY) {
logger.error('Missing MaxMind credentials');
process.exit(1);
}

const ASN_URL = 'https://download.maxmind.com/geoip/databases/GeoLite2-ASN-CSV/download?suffix=zip';
const ASN_FILENAME = 'GeoLite2-ASN.zip';
const ASN_FILENAMES = ['GeoLite2-ASN-Blocks-IPv4.csv', 'GeoLite2-ASN-Blocks-IPv6.csv'];

const CITY_URL = 'https://download.maxmind.com/geoip/databases/GeoLite2-City-CSV/download?suffix=zip';
const CITY_FILENAME = 'GeoLite2-City.zip';
const CITY_FILENAMES = ['GeoLite2-City-Locations-en.csv', 'GeoLite2-City-Blocks-IPv4.csv', 'GeoLite2-City-Blocks-IPv6.csv'];

async function importCSVToTable(csvPath: string, tableName: string, schema: string): Promise<void> {
const tempTableName = `${tableName}_temp`;
const fullTempTableName = `${schema}.${tempTableName}`;
const fullTableName = `${schema}.${tableName}`;

try {
// Create temporary table with same structure
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"`);

// Import CSV data
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "\\COPY ${fullTempTableName} FROM '${csvPath}' WITH (FORMAT CSV, HEADER)"`);

// Atomic swap
await execAsync(`
psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "
BEGIN;
DROP TABLE IF EXISTS ${fullTableName};
ALTER TABLE ${fullTempTableName} RENAME TO ${tableName};
COMMIT;
"
`);

logger.info(`Successfully imported ${csvPath} to ${fullTableName}`);
} catch (error) {
logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message);
// Cleanup temp table if it exists
await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`).catch(() => {
// Ignore errors
logger.warn(`Failed to drop table ${fullTempTableName}`);
});
throw error;
}
}

async function processFile(
url: string,
zipFileName: string,
filenames: string[],
processor: (filename: string, filePath: string) => Promise<void>
) {
const downloadDir = path.join(__dirname, '../../downloads');
const zipFilePath = path.join(downloadDir, zipFileName);

// Create downloads directory if it doesn't exist
if (!fs.existsSync(downloadDir)) {
fs.mkdirSync(downloadDir, { recursive: true });
}

let buffer: Buffer;

// Check if file exists and is less than 24 hours old
if (fs.existsSync(zipFilePath)) {
const stats = fs.statSync(zipFilePath);
const fileAge = Date.now() - stats.mtime.getTime();
const oneDayInMs = 24 * 60 * 60 * 1000;

if (fileAge < oneDayInMs) {
logger.info(`Using existing file ${zipFilePath}`);
buffer = fs.readFileSync(zipFilePath);
} else {
buffer = await downloadFile(url, zipFilePath);
}
} else {
buffer = await downloadFile(url, zipFilePath);
}

const directory = await Open.buffer(buffer);

for (const entry of directory.files) {
const currentFilename = entry.path.split('/').reverse()[0];
if (filenames.includes(currentFilename)) {
logger.info(`Extracting ${entry.path}...`);
const csvPath = path.join(downloadDir, currentFilename);
const writeStream = fs.createWriteStream(csvPath);
await new Promise((resolve, reject) => {
entry.stream().pipe(writeStream).on('finish', resolve).on('error', reject);
});
await processor(currentFilename, csvPath);
// Cleanup CSV file
fs.unlinkSync(csvPath);
}
}
}

async function downloadFile(url: string, savePath: string): Promise<Buffer> {
logger.info(`Downloading from ${url}...`);

const response = await fetch(url, {
headers: {
Authorization: `Basic ${Buffer.from(`${ENV.MAX_MIND_ACCOUNT_ID}:${ENV.MAX_MIND_LICENSE_KEY}`).toString('base64')}`,
},
});

if (!response.ok) {
throw new Error(`Failed to download: ${response.statusText}`);
}

const buffer = await streamToBuffer(response.body!);
fs.writeFileSync(savePath, buffer);
return buffer;
}

async function streamToBuffer(stream: ReadableStream): Promise<Buffer> {
const chunks: Buffer[] = [];
const reader = stream.getReader();

// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(Buffer.from(value));
}

return Buffer.concat(chunks);
}

async function processNetwork(csvPath: string) {
await importCSVToTable(csvPath, 'network', 'geo_ip');
}

async function processLocation(csvPath: string) {
await importCSVToTable(csvPath, 'location', 'geo_ip');
}

async function processASN(csvPath: string) {
await importCSVToTable(csvPath, 'organization', 'geo_ip');
}

async function main() {
try {
logger.info('Starting GeoIP database update...');

// Process ASN data
await processFile(ASN_URL, ASN_FILENAME, ASN_FILENAMES, async (filename, csvPath) => {
await processASN(csvPath);
});

// Process City data
await processFile(CITY_URL, CITY_FILENAME, CITY_FILENAMES, async (filename, csvPath) => {
if (filename.includes('Blocks')) {
await processNetwork(csvPath);
} else if (filename.includes('Locations')) {
await processLocation(csvPath);
}
});

logger.info('GeoIP database update completed successfully');
} catch (error) {
logger.error(getExceptionLog(error), 'Error updating GeoIP database: %s', error.message);
throw error;
}
}

main().catch((error) => {
logger.error(getExceptionLog(error), 'Fatal error: %s', error.message);
process.exit(1);
});
2 changes: 1 addition & 1 deletion apps/cron-tasks/src/save-analytics-summary.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getExceptionLog } from '@jetstream/api-config';
import { prisma } from './config/db.config';
import { logger } from './config/logger.config';
import { getAmplitudeChart } from './utils/amplitude-dashboard-api';
import { getExceptionLog } from './utils/utils';

const CHART_IDS = {
LOAD: {
Expand Down
Loading

0 comments on commit 29344a3

Please sign in to comment.