Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Error Handling and Cleanup during Insert Bulk Process #2887

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ runs:
run: |
cd test/dotnet
dotnet build
VSTEST_DISABLE_STANDARD_OUTPUT_CAPTURING=1 \
babel_URL=localhost \
babel_port=1433 \
babel_databaseName=master \
Expand Down
221 changes: 141 additions & 80 deletions contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "postgres.h"

#include "access/xact.h"
#include "utils/guc.h"
#include "lib/stringinfo.h"
#include "pgstat.h"
Expand Down Expand Up @@ -102,6 +103,12 @@ do \
FetchMoreBcpPlpData(message, dataLen); \
} while(0)

void
TdsResetBcpOffset()
{
offset = 0;
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
}

static void
FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData)
{
Expand Down Expand Up @@ -899,6 +906,60 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message)
return message;
}

static void
CleanupBCPDuringError(bool internal_sp_started,
volatile int before_subtxn_id,
volatile int before_lxid,
ResourceOwner oldowner,
MemoryContext oldcontext)
{
int ret = 0;

HOLD_CANCEL_INTERRUPTS();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to its parent function add a comment

hold interrupts during cleanup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What difference does it make though? For both cases of cleanup we need to hold interrupts so it will merely avoid redundancy in code lines

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup function doesn't care whether you're pausing the interrupts or not. It's rather from where you're calling the function, it matters. In this case, since you're calling in catch block, you want to hold the interrupts.
Yeah, the comment part looks redundant. Please ignore.

HOLD_INTERRUPTS();

/*
* Discard remaining TDS_BULK_LOAD packets only if End of
* Message has not been reached for the current request.
* Otherwise we have no TDS_BULK_LOAD packets left for the
* current request that need to be discarded.
*/
if (!TdsGetRecvPacketEomStatus())
ret = TdsDiscardAllPendingBcpRequest();
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved

RESUME_CANCEL_INTERRUPTS();

if (ret < 0)
TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request";

if (internal_sp_started && before_lxid == MyProc->lxid && before_subtxn_id == GetCurrentSubTransactionId())
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
{
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
elog(LOG, "TSQL TXN PG semantics : Rollback internal savepoint");
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
RollbackAndReleaseCurrentSubTransaction();
CurrentResourceOwner = oldowner;
}
else if (!IsTransactionBlockActive())
{
AbortCurrentTransaction();
StartTransactionCommand();
}
else
{
/*
* In the case of an error and transaction is active but the earlier savepoint
* did not match, then we shall rollback the current transaction and let the
* the actual error be relayed to the customer.
*/
elog(LOG, "The current transaction is rolled back since it "
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
"was in inconsistent state during Bulk Copy");
pltsql_plugin_handler_ptr->pltsql_rollback_txn_callback();
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
}
MemoryContextSwitchTo(oldcontext);
RESUME_INTERRUPTS();
TdsResetBcpOffset();
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
}

/*
* ProcessBCPRequest - Processes the request and calls the bulk_load_callback
* for futher execution.
Expand All @@ -910,11 +971,31 @@ ProcessBCPRequest(TDSRequest request)
uint64 retValue = 0;
TDSRequestBulkLoad req = (TDSRequestBulkLoad) request;
StringInfo message = req->firstMessage;
volatile bool internal_sp_started = false;
volatile int before_subtxn_id = 0;
volatile int before_lxid = MyProc->lxid;
ResourceOwner oldowner = CurrentResourceOwner;
MemoryContext oldcontext = CurrentMemoryContext;

set_ps_display("active");
TdsErrorContext->err_text = "Processing Bulk Load Request";
pgstat_report_activity(STATE_RUNNING, "Processing Bulk Load Request");

/*
* If a transaction is active then start a Savepoint to rollback
* later in case of error.
*/
if (IsTransactionBlockActive())
{
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
elog(LOG, "TSQL TXN Start internal savepoint");
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
BeginInternalSubTransaction(NULL);
internal_sp_started = true;
before_subtxn_id = GetCurrentSubTransactionId();
}
else
internal_sp_started = false;

