From aaaaa92e31645880ece7ac3eb625cbf69e045def Mon Sep 17 00:00:00 2001 From: Hasan Savran Date: Mon, 8 Jan 2024 10:58:51 -0500 Subject: [PATCH 1/6] OpenAI embedding is in test PostgreSQL changes are in dev and not ready --- .../Cosmos.DataTransfer.Core.csproj | 6 +- .../migrationsettings.json | 10 ++- CosmosDbDataMigrationTool.sln | 13 +++- ...Cosmos.DataTransfer.CosmosExtension.csproj | 2 +- .../Cosmos.DataTransfer.MongoExtension.csproj | 1 + .../MongoDataSinkExtension.cs | 36 ++++++++- .../Settings/MongoSinkSettings.cs | 10 +++ ...os.DataTransfer.PostgresqlExtension.csproj | 21 ++++++ Extensions/PostgreSQL/PostgreDataCol.cs | 71 ++++++++++++++++++ .../PostgreSQL/PostgreDictionaryDataItem.cs | 27 +++++++ .../PostgreSQL/PostgresqlDataSinkExtension.cs | 74 +++++++++++++++++++ Extensions/PostgreSQL/Program.cs | 1 + .../PublishProfiles/FolderProfile.pubxml | 15 ++++ ...mos.DataTransfer.SqlServerExtension.csproj | 2 +- 14 files changed, 276 insertions(+), 13 deletions(-) create mode 100644 Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj create mode 100644 Extensions/PostgreSQL/PostgreDataCol.cs create mode 100644 Extensions/PostgreSQL/PostgreDictionaryDataItem.cs create mode 100644 Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs create mode 100644 Extensions/PostgreSQL/Program.cs create mode 100644 Extensions/PostgreSQL/Properties/PublishProfiles/FolderProfile.pubxml diff --git a/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj b/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj index c9d8254..b1eebe7 100644 --- a/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj +++ b/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj @@ -19,15 +19,15 @@ - - + + - + diff --git a/Core/Cosmos.DataTransfer.Core/migrationsettings.json b/Core/Cosmos.DataTransfer.Core/migrationsettings.json index 82ea817..b138979 100644 --- a/Core/Cosmos.DataTransfer.Core/migrationsettings.json +++ b/Core/Cosmos.DataTransfer.Core/migrationsettings.json @@ -1,9 +1,11 @@ { - "Source": null, - "Sink": null, - "SourceSettings": { + "Source": "Cosmos-nosql", + "Sink": "Mongo", + "SourceSettings": { + }, - "SinkSettings": { + "SinkSettings": { + }, "Operations": [ //{ diff --git a/CosmosDbDataMigrationTool.sln b/CosmosDbDataMigrationTool.sln index 92b59f9..95ff2ba 100644 --- a/CosmosDbDataMigrationTool.sln +++ b/CosmosDbDataMigrationTool.sln @@ -87,7 +87,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Csv", "Csv", "{39930280-DA2 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.CsvExtension", "Extensions\Csv\Cosmos.DataTransfer.CsvExtension\Cosmos.DataTransfer.CsvExtension.csproj", "{6A3FB90C-B837-4724-A406-214D4CEA686F}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cosmos.DataTransfer.CsvExtension.UnitTests", "Extensions\Csv\Cosmos.DataTransfer.CsvExtension.UnitTests\Cosmos.DataTransfer.CsvExtension.UnitTests.csproj", "{40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.CsvExtension.UnitTests", "Extensions\Csv\Cosmos.DataTransfer.CsvExtension.UnitTests\Cosmos.DataTransfer.CsvExtension.UnitTests.csproj", "{40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}" +EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{BCBBAF22-0CB5-416B-8C80-03AB2FC4D0A0}" ProjectSection(SolutionItems) = preProject Contributing.md = Contributing.md @@ -95,6 +96,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution README.md = README.md EndProjectSection EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.PostgresqlExtension", "Extensions\PostgreSQL\Cosmos.DataTransfer.PostgresqlExtension.csproj", "{85820167-DB94-458B-B09B-9E823996C692}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PostgreSQL", "PostgreSQL", "{1B927C5F-50FC-42A6-BAF6-B00E6D760543}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -181,6 +186,10 @@ Global {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}.Debug|Any CPU.Build.0 = Debug|Any CPU {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}.Release|Any CPU.ActiveCfg = Release|Any CPU {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E}.Release|Any CPU.Build.0 = Release|Any CPU + {85820167-DB94-458B-B09B-9E823996C692}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {85820167-DB94-458B-B09B-9E823996C692}.Debug|Any CPU.Build.0 = Debug|Any CPU + {85820167-DB94-458B-B09B-9E823996C692}.Release|Any CPU.ActiveCfg = Release|Any CPU + {85820167-DB94-458B-B09B-9E823996C692}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -212,6 +221,8 @@ Global {39930280-DA29-4814-837B-FA7F252EB3EC} = {A8A1CEAB-2D82-460C-9B86-74ABD17CD201} {6A3FB90C-B837-4724-A406-214D4CEA686F} = {39930280-DA29-4814-837B-FA7F252EB3EC} {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E} = {39930280-DA29-4814-837B-FA7F252EB3EC} + {85820167-DB94-458B-B09B-9E823996C692} = {1B927C5F-50FC-42A6-BAF6-B00E6D760543} + {1B927C5F-50FC-42A6-BAF6-B00E6D760543} = {A8A1CEAB-2D82-460C-9B86-74ABD17CD201} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {662B3F27-70D8-45E6-A1C0-1438A9C8A542} diff --git a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj index 06801d6..277532c 100644 --- a/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj +++ b/Extensions/Cosmos/Cosmos.DataTransfer.CosmosExtension/Cosmos.DataTransfer.CosmosExtension.csproj @@ -8,7 +8,7 @@ - + diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj index f2e3cac..37735d2 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj @@ -8,6 +8,7 @@ + diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs index c7beeb3..2122e33 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs @@ -1,4 +1,6 @@ using System.ComponentModel.Composition; +using Azure.AI.OpenAI; +using Azure; using Cosmos.DataTransfer.Interfaces; using Cosmos.DataTransfer.MongoExtension.Settings; using Microsoft.Extensions.Configuration; @@ -18,21 +20,49 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati if (!string.IsNullOrEmpty(settings.ConnectionString) && !string.IsNullOrEmpty(settings.DatabaseName) && !string.IsNullOrEmpty(settings.Collection)) { + var Isembeddingsetsvalid = false; + var client = new OpenAIClient(""); + if (settings.GenerateEmbedding.HasValue && settings.GenerateEmbedding.Value && settings.SourcePropEmbedding != null && settings.DestPropEmbedding != null) + { + if (!string.IsNullOrEmpty(settings.OpenAIUrl) && !string.IsNullOrEmpty(settings.OpenAIKey) && !string.IsNullOrEmpty(settings.OpenAIDeploymentName)) + { + client = new OpenAIClient(new Uri(settings.OpenAIUrl), new AzureKeyCredential(settings.OpenAIKey)); + Isembeddingsetsvalid = true; + logger.LogInformation("OpenAI Embedding settings are valid."); + } + } + var context = new Context(settings.ConnectionString, settings.DatabaseName); var repo = context.GetRepository(settings.Collection); - var batchSize = settings.BatchSize ?? 1000; - var objects = new List(); int itemCount = 0; await foreach (var item in dataItems.WithCancellation(cancellationToken)) { var dict = item.BuildDynamicObjectTree(); + + if (Isembeddingsetsvalid) + { + var valtoemb = item.GetValue(settings.SourcePropEmbedding)?.ToString(); + if (!string.IsNullOrEmpty(valtoemb) && valtoemb?.Length < 8192) + { + var options = new EmbeddingsOptions() + { + DeploymentName = settings.OpenAIDeploymentName, + Input = { valtoemb } + }; + var vector = await client.GetEmbeddingsAsync(options,cancellationToken); + if (vector != null) + { + dict?.TryAdd(settings.DestPropEmbedding, vector.Value.Data[0].Embedding.ToArray()); + } + } + } objects.Add(new BsonDocument(dict)); itemCount++; if (objects.Count == batchSize) - { + { await repo.AddRange(objects); logger.LogInformation("Added {ItemCount} items to collection '{Collection}'", itemCount, settings.Collection); objects.Clear(); diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs index 3c64b1b..722e532 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs @@ -7,4 +7,14 @@ public class MongoSinkSettings : MongoBaseSettings public string? Collection { get; set; } public int? BatchSize { get; set; } + + public bool? GenerateEmbedding { get; set; } + + public string? OpenAIUrl { get; set; } + public string? OpenAIKey { get; set; } + + // name of the deployment for text-embedding-ada-002 + public string? OpenAIDeploymentName { get; set; } + public string? SourcePropEmbedding { get; set; } + public string? DestPropEmbedding { get; set; } } diff --git a/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj b/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj new file mode 100644 index 0000000..7ad263a --- /dev/null +++ b/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj @@ -0,0 +1,21 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + + diff --git a/Extensions/PostgreSQL/PostgreDataCol.cs b/Extensions/PostgreSQL/PostgreDataCol.cs new file mode 100644 index 0000000..c0435ee --- /dev/null +++ b/Extensions/PostgreSQL/PostgreDataCol.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Cosmos.DataTransfer.PostgresqlExtension +{ + public class PostgreDataCol + { + public string ColumnName { get; set; } + public Type ColumnType { get; set; } + + public NpgsqlTypes.NpgsqlDbType PostgreType { get; set; } + + public PostgreDataCol(string colname, Type coltype) + { + ColumnName = colname; + ColumnType = coltype; + PostgreType = Convert(coltype); + } + + public Dictionary SparseColumnData { get; } = new Dictionary(); + + + + + public void AddColumnValue(long row, object? value) + { + if (value == null) + { + return; + } + + SparseColumnData[row] = value; + } + + public NpgsqlTypes.NpgsqlDbType Convert(Type coltype) + { + if (coltype.Name == "Missing") + { + return NpgsqlTypes.NpgsqlDbType.Unknown; + } + return coltype switch + { + var type when type == typeof(string) => NpgsqlTypes.NpgsqlDbType.Varchar, + var type when type == typeof(int) => NpgsqlTypes.NpgsqlDbType.Integer, + var type when type == typeof(long) => NpgsqlTypes.NpgsqlDbType.Bigint, + var type when type == typeof(bool) => NpgsqlTypes.NpgsqlDbType.Boolean, + var type when type == typeof(DateTime) => NpgsqlTypes.NpgsqlDbType.Timestamp, + var type when type == typeof(double) => NpgsqlTypes.NpgsqlDbType.Double, + var type when type == typeof(float) => NpgsqlTypes.NpgsqlDbType.Real, + var type when type == typeof(decimal) => NpgsqlTypes.NpgsqlDbType.Numeric, + var type when type == typeof(byte[]) => NpgsqlTypes.NpgsqlDbType.Bytea, + var type when type == typeof(Guid) => NpgsqlTypes.NpgsqlDbType.Uuid, + var type when type == typeof(char) => NpgsqlTypes.NpgsqlDbType.Char, + var type when type == typeof(TimeSpan) => NpgsqlTypes.NpgsqlDbType.Interval, + var type when type == typeof(DateTimeOffset) => NpgsqlTypes.NpgsqlDbType.TimestampTz, + var type when type == typeof(short) => NpgsqlTypes.NpgsqlDbType.Smallint, + var type when type == typeof(uint) => NpgsqlTypes.NpgsqlDbType.Integer, + var type when type == typeof(ushort) => NpgsqlTypes.NpgsqlDbType.Smallint, + var type when type == typeof(ulong) => NpgsqlTypes.NpgsqlDbType.Bigint, + var type when type == typeof(sbyte) => NpgsqlTypes.NpgsqlDbType.Smallint, + var type when type == typeof(byte) => NpgsqlTypes.NpgsqlDbType.Smallint, + var type when type == typeof(char[]) => NpgsqlTypes.NpgsqlDbType.Text, + var type when type == typeof(char?) => NpgsqlTypes.NpgsqlDbType.Char, + _ => NpgsqlTypes.NpgsqlDbType.Unknown, + }; + } + } +} diff --git a/Extensions/PostgreSQL/PostgreDictionaryDataItem.cs b/Extensions/PostgreSQL/PostgreDictionaryDataItem.cs new file mode 100644 index 0000000..a668304 --- /dev/null +++ b/Extensions/PostgreSQL/PostgreDictionaryDataItem.cs @@ -0,0 +1,27 @@ +using Cosmos.DataTransfer.Interfaces; + +namespace Cosmos.DataTransfer.PostgresqlExtension +{ + public class PostgreDictionaryDataItem : IDataItem + { + public IDictionary Columns { get; set; } + + public PostgreDictionaryDataItem(IDictionary columns) + { + Columns = columns; + } + public IEnumerable GetFieldNames() + { + return Columns.Keys; + } + + public object? GetValue(string fieldName) + { + if (!Columns.TryGetValue(fieldName, out var value)) + { + return null; + } + return value; + } + } +} diff --git a/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs b/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs new file mode 100644 index 0000000..a239d39 --- /dev/null +++ b/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs @@ -0,0 +1,74 @@ +using System.ComponentModel.Composition; +using Cosmos.DataTransfer.Interfaces; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Npgsql; + +namespace Cosmos.DataTransfer.PostgresqlExtension +{ + [Export(typeof(IDataSinkExtension))] + public class PostgresqlDataSinkExtension : IDataSinkExtensionWithSettings + { + public string DisplayName => "PostgreSQL"; + + public IEnumerable GetSettings() + { + throw new NotImplementedException(); + } + + public async Task WriteAsync(IAsyncEnumerable dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default) + { + await MapDataTypes(dataItems); + /* + NpgsqlConnection con = new NpgsqlConnection(""); + using (var writer = con.BeginBinaryImport("COPY teachers (first_name, last_name, subject, salary) FROM STDIN (FORMAT BINARY)")) + { + await foreach (var item in dataItems) + { + await writer.StartRowAsync().ConfigureAwait(false); + await writer.WriteAsync("Firstname", NpgsqlTypes.NpgsqlDbType.Varchar).ConfigureAwait(false); + + + + await writer.WriteAsync(teacher.LastName, NpgsqlTypes.NpgsqlDbType.Varchar).ConfigureAwait(false); + await writer.WriteAsync(teacher.Subject, NpgsqlTypes.NpgsqlDbType.Varchar).ConfigureAwait(false); + await writer.WriteAsync(teacher.Salary, NpgsqlTypes.NpgsqlDbType.Integer).ConfigureAwait(false); + } + await writer.CompleteAsync().ConfigureAwait(false); + } + throw new NotImplementedException();*/ + } + + public async Task MapDataTypes(IAsyncEnumerable dataItems, CancellationToken cancellationToken = default) + { + List postgreDataCols = new List(); + await foreach (var item in dataItems) + { + var fieldNames = item.GetFieldNames(); + long row = 0; + foreach (var col in fieldNames) + { + var current = postgreDataCols.FirstOrDefault(c => c.ColumnName == col); + var colval = item.GetValue(col); + var coltype = Type.Missing.GetType(); + if (colval != null) + { + coltype = colval.GetType(); + } + if (current == null) + { + var newcol = new PostgreDataCol(col, coltype); + newcol.AddColumnValue(row, colval); + postgreDataCols.Add(newcol); + row++; + } + } + } + } + + + + + + } +} diff --git a/Extensions/PostgreSQL/Program.cs b/Extensions/PostgreSQL/Program.cs new file mode 100644 index 0000000..864ddc4 --- /dev/null +++ b/Extensions/PostgreSQL/Program.cs @@ -0,0 +1 @@ +Console.WriteLine(); \ No newline at end of file diff --git a/Extensions/PostgreSQL/Properties/PublishProfiles/FolderProfile.pubxml b/Extensions/PostgreSQL/Properties/PublishProfiles/FolderProfile.pubxml new file mode 100644 index 0000000..d6854c4 --- /dev/null +++ b/Extensions/PostgreSQL/Properties/PublishProfiles/FolderProfile.pubxml @@ -0,0 +1,15 @@ + + + + + Release + Any CPU + ..\..\Core\Cosmos.DataTransfer.Core\bin\Debug\net6.0\Extensions + FileSystem + <_TargetId>Folder + net6.0 + false + + \ No newline at end of file diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj index 3031dea..af02e09 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj @@ -8,7 +8,7 @@ - + From 5b98a5eb836abc6a6c520b4bb03898fca033093b Mon Sep 17 00:00:00 2001 From: Hasan Savran Date: Mon, 12 Feb 2024 16:38:28 -0500 Subject: [PATCH 2/6] Postgresql & Vector feature for Mongo Sink --- .../migrationsettings.json | 4 +- CosmosDbDataMigrationTool.sln | 3 + .../MongoDataSinkExtension.cs | 6 +- Extensions/Mongo/README.md | 15 +- ...os.DataTransfer.PostgresqlExtension.csproj | 1 + Extensions/PostgreSQL/PostgreDataCol.cs | 74 +++++++- .../PostgreSQL/PostgresqlDataSinkExtension.cs | 175 +++++++++++++++--- .../PostgresqlDataSourceExtension.cs | 47 +++++ Extensions/PostgreSQL/README.md | 35 ++++ .../Settings/PostgreBaseSettings.cs | 16 ++ .../Settings/PostgreSinkSettings.cs | 14 ++ .../Settings/PostgreSourceSettings.cs | 15 ++ .../DictionaryDataItem.cs | 4 +- PostgreBaseSettings.cs | 16 ++ PostgreSinkSettings.cs | 13 ++ 15 files changed, 402 insertions(+), 36 deletions(-) create mode 100644 Extensions/PostgreSQL/PostgresqlDataSourceExtension.cs create mode 100644 Extensions/PostgreSQL/README.md create mode 100644 Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs create mode 100644 Extensions/PostgreSQL/Settings/PostgreSinkSettings.cs create mode 100644 Extensions/PostgreSQL/Settings/PostgreSourceSettings.cs create mode 100644 PostgreBaseSettings.cs create mode 100644 PostgreSinkSettings.cs diff --git a/Core/Cosmos.DataTransfer.Core/migrationsettings.json b/Core/Cosmos.DataTransfer.Core/migrationsettings.json index b138979..a5a6272 100644 --- a/Core/Cosmos.DataTransfer.Core/migrationsettings.json +++ b/Core/Cosmos.DataTransfer.Core/migrationsettings.json @@ -1,6 +1,6 @@ { - "Source": "Cosmos-nosql", - "Sink": "Mongo", + "Source": "", + "Sink": "", "SourceSettings": { }, diff --git a/CosmosDbDataMigrationTool.sln b/CosmosDbDataMigrationTool.sln index 95ff2ba..c9a7e5e 100644 --- a/CosmosDbDataMigrationTool.sln +++ b/CosmosDbDataMigrationTool.sln @@ -99,6 +99,9 @@ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.PostgresqlExtension", "Extensions\PostgreSQL\Cosmos.DataTransfer.PostgresqlExtension.csproj", "{85820167-DB94-458B-B09B-9E823996C692}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PostgreSQL", "PostgreSQL", "{1B927C5F-50FC-42A6-BAF6-B00E6D760543}" + ProjectSection(SolutionItems) = preProject + Extensions\PostgreSQL\README.md = Extensions\PostgreSQL\README.md + EndProjectSection EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs index 2122e33..a81d364 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs @@ -15,7 +15,7 @@ public class MongoDataSinkExtension : IDataSinkExtensionWithSettings public async Task WriteAsync(IAsyncEnumerable dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default) { - var settings = config.Get(); + var settings = config.Get(); settings.Validate(); if (!string.IsNullOrEmpty(settings.ConnectionString) && !string.IsNullOrEmpty(settings.DatabaseName) && !string.IsNullOrEmpty(settings.Collection)) @@ -36,7 +36,7 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati var repo = context.GetRepository(settings.Collection); var batchSize = settings.BatchSize ?? 1000; var objects = new List(); - int itemCount = 0; + int itemCount = 0; await foreach (var item in dataItems.WithCancellation(cancellationToken)) { var dict = item.BuildDynamicObjectTree(); @@ -51,7 +51,7 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati DeploymentName = settings.OpenAIDeploymentName, Input = { valtoemb } }; - var vector = await client.GetEmbeddingsAsync(options,cancellationToken); + var vector = await client.GetEmbeddingsAsync(options,cancellationToken); if (vector != null) { dict?.TryAdd(settings.DestPropEmbedding, vector.Value.Data[0].Embedding.ToArray()); diff --git a/Extensions/Mongo/README.md b/Extensions/Mongo/README.md index 621027b..9d5952c 100644 --- a/Extensions/Mongo/README.md +++ b/Extensions/Mongo/README.md @@ -20,11 +20,24 @@ Source and sink settings require both `ConnectionString` and `DatabaseName` para ### Sink +- `GenerateEmbedding`: If set to true, the sink will generate embeddings for the records before writing them to the database. The sink requires the `OpenAIUrl`, `OpenAIKey`, and `OpenAIDeploymentModel` parameters to be set. Following paramaters are required if this is true +- `OpenAIUrl`: The URL of the OpenAI API +- `OpenAIKey`: The API key for the OpenAI API +- `OpenAIDeploymentModel`: The deployment model to use for the OpenAI API +- `SourcePropEmbedding`: The property in the source data that should be used to generate the embeddings +- `DestPropEmbedding`: New property name that will be added to the source data with the generated embeddings + ```json { "ConnectionString": "", "DatabaseName: "", "Collection": "", - "BatchSize: 100 + "BatchSize: 100, + "GenerateEmbedding": true | false + "OpenAIUrl": "", + "OpenAIKey": "", + "OpenAIDeploymentModel": "", + "SourcePropEmbedding": "", + "DestPropEmbedding": "" } ``` diff --git a/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj b/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj index 7ad263a..d96d74c 100644 --- a/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj +++ b/Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj @@ -8,6 +8,7 @@ + diff --git a/Extensions/PostgreSQL/PostgreDataCol.cs b/Extensions/PostgreSQL/PostgreDataCol.cs index c0435ee..9cf23ff 100644 --- a/Extensions/PostgreSQL/PostgreDataCol.cs +++ b/Extensions/PostgreSQL/PostgreDataCol.cs @@ -1,4 +1,5 @@ -using System; +using Microsoft.VisualBasic; +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -20,6 +21,24 @@ public PostgreDataCol(string colname, Type coltype) PostgreType = Convert(coltype); } + public PostgreDataCol(string colname, NpgsqlTypes.NpgsqlDbType postgreType) + { + ColumnName = colname; + PostgreType = postgreType; + ColumnType = Convert(postgreType); + } + + public PostgreDataCol(string colname, string postgredatatye) + { + ColumnName = colname; + PostgreType = Convert(postgredatatye); + ColumnType = Convert(PostgreType); + } + + public PostgreDataCol() + { + } + public Dictionary SparseColumnData { get; } = new Dictionary(); @@ -35,11 +54,62 @@ public void AddColumnValue(long row, object? value) SparseColumnData[row] = value; } + public Type Convert(NpgsqlTypes.NpgsqlDbType coltype) + { + return coltype switch + { + NpgsqlTypes.NpgsqlDbType.Varchar => typeof(string), + NpgsqlTypes.NpgsqlDbType.Integer => typeof(int), + NpgsqlTypes.NpgsqlDbType.Bigint => typeof(long), + NpgsqlTypes.NpgsqlDbType.Boolean => typeof(bool), + NpgsqlTypes.NpgsqlDbType.Timestamp => typeof(DateTime), + NpgsqlTypes.NpgsqlDbType.Double => typeof(double), + NpgsqlTypes.NpgsqlDbType.Real => typeof(float), + NpgsqlTypes.NpgsqlDbType.Numeric => typeof(decimal), + NpgsqlTypes.NpgsqlDbType.Bytea => typeof(byte[]), + NpgsqlTypes.NpgsqlDbType.Uuid => typeof(Guid), + NpgsqlTypes.NpgsqlDbType.Char => typeof(char), + NpgsqlTypes.NpgsqlDbType.Interval => typeof(TimeSpan), + NpgsqlTypes.NpgsqlDbType.TimestampTz => typeof(DateTimeOffset), + NpgsqlTypes.NpgsqlDbType.Smallint => typeof(short), + NpgsqlTypes.NpgsqlDbType.Unknown => typeof(DBNull), + _ => typeof(DBNull), + }; + } + + public NpgsqlTypes.NpgsqlDbType Convert(string postgredattype) + { + return postgredattype.ToLower() switch + { + "varchar" =>NpgsqlTypes.NpgsqlDbType.Varchar, + "int8" => NpgsqlTypes.NpgsqlDbType.Bigint, + "int4" => NpgsqlTypes.NpgsqlDbType.Integer, + "int2" => NpgsqlTypes.NpgsqlDbType.Smallint, + "bool" => NpgsqlTypes.NpgsqlDbType.Boolean, + "timestamp" => NpgsqlTypes.NpgsqlDbType.Timestamp, + "timestamptz" => NpgsqlTypes.NpgsqlDbType.TimestampTz, + "float8" => NpgsqlTypes.NpgsqlDbType.Double, + "float4" => NpgsqlTypes.NpgsqlDbType.Real, + "numeric" => NpgsqlTypes.NpgsqlDbType.Numeric, + "bytea" => NpgsqlTypes.NpgsqlDbType.Bytea, + "char" => NpgsqlTypes.NpgsqlDbType.Char, + "interval" => NpgsqlTypes.NpgsqlDbType.Interval, + "int2vector"=> NpgsqlTypes.NpgsqlDbType.Array, + "jsonb" => NpgsqlTypes.NpgsqlDbType.Jsonb, + "name" => NpgsqlTypes.NpgsqlDbType.Name, + "oid" => NpgsqlTypes.NpgsqlDbType.Oid, + "text" => NpgsqlTypes.NpgsqlDbType.Text, + "unknown" =>NpgsqlTypes.NpgsqlDbType.Unknown, + _ => NpgsqlTypes.NpgsqlDbType.Unknown, + }; + } + public NpgsqlTypes.NpgsqlDbType Convert(Type coltype) { if (coltype.Name == "Missing") { - return NpgsqlTypes.NpgsqlDbType.Unknown; + return //NpgsqlTypes.NpgsqlDbType.Varchar; + NpgsqlTypes.NpgsqlDbType.Unknown; } return coltype switch { diff --git a/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs b/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs index a239d39..8f16dce 100644 --- a/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs +++ b/Extensions/PostgreSQL/PostgresqlDataSinkExtension.cs @@ -1,5 +1,7 @@ using System.ComponentModel.Composition; +using System.Data; using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.PostgresqlExtension.Settings; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using Npgsql; @@ -9,43 +11,61 @@ namespace Cosmos.DataTransfer.PostgresqlExtension [Export(typeof(IDataSinkExtension))] public class PostgresqlDataSinkExtension : IDataSinkExtensionWithSettings { - public string DisplayName => "PostgreSQL"; + public string DisplayName => "PostgreSQL"; public IEnumerable GetSettings() { - throw new NotImplementedException(); + yield return new PostgreSinkSettings(); } public async Task WriteAsync(IAsyncEnumerable dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default) { - await MapDataTypes(dataItems); - /* - NpgsqlConnection con = new NpgsqlConnection(""); - using (var writer = con.BeginBinaryImport("COPY teachers (first_name, last_name, subject, salary) FROM STDIN (FORMAT BINARY)")) - { - await foreach (var item in dataItems) - { - await writer.StartRowAsync().ConfigureAwait(false); - await writer.WriteAsync("Firstname", NpgsqlTypes.NpgsqlDbType.Varchar).ConfigureAwait(false); - - + var settings = config.Get(); + settings.Validate(); + + var cols = await FindPostgreDataTypes(dataItems, cancellationToken); + NpgsqlConnection con = new(settings.ConnectionString); - await writer.WriteAsync(teacher.LastName, NpgsqlTypes.NpgsqlDbType.Varchar).ConfigureAwait(false); - await writer.WriteAsync(teacher.Subject, NpgsqlTypes.NpgsqlDbType.Varchar).ConfigureAwait(false); - await writer.WriteAsync(teacher.Salary, NpgsqlTypes.NpgsqlDbType.Integer).ConfigureAwait(false); + if (settings.AppendDataToTable == true && !string.IsNullOrEmpty(settings.TableName)) + { + var destcols = LoadTableSchema(con, settings.TableName); + cols = MapDataTypes(destcols, cols); + } + else if (settings.DropAndCreateTable == true) + { + DropTable(con, settings.TableName); + CreateTable(con, settings.TableName, cols); + } + con.Open(); + using (var writer = con.BeginBinaryImport(GenerateInsertCommand(settings.TableName, cols))) + { + await foreach (var row in dataItems) + { + await writer.StartRowAsync(cancellationToken).ConfigureAwait(false); + foreach (var item in cols) + { + try + { + await writer.WriteAsync(row.GetValue(item.ColumnName), item.PostgreType, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + logger.LogError(ex, "Error writing to database"); + } + } } - await writer.CompleteAsync().ConfigureAwait(false); + await writer.CompleteAsync(cancellationToken).ConfigureAwait(false); } - throw new NotImplementedException();*/ + con.Close(); } - public async Task MapDataTypes(IAsyncEnumerable dataItems, CancellationToken cancellationToken = default) + private async Task> FindPostgreDataTypes(IAsyncEnumerable dataItems, CancellationToken cancellationToken = default) { - List postgreDataCols = new List(); + List postgreDataCols = new(); await foreach (var item in dataItems) { var fieldNames = item.GetFieldNames(); - long row = 0; + int row = 0; foreach (var col in fieldNames) { var current = postgreDataCols.FirstOrDefault(c => c.ColumnName == col); @@ -57,18 +77,121 @@ public async Task MapDataTypes(IAsyncEnumerable dataItems, Cancellati } if (current == null) { - var newcol = new PostgreDataCol(col, coltype); + var newcol = new PostgreDataCol(col, coltype); newcol.AddColumnValue(row, colval); postgreDataCols.Add(newcol); - row++; + } + else + { + if (current.PostgreType == NpgsqlTypes.NpgsqlDbType.Unknown && coltype?.Name != "Missing") + { + var newcol = new PostgreDataCol(col, coltype); + postgreDataCols[row] = newcol; + } + } + row++; + } + } + return postgreDataCols; + } + + private List MapDataTypes(List dest, List source) + { + var temp = new List(); + foreach (var item in dest) + { + bool found = false; + foreach (var col in source) + { + if (item.ColumnName.ToLower() == col.ColumnName.ToLower()) + { + temp.Add(new PostgreDataCol() + { + ColumnName = col.ColumnName, + ColumnType = item.ColumnType, + PostgreType = item.PostgreType + }); + found = true; + break; + } + } + if (!found) + { + throw new Exception($"Column '{item.ColumnName}' does not exist in the source."); + } + } + return temp; + } + + private static void CreateTable(NpgsqlConnection con, string tableName, List cols) + { + //NpgsqlConnection con = new(connectionString); + var createtxt = $"CREATE TABLE {tableName}("; + foreach (var item in cols) + { + createtxt += $"{item.ColumnName} {item.PostgreType},"; + if (cols.Last() == item) + { + createtxt = createtxt.TrimEnd(','); + } + } + createtxt += ")"; + con.Open(); + using (var cmd = new NpgsqlCommand(createtxt, con)) + { + cmd.ExecuteNonQuery(); + } + con.Close(); + } + + private static void DropTable(NpgsqlConnection con, string tableName) + { + con.Open(); + using (var cmd = new NpgsqlCommand($"DROP TABLE IF EXISTS {tableName}", con)) + { + cmd.ExecuteNonQuery(); + } + con.Close(); + } + + private static List LoadTableSchema(NpgsqlConnection con, string tableName) + { + var temp = new List(); + con.Open(); + var dt = new DataTable(); + using (var cmd = new NpgsqlCommand($"SELECT column_name, udt_name FROM information_schema.columns WHERE table_name = '{tableName}'", con)) + using (var reader = cmd.ExecuteReader()) + { + dt.Load(reader); + } + foreach (DataRow row in dt.Rows) + { + if (row != null) + { + var newcol = new PostgreDataCol(row["column_name"]?.ToString(), row["udt_name"]?.ToString()); + temp.Add(newcol); + } + } + + con.Close(); + return temp; + } + + private static string GenerateInsertCommand(string tablename, List cols) + { + var colstxt = ""; + foreach (var item in cols) + { + colstxt += $"{item.ColumnName},"; + if (cols.Last() == item) + { + colstxt = colstxt.TrimEnd(','); } } + return $"COPY {tablename}({colstxt}) FROM STDIN(FORMAT BINARY)"; } - - - } } diff --git a/Extensions/PostgreSQL/PostgresqlDataSourceExtension.cs b/Extensions/PostgreSQL/PostgresqlDataSourceExtension.cs new file mode 100644 index 0000000..3ca9095 --- /dev/null +++ b/Extensions/PostgreSQL/PostgresqlDataSourceExtension.cs @@ -0,0 +1,47 @@ +using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.PostgresqlExtension.Settings; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Npgsql; +using System.ComponentModel.Composition; +using System.Runtime.CompilerServices; + +namespace Cosmos.DataTransfer.PostgresqlExtension; +[Export(typeof(IDataSourceExtension))] + +internal class PostgresqlDataSourceExtension : IDataSourceExtensionWithSettings +{ + public string DisplayName => "PostgreSQL"; + + public IEnumerable GetSettings() + { + yield return new PostgreSourceSettings(); + } + + public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var settings = config.Get(); + settings.Validate(); + + await using var connection = new NpgsqlConnection(settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + await using var command = new NpgsqlCommand(settings.QueryText, connection); + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + while (await reader.ReadAsync(cancellationToken)) + { + var columns = await reader.GetColumnSchemaAsync(cancellationToken); + Dictionary fields = new(); + foreach (var column in columns) + { + var value = column.ColumnOrdinal.HasValue ? reader[column.ColumnOrdinal.Value] : reader[column.ColumnName]; + if (value == DBNull.Value) + { + value = null; + } + fields[column.ColumnName] = value; + } + yield return new DictionaryDataItem(fields); + } + } +} + diff --git a/Extensions/PostgreSQL/README.md b/Extensions/PostgreSQL/README.md new file mode 100644 index 0000000..b15dea8 --- /dev/null +++ b/Extensions/PostgreSQL/README.md @@ -0,0 +1,35 @@ +# PostgreSQL Extension + +The PostgreSQL data transfer extension provides source and sink capabilities for reading from and writing to table data in PostgreSQL Server. + +> **Note**: When specifying the PostgreSQL extension as the Source or Sink property in configuration, utilize the name **PostgreSQL**. + +## Settings + +Source and sink settings both require a `ConnectionString` parameter. Specify the database name in the connection string. + +Source settings also require a `QueryText` parameter to define the data to select from SQL. This can combine data from multiple tables, views, etc. but should produce a single result set. + +### Source + +```json +{ + "ConnectionString": "", + "QueryText": "" +} +``` + +Sink settings require a `TableName` to define where to insert data. +- `AppendDataToTable`: Set to true to use table's schema and append data to the table. +- `DropAndCreateTable`: Set to true to drop and recreate the table. Schema will be guessed from the source data. + +### Sink + +```json +{ + "ConnectionString": "", + "TableName": "", + "AppendDataToTable": true | false, + "DropAndCreateTable": true | false +} +``` \ No newline at end of file diff --git a/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs b/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs new file mode 100644 index 0000000..90f140e --- /dev/null +++ b/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs @@ -0,0 +1,16 @@ +using System.ComponentModel.DataAnnotations; +using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.Interfaces.Manifest; + +namespace Cosmos.DataTransfer.PostgresqlExtension.Settings +{ + public class PostgreBaseSettings : IDataExtensionSettings + { + [Required] + [SensitiveValue] + public string? ConnectionString { get; set; } + + //[Required] + //public string? Database { get; set; } + } +} \ No newline at end of file diff --git a/Extensions/PostgreSQL/Settings/PostgreSinkSettings.cs b/Extensions/PostgreSQL/Settings/PostgreSinkSettings.cs new file mode 100644 index 0000000..3ab0bde --- /dev/null +++ b/Extensions/PostgreSQL/Settings/PostgreSinkSettings.cs @@ -0,0 +1,14 @@ +using System.ComponentModel.DataAnnotations; + +namespace Cosmos.DataTransfer.PostgresqlExtension.Settings +{ + + public class PostgreSinkSettings : PostgreBaseSettings + { + [Required] + public string TableName { get; set; } + public bool? AppendDataToTable { get; set; } + public bool? DropAndCreateTable { get; set; } + + } +} \ No newline at end of file diff --git a/Extensions/PostgreSQL/Settings/PostgreSourceSettings.cs b/Extensions/PostgreSQL/Settings/PostgreSourceSettings.cs new file mode 100644 index 0000000..96549f3 --- /dev/null +++ b/Extensions/PostgreSQL/Settings/PostgreSourceSettings.cs @@ -0,0 +1,15 @@ +using Cosmos.DataTransfer.Interfaces.Manifest; +using System.ComponentModel.DataAnnotations; + +namespace Cosmos.DataTransfer.PostgresqlExtension.Settings +{ + public class PostgreSourceSettings:PostgreBaseSettings + { + [Required] + [SensitiveValue] + public string? ConnectionString { get; set; } + + [Required] + public string? QueryText { get; set; } + } +} diff --git a/Interfaces/Cosmos.DataTransfer.Interfaces/DictionaryDataItem.cs b/Interfaces/Cosmos.DataTransfer.Interfaces/DictionaryDataItem.cs index 4336ac7..ede1e70 100644 --- a/Interfaces/Cosmos.DataTransfer.Interfaces/DictionaryDataItem.cs +++ b/Interfaces/Cosmos.DataTransfer.Interfaces/DictionaryDataItem.cs @@ -6,11 +6,11 @@ public class DictionaryDataItem : IDataItem public DictionaryDataItem(IDictionary items) { - Items = items; + Items = items; } public IEnumerable GetFieldNames() - { + { return Items.Keys; } diff --git a/PostgreBaseSettings.cs b/PostgreBaseSettings.cs new file mode 100644 index 0000000..6812b75 --- /dev/null +++ b/PostgreBaseSettings.cs @@ -0,0 +1,16 @@ +using System.ComponentModel.DataAnnotations; +using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.Interfaces.Manifest; + +namespace Cosmos.DataTransfer.PostgresqlExtension.Settings +{ + public class PostgreBaseSettings : IDataExtensionSettings + { + [Required] + [SensitiveValue] + public string? ConnectionString { get; set; } + + [Required] + public string? DatabaseName { get; set; } + } +} \ No newline at end of file diff --git a/PostgreSinkSettings.cs b/PostgreSinkSettings.cs new file mode 100644 index 0000000..23d9ddb --- /dev/null +++ b/PostgreSinkSettings.cs @@ -0,0 +1,13 @@ +using System.ComponentModel.DataAnnotations; + +namespace Cosmos.DataTransfer.PostgresqlExtension.Settings +{ + + public class PostgreSinkSettings : PostgreBaseSettings + { + public string TableName { get; set; }; + public bool? CreateTableIfNotExist { get; set; } + public bool? DropTabelIfExists { get; set; } + + } +} From 74b2417981553b5f02f62a1f22b09796fb12346c Mon Sep 17 00:00:00 2001 From: John Bowen Date: Fri, 29 Mar 2024 15:52:21 -0700 Subject: [PATCH 3/6] Updating to new stable package version --- .../Cosmos.DataTransfer.Core.csproj | 2 +- .../PostgreSQL/Settings/PostgreBaseSettings.cs | 3 --- ...Cosmos.DataTransfer.SqlServerExtension.csproj | 2 +- .../DictionaryDataItem.cs | 4 ++-- PostgreBaseSettings.cs | 16 ---------------- PostgreSinkSettings.cs | 13 ------------- 6 files changed, 4 insertions(+), 36 deletions(-) delete mode 100644 PostgreBaseSettings.cs delete mode 100644 PostgreSinkSettings.cs diff --git a/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj b/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj index b1eebe7..67838d7 100644 --- a/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj +++ b/Core/Cosmos.DataTransfer.Core/Cosmos.DataTransfer.Core.csproj @@ -20,7 +20,7 @@ - + diff --git a/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs b/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs index 90f140e..dc02d49 100644 --- a/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs +++ b/Extensions/PostgreSQL/Settings/PostgreBaseSettings.cs @@ -9,8 +9,5 @@ public class PostgreBaseSettings : IDataExtensionSettings [Required] [SensitiveValue] public string? ConnectionString { get; set; } - - //[Required] - //public string? Database { get; set; } } } \ No newline at end of file diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj index af02e09..5800558 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/Cosmos.DataTransfer.SqlServerExtension.csproj @@ -8,7 +8,7 @@ - + diff --git a/Interfaces/Cosmos.DataTransfer.Interfaces/DictionaryDataItem.cs b/Interfaces/Cosmos.DataTransfer.Interfaces/DictionaryDataItem.cs index ede1e70..4336ac7 100644 --- a/Interfaces/Cosmos.DataTransfer.Interfaces/DictionaryDataItem.cs +++ b/Interfaces/Cosmos.DataTransfer.Interfaces/DictionaryDataItem.cs @@ -6,11 +6,11 @@ public class DictionaryDataItem : IDataItem public DictionaryDataItem(IDictionary items) { - Items = items; + Items = items; } public IEnumerable GetFieldNames() - { + { return Items.Keys; } diff --git a/PostgreBaseSettings.cs b/PostgreBaseSettings.cs deleted file mode 100644 index 6812b75..0000000 --- a/PostgreBaseSettings.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.ComponentModel.DataAnnotations; -using Cosmos.DataTransfer.Interfaces; -using Cosmos.DataTransfer.Interfaces.Manifest; - -namespace Cosmos.DataTransfer.PostgresqlExtension.Settings -{ - public class PostgreBaseSettings : IDataExtensionSettings - { - [Required] - [SensitiveValue] - public string? ConnectionString { get; set; } - - [Required] - public string? DatabaseName { get; set; } - } -} \ No newline at end of file diff --git a/PostgreSinkSettings.cs b/PostgreSinkSettings.cs deleted file mode 100644 index 23d9ddb..0000000 --- a/PostgreSinkSettings.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System.ComponentModel.DataAnnotations; - -namespace Cosmos.DataTransfer.PostgresqlExtension.Settings -{ - - public class PostgreSinkSettings : PostgreBaseSettings - { - public string TableName { get; set; }; - public bool? CreateTableIfNotExist { get; set; } - public bool? DropTabelIfExists { get; set; } - - } -} From 3bd4579784c4edc9c7ce40f6f2aded25e8bd1854 Mon Sep 17 00:00:00 2001 From: John Bowen Date: Fri, 29 Mar 2024 16:01:46 -0700 Subject: [PATCH 4/6] Moving Vector functionality for Mongo to new beta extension --- CosmosDbDataMigrationTool.sln | 2 +- .../Context.cs | 2 +- ...osmos.DataTransfer.MongoVectorExtension.csproj} | 0 .../IRepository.cs | 2 +- .../MongoDataItem.cs | 2 +- .../MongoRepository.cs | 2 +- .../MongoVectorDataSinkExtension.cs} | 14 +++++++------- .../MongoVectorDataSourceExtension.cs} | 8 ++++---- .../Program.cs | 0 .../PublishToExtensionsFolder.pubxml | 0 .../Settings/MongoBaseSettings.cs | 2 +- .../Settings/MongoSourceSettings.cs | 2 +- .../Settings/MongoVectorSinkSettings.cs} | 4 ++-- 13 files changed, 20 insertions(+), 20 deletions(-) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension => Cosmos.DataTransfer.MongoVectorExtension}/Context.cs (96%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj => Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj} (100%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension => Cosmos.DataTransfer.MongoVectorExtension}/IRepository.cs (90%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension => Cosmos.DataTransfer.MongoVectorExtension}/MongoDataItem.cs (90%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension => Cosmos.DataTransfer.MongoVectorExtension}/MongoRepository.cs (95%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs => Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs} (90%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs => Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSourceExtension.cs} (88%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension => Cosmos.DataTransfer.MongoVectorExtension}/Program.cs (100%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension => Cosmos.DataTransfer.MongoVectorExtension}/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml (100%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension => Cosmos.DataTransfer.MongoVectorExtension}/Settings/MongoBaseSettings.cs (84%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension => Cosmos.DataTransfer.MongoVectorExtension}/Settings/MongoSourceSettings.cs (61%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs => Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs} (81%) diff --git a/CosmosDbDataMigrationTool.sln b/CosmosDbDataMigrationTool.sln index c9a7e5e..4ed8f39 100644 --- a/CosmosDbDataMigrationTool.sln +++ b/CosmosDbDataMigrationTool.sln @@ -39,7 +39,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Mongo", "Mongo", "{F18E789A Extensions\Mongo\README.md = Extensions\Mongo\README.md EndProjectSection EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.MongoExtension", "Extensions\Mongo\Cosmos.DataTransfer.MongoExtension\Cosmos.DataTransfer.MongoExtension.csproj", "{F6EAC33B-9F7D-433B-9328-622FB8938C24}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.MongoVectorExtension", "Extensions\Mongo\Cosmos.DataTransfer.MongoVectorExtension\Cosmos.DataTransfer.MongoVectorExtension.csproj", "{F6EAC33B-9F7D-433B-9328-622FB8938C24}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.JsonExtension.UnitTests", "Extensions\Json\Cosmos.DataTransfer.JsonExtension.UnitTests\Cosmos.DataTransfer.JsonExtension.UnitTests.csproj", "{ED1E375E-A5A3-47EA-A7D5-07344C7E152F}" EndProject diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Context.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Context.cs similarity index 96% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Context.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Context.cs index 2271d46..715f07c 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Context.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Context.cs @@ -2,7 +2,7 @@ using MongoDB.Driver; using MongoDB.Driver.Core.Events; -namespace Cosmos.DataTransfer.MongoExtension; +namespace Cosmos.DataTransfer.MongoVectorExtension; public class Context { private readonly IMongoDatabase database = null!; diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj similarity index 100% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/IRepository.cs similarity index 90% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/IRepository.cs index 302c17b..03ea5aa 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/IRepository.cs @@ -1,6 +1,6 @@ using System.Linq.Expressions; -namespace Cosmos.DataTransfer.MongoExtension; +namespace Cosmos.DataTransfer.MongoVectorExtension; public interface IRepository { diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataItem.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoDataItem.cs similarity index 90% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataItem.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoDataItem.cs index 1f19a4b..8bec99b 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataItem.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoDataItem.cs @@ -1,7 +1,7 @@ using Cosmos.DataTransfer.Interfaces; using MongoDB.Bson; -namespace Cosmos.DataTransfer.MongoExtension; +namespace Cosmos.DataTransfer.MongoVectorExtension; public class MongoDataItem : IDataItem { private readonly BsonDocument record; diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoRepository.cs similarity index 95% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoRepository.cs index ec2f662..f104853 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoRepository.cs @@ -1,7 +1,7 @@ using System.Linq.Expressions; using MongoDB.Driver; -namespace Cosmos.DataTransfer.MongoExtension; +namespace Cosmos.DataTransfer.MongoVectorExtension; public class MongoRepository : IRepository { diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs similarity index 90% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs index a81d364..fd4638c 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs @@ -1,21 +1,21 @@ using System.ComponentModel.Composition; -using Azure.AI.OpenAI; using Azure; +using Azure.AI.OpenAI; using Cosmos.DataTransfer.Interfaces; -using Cosmos.DataTransfer.MongoExtension.Settings; +using Cosmos.DataTransfer.MongoVectorExtension.Settings; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using MongoDB.Bson; -namespace Cosmos.DataTransfer.MongoExtension; +namespace Cosmos.DataTransfer.MongoVectorExtension; [Export(typeof(IDataSinkExtension))] -public class MongoDataSinkExtension : IDataSinkExtensionWithSettings +public class MongoVectorDataSinkExtension : IDataSinkExtensionWithSettings { - public string DisplayName => "MongoDB"; + public string DisplayName => $"MongoDB-Vector{ExtensionExtensions.BetaExtensionTag}"; public async Task WriteAsync(IAsyncEnumerable dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default) { - var settings = config.Get(); + var settings = config.Get(); settings.Validate(); if (!string.IsNullOrEmpty(settings.ConnectionString) && !string.IsNullOrEmpty(settings.DatabaseName) && !string.IsNullOrEmpty(settings.Collection)) @@ -83,6 +83,6 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati public IEnumerable GetSettings() { - yield return new MongoSinkSettings(); + yield return new MongoVectorSinkSettings(); } } diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSourceExtension.cs similarity index 88% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSourceExtension.cs index 5768800..4c844bc 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSourceExtension.cs @@ -1,16 +1,16 @@ using System.ComponentModel.Composition; using System.Runtime.CompilerServices; using Cosmos.DataTransfer.Interfaces; -using Cosmos.DataTransfer.MongoExtension.Settings; +using Cosmos.DataTransfer.MongoVectorExtension.Settings; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using MongoDB.Bson; -namespace Cosmos.DataTransfer.MongoExtension; +namespace Cosmos.DataTransfer.MongoVectorExtension; [Export(typeof(IDataSourceExtension))] -internal class MongoDataSourceExtension : IDataSourceExtensionWithSettings +internal class MongoVectorDataSourceExtension : IDataSourceExtensionWithSettings { - public string DisplayName => "MongoDB"; + public string DisplayName => $"MongoDB-Vector{ExtensionExtensions.BetaExtensionTag}"; public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default) { diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Program.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Program.cs similarity index 100% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Program.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Program.cs diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml similarity index 100% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoBaseSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoBaseSettings.cs similarity index 84% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoBaseSettings.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoBaseSettings.cs index 1c60153..0e9af71 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoBaseSettings.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoBaseSettings.cs @@ -2,7 +2,7 @@ using Cosmos.DataTransfer.Interfaces; using Cosmos.DataTransfer.Interfaces.Manifest; -namespace Cosmos.DataTransfer.MongoExtension.Settings; +namespace Cosmos.DataTransfer.MongoVectorExtension.Settings; public class MongoBaseSettings : IDataExtensionSettings { [Required] diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoSourceSettings.cs similarity index 61% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoSourceSettings.cs index 95a9828..0cbbf36 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoSourceSettings.cs @@ -1,4 +1,4 @@ -namespace Cosmos.DataTransfer.MongoExtension.Settings; +namespace Cosmos.DataTransfer.MongoVectorExtension.Settings; public class MongoSourceSettings : MongoBaseSettings { public string? Collection { get; set; } diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs similarity index 81% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs index 722e532..6e6e53f 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs @@ -1,7 +1,7 @@ using System.ComponentModel.DataAnnotations; -namespace Cosmos.DataTransfer.MongoExtension.Settings; -public class MongoSinkSettings : MongoBaseSettings +namespace Cosmos.DataTransfer.MongoVectorExtension.Settings; +public class MongoVectorSinkSettings : MongoBaseSettings { [Required] public string? Collection { get; set; } From 77a81c87f576fa91d42a39943b93e521f9d43f9a Mon Sep 17 00:00:00 2001 From: John Bowen Date: Fri, 29 Mar 2024 16:16:46 -0700 Subject: [PATCH 5/6] Setting up Mongo Vector as an extension of the primary Mongo extension --- CosmosDbDataMigrationTool.sln | 7 +++ .../Context.cs | 2 +- .../Cosmos.DataTransfer.MongoExtension.csproj | 24 ++++++++ .../IRepository.cs | 2 +- .../MongoDataItem.cs | 2 +- .../MongoDataSinkExtension.cs | 58 +++++++++++++++++++ .../MongoDataSourceExtension.cs} | 8 +-- .../MongoRepository.cs | 2 +- .../Program.cs | 1 + .../PublishToExtensionsFolder.pubxml | 24 ++++++++ .../Settings/MongoBaseSettings.cs | 2 +- .../Settings/MongoSinkSettings.cs | 10 ++++ .../Settings/MongoSourceSettings.cs | 2 +- ...s.DataTransfer.MongoVectorExtension.csproj | 1 + .../MongoVectorDataSinkExtension.cs | 1 + .../Settings/MongoVectorSinkSettings.cs | 1 + Extensions/Mongo/README.md | 23 ++++++++ 17 files changed, 160 insertions(+), 10 deletions(-) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoVectorExtension => Cosmos.DataTransfer.MongoExtension}/Context.cs (96%) create mode 100644 Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj rename Extensions/Mongo/{Cosmos.DataTransfer.MongoVectorExtension => Cosmos.DataTransfer.MongoExtension}/IRepository.cs (90%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoVectorExtension => Cosmos.DataTransfer.MongoExtension}/MongoDataItem.cs (90%) create mode 100644 Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs rename Extensions/Mongo/{Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSourceExtension.cs => Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs} (88%) rename Extensions/Mongo/{Cosmos.DataTransfer.MongoVectorExtension => Cosmos.DataTransfer.MongoExtension}/MongoRepository.cs (95%) create mode 100644 Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Program.cs create mode 100644 Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml rename Extensions/Mongo/{Cosmos.DataTransfer.MongoVectorExtension => Cosmos.DataTransfer.MongoExtension}/Settings/MongoBaseSettings.cs (84%) create mode 100644 Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs rename Extensions/Mongo/{Cosmos.DataTransfer.MongoVectorExtension => Cosmos.DataTransfer.MongoExtension}/Settings/MongoSourceSettings.cs (61%) diff --git a/CosmosDbDataMigrationTool.sln b/CosmosDbDataMigrationTool.sln index 4ed8f39..10126ef 100644 --- a/CosmosDbDataMigrationTool.sln +++ b/CosmosDbDataMigrationTool.sln @@ -103,6 +103,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PostgreSQL", "PostgreSQL", Extensions\PostgreSQL\README.md = Extensions\PostgreSQL\README.md EndProjectSection EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.MongoExtension", "Extensions\Mongo\Cosmos.DataTransfer.MongoExtension\Cosmos.DataTransfer.MongoExtension.csproj", "{31BC84E1-55E5-45AA-BFAC-90732F20588B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -193,6 +195,10 @@ Global {85820167-DB94-458B-B09B-9E823996C692}.Debug|Any CPU.Build.0 = Debug|Any CPU {85820167-DB94-458B-B09B-9E823996C692}.Release|Any CPU.ActiveCfg = Release|Any CPU {85820167-DB94-458B-B09B-9E823996C692}.Release|Any CPU.Build.0 = Release|Any CPU + {31BC84E1-55E5-45AA-BFAC-90732F20588B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {31BC84E1-55E5-45AA-BFAC-90732F20588B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {31BC84E1-55E5-45AA-BFAC-90732F20588B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {31BC84E1-55E5-45AA-BFAC-90732F20588B}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -226,6 +232,7 @@ Global {40AD8890-BD78-48F5-AE76-2C2FC6F15B7E} = {39930280-DA29-4814-837B-FA7F252EB3EC} {85820167-DB94-458B-B09B-9E823996C692} = {1B927C5F-50FC-42A6-BAF6-B00E6D760543} {1B927C5F-50FC-42A6-BAF6-B00E6D760543} = {A8A1CEAB-2D82-460C-9B86-74ABD17CD201} + {31BC84E1-55E5-45AA-BFAC-90732F20588B} = {F18E789A-D32D-48D3-B75F-1196D7215F74} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {662B3F27-70D8-45E6-A1C0-1438A9C8A542} diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Context.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Context.cs similarity index 96% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Context.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Context.cs index 715f07c..2271d46 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Context.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Context.cs @@ -2,7 +2,7 @@ using MongoDB.Driver; using MongoDB.Driver.Core.Events; -namespace Cosmos.DataTransfer.MongoVectorExtension; +namespace Cosmos.DataTransfer.MongoExtension; public class Context { private readonly IMongoDatabase database = null!; diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj new file mode 100644 index 0000000..f2e3cac --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Cosmos.DataTransfer.MongoExtension.csproj @@ -0,0 +1,24 @@ + + + + net6.0 + enable + enable + Exe + + + + + + + + + + + + + + + + + diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/IRepository.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs similarity index 90% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/IRepository.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs index 03ea5aa..302c17b 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/IRepository.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/IRepository.cs @@ -1,6 +1,6 @@ using System.Linq.Expressions; -namespace Cosmos.DataTransfer.MongoVectorExtension; +namespace Cosmos.DataTransfer.MongoExtension; public interface IRepository { diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoDataItem.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataItem.cs similarity index 90% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoDataItem.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataItem.cs index 8bec99b..1f19a4b 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoDataItem.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataItem.cs @@ -1,7 +1,7 @@ using Cosmos.DataTransfer.Interfaces; using MongoDB.Bson; -namespace Cosmos.DataTransfer.MongoVectorExtension; +namespace Cosmos.DataTransfer.MongoExtension; public class MongoDataItem : IDataItem { private readonly BsonDocument record; diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs new file mode 100644 index 0000000..c7beeb3 --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSinkExtension.cs @@ -0,0 +1,58 @@ +using System.ComponentModel.Composition; +using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.MongoExtension.Settings; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using MongoDB.Bson; + +namespace Cosmos.DataTransfer.MongoExtension; +[Export(typeof(IDataSinkExtension))] +public class MongoDataSinkExtension : IDataSinkExtensionWithSettings +{ + public string DisplayName => "MongoDB"; + + public async Task WriteAsync(IAsyncEnumerable dataItems, IConfiguration config, IDataSourceExtension dataSource, ILogger logger, CancellationToken cancellationToken = default) + { + var settings = config.Get(); + settings.Validate(); + + if (!string.IsNullOrEmpty(settings.ConnectionString) && !string.IsNullOrEmpty(settings.DatabaseName) && !string.IsNullOrEmpty(settings.Collection)) + { + var context = new Context(settings.ConnectionString, settings.DatabaseName); + var repo = context.GetRepository(settings.Collection); + + var batchSize = settings.BatchSize ?? 1000; + + var objects = new List(); + int itemCount = 0; + await foreach (var item in dataItems.WithCancellation(cancellationToken)) + { + var dict = item.BuildDynamicObjectTree(); + objects.Add(new BsonDocument(dict)); + itemCount++; + + if (objects.Count == batchSize) + { + await repo.AddRange(objects); + logger.LogInformation("Added {ItemCount} items to collection '{Collection}'", itemCount, settings.Collection); + objects.Clear(); + } + } + + if (objects.Any()) + { + await repo.AddRange(objects); + } + + if (itemCount > 0) + logger.LogInformation("Added {ItemCount} total items to collection '{Collection}'", itemCount, settings.Collection); + else + logger.LogWarning("No items added to collection '{Collection}'", settings.Collection); + } + } + + public IEnumerable GetSettings() + { + yield return new MongoSinkSettings(); + } +} diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSourceExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs similarity index 88% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSourceExtension.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs index 4c844bc..5768800 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSourceExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoDataSourceExtension.cs @@ -1,16 +1,16 @@ using System.ComponentModel.Composition; using System.Runtime.CompilerServices; using Cosmos.DataTransfer.Interfaces; -using Cosmos.DataTransfer.MongoVectorExtension.Settings; +using Cosmos.DataTransfer.MongoExtension.Settings; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; using MongoDB.Bson; -namespace Cosmos.DataTransfer.MongoVectorExtension; +namespace Cosmos.DataTransfer.MongoExtension; [Export(typeof(IDataSourceExtension))] -internal class MongoVectorDataSourceExtension : IDataSourceExtensionWithSettings +internal class MongoDataSourceExtension : IDataSourceExtensionWithSettings { - public string DisplayName => $"MongoDB-Vector{ExtensionExtensions.BetaExtensionTag}"; + public string DisplayName => "MongoDB"; public async IAsyncEnumerable ReadAsync(IConfiguration config, ILogger logger, [EnumeratorCancellation] CancellationToken cancellationToken = default) { diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoRepository.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs similarity index 95% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoRepository.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs index f104853..ec2f662 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoRepository.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/MongoRepository.cs @@ -1,7 +1,7 @@ using System.Linq.Expressions; using MongoDB.Driver; -namespace Cosmos.DataTransfer.MongoVectorExtension; +namespace Cosmos.DataTransfer.MongoExtension; public class MongoRepository : IRepository { diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Program.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Program.cs new file mode 100644 index 0000000..90fe8a7 --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Program.cs @@ -0,0 +1 @@ +Console.WriteLine("Starting Mongo extension"); diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml new file mode 100644 index 0000000..789090b --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Properties/PublishProfiles/PublishToExtensionsFolder.pubxml @@ -0,0 +1,24 @@ + + + + + Debug + Any CPU + ..\..\..\Core\Cosmos.DataTransfer.Core\bin\Debug\net6.0\Extensions + FileSystem + <_TargetId>Folder + net6.0 + false + + + Release + Any CPU + ..\..\..\Core\Cosmos.DataTransfer.Core\bin\Release\net6.0\Extensions + FileSystem + <_TargetId>Folder + net6.0 + false + + \ No newline at end of file diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoBaseSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoBaseSettings.cs similarity index 84% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoBaseSettings.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoBaseSettings.cs index 0e9af71..1c60153 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoBaseSettings.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoBaseSettings.cs @@ -2,7 +2,7 @@ using Cosmos.DataTransfer.Interfaces; using Cosmos.DataTransfer.Interfaces.Manifest; -namespace Cosmos.DataTransfer.MongoVectorExtension.Settings; +namespace Cosmos.DataTransfer.MongoExtension.Settings; public class MongoBaseSettings : IDataExtensionSettings { [Required] diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs new file mode 100644 index 0000000..3c64b1b --- /dev/null +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSinkSettings.cs @@ -0,0 +1,10 @@ +using System.ComponentModel.DataAnnotations; + +namespace Cosmos.DataTransfer.MongoExtension.Settings; +public class MongoSinkSettings : MongoBaseSettings +{ + [Required] + public string? Collection { get; set; } + + public int? BatchSize { get; set; } +} diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoSourceSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs similarity index 61% rename from Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoSourceSettings.cs rename to Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs index 0cbbf36..95a9828 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoSourceSettings.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoExtension/Settings/MongoSourceSettings.cs @@ -1,4 +1,4 @@ -namespace Cosmos.DataTransfer.MongoVectorExtension.Settings; +namespace Cosmos.DataTransfer.MongoExtension.Settings; public class MongoSourceSettings : MongoBaseSettings { public string? Collection { get; set; } diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj index 37735d2..9ef48ac 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Cosmos.DataTransfer.MongoVectorExtension.csproj @@ -16,6 +16,7 @@ + diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs index fd4638c..c9d671f 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/MongoVectorDataSinkExtension.cs @@ -2,6 +2,7 @@ using Azure; using Azure.AI.OpenAI; using Cosmos.DataTransfer.Interfaces; +using Cosmos.DataTransfer.MongoExtension; using Cosmos.DataTransfer.MongoVectorExtension.Settings; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Logging; diff --git a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs index 6e6e53f..4148263 100644 --- a/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs +++ b/Extensions/Mongo/Cosmos.DataTransfer.MongoVectorExtension/Settings/MongoVectorSinkSettings.cs @@ -1,4 +1,5 @@ using System.ComponentModel.DataAnnotations; +using Cosmos.DataTransfer.MongoExtension.Settings; namespace Cosmos.DataTransfer.MongoVectorExtension.Settings; public class MongoVectorSinkSettings : MongoBaseSettings diff --git a/Extensions/Mongo/README.md b/Extensions/Mongo/README.md index 9d5952c..f491f2e 100644 --- a/Extensions/Mongo/README.md +++ b/Extensions/Mongo/README.md @@ -20,6 +20,28 @@ Source and sink settings require both `ConnectionString` and `DatabaseName` para ### Sink +```json +{ + "ConnectionString": "", + "DatabaseName: "", + "Collection": "" +} +``` + +# MongoDB Vector Extension (Beta) + +The MongoDB Vector extension is a Sink only extension that builds on the MongoDB extension by providing additional capabilities for generating embeddings using Azure OpenAI APIs. + +> **Note**: When specifying the MongoDB Vector extension as the Sink property in configuration, utilize the name **MongoDB-Vector(beta)**. + +## Settings + +The settings are based on the MongoDB extension settings with additional parameters for generating embeddings. + +### Additional Sink Settings + +The sink settings require the following additional parameters: + - `GenerateEmbedding`: If set to true, the sink will generate embeddings for the records before writing them to the database. The sink requires the `OpenAIUrl`, `OpenAIKey`, and `OpenAIDeploymentModel` parameters to be set. Following paramaters are required if this is true - `OpenAIUrl`: The URL of the OpenAI API - `OpenAIKey`: The API key for the OpenAI API @@ -41,3 +63,4 @@ Source and sink settings require both `ConnectionString` and `DatabaseName` para "DestPropEmbedding": "" } ``` + From 3be9df4a7df4715583436cdbbedb281fb4159c84 Mon Sep 17 00:00:00 2001 From: John Bowen Date: Fri, 29 Mar 2024 16:37:13 -0700 Subject: [PATCH 6/6] Adding new extension to build --- .github/actions/build-with-plugins/action.yml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/.github/actions/build-with-plugins/action.yml b/.github/actions/build-with-plugins/action.yml index 9aa8f88..f68a76d 100644 --- a/.github/actions/build-with-plugins/action.yml +++ b/.github/actions/build-with-plugins/action.yml @@ -151,6 +151,21 @@ runs: -p:PublishReadyToRun=false \ -p:PublishTrimmed=false \ -p:Version=${{ inputs.build-version }} + - name: Build PostgreSQL Extension + shell: bash + run: | + dotnet publish \ + Extensions/PostgreSQL/Cosmos.DataTransfer.PostgresqlExtension.csproj \ + --configuration Release \ + --output ${{ inputs.platform-short }}/Extensions \ + --self-contained false \ + --runtime ${{ inputs.runtime }} \ + -p:PublishSingleFile=false \ + -p:DebugType=embedded \ + -p:EnableCompressionInSingleFile=true \ + -p:PublishReadyToRun=false \ + -p:PublishTrimmed=false \ + -p:Version=${{ inputs.build-version }} - name: Upload package uses: actions/upload-artifact@v3 with: