Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/RDMP-28 Add BeginLoadData & EndLoadData to Datatables #1598

Merged
merged 10 commits into from
Aug 28, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private DataTable GetDataTable(int timeout, IDataLoadEventListener listener)
$"Connection opened, ready to send the following SQL (with Timeout {Timeout}s):{Environment.NewLine}{sql}"));

var dt = new DataTable();

dt.BeginLoadData();
using (var cmd = server.GetCommand(sql, con))
{
cmd.CommandTimeout = timeout;
Expand All @@ -96,6 +96,7 @@ private DataTable GetDataTable(int timeout, IDataLoadEventListener listener)


dt.TableName = TableName;
dt.EndLoadData();

listener?.OnNotify(this, new NotifyEventArgs(ProgressEventType.Information,
$"successfully read {dt.Rows.Count} rows from source"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,12 @@ private DataTable GetDataTable(IDataLoadEventListener listener)
throw new Exception(
"CohortIdentificationCriteria execution resulted in an empty dataset (there were no cohorts matched by the query?)");

var dt = execution.Identifiers;

DataTable dt = execution.Identifiers;
dt.BeginLoadData();
foreach (DataColumn column in dt.Columns)
column.ReadOnly = false;

dt.EndLoadData();
return dt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
namespace Rdmp.Core.CohortCommitting.Pipeline.Sources;

/// <summary>
/// Pipeline source component that generates a DataTable containing all the unique patient identifiers in the column referenced by the <see cref="IPipelineRequirement{T}"/>
/// Pipeline source component that generates a DataTable containing all the unique patient identifiers in the column referenced by the <see cref="IPipelineRequirement{T}"/>
/// <see cref="ExtractionInformation"/>.
/// </summary>
public class PatientIdentifierColumnSource : IPluginDataFlowSource<DataTable>,
Expand Down Expand Up @@ -54,7 +54,8 @@ private DataTable GetDataTable(int timeout, int? topX)

var colName = _extractionInformation.GetRuntimeName();

var dt = new DataTable();
DataTable dt = new DataTable();
dt.BeginLoadData();
dt.Columns.Add(colName);

using (var con = server.GetConnection())
Expand All @@ -72,7 +73,7 @@ private DataTable GetDataTable(int timeout, int? topX)
}
}
}

dt.EndLoadData();
return dt;
}

Expand Down
16 changes: 10 additions & 6 deletions Rdmp.Core/Curation/Data/Aggregation/AggregateConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ namespace Rdmp.Core.Curation.Data.Aggregation;
/// Entry point for the aggregation system. This class describes what a given aggregation is supposed to achieve (e.g. summarise the number of records in a
/// dataset by region over time since 2001 to present). An AggregateConfiguration belongs to a given Catalogue and is the hanging-off point for the rest of
/// the configuration (e.g. AggregateDimension / AggregateFilter)
///
///
/// <para>AggregateConfigurations can be used with an AggregateBuilder to produce runnable SQL which will return a DataTable containing results appropriate to the
/// query being built.</para>
///
///
/// <para>There are Three types of AggregateConfiguration (these are configurations - not separate classes):</para>
/// <para>1. 'Aggregate Graph' - Produce summary information about a dataset designed to be displayed in a graph e.g. number of records each year by healthboard</para>
/// <para>2. 'Cohort Aggregate' - Produce a list of unique patient identifiers from a dataset (e.g. 'all patients with HBA1c test code > 50 in biochemistry')</para>
/// <para>3. 'Joinable PatientIndex Table' - Produce a patient identifier fact table for joining to other Cohort Aggregates during cohort building (See JoinableCohortAggregateConfiguration)</para>
/// <para>The above labels are informal terms. Use IsCohortIdentificationAggregate and IsJoinablePatientIndexTable to determine what type a given
/// AggregateConfiguration is. </para>
///
///
/// <para>If your Aggregate is part of cohort identification (Identifier List or Patient Index Table) then its name will start with cic_X_ where X is the ID of the cohort identification
/// configuration. Depending on the user interface though this might not appear (See ToString implementation).</para>
/// </summary>
Expand Down Expand Up @@ -113,7 +113,7 @@ public DateTime dtCreated
/// <summary>
/// Indicates the AggregateDimension (if any) that will result in a pivot graph being generated. E.g. if your AggregateConfiguration is a graph of records by year between
/// 2001 and 2018 then specifying a pivot on healthboard would result in 1 line in the graph per healthboard instead of a single line for the count of all (the default).
///
///
/// <para>If an AggregateConfiguration is a Cohort or Patient index table then it cannot have a Pivot</para>
/// </summary>
public int? PivotOnDimensionID
Expand Down Expand Up @@ -227,9 +227,13 @@ public static void AdjustGraphDataTable(DataTable dt)
if (dt.Rows.Count == 0) return;