while (1)
{
int nargs = 0;
Expand All @@ -929,24 +1010,8 @@ ProcessBCPRequest(TDSRequest request)
}
PG_CATCH();
{
int ret = 0;

HOLD_CANCEL_INTERRUPTS();

/*
* Discard remaining TDS_BULK_LOAD packets only if End of Message
* has not been reached for the current request. Otherwise we have
* no TDS_BULK_LOAD packets left for the current request that need
* to be discarded.
*/
if (!TdsGetRecvPacketEomStatus())
ret = TdsDiscardAllPendingBcpRequest();

RESUME_CANCEL_INTERRUPTS();

if (ret < 0)
TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request";

CleanupBCPDuringError(internal_sp_started, before_subtxn_id,
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
before_lxid, oldowner, oldcontext);
PG_RE_THROW();
}
PG_END_TRY();
Expand All @@ -955,79 +1020,75 @@ ProcessBCPRequest(TDSRequest request)
* If the row-count is 0 then there are no rows left to be inserted.
* We should begin with cleanup.
*/
if (req->rowCount == 0)
if (req->rowCount > 0)
{
/* Using Same callback function to do the clean-up. */
pltsql_plugin_handler_ptr->bulk_load_callback(0, 0, NULL, NULL);
break;
}
nargs = req->colCount * req->rowCount;
values = palloc0(nargs * sizeof(Datum));
nulls = palloc0(nargs * sizeof(bool));

nargs = req->colCount * req->rowCount;
values = palloc0(nargs * sizeof(Datum));
nulls = palloc0(nargs * sizeof(bool));

/* Flaten and create a 1-D array of Value & Datums */
foreach(lc, req->rowData)
{
BulkLoadRowData *row = (BulkLoadRowData *) lfirst(lc);

for (int currentColumn = 0; currentColumn < req->colCount; currentColumn++)
/* Flaten and create a 1-D array of Value & Datums */
foreach(lc, req->rowData)
{
if (row->isNull[currentColumn]) /* null */
nulls[count] = row->isNull[currentColumn];
else
values[count] = row->columnValues[currentColumn];
count++;
BulkLoadRowData *row = (BulkLoadRowData *) lfirst(lc);

for (int currentColumn = 0; currentColumn < req->colCount; currentColumn++)
{
if (row->isNull[currentColumn]) /* null */
nulls[count] = row->isNull[currentColumn];
else
values[count] = row->columnValues[currentColumn];
count++;
}
}
}

if (req->rowData) /* If any row exists then do an insert. */
PG_TRY();
{
retValue += pltsql_plugin_handler_ptr->bulk_load_callback(req->rowCount ? req->colCount : 0,
req->rowCount, values, nulls);
}
PG_CATCH();
{
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should hold interrupts before doing any logging in catch blocks.

ereport(LOG,
(errmsg("Bulk Load Request. Number of Rows: %d and Number of columns: %d.",
req->rowCount, req->colCount),
errhidestmt(true)));

CleanupBCPDuringError(internal_sp_started, before_subtxn_id,
before_lxid, oldowner, oldcontext);
PG_RE_THROW();
}
PG_END_TRY();
/* Free the List of Rows. */
if (req->rowData)
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
{
PG_TRY();
{
retValue += pltsql_plugin_handler_ptr->bulk_load_callback(req->colCount,
req->rowCount, values, nulls);
}
PG_CATCH();
{
int ret = 0;

HOLD_CANCEL_INTERRUPTS();
HOLD_INTERRUPTS();

/*
* Discard remaining TDS_BULK_LOAD packets only if End of
* Message has not been reached for the current request.
* Otherwise we have no TDS_BULK_LOAD packets left for the
* current request that need to be discarded.
*/
if (!TdsGetRecvPacketEomStatus())
ret = TdsDiscardAllPendingBcpRequest();

RESUME_CANCEL_INTERRUPTS();

if (ret < 0)
TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request";

if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("Bulk Load Request. Number of Rows: %d and Number of columns: %d.",
req->rowCount, req->colCount),
errhidestmt(true)));

RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
/* Free the List of Rows. */
list_free_deep(req->rowData);
req->rowData = NIL;
if (values)
pfree(values);
if (nulls)
pfree(nulls);
}
/* If there we no rows then we have reached the end of the loop. */
else
break;

if (values)
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
pfree(values);
if (nulls)
pfree(nulls);
}

/* If we Started an internal savepoint then release it. */
if (internal_sp_started && before_subtxn_id == GetCurrentSubTransactionId())
{
elog(DEBUG5, "TSQL TXN Release internal savepoint");
ReleaseCurrentSubTransaction();
CurrentResourceOwner = oldowner;
MemoryContextSwitchTo(oldcontext);
}
/* Unlikely case where Transaction is active but the savepoints do not match. */
else if (internal_sp_started && before_subtxn_id != GetCurrentSubTransactionId())
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("The current Transaction was found to be in inconsisten state")));

