Skip to content

Commit

Permalink
Fix Error Handling and Cleanup during Insert Bulk Process (#2887)
Browse files Browse the repository at this point in the history
This commit fixes an issue with the error handling and cleanup phase of the Insert Bulk Process.

1. For Error Handling there was a scenario where bulk_load_callback(0, 0, NULL, NULL) call for cleanup would flush the remaining rows during EndBulkCopy and could result in an error. To fix this, we move the transaction rollback logic from TSQL extension to TDS. We also improved it by using Savepoints in case of active transaction which is aligned with TSQL behaviour.
2. In this case, if we have a reset-connection after the Bulk Load TDS packet we werent cleaning up the Bulk Load state. To do so we reset the offset.
3. During Reset Connection TDS is not resetting any TSQL transaction semantic. To resolve this we introduce a wrapper of AbortOutOfAnyTransaction to reset NestedTranCount.
Issues Resolved
BABEL-5200, BABEL-5199, BABEL-5220

Authored-by: Kushaal Shroff [email protected]
Signed-off-by: Kushaal Shroff [email protected]
  • Loading branch information
KushaalShroff committed Aug 30, 2024
1 parent 84282f3 commit 36899b1
Show file tree
Hide file tree
Showing 13 changed files with 1,531 additions and 180 deletions.
382 changes: 226 additions & 156 deletions contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion contrib/babelfishpg_tds/src/backend/tds/tdscomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,16 @@ TdsReadNextPendingBcpRequest(StringInfo message)

if (TdsReadNextBuffer() == EOF)
return EOF;
Assert(TdsRecvMessageType == TDS_BULK_LOAD);

/*
* The driver could send an attention packet even in the middle of a
* large TDS request. In that case we should abort the entire request which
* has been read.
*/
if (TdsRecvMessageType != TDS_BULK_LOAD)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("Unexpected out of band packet while fetching a Bulk Load Request.")));


readBytes = TdsLeftInPacket;
Expand Down
3 changes: 2 additions & 1 deletion contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ ResetTDSConnection(void)
TdsErrorContext->err_text = "Resetting the TDS connection";

/* Make sure we've killed any active transaction */
AbortOutOfAnyTransaction();
pltsql_plugin_handler_ptr->pltsql_abort_any_transaction_callback();

/*
* Save the transaction isolation level that should be restored after
Expand All @@ -153,6 +153,7 @@ ResetTDSConnection(void)
TdsProtocolInit();
TdsResetCache();
TdsResponseReset();
TdsResetBcpOffset();
SetConfigOption("default_transaction_isolation", isolationOld,
PGC_BACKEND, PGC_S_CLIENT);

Expand Down
3 changes: 3 additions & 0 deletions contrib/babelfishpg_tds/src/include/tds_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,4 +367,7 @@ extern int tds_parse_xml_decl(const xmlChar *str, size_t *lenp,
extern char *TdsEncodingConversion(const char *s, int len, pg_enc src_encoding, pg_enc dest_encoding, int *encodedByteLen);
extern coll_info_t TdsLookupCollationTableCallback(Oid oid);

/* Functions in tdsbulkload.c */
extern void TdsResetBcpOffset(void);

#endif /* TDS_INT_H */
17 changes: 1 addition & 16 deletions contrib/babelfishpg_tsql/src/pl_exec-2.c
Original file line number Diff line number Diff line change
Expand Up @@ -3014,6 +3014,7 @@ execute_bulk_load_insert(int ncol, int nrow,
pfree(cstmt->relation);
}
pfree(cstmt);
cstmt = NULL;
}

/* Reset Insert-Bulk Options. */
Expand Down Expand Up @@ -3047,27 +3048,11 @@ execute_bulk_load_insert(int ncol, int nrow,
* In an error condition, the caller calls the function again to do
* the cleanup.
*/
MemoryContext oldcontext;

/* Cleanup cstate. */
EndBulkCopy(cstmt->cstate, true);

if (ActiveSnapshotSet() && GetActiveSnapshot() == snap)
PopActiveSnapshot();
oldcontext = CurrentMemoryContext;

/*
* If a transaction block is already in progress then abort it, else
* rollback entire transaction.
*/
if (!IsTransactionBlockActive())
{
AbortCurrentTransaction();
StartTransactionCommand();
}
else
pltsql_rollback_txn();
MemoryContextSwitchTo(oldcontext);

/* Reset Insert-Bulk Options. */
insert_bulk_keep_nulls = prev_insert_bulk_keep_nulls;
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/pl_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -3665,6 +3665,8 @@ _PG_init(void)
(*pltsql_protocol_plugin_ptr)->sp_unprepare_callback = &sp_unprepare;
(*pltsql_protocol_plugin_ptr)->reset_session_properties = &reset_session_properties;
(*pltsql_protocol_plugin_ptr)->bulk_load_callback = &execute_bulk_load_insert;
(*pltsql_protocol_plugin_ptr)->pltsql_rollback_txn_callback = &pltsql_rollback_txn;
(*pltsql_protocol_plugin_ptr)->pltsql_abort_any_transaction_callback = &pltsql_abort_any_transaction;
(*pltsql_protocol_plugin_ptr)->pltsql_declare_var_callback = &pltsql_declare_variable;
(*pltsql_protocol_plugin_ptr)->pltsql_read_out_param_callback = &pltsql_read_composite_out_param;
(*pltsql_protocol_plugin_ptr)->sqlvariant_set_metadata = common_utility_plugin_ptr->TdsSetMetaData;
Expand Down
5 changes: 5 additions & 0 deletions contrib/babelfishpg_tsql/src/pltsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,10 @@ typedef struct PLtsql_protocol_plugin
uint64 (*bulk_load_callback) (int ncol, int nrow,
Datum *Values, bool *Nulls);

void (*pltsql_rollback_txn_callback) (void);

void (*pltsql_abort_any_transaction_callback) (void);

int (*pltsql_get_generic_typmod) (Oid funcid, int nargs, Oid declared_oid);

const char *(*pltsql_get_logical_schema_name) (const char *physical_schema_name, bool missingOk);
Expand Down Expand Up @@ -1982,6 +1986,7 @@ extern void PLTsqlRollbackTransaction(char *txnName, QueryCompletion *qc, bool c
extern void pltsql_start_txn(void);
extern void pltsql_commit_txn(void);
extern void pltsql_rollback_txn(void);
extern void pltsql_abort_any_transaction(void);
extern bool pltsql_get_errdata(int *tsql_error_code, int *tsql_error_severity, int *tsql_error_state);
extern void pltsql_eval_txn_data(PLtsql_execstate *estate, PLtsql_stmt_execsql *stmt, CachedPlanSource *cachedPlanSource);
extern bool is_sysname_column(ColumnDef *coldef);
Expand Down
7 changes: 7 additions & 0 deletions contrib/babelfishpg_tsql/src/pltsql_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,13 @@ pltsql_rollback_txn(void)
StartTransactionCommand();
}

void
pltsql_abort_any_transaction(void)
{
NestedTranCount = 0;
AbortOutOfAnyTransaction();
}

bool
pltsql_get_errdata(int *tsql_error_code, int *tsql_error_severity, int *tsql_error_state)
{
Expand Down
Loading

0 comments on commit 36899b1

Please sign in to comment.