if (!UserSettings.IncludeZeroSeriesInGraphs)
{
dt.BeginLoadData();
foreach (var col in dt.Columns.Cast<DataColumn>().ToArray())
if (dt.Rows.Cast<DataRow>().All(r => IsBasicallyZero(r[col.ColumnName])))
dt.Columns.Remove(col);
dt.EndLoadData();
}
}

private static bool IsBasicallyZero(object v) => v == null || v == DBNull.Value ||
Expand Down Expand Up @@ -257,7 +261,7 @@ private static bool IsBasicallyZero(object v) => v == null || v == DBNull.Value
/// When an AggregateConfiguration is used in a cohort identification capacity it can have one or more 'patient index tables' defined e.g.
/// 'Give me all prescriptions for morphine' (Prescribing) 'within 6 months of patient being discharged from hospital' (SMR01). In this case
/// a join is done against the secondary dataset.
///
///
/// <para>This property returns all such 'patient index table' AggregateConfigurations which are currently being used by this AggregateConfiguration
/// for building its join.</para>
/// </summary>
Expand Down Expand Up @@ -304,7 +308,7 @@ public AggregateConfiguration()

/// <summary>
/// Only relevant for AggregateConfigurations that are being used in a cohort identification capacity (See <see cref="IsCohortIdentificationAggregate"/>).
///
///
/// <para>The order location of an AggregateConfiguration within its parent <see cref="CohortAggregateContainer"/> (if it has one). This is mostly irrelevant for UNION /
/// INTERSECT operations (other than helping the user viewing the system) but is vital for EXCEPT containers where the first AggregateConfiguration in the container is
/// run producing a dataset and all subsequent AggregateConfigurations are then removed from that patient set.</para>
Expand Down
26 changes: 16 additions & 10 deletions Rdmp.Core/DataExport/Data/ExtractableCohort.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,13 @@ public IExternalCohortDefinitionData GetExternalData(int timeoutInSeconds = -1)
var syntax = db.Server.GetQuerySyntaxHelper();

var sql =
$@"Select
$@"Select
{syntax.EnsureWrapped("projectNumber")},
{syntax.EnsureWrapped("description")},
{syntax.EnsureWrapped("version")},
{syntax.EnsureWrapped("dtCreated")}
from {ExternalCohortTable.DefinitionTableName}
where
from {ExternalCohortTable.DefinitionTableName}
where
{syntax.EnsureWrapped("id")} = {OriginID}";

if (timeoutInSeconds != -1) db.Server.TestConnection(timeoutInSeconds * 1000);
Expand Down Expand Up @@ -227,7 +227,7 @@ public IExternalCohortDefinitionData GetExternalData(int timeoutInSeconds = -1)
private int _originID;

/// <summary>
/// Creates a new cohort reference in the data export database. This must resolve (via <paramref name="originalId"/>) to
/// Creates a new cohort reference in the data export database. This must resolve (via <paramref name="originalId"/>) to
/// a row in the external cohort database (<paramref name="externalSource"/>).
/// </summary>
/// <param name="repository"></param>
Expand Down Expand Up @@ -288,11 +288,12 @@ public DataTable FetchEntireCohort()
var sql = $"SELECT DISTINCT * FROM {cohortTable.GetFullyQualifiedName()} WHERE {WhereSQL()}";

var da = cohortTable.Database.Server.GetDataAdapter(sql, con);
var dtReturn = new DataTable();
DataTable dtReturn = new DataTable();
dtReturn.BeginLoadData();
da.Fill(dtReturn);

