From a054fd786db677d62f052fd472e12f113fd0a6d2 Mon Sep 17 00:00:00 2001 From: Kushaal Shroff <51415286+KushaalShroff@users.noreply.github.com> Date: Fri, 30 Aug 2024 15:24:37 +0530 Subject: [PATCH] Fix Error Handling and Cleanup during Insert Bulk Process (#2887) This commit fixes an issue with the error handling and cleanup phase of the Insert Bulk Process. 1. For Error Handling there was a scenario where bulk_load_callback(0, 0, NULL, NULL) call for cleanup would flush the remaining rows during EndBulkCopy and could result in an error. To fix this, we move the transaction rollback logic from TSQL extension to TDS. We also improved it by using Savepoints in case of active transaction which is aligned with TSQL behaviour. 2. In this case, if we have a reset-connection after the Bulk Load TDS packet we werent cleaning up the Bulk Load state. To do so we reset the offset. 3. During Reset Connection TDS is not resetting any TSQL transaction semantic. To resolve this we introduce a wrapper of AbortOutOfAnyTransaction to reset NestedTranCount. Issues Resolved BABEL-5200, BABEL-5199, BABEL-5220 Authored-by: Kushaal Shroff kushaal@amazon.com Signed-off-by: Kushaal Shroff kushaal@amazon.com --- .../install-and-run-dotnet/action.yml | 1 + .../src/backend/tds/tdsbulkload.c | 382 +++++----- .../babelfishpg_tds/src/backend/tds/tdscomm.c | 11 +- .../src/backend/tds/tdsprotocol.c | 3 +- contrib/babelfishpg_tds/src/include/tds_int.h | 3 + contrib/babelfishpg_tsql/src/pl_exec-2.c | 17 +- contrib/babelfishpg_tsql/src/pl_handler.c | 2 + contrib/babelfishpg_tsql/src/pltsql.h | 5 + contrib/babelfishpg_tsql/src/pltsql_utils.c | 7 + .../ExpectedOutput/insertBulkErrors.out | 657 ++++++++++++++++++ .../input/InsertBulk/insertBulkErrors.txt | 558 +++++++++++++++ test/dotnet/src/BatchRun.cs | 11 +- test/dotnet/utils/ConfigSetup.cs | 9 +- test/dotnet/utils/TestUtils.cs | 46 +- 14 files changed, 1532 insertions(+), 180 deletions(-) create mode 100644 test/dotnet/ExpectedOutput/insertBulkErrors.out create mode 100644 test/dotnet/input/InsertBulk/insertBulkErrors.txt diff --git a/.github/composite-actions/install-and-run-dotnet/action.yml b/.github/composite-actions/install-and-run-dotnet/action.yml index bdd3eb366a..bfd0aad508 100644 --- a/.github/composite-actions/install-and-run-dotnet/action.yml +++ b/.github/composite-actions/install-and-run-dotnet/action.yml @@ -34,6 +34,7 @@ runs: run: | cd test/dotnet dotnet build + VSTEST_DISABLE_STANDARD_OUTPUT_CAPTURING=1 \ babel_URL=localhost \ babel_port=1433 \ babel_databaseName=master \ diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c b/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c index d5080b13da..d95dfe630d 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "access/xact.h" #include "utils/guc.h" #include "lib/stringinfo.h" #include "pgstat.h" @@ -35,7 +36,7 @@ static void FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMe static void FetchMoreBcpPlpData(StringInfo *message, int dataLenToRead); static int ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request); static void FreePlpToken(ParameterToken token); -uint64_t offset = 0; +uint64_t volatile bcpOffset = 0; #define COLUMNMETADATA_HEADER_LEN sizeof(uint32_t) + sizeof(uint16) + 1 #define FIXED_LEN_TYPE_COLUMNMETADATA_LEN 1 @@ -79,7 +80,7 @@ do \ #define CheckMessageHasEnoughBytesToReadColMetadata(message, dataLen) \ do \ { \ - if ((*message)->len - offset < dataLen) \ + if ((*message)->len - bcpOffset < dataLen) \ FetchMoreBcpData(message, dataLen, false); \ } while(0) @@ -90,7 +91,7 @@ do \ #define CheckMessageHasEnoughBytesToReadRows(message, dataLen) \ do \ { \ - if ((*message)->len - offset < dataLen) \ + if ((*message)->len - bcpOffset < dataLen) \ FetchMoreBcpData(message, dataLen, true); \ } while(0) @@ -98,10 +99,16 @@ do \ #define CheckPlpMessageHasEnoughBytesToRead(message, dataLen) \ do \ { \ - if ((*message)->len - offset < dataLen) \ + if ((*message)->len - bcpOffset < dataLen) \ FetchMoreBcpPlpData(message, dataLen); \ } while(0) +void +TdsResetBcpOffset() +{ + bcpOffset = 0; +} + static void FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData) { @@ -133,12 +140,12 @@ FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData) if (freeMessageData) { temp = makeStringInfo(); - appendBinaryStringInfo(temp, (*message)->data + offset, (*message)->len - offset); + appendBinaryStringInfo(temp, (*message)->data + bcpOffset, (*message)->len - bcpOffset); if ((*message)->data) pfree((*message)->data); pfree((*message)); - offset = 0; + bcpOffset = 0; } else temp = *message; @@ -146,7 +153,7 @@ FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData) /* * Keep fetching for additional packets until we have enough data to read. */ - while (dataLenToRead + offset > temp->len) + while (dataLenToRead + bcpOffset > temp->len) { /* * We should hold the interrupts until we read the next request frame. @@ -173,7 +180,7 @@ FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData) /* * Incase of PLP data we should not discard the previous packet since we - * first store the offset of the PLP Chunks first and then read the data later. + * first store the bcpOffset of the PLP Chunks first and then read the data later. */ static void FetchMoreBcpPlpData(StringInfo *message, int dataLenToRead) @@ -198,7 +205,7 @@ FetchMoreBcpPlpData(StringInfo *message, int dataLenToRead) /* * Keep fetching for additional packets until we have enough data to read. */ - while (dataLenToRead + offset > (*message)->len) + while (dataLenToRead + bcpOffset > (*message)->len) { /* * We should hold the interrupts until we read the next request frame. @@ -240,33 +247,34 @@ GetBulkLoadRequest(StringInfo message) request->rowData = NIL; request->reqType = TDS_REQUEST_BULK_LOAD; - if (unlikely((uint8_t) message->data[offset] != TDS_TOKEN_COLMETADATA)) + TdsResetBcpOffset(); + if (unlikely((uint8_t) message->data[bcpOffset] != TDS_TOKEN_COLMETADATA)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("The incoming tabular data stream (TDS) Bulk Load Request (BulkLoadBCP) protocol stream is incorrect. " "unexpected token encountered processing the request."))); - offset++; + bcpOffset++; - memcpy(&colCount, &message->data[offset], sizeof(uint16)); + memcpy(&colCount, &message->data[bcpOffset], sizeof(uint16)); colmetadata = palloc0(colCount * sizeof(BulkLoadColMetaData)); request->colCount = colCount; request->colMetaData = colmetadata; - offset += sizeof(uint16); + bcpOffset += sizeof(uint16); for (int currentColumn = 0; currentColumn < colCount; currentColumn++) { CheckMessageHasEnoughBytesToReadColMetadata(&message, COLUMNMETADATA_HEADER_LEN); /* UserType */ - memcpy(&colmetadata[currentColumn].userType, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&colmetadata[currentColumn].userType, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); /* Flags */ - memcpy(&colmetadata[currentColumn].flags, &message->data[offset], sizeof(uint16)); - offset += sizeof(uint16); + memcpy(&colmetadata[currentColumn].flags, &message->data[bcpOffset], sizeof(uint16)); + bcpOffset += sizeof(uint16); /* TYPE_INFO */ - colmetadata[currentColumn].columnTdsType = message->data[offset++]; + colmetadata[currentColumn].columnTdsType = message->data[bcpOffset++]; /* Datatype specific Column Metadata. */ switch (colmetadata[currentColumn].columnTdsType) @@ -278,14 +286,14 @@ GetBulkLoadRequest(StringInfo message) case TDS_TYPE_DATETIMEN: case TDS_TYPE_UNIQUEIDENTIFIER: CheckMessageHasEnoughBytesToReadColMetadata(&message, FIXED_LEN_TYPE_COLUMNMETADATA_LEN); - colmetadata[currentColumn].maxLen = message->data[offset++]; + colmetadata[currentColumn].maxLen = message->data[bcpOffset++]; break; case TDS_TYPE_DECIMALN: case TDS_TYPE_NUMERICN: CheckMessageHasEnoughBytesToReadColMetadata(&message, NUMERIC_COLUMNMETADATA_LEN); - colmetadata[currentColumn].maxLen = message->data[offset++]; - colmetadata[currentColumn].precision = message->data[offset++]; - colmetadata[currentColumn].scale = message->data[offset++]; + colmetadata[currentColumn].maxLen = message->data[bcpOffset++]; + colmetadata[currentColumn].precision = message->data[bcpOffset++]; + colmetadata[currentColumn].scale = message->data[bcpOffset++]; break; case TDS_TYPE_CHAR: case TDS_TYPE_VARCHAR: @@ -293,12 +301,12 @@ GetBulkLoadRequest(StringInfo message) case TDS_TYPE_NVARCHAR: { CheckMessageHasEnoughBytesToReadColMetadata(&message, STRING_COLUMNMETADATA_LEN); - memcpy(&colmetadata[currentColumn].maxLen, &message->data[offset], sizeof(uint16)); - offset += sizeof(uint16); + memcpy(&colmetadata[currentColumn].maxLen, &message->data[bcpOffset], sizeof(uint16)); + bcpOffset += sizeof(uint16); - memcpy(&collation, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); - colmetadata[currentColumn].sortId = message->data[offset++]; + memcpy(&collation, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); + colmetadata[currentColumn].sortId = message->data[bcpOffset++]; colmetadata[currentColumn].encoding = TdsGetEncoding(collation); } break; @@ -309,53 +317,53 @@ GetBulkLoadRequest(StringInfo message) uint16_t tableLen = 0; CheckMessageHasEnoughBytesToReadColMetadata(&message, sizeof(uint32_t)); - memcpy(&colmetadata[currentColumn].maxLen, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&colmetadata[currentColumn].maxLen, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); /* Read collation(LICD) and sort-id for TEXT and NTEXT. */ if (colmetadata[currentColumn].columnTdsType == TDS_TYPE_TEXT || colmetadata[currentColumn].columnTdsType == TDS_TYPE_NTEXT) { CheckMessageHasEnoughBytesToReadColMetadata(&message, sizeof(uint32_t) + 1); - memcpy(&collation, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); - colmetadata[currentColumn].sortId = message->data[offset++]; + memcpy(&collation, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); + colmetadata[currentColumn].sortId = message->data[bcpOffset++]; colmetadata[currentColumn].encoding = TdsGetEncoding(collation); } CheckMessageHasEnoughBytesToReadColMetadata(&message, sizeof(uint16_t)); - memcpy(&tableLen, &message->data[offset], sizeof(uint16_t)); - offset += sizeof(uint16_t); + memcpy(&tableLen, &message->data[bcpOffset], sizeof(uint16_t)); + bcpOffset += sizeof(uint16_t); /* Skip table name for now. */ CheckMessageHasEnoughBytesToReadColMetadata(&message, tableLen * 2); - offset += tableLen * 2; + bcpOffset += tableLen * 2; } break; case TDS_TYPE_XML: { CheckMessageHasEnoughBytesToReadColMetadata(&message, 1); - colmetadata[currentColumn].maxLen = message->data[offset++]; + colmetadata[currentColumn].maxLen = message->data[bcpOffset++]; } break; case TDS_TYPE_DATETIME2: { CheckMessageHasEnoughBytesToReadColMetadata(&message, FIXED_LEN_TYPE_COLUMNMETADATA_LEN); - colmetadata[currentColumn].scale = message->data[offset++]; + colmetadata[currentColumn].scale = message->data[bcpOffset++]; colmetadata[currentColumn].maxLen = 8; } break; case TDS_TYPE_TIME: { CheckMessageHasEnoughBytesToReadColMetadata(&message, FIXED_LEN_TYPE_COLUMNMETADATA_LEN); - colmetadata[currentColumn].scale = message->data[offset++]; + colmetadata[currentColumn].scale = message->data[bcpOffset++]; colmetadata[currentColumn].maxLen = 5; } break; case TDS_TYPE_DATETIMEOFFSET: { CheckMessageHasEnoughBytesToReadColMetadata(&message, FIXED_LEN_TYPE_COLUMNMETADATA_LEN); - colmetadata[currentColumn].scale = message->data[offset++]; + colmetadata[currentColumn].scale = message->data[bcpOffset++]; colmetadata[currentColumn].maxLen = 10; } break; @@ -365,8 +373,8 @@ GetBulkLoadRequest(StringInfo message) uint16 plp; CheckMessageHasEnoughBytesToReadColMetadata(&message, BINARY_COLUMNMETADATA_LEN); - memcpy(&plp, &message->data[offset], sizeof(uint16)); - offset += sizeof(uint16); + memcpy(&plp, &message->data[bcpOffset], sizeof(uint16)); + bcpOffset += sizeof(uint16); colmetadata[currentColumn].maxLen = plp; } break; @@ -375,8 +383,8 @@ GetBulkLoadRequest(StringInfo message) break; case TDS_TYPE_SQLVARIANT: CheckMessageHasEnoughBytesToReadColMetadata(&message, SQL_VARIANT_COLUMNMETADATA_LEN); - memcpy(&colmetadata[currentColumn].maxLen, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&colmetadata[currentColumn].maxLen, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); break; /* @@ -471,15 +479,15 @@ GetBulkLoadRequest(StringInfo message) /* Column Name */ CheckMessageHasEnoughBytesToReadColMetadata(&message, sizeof(uint8_t)); - memcpy(&colmetadata[currentColumn].colNameLen, &message->data[offset++], sizeof(uint8_t)); + memcpy(&colmetadata[currentColumn].colNameLen, &message->data[bcpOffset++], sizeof(uint8_t)); CheckMessageHasEnoughBytesToReadColMetadata(&message, colmetadata[currentColumn].colNameLen * 2); colmetadata[currentColumn].colName = (char *) palloc0(colmetadata[currentColumn].colNameLen * sizeof(char) * 2 + 1); - memcpy(colmetadata[currentColumn].colName, &message->data[offset], + memcpy(colmetadata[currentColumn].colName, &message->data[bcpOffset], colmetadata[currentColumn].colNameLen * 2); colmetadata[currentColumn].colName[colmetadata[currentColumn].colNameLen * 2] = '\0'; - offset += colmetadata[currentColumn].colNameLen * 2; + bcpOffset += colmetadata[currentColumn].colNameLen * 2; } request->firstMessage = makeStringInfo(); appendBinaryStringInfo(request->firstMessage, message->data, message->len); @@ -506,7 +514,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, 1); /* Loop over each row. */ - while ((uint8_t) message->data[offset] == TDS_TOKEN_ROW + while ((uint8_t) message->data[bcpOffset] == TDS_TOKEN_ROW && request->currentBatchSize < pltsql_plugin_handler_ptr->get_insert_bulk_kilobytes_per_batch() * 1024 && request->rowCount < pltsql_plugin_handler_ptr->get_insert_bulk_rows_per_batch()) { @@ -518,7 +526,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) rowData->columnValues = palloc0(request->colCount * sizeof(Datum)); rowData->isNull = palloc0(request->colCount * sizeof(bool)); - offset++; + bcpOffset++; request->currentBatchSize++; while (i != request->colCount) /* Loop over each column. */ @@ -544,7 +552,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) else { CheckMessageHasEnoughBytesToReadRows(&message, 1); - len = message->data[offset++]; + len = message->data[bcpOffset++]; request->currentBatchSize++; if (len == 0) /* null */ @@ -559,7 +567,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; @@ -606,7 +614,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) break; } - offset += len; + bcpOffset += len; request->currentBatchSize += len; } break; @@ -623,7 +631,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, 1); - len = message->data[offset++]; + len = message->data[bcpOffset++]; request->currentBatchSize++; if (len == 0) /* null */ { @@ -637,7 +645,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; @@ -648,7 +656,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) */ rowData->columnValues[i] = TdsTypeNumericToDatum(temp, colmetadata[i].scale); - offset += len; + bcpOffset += len; request->currentBatchSize += len; } break; @@ -663,8 +671,8 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) if (colmetadata[i].maxLen != 0xffff) { CheckMessageHasEnoughBytesToReadRows(&message, sizeof(short)); - memcpy(&len, &message->data[offset], sizeof(short)); - offset += sizeof(short); + memcpy(&len, &message->data[bcpOffset], sizeof(short)); + bcpOffset += sizeof(short); request->currentBatchSize += sizeof(short); if (len != 0xffff) { @@ -673,12 +681,12 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; - offset += len; + bcpOffset += len; request->currentBatchSize += len; } else /* null */ @@ -750,7 +758,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) * Ignore the Data Text Ptr since its currently of no * use. */ - dataTextPtrLen = message->data[offset++]; + dataTextPtrLen = message->data[bcpOffset++]; request->currentBatchSize++; if (dataTextPtrLen == 0) /* null */ { @@ -761,14 +769,14 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, dataTextPtrLen + 8 + sizeof(uint32_t)); - offset += dataTextPtrLen; + bcpOffset += dataTextPtrLen; request->currentBatchSize += dataTextPtrLen; - offset += 8; /* TODO: Ignored the Data Text + bcpOffset += 8; /* TODO: Ignored the Data Text * TimeStamp for now. */ request->currentBatchSize += 8; - memcpy(&len, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&len, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); request->currentBatchSize += sizeof(uint32_t); if (len == 0) /* null */ { @@ -782,7 +790,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; @@ -804,7 +812,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) break; } - offset += len; + bcpOffset += len; request->currentBatchSize += len; } break; @@ -843,8 +851,8 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) { CheckMessageHasEnoughBytesToReadRows(&message, sizeof(uint32_t)); - memcpy(&len, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&len, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); request->currentBatchSize += sizeof(uint32_t); if (len == 0) /* null */ @@ -859,7 +867,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; @@ -870,7 +878,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) */ rowData->columnValues[i] = TdsTypeSqlVariantToDatum(temp); - offset += len; + bcpOffset += len; request->currentBatchSize += len; } break; @@ -888,17 +896,75 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, 1); if (request->rowCount < pltsql_plugin_handler_ptr->get_insert_bulk_rows_per_batch() && request->currentBatchSize < pltsql_plugin_handler_ptr->get_insert_bulk_kilobytes_per_batch() * 1024 - && (uint8_t) message->data[offset] != TDS_TOKEN_DONE) + && (uint8_t) message->data[bcpOffset] != TDS_TOKEN_DONE) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("The incoming tabular data stream (TDS) Bulk Load Request (BulkLoadBCP) protocol stream is incorrect. " "Row %d, unexpected token encountered processing the request. %d", - request->rowCount, (uint8_t) message->data[offset]))); + request->rowCount, (uint8_t) message->data[bcpOffset]))); pfree(temp); return message; } +static void +CleanupBCPDuringError(bool internal_sp_started, + volatile int before_subtxn_id, + volatile int before_lxid, + ResourceOwner oldowner, + MemoryContext oldcontext) +{ + int ret = 0; + + /* Reset BCP bcpOffset. */ + TdsResetBcpOffset(); + + HOLD_INTERRUPTS(); + + /* + * Discard remaining TDS_BULK_LOAD packets only if End of + * Message has not been reached for the current request. + * Otherwise we have no TDS_BULK_LOAD packets left for the + * current request that need to be discarded. + */ + if (!TdsGetRecvPacketEomStatus()) + { + HOLD_CANCEL_INTERRUPTS(); + ret = TdsDiscardAllPendingBcpRequest(); + RESUME_CANCEL_INTERRUPTS(); + } + + if (ret < 0) + TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request"; + + if (internal_sp_started && before_lxid == MyProc->lxid && before_subtxn_id == GetCurrentSubTransactionId()) + { + if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) + elog(LOG, "TSQL TXN PG semantics : Rollback internal savepoint"); + RollbackAndReleaseCurrentSubTransaction(); + CurrentResourceOwner = oldowner; + } + else if (!IsTransactionBlockActive()) + { + AbortCurrentTransaction(); + StartTransactionCommand(); + } + else + { + /* + * In the case of an error and transaction is active but the earlier savepoint + * did not match, then we shall rollback the current transaction and let the + * the actual error be relayed to the customer. + */ + elog(LOG, "The current transaction is rolled back since it " + "was in inconsistent state during Bulk Copy"); + pltsql_plugin_handler_ptr->pltsql_rollback_txn_callback(); + } + + MemoryContextSwitchTo(oldcontext); + RESUME_INTERRUPTS(); +} + /* * ProcessBCPRequest - Processes the request and calls the bulk_load_callback * for futher execution. @@ -910,12 +976,33 @@ ProcessBCPRequest(TDSRequest request) uint64 retValue = 0; TDSRequestBulkLoad req = (TDSRequestBulkLoad) request; StringInfo message = req->firstMessage; + volatile bool internal_sp_started = false; + volatile int before_subtxn_id = 0; + volatile int before_lxid = MyProc->lxid; + ResourceOwner oldowner = CurrentResourceOwner; + MemoryContext oldcontext = CurrentMemoryContext; + bool endOfMessage = false; set_ps_display("active"); TdsErrorContext->err_text = "Processing Bulk Load Request"; pgstat_report_activity(STATE_RUNNING, "Processing Bulk Load Request"); - while (1) + /* + * If a transaction is active then start a Savepoint to rollback + * later in case of error. + */ + if (IsTransactionBlockActive()) + { + if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) + elog(LOG, "TSQL TXN Start internal savepoint"); + BeginInternalSubTransaction(NULL); + internal_sp_started = true; + before_subtxn_id = GetCurrentSubTransactionId(); + } + else + internal_sp_started = false; + + while (!endOfMessage) { int nargs = 0; Datum *values = NULL; @@ -929,24 +1016,8 @@ ProcessBCPRequest(TDSRequest request) } PG_CATCH(); { - int ret = 0; - - HOLD_CANCEL_INTERRUPTS(); - - /* - * Discard remaining TDS_BULK_LOAD packets only if End of Message - * has not been reached for the current request. Otherwise we have - * no TDS_BULK_LOAD packets left for the current request that need - * to be discarded. - */ - if (!TdsGetRecvPacketEomStatus()) - ret = TdsDiscardAllPendingBcpRequest(); - - RESUME_CANCEL_INTERRUPTS(); - - if (ret < 0) - TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request"; - + CleanupBCPDuringError(internal_sp_started, before_subtxn_id, + before_lxid, oldowner, oldcontext); PG_RE_THROW(); } PG_END_TRY(); @@ -955,79 +1026,79 @@ ProcessBCPRequest(TDSRequest request) * If the row-count is 0 then there are no rows left to be inserted. * We should begin with cleanup. */ - if (req->rowCount == 0) + if (req->rowCount > 0) { - /* Using Same callback function to do the clean-up. */ - pltsql_plugin_handler_ptr->bulk_load_callback(0, 0, NULL, NULL); - break; - } + nargs = req->colCount * req->rowCount; + values = palloc0(nargs * sizeof(Datum)); + nulls = palloc0(nargs * sizeof(bool)); - nargs = req->colCount * req->rowCount; - values = palloc0(nargs * sizeof(Datum)); - nulls = palloc0(nargs * sizeof(bool)); - - /* Flaten and create a 1-D array of Value & Datums */ - foreach(lc, req->rowData) - { - BulkLoadRowData *row = (BulkLoadRowData *) lfirst(lc); - - for (int currentColumn = 0; currentColumn < req->colCount; currentColumn++) + /* Flaten and create a 1-D array of Value & Datums */ + foreach(lc, req->rowData) { - if (row->isNull[currentColumn]) /* null */ - nulls[count] = row->isNull[currentColumn]; - else - values[count] = row->columnValues[currentColumn]; - count++; + BulkLoadRowData *row = (BulkLoadRowData *) lfirst(lc); + + for (int currentColumn = 0; currentColumn < req->colCount; currentColumn++) + { + if (row->isNull[currentColumn]) /* null */ + nulls[count] = row->isNull[currentColumn]; + else + values[count] = row->columnValues[currentColumn]; + count++; + } } } - if (req->rowData) /* If any row exists then do an insert. */ + PG_TRY(); { - PG_TRY(); + retValue += pltsql_plugin_handler_ptr->bulk_load_callback(req->rowCount ? req->colCount : 0, + req->rowCount, values, nulls); + + /* Free the List of Rows. */ + if (req->rowData) { - retValue += pltsql_plugin_handler_ptr->bulk_load_callback(req->colCount, - req->rowCount, values, nulls); + list_free_deep(req->rowData); + req->rowData = NIL; } - PG_CATCH(); - { - int ret = 0; + /* If there we no rows then we have reached the end of the loop. */ + else + endOfMessage = true; - HOLD_CANCEL_INTERRUPTS(); - HOLD_INTERRUPTS(); - - /* - * Discard remaining TDS_BULK_LOAD packets only if End of - * Message has not been reached for the current request. - * Otherwise we have no TDS_BULK_LOAD packets left for the - * current request that need to be discarded. - */ - if (!TdsGetRecvPacketEomStatus()) - ret = TdsDiscardAllPendingBcpRequest(); - - RESUME_CANCEL_INTERRUPTS(); - - if (ret < 0) - TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request"; - - if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) - ereport(LOG, - (errmsg("Bulk Load Request. Number of Rows: %d and Number of columns: %d.", - req->rowCount, req->colCount), - errhidestmt(true))); - - RESUME_INTERRUPTS(); - PG_RE_THROW(); - } - PG_END_TRY(); - /* Free the List of Rows. */ - list_free_deep(req->rowData); - req->rowData = NIL; if (values) pfree(values); if (nulls) pfree(nulls); } + PG_CATCH(); + { + if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) + ereport(LOG, + (errmsg("Bulk Load Request. Number of Rows: %d and Number of columns: %d.", + req->rowCount, req->colCount), + errhidestmt(true))); + + CleanupBCPDuringError(internal_sp_started, before_subtxn_id, + before_lxid, oldowner, oldcontext); + PG_RE_THROW(); + } + PG_END_TRY(); + } + /* Reset the offset at the end of the request. */ + TdsResetBcpOffset(); + + /* If we Started an internal savepoint then release it. */ + if (internal_sp_started && before_subtxn_id == GetCurrentSubTransactionId()) + { + elog(DEBUG5, "TSQL TXN Release internal savepoint"); + ReleaseCurrentSubTransaction(); + CurrentResourceOwner = oldowner; + MemoryContextSwitchTo(oldcontext); } + /* Unlikely case where Transaction is active but the savepoints do not match. */ + else if (unlikely(internal_sp_started && before_subtxn_id != GetCurrentSubTransactionId())) + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("The current Transaction was found to be in inconsisten state " + "during Bulk Copy"))); /* * Send Done Token if rows processed is a positive number. Command type - @@ -1054,7 +1125,6 @@ ProcessBCPRequest(TDSRequest request) pltsql_plugin_handler_ptr->stmt_needs_logging = false; error_context_stack = plerrcontext; } - offset = 0; } static int @@ -1066,8 +1136,8 @@ ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request) unsigned long lenCheck = 0; CheckPlpMessageHasEnoughBytesToRead(message, sizeof(plpTok)); - memcpy(&plpTok, &(*message)->data[offset], sizeof(plpTok)); - offset += sizeof(plpTok); + memcpy(&plpTok, &(*message)->data[bcpOffset], sizeof(plpTok)); + bcpOffset += sizeof(plpTok); request->currentBatchSize += sizeof(plpTok); temp->plp = NULL; @@ -1083,11 +1153,11 @@ ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request) uint32_t tempLen; CheckPlpMessageHasEnoughBytesToRead(message, sizeof(tempLen)); - if (offset + sizeof(tempLen) > (*message)->len) + if (bcpOffset + sizeof(tempLen) > (*message)->len) return STATUS_ERROR; - memcpy(&tempLen, &(*message)->data[offset], sizeof(tempLen)); - offset += sizeof(tempLen); + memcpy(&tempLen, &(*message)->data[bcpOffset], sizeof(tempLen)); + bcpOffset += sizeof(tempLen); request->currentBatchSize += sizeof(tempLen); /* PLP Terminator */ @@ -1096,7 +1166,7 @@ ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request) plpTemp = palloc0(sizeof(PlpData)); plpTemp->next = NULL; - plpTemp->offset = offset; + plpTemp->offset = bcpOffset; plpTemp->len = tempLen; if (plpPrev == NULL) { @@ -1110,10 +1180,10 @@ ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request) } CheckPlpMessageHasEnoughBytesToRead(message, plpTemp->len); - if (offset + plpTemp->len > (*message)->len) + if (bcpOffset + plpTemp->len > (*message)->len) return STATUS_ERROR; - offset += plpTemp->len; + bcpOffset += plpTemp->len; request->currentBatchSize += plpTemp->len; lenCheck += plpTemp->len; } diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdscomm.c b/contrib/babelfishpg_tds/src/backend/tds/tdscomm.c index 810656613a..5b2ef60e75 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdscomm.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdscomm.c @@ -867,7 +867,16 @@ TdsReadNextPendingBcpRequest(StringInfo message) if (TdsReadNextBuffer() == EOF) return EOF; - Assert(TdsRecvMessageType == TDS_BULK_LOAD); + + /* + * The driver could send an attention packet even in the middle of a + * large TDS request. In that case we should abort the entire request which + * has been read. + */ + if (TdsRecvMessageType != TDS_BULK_LOAD) + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Unexpected out of band packet while fetching a Bulk Load Request."))); readBytes = TdsLeftInPacket; diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c b/contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c index 038c244dda..68c4be8e4d 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c @@ -127,7 +127,7 @@ ResetTDSConnection(void) TdsErrorContext->err_text = "Resetting the TDS connection"; /* Make sure we've killed any active transaction */ - AbortOutOfAnyTransaction(); + pltsql_plugin_handler_ptr->pltsql_abort_any_transaction_callback(); /* * Save the transaction isolation level that should be restored after @@ -153,6 +153,7 @@ ResetTDSConnection(void) TdsProtocolInit(); TdsResetCache(); TdsResponseReset(); + TdsResetBcpOffset(); SetConfigOption("default_transaction_isolation", isolationOld, PGC_BACKEND, PGC_S_CLIENT); diff --git a/contrib/babelfishpg_tds/src/include/tds_int.h b/contrib/babelfishpg_tds/src/include/tds_int.h index 3bb79320b5..4c5ec4d111 100644 --- a/contrib/babelfishpg_tds/src/include/tds_int.h +++ b/contrib/babelfishpg_tds/src/include/tds_int.h @@ -380,4 +380,7 @@ extern coll_info_t TdsLookupCollationTableCallback(Oid oid); extern Datum TdsBytePtrToDatum(StringInfo buf, int datatype, int scale); extern Datum TdsDateTimeTypeToDatum(uint64 time, int32 date, int datatype, int scale); +/* Functions in tdsbulkload.c */ +extern void TdsResetBcpOffset(void); + #endif /* TDS_INT_H */ diff --git a/contrib/babelfishpg_tsql/src/pl_exec-2.c b/contrib/babelfishpg_tsql/src/pl_exec-2.c index c9b713c647..12d2e42ff3 100644 --- a/contrib/babelfishpg_tsql/src/pl_exec-2.c +++ b/contrib/babelfishpg_tsql/src/pl_exec-2.c @@ -3599,6 +3599,7 @@ execute_bulk_load_insert(int ncol, int nrow, pfree(cstmt->relation); } pfree(cstmt); + cstmt = NULL; } /* Reset Insert-Bulk Options. */ @@ -3632,27 +3633,11 @@ execute_bulk_load_insert(int ncol, int nrow, * In an error condition, the caller calls the function again to do * the cleanup. */ - MemoryContext oldcontext; - /* Cleanup cstate. */ EndBulkCopy(cstmt->cstate, true); if (ActiveSnapshotSet() && GetActiveSnapshot() == snap) PopActiveSnapshot(); - oldcontext = CurrentMemoryContext; - - /* - * If a transaction block is already in progress then abort it, else - * rollback entire transaction. - */ - if (!IsTransactionBlockActive()) - { - AbortCurrentTransaction(); - StartTransactionCommand(); - } - else - pltsql_rollback_txn(); - MemoryContextSwitchTo(oldcontext); /* Reset Insert-Bulk Options. */ insert_bulk_keep_nulls = prev_insert_bulk_keep_nulls; diff --git a/contrib/babelfishpg_tsql/src/pl_handler.c b/contrib/babelfishpg_tsql/src/pl_handler.c index 3d04e0bf76..a415998b5d 100644 --- a/contrib/babelfishpg_tsql/src/pl_handler.c +++ b/contrib/babelfishpg_tsql/src/pl_handler.c @@ -4709,6 +4709,8 @@ _PG_init(void) (*pltsql_protocol_plugin_ptr)->sp_unprepare_callback = &sp_unprepare; (*pltsql_protocol_plugin_ptr)->reset_session_properties = &reset_session_properties; (*pltsql_protocol_plugin_ptr)->bulk_load_callback = &execute_bulk_load_insert; + (*pltsql_protocol_plugin_ptr)->pltsql_rollback_txn_callback = &pltsql_rollback_txn; + (*pltsql_protocol_plugin_ptr)->pltsql_abort_any_transaction_callback = &pltsql_abort_any_transaction; (*pltsql_protocol_plugin_ptr)->pltsql_declare_var_callback = &pltsql_declare_variable; (*pltsql_protocol_plugin_ptr)->pltsql_read_out_param_callback = &pltsql_read_composite_out_param; (*pltsql_protocol_plugin_ptr)->sqlvariant_set_metadata = common_utility_plugin_ptr->TdsSetMetaData; diff --git a/contrib/babelfishpg_tsql/src/pltsql.h b/contrib/babelfishpg_tsql/src/pltsql.h index 57b90ab097..4849535a7a 100644 --- a/contrib/babelfishpg_tsql/src/pltsql.h +++ b/contrib/babelfishpg_tsql/src/pltsql.h @@ -1768,6 +1768,10 @@ typedef struct PLtsql_protocol_plugin uint64 (*bulk_load_callback) (int ncol, int nrow, Datum *Values, bool *Nulls); + void (*pltsql_rollback_txn_callback) (void); + + void (*pltsql_abort_any_transaction_callback) (void); + int (*pltsql_get_generic_typmod) (Oid funcid, int nargs, Oid declared_oid); const char *(*pltsql_get_logical_schema_name) (const char *physical_schema_name, bool missingOk); @@ -2146,6 +2150,7 @@ extern void PLTsqlRollbackTransaction(char *txnName, QueryCompletion *qc, bool c extern void pltsql_start_txn(void); extern void pltsql_commit_txn(void); extern void pltsql_rollback_txn(void); +extern void pltsql_abort_any_transaction(void); extern bool pltsql_get_errdata(int *tsql_error_code, int *tsql_error_severity, int *tsql_error_state); extern void pltsql_eval_txn_data(PLtsql_execstate *estate, PLtsql_stmt_execsql *stmt, CachedPlanSource *cachedPlanSource); extern bool is_sysname_column(ColumnDef *coldef); diff --git a/contrib/babelfishpg_tsql/src/pltsql_utils.c b/contrib/babelfishpg_tsql/src/pltsql_utils.c index 8d27110bcb..22b7a99eb2 100644 --- a/contrib/babelfishpg_tsql/src/pltsql_utils.c +++ b/contrib/babelfishpg_tsql/src/pltsql_utils.c @@ -855,6 +855,13 @@ pltsql_rollback_txn(void) StartTransactionCommand(); } +void +pltsql_abort_any_transaction(void) +{ + NestedTranCount = 0; + AbortOutOfAnyTransaction(); +} + bool pltsql_get_errdata(int *tsql_error_code, int *tsql_error_severity, int *tsql_error_state) { diff --git a/test/dotnet/ExpectedOutput/insertBulkErrors.out b/test/dotnet/ExpectedOutput/insertBulkErrors.out new file mode 100644 index 0000000000..34fcef8f76 --- /dev/null +++ b/test/dotnet/ExpectedOutput/insertBulkErrors.out @@ -0,0 +1,657 @@ +#Q#Create table sourceTable(a int, b int not null) +#Q#Create table destinationTable(a int, b int not null) +#Q#Insert into sourceTable values (1, 1); +#Q#Insert into sourceTable values (NULL, 2); +#Q#select @@trancount; +#D#int +1 +#Q#select @@trancount +#D#int +1 +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#Create table sourceTable(a int, b int not null) +#Q#Create table destinationTable(a int, b int not null) +#Q#Insert into sourceTable values (1, 1); +#Q#Insert into sourceTable values (NULL, 2); +#Q#select @@trancount; +#D#int +1 +#Q#select @@trancount +#D#int +1 +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#Select * from destinationTable +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#Create table sourceTable(a int, b int not null) +#Q#Create table destinationTable(a int, b int not null) +#Q#create index idx on destinationTable(a); +#Q#Insert into sourceTable values (1, 1); +#Q#Insert into sourceTable values (NULL, 2); +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#Create table sourceTable(a int, b int not null) +#Q#Create table destinationTable(a int, b int not null) +#Q#create index idx on destinationTable(a); +#Q#Insert into sourceTable values (1, 1); +#Q#Insert into sourceTable values (NULL, 2); +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select * from destinationTable +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select * from destinationTable +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024)) +#Q#INSERT INTO destinationTable VALUES(1001, 'Foo') +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int unique, c2 CHAR(1024)) +#Q#INSERT INTO destinationTable VALUES(1001, 'Foo') +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO sourceTable VALUES (NULL, NULL) +#Q#create table destinationTable(c1 int NOT NULL, c2 CHAR(1024)) +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(NULL, NULL) +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 999, 1), 'Foo' +#Q#Select count(*) from sourceTable +#D#int +1001 +#Q#select count(*) from sourceTable1 +#D#int +1000 +#Q#Select count(*) from destinationTable +#D#int +0 +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +0 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +0 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable VALUES (1, 'Foo'), (2, 'Foo') +#Q#create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024)) +#Q#INSERT INTO destinationTable VALUES(2, 'Foo') +#Q#Select * from sourceTable +#D#int#!#char +1#!#Foo +2#!#Foo +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select c1 from destinationTable +#D#int +2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select c1 from destinationTable +#D#int +2 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create index idx on destinationTable(c1); +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO destinationTable VALUES (-1, 'Foo'); +#Q#INSERT INTO destinationTable VALUES (-2, 'Foo'); +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create index idx on destinationTable(c1); +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO destinationTable VALUES (-1, 'Foo'); +#Q#INSERT INTO destinationTable VALUES (-2, 'Foo'); +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +4 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +4 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q# SET implicit_transactions ON +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create index idx on destinationTable(c1); +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO destinationTable VALUES (-1, 'Foo'); +#Q#INSERT INTO destinationTable VALUES (-2, 'Foo'); +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#E#table "sourcetable" does not exist +#Q#drop table sourceTable1 +#E#table "sourcetable1" does not exist +#Q#drop table destinationTable +#E#table "destinationtable" does not exist +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create index idx on destinationTable(c1); +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO destinationTable VALUES (-1, 'Foo'); +#Q#INSERT INTO destinationTable VALUES (-2, 'Foo'); +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +4 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +4 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q# SET implicit_transactions OFF diff --git a/test/dotnet/input/InsertBulk/insertBulkErrors.txt b/test/dotnet/input/InsertBulk/insertBulkErrors.txt new file mode 100644 index 0000000000..35780887fb --- /dev/null +++ b/test/dotnet/input/InsertBulk/insertBulkErrors.txt @@ -0,0 +1,558 @@ +########################################################## +#################### TEST DETAILS ######################## +### 1. Testing explicit transaction (error case handled in 5.) +### a. Commit without error +### b. Rollback without error +### 2. Index with without transaction +### 3. Primary Key error case +### 4. Unique constraint with error case +### 5. Check constraint with error case +### a. transaction testing during error scenarios +### b. @@trancount test - error should not terminate transaction +### c. Test CheckConstraint BCP Option Enabled +### d. Test Reusing the same connection for BCP even after error scenarios +### 6. Reset-connection testing with Primary Key error +### 7. Savepoint rollback and commit in error and non-error case. +### 8. implicit_transactions have no role to play here but we have still added tests. +### The above tests test the seq and index. +########################################################## + +####### Testing explicit transaction ####### +# commit and then check for inserts +Create table sourceTable(a int, b int not null) +Create table destinationTable(a int, b int not null) +Insert into sourceTable values (1, 1); +Insert into sourceTable values (NULL, 2); +txn#!#begin +select @@trancount; +traninsertbulk#!#sourceTable#!#destinationTable + +select @@trancount +txn#!#commit + +Select * from sourceTable +Select * from destinationTable +drop table sourceTable +drop table destinationTable + +# rollback and then check for inserts +Create table sourceTable(a int, b int not null) +Create table destinationTable(a int, b int not null) +Insert into sourceTable values (1, 1); +Insert into sourceTable values (NULL, 2); +txn#!#begin +select @@trancount; +# int +traninsertbulk#!#sourceTable#!#destinationTable + +select @@trancount +txn#!#rollback + +Select * from sourceTable +Select * from destinationTable +drop table sourceTable +drop table destinationTable + +# Index without transaction +Create table sourceTable(a int, b int not null) +Create table destinationTable(a int, b int not null) +create index idx on destinationTable(a); +Insert into sourceTable values (1, 1); +Insert into sourceTable values (NULL, 2); +insertbulk#!#sourceTable#!#destinationTable +Select * from sourceTable + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select * from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select * from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table destinationTable + +####### Index with transaction ####### +Create table sourceTable(a int, b int not null) +Create table destinationTable(a int, b int not null) +create index idx on destinationTable(a); +Insert into sourceTable values (1, 1); +Insert into sourceTable values (NULL, 2); + +# transaction rollback test with index +txn#!#begin +traninsertbulk#!#sourceTable#!#destinationTable +txn#!#rollback +Select * from sourceTable + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select * from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select * from destinationTable + +# transaction commit test with index +txn#!#begin +traninsertbulk#!#sourceTable#!#destinationTable +txn#!#commit +Select * from sourceTable + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select * from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select * from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table destinationTable + + +####### Primary Key error ####### + +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024)) +INSERT INTO destinationTable VALUES(1001, 'Foo') + +insertbulk#!#sourceTable#!#destinationTable + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +insertbulk#!#sourceTable1#!#destinationTable + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + + +####### Unique ####### +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int unique, c2 CHAR(1024)) +INSERT INTO destinationTable VALUES(1001, 'Foo') + +insertbulk#!#sourceTable#!#destinationTable + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +insertbulk#!#sourceTable1#!#destinationTable + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + +####### Not Null ####### +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1000, 1), 'Foo' +INSERT INTO sourceTable VALUES (NULL, NULL) +create table destinationTable(c1 int NOT NULL, c2 CHAR(1024)) + +insertbulk#!#sourceTable#!#destinationTable + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(NULL, NULL) +INSERT INTO sourceTable1 SELECT generate_series(1, 999, 1), 'Foo' + +insertbulk#!#sourceTable1#!#destinationTable + +Select count(*) from sourceTable +select count(*) from sourceTable1 + +Select count(*) from destinationTable + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + +####### Check ####### +##### THESE TESTS ALSO TEST REUSING THE SAME CONNECTION +##### ON WHICH WE ERROR OUT AND NEED TO RESET TDS STATE +##### WE ALSO SEE THAT TRANSACTION IS NOT ROLLED BACK FOR +##### ANY ERROR DURING BULK OPERATION + +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) + + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +txn#!#commit + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + +####### Reset-connection with error (retry the insert bulk in a loop) ####### +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable VALUES (1, 'Foo'), (2, 'Foo') +create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024)) +INSERT INTO destinationTable VALUES(2, 'Foo') + +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable + +Select * from sourceTable + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select c1 from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select c1 from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table destinationTable + +####### Savepoint rollback with and without error ####### +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +create index idx on destinationTable(c1); + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +INSERT INTO destinationTable VALUES (-1, 'Foo'); +txn#!#savepoint#!#sp1 +INSERT INTO destinationTable VALUES (-2, 'Foo'); + +###### WITHOUT ERROR ###### +SELECT @@trancount +traninsertbulk#!#destinationTable#!#destinationTable + +###### WITH ERROR ###### +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount + +txn#!#rollback#!#sp1 + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +txn#!#rollback + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + + +####### Savepoint commit with and without error ####### +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +create index idx on destinationTable(c1); + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +INSERT INTO destinationTable VALUES (-1, 'Foo'); +txn#!#savepoint#!#sp1 +INSERT INTO destinationTable VALUES (-2, 'Foo'); + +###### WITHOUT ERROR ###### +SELECT @@trancount +traninsertbulk#!#destinationTable#!#destinationTable +###### WITH ERROR ###### +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount + +txn#!#commit#!#sp1 + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + + SET implicit_transactions ON +####### implicit_transactions rollback with and without error ####### + +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +create index idx on destinationTable(c1); + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +INSERT INTO destinationTable VALUES (-1, 'Foo'); +txn#!#savepoint#!#sp1 +INSERT INTO destinationTable VALUES (-2, 'Foo'); + +###### WITHOUT ERROR ###### +SELECT @@trancount +traninsertbulk#!#destinationTable#!#destinationTable + +###### WITH ERROR ###### +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount + +txn#!#rollback#!#sp1 + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +txn#!#rollback + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + +####### implicit_transactions commit with and without error ####### +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +create index idx on destinationTable(c1); + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +INSERT INTO destinationTable VALUES (-1, 'Foo'); +txn#!#savepoint#!#sp1 +INSERT INTO destinationTable VALUES (-2, 'Foo'); + +###### WITHOUT ERROR ###### +SELECT @@trancount +traninsertbulk#!#destinationTable#!#destinationTable +###### WITH ERROR ###### +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount + +txn#!#commit#!#sp1 + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + + SET implicit_transactions OFF \ No newline at end of file diff --git a/test/dotnet/src/BatchRun.cs b/test/dotnet/src/BatchRun.cs index d424a36e28..c2184e7897 100644 --- a/test/dotnet/src/BatchRun.cs +++ b/test/dotnet/src/BatchRun.cs @@ -217,6 +217,15 @@ bool BatchRunner(DbConnection bblCnn, string queryFilePath, Serilog.Core.Logger string destinationTable = result[2]; testFlag &= testUtils.insertBulkCopy(bblCnn, bblCmd, sourceTable, destinationTable, logger, ref stCount); } + else if (strLine.ToLowerInvariant().StartsWith("traninsertbulk")) + { + var result = strLine.Split("#!#", StringSplitOptions.RemoveEmptyEntries); + testUtils.PrintToLogsOrConsole( + $"########################## INSERT BULK:- {strLine} ##########################", logger, "information"); + string sourceTable = result[1]; + string destinationTable = result[2]; + testFlag &= testUtils.insertBulkCopyWithTransaction(bblCnn, bblCmd, sourceTable, destinationTable, bblTransaction, logger, ref stCount); + } /* Case for sp_customtype RPC. */ else if (strLine.ToLowerInvariant().StartsWith("storedp")) { @@ -290,7 +299,7 @@ bool BatchRunner(DbConnection bblCnn, string queryFilePath, Serilog.Core.Logger } else if (query.ToLowerInvariant().StartsWith("insert") || query.ToLowerInvariant().StartsWith("update") || query.ToLowerInvariant().StartsWith("alter") || query.ToLowerInvariant().StartsWith("delete") || query.ToLowerInvariant().StartsWith("begin") || query.ToLowerInvariant().StartsWith("commit") - || query.ToLowerInvariant().StartsWith("rollback") || query.ToLowerInvariant().StartsWith("save") || query.ToLowerInvariant().StartsWith("use") + || query.ToLowerInvariant().StartsWith("rollback") || query.ToLowerInvariant().StartsWith("save") || query.ToLowerInvariant().StartsWith("use") || query.ToLowerInvariant().StartsWith(" set") || query.ToLowerInvariant().StartsWith("create") || query.ToLowerInvariant().StartsWith("drop") || query.ToLowerInvariant().StartsWith("exec") || query.ToLowerInvariant().StartsWith("declare")) { bblCmd?.Dispose(); diff --git a/test/dotnet/utils/ConfigSetup.cs b/test/dotnet/utils/ConfigSetup.cs index 611e5c2049..bb3c74296d 100644 --- a/test/dotnet/utils/ConfigSetup.cs +++ b/test/dotnet/utils/ConfigSetup.cs @@ -9,6 +9,8 @@ public static class ConfigSetup /* Declaring variables required for a Test Run. */ static readonly Dictionary Dictionary = LoadConfig(); public static readonly string BblConnectionString = Dictionary["bblConnectionString"]; + + public static readonly string BCPConnectionString = Dictionary["BCPConnectionString"]; public static readonly string QueryFolder = Dictionary["queryFolder"]; public static readonly string TestName = Dictionary["testName"]; public static readonly bool RunInParallel = bool.Parse(Dictionary["runInParallel"]); @@ -45,6 +47,9 @@ public static Dictionary LoadConfig() /* Creating Server Connection String and Query. */ dictionary["bblConnectionString"] = BuildConnectionString(dictionary["babel_URL"], dictionary["babel_port"], + dictionary["babel_databaseName"], + dictionary["babel_user"], dictionary["babel_password"]) + "pooling=false;"; + dictionary["BCPConnectionString"] = BuildConnectionString(dictionary["babel_URL"], dictionary["babel_port"], dictionary["babel_databaseName"], dictionary["babel_user"], dictionary["babel_password"]); return dictionary; @@ -56,10 +61,10 @@ static string BuildConnectionString(string url, string port, string db, string u { case "oledb": return @"Provider = " + ConfigSetup.Provider + ";Data Source = " + url + "," + port + "; Initial Catalog = " + db - + "; User ID = " + uid + "; Password = " + pwd + ";Pooling=false;"; + + "; User ID = " + uid + "; Password = " + pwd + ";"; case "sql": return @"Data Source = " + url + "," + port + "; Initial Catalog = " + db - + "; User ID = " + uid + "; Password = " + pwd + ";Pooling=false;"; + + "; User ID = " + uid + "; Password = " + pwd + ";"; default: throw new Exception("Driver Not Supported"); } diff --git a/test/dotnet/utils/TestUtils.cs b/test/dotnet/utils/TestUtils.cs index 243ee2e517..dd95723b53 100644 --- a/test/dotnet/utils/TestUtils.cs +++ b/test/dotnet/utils/TestUtils.cs @@ -39,10 +39,50 @@ public bool insertBulkCopy(DbConnection bblCnn, DbCommand bblCmd, String sourceT DbDataReader reader = null; try { + /* To Enforce Reset Connection. */ reader = bblCmd.ExecuteReader(); - SqlBulkCopy bulkCopy = new SqlBulkCopy(ConfigSetup.BblConnectionString); + using (SqlConnection destinationConnection = + new SqlConnection(ConfigSetup.BCPConnectionString)) + { + destinationConnection.Open(); + + SqlBulkCopy bulkCopy = new SqlBulkCopy(destinationConnection); + bulkCopy.DestinationTableName = destinationTable; + bulkCopy.WriteToServer(reader); + } + } + catch (Exception e) + { + PrintToLogsOrConsole("#################################################################", logger, "information"); + PrintToLogsOrConsole( + $"############# ERROR IN EXECUTING WITH BABEL ####################\n{e}\n", + logger, "information"); + stCount--; + return false; + } + finally + { + reader.Close(); + } + return true; + } + + public bool insertBulkCopyWithTransaction(DbConnection bblCnn, DbCommand bblCmd, String sourceTable, String destinationTable, DbTransaction transaction, Logger logger, ref int stCount) + { + bblCmd.CommandText = "Select * from " + sourceTable; + bblCmd.Transaction = transaction; + DbDataReader reader = null; + DataTable dataTable = new DataTable(); + try + { + reader = bblCmd.ExecuteReader(); + dataTable.Load(reader); + reader.Close(); + + /* Set CheckConstraints default for this API since this is the only mechanism to use BCP Options. */ + SqlBulkCopy bulkCopy = new SqlBulkCopy((SqlConnection)bblCnn, SqlBulkCopyOptions.CheckConstraints, (SqlTransaction) transaction); bulkCopy.DestinationTableName = destinationTable; - bulkCopy.WriteToServer(reader); + bulkCopy.WriteToServer(dataTable); } catch (Exception e) { @@ -403,7 +443,7 @@ public string AuthHelper(string strLine) dictionary["others"] = result[i].Split("|-|")[1]; } return @"Data Source = " + dictionary["url"] + "; Initial Catalog = " + dictionary["db"] + - "; User ID = " + dictionary["user"] + "; Password = " + dictionary["pwd"] + ";Pooling=false;" + dictionary["others"]; + "; User ID = " + dictionary["user"] + "; Password = " + dictionary["pwd"] + ";" + dictionary["others"]; } /* Depending on the OS we use the appropriate diff command. */