Skip to content

Commit

Permalink
Merge pull request #22 from dynamicweb/mss/16878
Browse files Browse the repository at this point in the history
added missing Close-function in the DestinationWriter plus logic for …
  • Loading branch information
frederik5480 authored Apr 8, 2024
2 parents e451ba4 + 7f21f7e commit 371e881
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<Version>10.0.13</Version>
<Version>10.0.14</Version>
<AssemblyVersion>1.0.0.0</AssemblyVersion>
<Title>Order Provider</Title>
<Description>Order Provider</Description>
Expand Down
44 changes: 34 additions & 10 deletions src/OrderDestinationWriter.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using Dynamicweb.DataIntegration.Integration;
using Dynamicweb.DataIntegration.ProviderHelpers;
using Dynamicweb.Logging;
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
Expand All @@ -18,13 +19,15 @@ internal class OrderDestinationWriter : BaseSqlWriter
private DataSet DataToWrite { get; } = new DataSet();
private string TempTablePrefix { get; }
private bool SkipFailingRows { get; }
private bool DiscardDuplicates { get; }
internal SqlCommand SqlCommand { get; }
internal int RowsToWriteCount { get; set; }
private int LastLogRowsCount { get; set; }
protected DuplicateRowsHandler duplicateRowsHandler;
private readonly ColumnMappingCollection _columnMappings;
private readonly IEnumerable<ColumnMapping> _activeColumnMappings;

public OrderDestinationWriter(Mapping mapping, SqlConnection connection, ILogger logger, bool skipFailingRows)
public OrderDestinationWriter(Mapping mapping, SqlConnection connection, ILogger logger, bool skipFailingRows, bool discardDuplicates)
{
Mapping = mapping;
_columnMappings = Mapping.GetColumnMappings();
Expand All @@ -33,6 +36,7 @@ public OrderDestinationWriter(Mapping mapping, SqlConnection connection, ILogger
SqlCommand.CommandTimeout = 1200;
Logger = logger;
SkipFailingRows = skipFailingRows;
DiscardDuplicates = discardDuplicates;
TempTablePrefix = $"TempTableForBulkImport{mapping.GetId()}";
SqlBulkCopier = new SqlBulkCopy(connection);
SqlBulkCopier.DestinationTableName = mapping.DestinationTable.Name + TempTablePrefix;
Expand All @@ -57,6 +61,10 @@ public OrderDestinationWriter(Mapping mapping, SqlConnection connection, ILogger
{
TableToWrite.Columns.Add(column.Name, column.Type);
}
if (DiscardDuplicates)
{
duplicateRowsHandler = new DuplicateRowsHandler(Logger, Mapping);
}
}

internal void FinishWriting()
Expand Down Expand Up @@ -98,18 +106,34 @@ internal void MoveDataToMainTable(SqlTransaction sqlTransaction, bool updateOnly
}
}

// if 10k write table to db, empty table
if (TableToWrite.Rows.Count >= 1000)
if (!DiscardDuplicates || !duplicateRowsHandler.IsRowDuplicate(_columnMappings, Mapping, dataRow, row))
{
RowsToWriteCount = RowsToWriteCount + TableToWrite.Rows.Count;
SkippedFailedRowsCount = SqlBulkCopierWriteToServer(SqlBulkCopier, TableToWrite, SkipFailingRows, Mapping, Logger);
RowsToWriteCount = RowsToWriteCount - SkippedFailedRowsCount;
TableToWrite.Clear();
if (RowsToWriteCount >= LastLogRowsCount + 10000)
TableToWrite.Rows.Add(dataRow);
// if 10k write table to db, empty table
if (TableToWrite.Rows.Count >= 1000)
{
LastLogRowsCount = RowsToWriteCount;
Logger.Log("Added " + RowsToWriteCount + " rows to temporary table for " + Mapping.DestinationTable.Name + ".");
RowsToWriteCount = RowsToWriteCount + TableToWrite.Rows.Count;
SkippedFailedRowsCount = SqlBulkCopierWriteToServer(SqlBulkCopier, TableToWrite, SkipFailingRows, Mapping, Logger);
RowsToWriteCount = RowsToWriteCount - SkippedFailedRowsCount;
TableToWrite.Clear();
if (RowsToWriteCount >= LastLogRowsCount + 10000)
{
LastLogRowsCount = RowsToWriteCount;
Logger.Log("Added " + RowsToWriteCount + " rows to temporary table for " + Mapping.DestinationTable.Name + ".");
}
}
}
}

public new void Close()
{
string text = Mapping.DestinationTable.Name + TempTablePrefix;
SqlCommand.CommandText = "if exists (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'" + text + "') AND type in (N'U')) drop table " + text;
SqlCommand.ExecuteNonQuery();
((IDisposable)SqlBulkCopier).Dispose();
if (duplicateRowsHandler != null)
{
duplicateRowsHandler.Dispose();
}
}
}
2 changes: 1 addition & 1 deletion src/OrderProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public override bool RunJob(Job job)
Logger.Log("Starting import to temporary table for " + mapping.DestinationTable.Name + ".");
using (var reader = job.Source.GetReader(mapping))
{
var writer = new OrderDestinationWriter(mapping, Connection, Logger, SkipFailingRows);
var writer = new OrderDestinationWriter(mapping, Connection, Logger, SkipFailingRows, DiscardDuplicates);
var columnMappings = mapping.GetColumnMappings();
while (!reader.IsDone())
{
Expand Down

0 comments on commit 371e881

Please sign in to comment.