From bf8f48c8d1a65d8fa2531a684abc48a7acd222c1 Mon Sep 17 00:00:00 2001 From: TheExGenesis Date: Wed, 4 Sep 2024 03:38:55 -0400 Subject: [PATCH] fix: retries for every insert request --- src/lib-server/db_insert.ts | 311 +++++++++++++++++++----------------- 1 file changed, 168 insertions(+), 143 deletions(-) diff --git a/src/lib-server/db_insert.ts b/src/lib-server/db_insert.ts index a6bd573..7fc4f9d 100644 --- a/src/lib-server/db_insert.ts +++ b/src/lib-server/db_insert.ts @@ -14,6 +14,29 @@ if (process.env.NODE_ENV !== 'production') { const BATCH_SIZE = 1000 // Adjust as needed +const MAX_RETRIES = 5 +const RETRY_DELAY = 1000 // 1 second + +const retryOperation = async ( + operation: () => Promise, + errorMessage: string, +): Promise => { + let retries = 0 + while (retries < MAX_RETRIES) { + try { + return await operation() + } catch (error) { + retries++ + if (retries >= MAX_RETRIES) { + throw new Error(`${errorMessage}: ${(error as Error).message}`) + } + console.log(`Attempt ${retries} failed. Retrying in ${RETRY_DELAY}ms...`) + await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY)) + } + } + throw new Error(`Max retries reached for operation: ${errorMessage}`) +} + const patchTweetsWithNoteTweets = (noteTweets: any[], tweets: any[]): any[] => { const startTime = performance.now() let changedTweets: any[] = [] @@ -72,18 +95,17 @@ const insertTempTweets = async ( archive_upload_id: -1, // Placeholder value })) - const { error } = await supabase - .schema('temp') - .from(`tweets_${suffix}`) - .upsert(formattedTweets, { - onConflict: 'tweet_id', - ignoreDuplicates: false, - }) - - if (error) - throw new Error( - `Error inserting tweets: ${error.message} ${formattedTweets}`, - ) + await retryOperation(async () => { + const { data, error } = await supabase + .schema('temp') + .from(`tweets_${suffix}`) + .upsert(formattedTweets, { + onConflict: 'tweet_id', + ignoreDuplicates: false, + }) + if (error) throw error + return data + }, 'Error inserting tweets') } const processAndInsertTweetEntities = async ( @@ -164,16 +186,17 @@ const processAndInsertTweetEntities = async ( for (const table of tables) { for (let i = 0; i < table.data.length; i += 1000) { const batch = table.data.slice(i, i + 1000) - const { error } = await supabase - .schema('temp') - .from(table.name) - .upsert(batch, { - onConflict: table.conflictTarget, - ignoreDuplicates: true, - }) - if (error) { - throw new Error(`Error inserting ${table.name}: ${error.message}`) - } + await retryOperation(async () => { + const { data, error } = await supabase + .schema('temp') + .from(table.name) + .upsert(batch, { + onConflict: table.conflictTarget, + ignoreDuplicates: true, + }) + if (error) throw error + return data + }, `Error inserting ${table.name}`) } } } @@ -190,15 +213,17 @@ const insertTempFollowers = async ( archive_upload_id: -1, // Placeholder value })) - const { error } = await supabase - .schema('temp') - .from(`followers_${suffix}`) - .upsert(formattedFollowers, { - onConflict: 'account_id,follower_account_id', - ignoreDuplicates: false, - }) - - if (error) throw new Error(`Error inserting followers: ${error.message}`) + await retryOperation(async () => { + const { data, error } = await supabase + .schema('temp') + .from(`followers_${suffix}`) + .upsert(formattedFollowers, { + onConflict: 'account_id,follower_account_id', + ignoreDuplicates: false, + }) + if (error) throw error + return data + }, 'Error inserting followers') } const insertTempFollowing = async ( @@ -213,15 +238,17 @@ const insertTempFollowing = async ( archive_upload_id: -1, // Placeholder value })) - const { error } = await supabase - .schema('temp') - .from(`following_${suffix}`) - .upsert(formattedFollowing, { - onConflict: 'account_id,following_account_id', - ignoreDuplicates: false, - }) - - if (error) throw new Error(`Error inserting following: ${error.message}`) + await retryOperation(async () => { + const { data, error } = await supabase + .schema('temp') + .from(`following_${suffix}`) + .upsert(formattedFollowing, { + onConflict: 'account_id,following_account_id', + ignoreDuplicates: false, + }) + if (error) throw error + return data + }, 'Error inserting following') } const insertTempLikes = async ( @@ -241,18 +268,17 @@ const insertTempLikes = async ( full_text: like.full_text || '', })) - const { error: likedTweetsError } = await supabase - .schema('temp') - .from(`liked_tweets_${suffix}`) - .upsert(validLikedTweets, { - onConflict: 'tweet_id', - ignoreDuplicates: false, - }) - - if (likedTweetsError) { - console.error('Error details:', likedTweetsError, { validLikedTweets }) - throw new Error(`Error inserting liked tweets: ${likedTweetsError.message}`) - } + await retryOperation(async () => { + const { data, error } = await supabase + .schema('temp') + .from(`liked_tweets_${suffix}`) + .upsert(validLikedTweets, { + onConflict: 'tweet_id', + ignoreDuplicates: false, + }) + if (error) throw error + return data + }, 'Error inserting liked tweets') const likeRelations = likes.map((like) => ({ account_id: accountId, @@ -260,18 +286,17 @@ const insertTempLikes = async ( archive_upload_id: -1, // Placeholder value })) - const { error: likesError } = await supabase - .schema('temp') - .from(`likes_${suffix}`) - .upsert(likeRelations, { - onConflict: 'account_id,liked_tweet_id', - ignoreDuplicates: false, - }) - - if (likesError) { - console.error('Error details:', likesError, { likeRelations }) - throw new Error(`Error inserting likes: ${likesError.message}`) - } + await retryOperation(async () => { + const { data, error } = await supabase + .schema('temp') + .from(`likes_${suffix}`) + .upsert(likeRelations, { + onConflict: 'account_id,liked_tweet_id', + ignoreDuplicates: false, + }) + if (error) throw error + return data + }, 'Error inserting likes') } export const processTwitterArchive = async ( @@ -286,9 +311,13 @@ export const processTwitterArchive = async ( const suffix = accountId console.log('Dropping temporary tables...') - await supabase - .schema(getSchemaName()) - .rpc('drop_temp_tables', { p_suffix: suffix }) + await retryOperation(async () => { + const { data, error } = await supabase + .schema(getSchemaName()) + .rpc('drop_temp_tables', { p_suffix: suffix }) + if (error) throw error + return data + }, 'Error dropping temporary tables') try { // Calculate latest tweet date @@ -308,75 +337,61 @@ export const processTwitterArchive = async ( // Create temporary tables console.log('Creating temporary tables...') - await supabase - .schema(getSchemaName()) - .rpc('create_temp_tables', { p_suffix: suffix }) - - // check if tables are created by selecting from temp.likes, if there's an empty list then it exists, if there's an error, then it doesn't exist - const maxRetries = 5 - let retries = 0 - let tablesExist = false - - while (retries < maxRetries && !tablesExist) { - try { - const { data, error: checkTableError } = await supabase - .schema('temp') - .from(`likes_${suffix}`) - .select('*') - if (checkTableError) { - throw checkTableError - } - tablesExist = true - } catch (checkTableError) { - console.log( - `Attempt ${retries + 1}: Temporary tables not ready yet. Retrying...`, - ) - retries++ - if (retries < maxRetries) { - await new Promise((resolve) => setTimeout(resolve, 1000)) // Wait for 1 second before retrying - } else { - throw new Error( - `Failed to verify temporary tables after ${maxRetries} attempts: ${ - (checkTableError as Error).message - }`, - ) - } - } - } + await retryOperation(async () => { + const { data, error } = await supabase + .schema(getSchemaName()) + .rpc('create_temp_tables', { p_suffix: suffix }) + if (error) throw error + return data + }, 'Error creating temporary tables') + + // Verify tables are created + await retryOperation(async () => { + const { data, error } = await supabase + .schema('temp') + .from(`likes_${suffix}`) + .select('*') + if (error) throw error + return data + }, 'Failed to verify temporary tables') console.log('Inserting account data...') - const { error: accountError } = await supabase - .schema(getSchemaName()) - .rpc('insert_temp_account', { - p_account: archiveData.account[0].account, - p_suffix: suffix, - }) - if (accountError) - throw new Error(`Error inserting account data: ${accountError.message}`) + await retryOperation(async () => { + const { data, error } = await supabase + .schema(getSchemaName()) + .rpc('insert_temp_account', { + p_account: archiveData.account[0].account, + p_suffix: suffix, + }) + if (error) throw error + return data + }, 'Error inserting account data') console.log('Inserting archive upload data...') - const { data: archiveUploadId, error: archiveUploadError } = await supabase - .schema(getSchemaName()) - .rpc('insert_temp_archive_upload', { - p_account_id: accountId, - p_archive_at: latestTweetDate, - p_suffix: suffix, - }) - if (archiveUploadError) - throw new Error( - `Error inserting archive upload data: ${archiveUploadError.message}`, - ) + const { data: archiveUploadId } = await retryOperation(async () => { + const { data, error } = await supabase + .schema(getSchemaName()) + .rpc('insert_temp_archive_upload', { + p_account_id: accountId, + p_archive_at: latestTweetDate, + p_suffix: suffix, + }) + if (error) throw error + return data + }, 'Error inserting archive upload data') console.log('Inserting profile data...') - const { error: profileError } = await supabase - .schema(getSchemaName()) - .rpc('insert_temp_profiles', { - p_profile: archiveData.profile[0].profile, - p_account_id: accountId, - p_suffix: suffix, - }) - if (profileError) - throw new Error(`Error inserting profile data: ${profileError.message}`) + await retryOperation(async () => { + const { data, error } = await supabase + .schema(getSchemaName()) + .rpc('insert_temp_profiles', { + p_profile: archiveData.profile[0].profile, + p_account_id: accountId, + p_suffix: suffix, + }) + if (error) throw error + return data + }, 'Error inserting profile data') // Patch tweets with note tweets console.log('Patching tweets with note tweets...') @@ -464,15 +479,17 @@ export const processTwitterArchive = async ( percent: 50, }) const commitStartTime = Date.now() - const { error: commitError } = await supabase - .schema(getSchemaName()) - .rpc('commit_temp_data', { - p_suffix: suffix, - }) + await retryOperation(async () => { + const { data, error } = await supabase + .schema(getSchemaName()) + .rpc('commit_temp_data', { + p_suffix: suffix, + }) + if (error) throw error + return data + }, 'Error committing data') const commitEndTime = Date.now() console.log(`Commit processing time: ${commitEndTime - commitStartTime}ms`) - if (commitError) - throw new Error(`Error committing data: ${commitError.message}`) console.log('Twitter archive processing completed successfully.') progressCallback({ @@ -487,9 +504,13 @@ export const processTwitterArchive = async ( // Attempt to drop temporary tables try { console.log('Attempting to drop temporary tables...') - await supabase - .schema(getSchemaName()) - .rpc('drop_temp_tables', { p_suffix: suffix }) + await retryOperation(async () => { + const { data, error } = await supabase + .schema(getSchemaName()) + .rpc('drop_temp_tables', { p_suffix: suffix }) + if (error) throw error + return data + }, 'Error dropping temporary tables') console.log('Temporary tables dropped successfully.') } catch (dropError: any) { console.error('Error dropping temporary tables:', dropError) @@ -500,9 +521,13 @@ export const processTwitterArchive = async ( } try { console.log('Attempting to drop temporary tables...') - await supabase - .schema(getSchemaName()) - .rpc('drop_temp_tables', { p_suffix: suffix }) + await retryOperation(async () => { + const { data, error } = await supabase + .schema(getSchemaName()) + .rpc('drop_temp_tables', { p_suffix: suffix }) + if (error) throw error + return data + }, 'Error dropping temporary tables') console.log('Temporary tables dropped successfully.') } catch (dropError: any) { console.error('Error dropping temporary tables:', dropError)