From 571b0a7cc779bc6d6f0f310297bc7c9368f83b20 Mon Sep 17 00:00:00 2001 From: Austin Turner Date: Sat, 21 Dec 2024 16:40:51 -0700 Subject: [PATCH] Fix ip address table overwrite 46 and 46 are stored in different files and we were creating a new temp table for each file, thus not retaining all the results --- apps/cron-tasks/src/geo-ip-updater.ts | 75 ++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 14 deletions(-) diff --git a/apps/cron-tasks/src/geo-ip-updater.ts b/apps/cron-tasks/src/geo-ip-updater.ts index 22a957ae..18b870f1 100644 --- a/apps/cron-tasks/src/geo-ip-updater.ts +++ b/apps/cron-tasks/src/geo-ip-updater.ts @@ -68,18 +68,36 @@ const CITY_URL = 'https://download.maxmind.com/geoip/databases/GeoLite2-City-CSV const CITY_FILENAME = 'GeoLite2-City.zip'; const CITY_FILENAMES = ['GeoLite2-City-Locations-en.csv', 'GeoLite2-City-Blocks-IPv4.csv', 'GeoLite2-City-Blocks-IPv6.csv']; -async function importCSVToTable(csvPath: string, tableName: string, schema: string): Promise { +async function importCSVToTable(csvPath: string, tableName: string, schema: string, createTemp: boolean): Promise { const tempTableName = `${tableName}_temp`; const fullTempTableName = `${schema}.${tempTableName}`; const fullTableName = `${schema}.${tableName}`; try { - // Create temporary table with same structure - await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"`); + // Only create temp table if requested (first file) + if (createTemp) { + logger.info(`Creating temporary table ${fullTempTableName}`); + await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`); + await execAsync( + `psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "CREATE TABLE ${fullTempTableName} (LIKE ${fullTableName} INCLUDING ALL)"` + ); + } // Import CSV data + logger.info(`Importing ${csvPath} to ${fullTempTableName}`); await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "\\COPY ${fullTempTableName} FROM '${csvPath}' WITH (FORMAT CSV, HEADER)"`); + } catch (error) { + logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message); + throw error; + } +} + +async function swapTables(tableName: string, schema: string): Promise { + const tempTableName = `${tableName}_temp`; + const fullTempTableName = `${schema}.${tempTableName}`; + const fullTableName = `${schema}.${tableName}`; + try { // Atomic swap await execAsync(` psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c " @@ -89,13 +107,11 @@ async function importCSVToTable(csvPath: string, tableName: string, schema: stri COMMIT; " `); - - logger.info(`Successfully imported ${csvPath} to ${fullTableName}`); + logger.info(`Successfully swapped ${fullTempTableName} to ${fullTableName}`); } catch (error) { - logger.error(getExceptionLog(error), `Error importing ${csvPath}: %s`, error.message); + logger.error(getExceptionLog(error), `Error swapping tables: %s`, error.message); // Cleanup temp table if it exists await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${fullTempTableName}"`).catch(() => { - // Ignore errors logger.warn(`Failed to drop table ${fullTempTableName}`); }); throw error; @@ -184,35 +200,66 @@ async function streamToBuffer(stream: ReadableStream): Promise { return Buffer.concat(chunks); } -async function processNetwork(csvPath: string) { - await importCSVToTable(csvPath, 'network', 'geo_ip'); +const tempTablesNeedToBeCreated = { + network: true, + location: true, + organization: true, +}; + +async function processNetwork(filename: string, csvPath: string) { + await importCSVToTable(csvPath, 'network', 'geo_ip', tempTablesNeedToBeCreated.network); + tempTablesNeedToBeCreated.network = false; } async function processLocation(csvPath: string) { - await importCSVToTable(csvPath, 'location', 'geo_ip'); + await importCSVToTable(csvPath, 'location', 'geo_ip', tempTablesNeedToBeCreated.location); + tempTablesNeedToBeCreated.location = false; +} + +async function processASN(filename: string, csvPath: string) { + await importCSVToTable(csvPath, 'organization', 'geo_ip', tempTablesNeedToBeCreated.organization); + tempTablesNeedToBeCreated.organization = false; } -async function processASN(csvPath: string) { - await importCSVToTable(csvPath, 'organization', 'geo_ip'); +async function cleanupTempTables(): Promise { + const tables = ['network', 'location', 'organization']; + + for (const table of tables) { + const tempTableName = `geo_ip.${table}_temp`; + try { + await execAsync(`psql "${ENV.JETSTREAM_POSTGRES_DBURI}" -c "DROP TABLE IF EXISTS ${tempTableName}"`); + logger.info(`Cleaned up temporary table ${tempTableName}`); + } catch (error) { + logger.warn(`Failed to drop temporary table ${tempTableName}: ${error.message}`); + } + } } async function main() { try { logger.info('Starting GeoIP database update...'); + // Clean up any leftover temp tables first + await cleanupTempTables(); + // Process ASN data await processFile(ASN_URL, ASN_FILENAME, ASN_FILENAMES, async (filename, csvPath) => { - await processASN(csvPath); + await processASN(filename, csvPath); }); + // Swap ASN tables after all files are processed + await swapTables('organization', 'geo_ip'); // Process City data await processFile(CITY_URL, CITY_FILENAME, CITY_FILENAMES, async (filename, csvPath) => { if (filename.includes('Blocks')) { - await processNetwork(csvPath); + await processNetwork(filename, csvPath); } else if (filename.includes('Locations')) { await processLocation(csvPath); } }); + // Swap network/location tables after all files are processed + await swapTables('location', 'geo_ip'); + await swapTables('network', 'geo_ip'); logger.info('GeoIP database update completed successfully'); } catch (error) {