dtReturn.TableName = cohortTable.GetRuntimeName();

dtReturn.EndLoadData();
return dtReturn;
}
}
Expand Down Expand Up @@ -398,13 +399,13 @@ public static DataTable GetImportableCohortDefinitionsTable(ExternalCohortTable
{
con.Open();
var sql =
$@"Select
$@"Select
{syntax.EnsureWrapped("description")},
{syntax.EnsureWrapped("id")},
{syntax.EnsureWrapped("version")},
{syntax.EnsureWrapped("projectNumber")}
from {externalSource.DefinitionTableName}
where
from {externalSource.DefinitionTableName}
where
exists (Select 1 from {externalSource.TableName} WHERE {externalSource.DefinitionTableForeignKeyField}=id)";

using (var da = server.GetDataAdapter(sql, con))
Expand All @@ -414,8 +415,10 @@ public static DataTable GetImportableCohortDefinitionsTable(ExternalCohortTable
versionMemberName = "version";
projectNumberMemberName = "projectNumber";

var toReturn = new DataTable();
DataTable toReturn = new DataTable();
toReturn.BeginLoadData();
da.Fill(toReturn);
toReturn.EndLoadData();
return toReturn;
}
}
Expand Down Expand Up @@ -535,6 +538,7 @@ public void ReverseAnonymiseDataTable(DataTable toProcess, IDataLoadEventListene
sw2.Start();

//fix values
toProcess.BeginLoadData();
foreach (DataRow row in toProcess.Rows)
try
{
Expand Down Expand Up @@ -580,6 +584,8 @@ public void ReverseAnonymiseDataTable(DataTable toProcess, IDataLoadEventListene
$"Substituted {substitutions} release identifiers for private identifiers in input data table (input data table contained {toProcess.Rows.Count} rows)"));

toProcess.Columns[releaseIdentifier].ColumnName = privateIdentifier;

toProcess.EndLoadData();
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ namespace Rdmp.Core.DataExport.DataExtraction;
/// <summary>
/// Applies Catalogue.ValidationXML to rows extracted during a Data Extraction Pipeline (See ExecuteDatasetExtractionSource). Because the columns which
/// are extracted can be a subset of the columns in the Catalogue and can include transforms the validation rules have to be adjusted (some are not applied).
///
///
/// <para>A count of the number of rows failing validation is stored in VerboseValidationResults (divided by column) and is available for writing to the word
/// metadata document that accompanies the extracted records (See WordDataWriter). </para>
///
///
/// <para>This is similar to CatalogueConstraintReport (DQE) but is applied to a researchers extract instead of the Catalogue as a whole.</para>
/// </summary>
public class ExtractionTimeValidator
Expand Down Expand Up @@ -53,7 +53,7 @@ public void Validate(DataTable dt, string validationColumnToPopulateIfAny)
{
if (!_initialized)
Initialize(dt);

dt.BeginLoadData();
foreach (DataRow r in dt.Rows)
{
//additive validation results, Results is a class that wraps DictionaryOfFailure which is an array of columns and each element is another array of consequences (with a row count for each consequence)
Expand All @@ -64,6 +64,7 @@ public void Validate(DataTable dt, string validationColumnToPopulateIfAny)
if (validationColumnToPopulateIfAny != null)
r[validationColumnToPopulateIfAny] = consequenceOnLastRowProcessed;
}
dt.EndLoadData();
}

private void Initialize(DataTable dt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,9 @@ public virtual DataTable TryGetPreview()
var da = server.GetDataAdapter(Request.QueryBuilder.SQL, con);

//get up to 1000 records
toReturn.BeginLoadData();
da.Fill(0, 1000, toReturn);
toReturn.EndLoadData();

con.Close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,12 @@ public void AddWhile(IDbDataCommandDataFlowSource source, Func<DataRow, bool> eq

DataRow r;

chunk.BeginLoadData();
//while we are still successfully reading rows and those rows have the same release id
while ((r = source.ReadOneRow()) != null)
{
if (equalityFunc(r))
//add it to the current chunk
//add it to the current chunk
{
chunk.ImportRow(r);
}
Expand All @@ -87,5 +89,7 @@ public void AddWhile(IDbDataCommandDataFlowSource source, Func<DataRow, bool> eq
_peekedRecord = r;
break;
}
}
chunk.EndLoadData();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ public DataTable GetDataTable()
var tbl = TableInfo.Discover(DataAccessContext.DataExport);
var server = tbl.Database.Server;

var dt = new DataTable();
DataTable dt = new DataTable();

using (var con = server.GetConnection())
{
con.Open();
using (var da = server.GetDataAdapter(
server.GetCommand(GetDataTableFetchSql(), con)))
{
dt.BeginLoadData();
da.Fill(dt);
dt.EndLoadData();
}
}

Expand Down
6 changes: 3 additions & 3 deletions Rdmp.Core/DataLoad/Engine/Pipeline/Components/CleanStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener
GracefulCancellationToken cancellationToken)
{
timer.Start();

StartAgain:
toProcess.BeginLoadData();
StartAgain:
foreach (DataRow row in toProcess.Rows)
{
for (var i = 0; i < columnsToClean.Count; i++)
Expand Down Expand Up @@ -85,7 +85,7 @@ public DataTable ProcessPipelineData(DataTable toProcess, IDataLoadEventListener
job.OnProgress(this,
new ProgressEventArgs(_taskDescription, new ProgressMeasurement(_rowsProcessed, ProgressType.Records),
timer.Elapsed));

toProcess.EndLoadData();
return toProcess;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ public DataTable GetChunk(IDataLoadEventListener job, GracefulCancellationToken
timer.Start();
try
{
var chunk = GetChunkSchema(_reader);

DataTable chunk = GetChunkSchema(_reader);
chunk.BeginLoadData();
while (_reader.HasRows && _reader.Read())
{
cancellationToken.ThrowIfCancellationRequested();
Expand All @@ -86,8 +86,12 @@ public DataTable GetChunk(IDataLoadEventListener job, GracefulCancellationToken

//we reached batch limit
if (readThisBatch == BatchSize)
{
chunk.EndLoadData();
return chunk;
}
}
chunk.EndLoadData();

//if data was read
if (readThisBatch > 0)
Expand Down
11 changes: 6 additions & 5 deletions Rdmp.Core/DataLoad/Modules/Attachers/FixedWidthFormatFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public FixedWidthFormatFile(FileInfo pathToFormatFile)
//now add values
for (var index = 0; index < readAllLines.Length - 1; index++)
{
//skip header line
//skip header line
var cellsOnRowAsSplitString = readAllLines[index + 1].Split(',');

FormatColumns[index].From = int.Parse(cellsOnRowAsSplitString[0]);
Expand All @@ -53,7 +53,7 @@ public FixedWidthFormatFile(FileInfo pathToFormatFile)
FormatColumns[index].DateFormat =
cellsOnRowAsSplitString[4]
.Replace("ccyy",
"yyyy"); //some people think that ccyy is a valid way of expressing year formats... they are wrong
"yyyy"); //some people think that ccyy is a valid way of expressing year formats... they are wrong

if (FormatColumns[index].From + FormatColumns[index].Size - 1 != FormatColumns[index].To)
throw new FlatFileLoadException(
Expand All @@ -77,9 +77,9 @@ public FixedWidthFormatFile(FileInfo pathToFormatFile)
public DataTable GetDataTableFromFlatFile(FileInfo f)
{
//setup the table
var toReturn = new DataTable();

DataTable toReturn = new DataTable();

toReturn.BeginLoadData();
foreach (var fixedWidthColumn in FormatColumns)
{
var dataColumn = toReturn.Columns.Add(fixedWidthColumn.Field);
Expand Down Expand Up @@ -115,7 +115,7 @@ public DataTable GetDataTableFromFlatFile(FileInfo f)
if (string.IsNullOrWhiteSpace(value))
dataRow[fixedWidthColumn.Field] = DBNull.Value;
else
//it is a date column
//it is a date column
if (!string.IsNullOrWhiteSpace(fixedWidthColumn.DateFormat))
try
{
Expand All @@ -132,6 +132,7 @@ public DataTable GetDataTableFromFlatFile(FileInfo f)
}
}

toReturn.EndLoadData();
return toReturn;
}

Expand Down
Loading
Loading