/*
* Send Done Token if rows processed is a positive number. Command type -
Expand Down
11 changes: 10 additions & 1 deletion contrib/babelfishpg_tds/src/backend/tds/tdscomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,16 @@ TdsReadNextPendingBcpRequest(StringInfo message)

if (TdsReadNextBuffer() == EOF)
return EOF;
Assert(TdsRecvMessageType == TDS_BULK_LOAD);

/*
* The driver could send an attention packet even in the middle of a
* large TDS request. In that case we should abort the entire request which
* has been read.
*/
if (TdsRecvMessageType != TDS_BULK_LOAD)
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("Unexpected out of band packet while fetching a Bulk Load Request.")));


readBytes = TdsLeftInPacket;
Expand Down
1 change: 1 addition & 0 deletions contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ ResetTDSConnection(void)
TdsProtocolInit();
TdsResetCache();
TdsResponseReset();
TdsResetBcpOffset();
SetConfigOption("default_transaction_isolation", isolationOld,
PGC_BACKEND, PGC_S_CLIENT);

Expand Down
3 changes: 3 additions & 0 deletions contrib/babelfishpg_tds/src/include/tds_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,4 +380,7 @@ extern coll_info_t TdsLookupCollationTableCallback(Oid oid);
extern Datum TdsBytePtrToDatum(StringInfo buf, int datatype, int scale);
extern Datum TdsDateTimeTypeToDatum(uint64 time, int32 date, int datatype, int scale);

/* Functions in tdsbulkload.c */
extern void TdsResetBcpOffset(void);

#endif /* TDS_INT_H */
16 changes: 0 additions & 16 deletions contrib/babelfishpg_tsql/src/pl_exec-2.c
Original file line number Diff line number Diff line change
Expand Up @@ -3632,27 +3632,11 @@ execute_bulk_load_insert(int ncol, int nrow,
* In an error condition, the caller calls the function again to do
* the cleanup.
*/
MemoryContext oldcontext;
KushaalShroff marked this conversation as resolved.
Show resolved Hide resolved

/* Cleanup cstate. */
EndBulkCopy(cstmt->cstate, true);

if (ActiveSnapshotSet() && GetActiveSnapshot() == snap)
PopActiveSnapshot();
oldcontext = CurrentMemoryContext;

/*
* If a transaction block is already in progress then abort it, else
* rollback entire transaction.
*/
if (!IsTransactionBlockActive())
{
AbortCurrentTransaction();
StartTransactionCommand();
}
else
pltsql_rollback_txn();
MemoryContextSwitchTo(oldcontext);

/* Reset Insert-Bulk Options. */
insert_bulk_keep_nulls = prev_insert_bulk_keep_nulls;
Expand Down
1 change: 1 addition & 0 deletions contrib/babelfishpg_tsql/src/pl_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -4675,6 +4675,7 @@ _PG_init(void)
(*pltsql_protocol_plugin_ptr)->sp_unprepare_callback = &sp_unprepare;
(*pltsql_protocol_plugin_ptr)->reset_session_properties = &reset_session_properties;
(*pltsql_protocol_plugin_ptr)->bulk_load_callback = &execute_bulk_load_insert;
(*pltsql_protocol_plugin_ptr)->pltsql_rollback_txn_callback = &pltsql_rollback_txn;
(*pltsql_protocol_plugin_ptr)->pltsql_declare_var_callback = &pltsql_declare_variable;
(*pltsql_protocol_plugin_ptr)->pltsql_read_out_param_callback = &pltsql_read_composite_out_param;
(*pltsql_protocol_plugin_ptr)->sqlvariant_set_metadata = common_utility_plugin_ptr->TdsSetMetaData;
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/pltsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -1768,6 +1768,8 @@ typedef struct PLtsql_protocol_plugin
uint64 (*bulk_load_callback) (int ncol, int nrow,
Datum *Values, bool *Nulls);

void (*pltsql_rollback_txn_callback) (void);

int (*pltsql_get_generic_typmod) (Oid funcid, int nargs, Oid declared_oid);

const char *(*pltsql_get_logical_schema_name) (const char *physical_schema_name, bool missingOk);
Expand Down
Loading
Loading