Skip to content

Commit

Permalink
Merge pull request #136 from philnach/UseRBACBlob
Browse files Browse the repository at this point in the history
Add UseRbacAuth and EnableInteractiveCredentials to Azure Blob Storage Extension
  • Loading branch information
markjbrown authored Sep 9, 2024
2 parents 71f4b0e + 5a3e897 commit 8ca1b93
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Core" Version="1.38.0" />
<PackageReference Include="Azure.Core" Version="1.40.0" />
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.2.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.UserSecrets" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
Expand Down
3 changes: 3 additions & 0 deletions CosmosDbDataMigrationTool.sln
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Cosmos.DataTransfer.Common", "Interfaces\Cosmos.DataTransfer.Common\Cosmos.DataTransfer.Common.csproj", "{0FAD9D89-2E41-4D65-8440-5C01885D9292}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AzureBlob", "AzureBlob", "{9627A42A-BEB0-4A39-B49C-C3C6D54E705A}"
ProjectSection(SolutionItems) = preProject
Extensions\AzureBlob\README.md = Extensions\AzureBlob\README.md
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "AwsS3", "AwsS3", "{502197E4-F554-4B5B-9235-FBFE7E49EBEF}"
EndProject
Expand Down
193 changes: 113 additions & 80 deletions ExampleConfigs.md
Original file line number Diff line number Diff line change
@@ -1,123 +1,156 @@
# Example `migrationsettings.json` Files

## JSON to Cosmos-NoSQL

```json
{
"Source": "json",
"Sink": "cosmos-nosql",
"SourceSettings": {
"FilePath": "https://mytestfiles.local/sales-data.json"
},
"SinkSettings": {
"ConnectionString": "AccountEndpoint=https://...",
"Database": "myDb",
"Container": "myContainer",
"PartitionKeyPath": "/id",
"RecreateContainer": true,
"WriteMode": "Insert",
"CreatedContainerMaxThroughput": 5000,
"IsServerlessAccount": false
}
"SourceSettings": {
"FilePath": "https://mytestfiles.local/sales-data.json"
},
"SinkSettings": {
"ConnectionString": "AccountEndpoint=https://...",
"Database": "myDb",
"Container": "myContainer",
"PartitionKeyPath": "/id",
"RecreateContainer": true,
"WriteMode": "Insert",
"CreatedContainerMaxThroughput": 5000,
"IsServerlessAccount": false
}
}
```

## Cosmos-NoSQL to JSON

```json
{
"Source": "Cosmos-NoSql",
"Sink": "JSON",
"SourceSettings":
{
"ConnectionString": "AccountEndpoint=https://...",
"Database":"cosmicworks",
"Container":"customers",
"IncludeMetadataFields": true
},
"SinkSettings":
{
"FilePath": "c:\\data\\cosmicworks\\customers.json",
"Indented": true
}
"SourceSettings":
{
"ConnectionString": "AccountEndpoint=https://...",
"Database":"cosmicworks",
"Container":"customers",
"IncludeMetadataFields": true
},
"SinkSettings":
{
"FilePath": "c:\\data\\cosmicworks\\customers.json",
"Indented": true
}
}
```

## MongoDB to Cosmos-NoSQL

```json
{
"Source": "mongodb",
"Sink": "cosmos-nosql",
"SourceSettings": {
"ConnectionString": "mongodb://...",
"DatabaseName": "sales",
"Collection": "person"
},
"SinkSettings": {
"ConnectionString": "AccountEndpoint=https://...",
"Database": "users",
"Container": "migrated",
"PartitionKeyPath": "/id",
"SourceSettings": {
"ConnectionString": "mongodb://...",
"DatabaseName": "sales",
"Collection": "person"
},
"SinkSettings": {
"ConnectionString": "AccountEndpoint=https://...",
"Database": "users",
"Container": "migrated",
"PartitionKeyPath": "/id",
"ConnectionMode": "Direct",
"WriteMode": "UpsertStream",
"CreatedContainerMaxThroughput": 8000,
"UseAutoscaleForCreatedContainer": false
}
"WriteMode": "UpsertStream",
"CreatedContainerMaxThroughput": 8000,
"UseAutoscaleForCreatedContainer": false
}
}
```

## SqlServer to AzureTableAPI

```json
{
"Source": "SqlServer",
"Sink": "AzureTableApi",
"SourceSettings": {
"ConnectionString": "Server=...",
"QueryText": "SELECT Id, Date, Amount FROM dbo.Payments WHERE Status = 'open'"
},
"SinkSettings": {
"ConnectionString": "DefaultEndpointsProtocol=https;AccountName=...",
"Table": "payments",
"RowKeyFieldName": "Id"
}
"SourceSettings": {
"ConnectionString": "Server=...",
"QueryText": "SELECT Id, Date, Amount FROM dbo.Payments WHERE Status = 'open'"
},
"SinkSettings": {
"ConnectionString": "DefaultEndpointsProtocol=https;AccountName=...",
"Table": "payments",
"RowKeyFieldName": "Id"
}
}
```

## Cosmos-NoSQL to SqlServer

```json
{
"Source": "cosmos-nosql",
"Sink": "sqlserver",
"SourceSettings":
{
"ConnectionString": "AccountEndpoint=https://...",
"Database":"operations",
"Container":"alerts",
"PartitionKeyValue": "jan",
"SourceSettings":
{
"ConnectionString": "AccountEndpoint=https://...",
"Database":"operations",
"Container":"alerts",
"PartitionKeyValue": "jan",
"Query": "SELECT a.name, a.description, a.count, a.id, a.isSet FROM a"
},
"SinkSettings":
{
"ConnectionString": "Server=...",
"TableName": "Import",
"ColumnMappings": [
{
"ColumnName": "Name"
},
{
"ColumnName": "Description"
},
{
"ColumnName": "Count",
"SourceFieldName": "number"
},
{
"ColumnName": "Id"
},
{
"ColumnName": "IsSet",
"AllowNull": false,
"DefaultValue": false
}
]
}
},
"SinkSettings":
{
"ConnectionString": "Server=...",
"TableName": "Import",
"ColumnMappings": [
{
"ColumnName": "Name"
},
{
"ColumnName": "Description"
},
{
"ColumnName": "Count",
"SourceFieldName": "number"
},
{
"ColumnName": "Id"
},
{
"ColumnName": "IsSet",
"AllowNull": false,
"DefaultValue": false
}
]
}
}
```

## Cosmos-NoSQL to Json-AzureBlob (Using RBAC)

```json
{
"Source": "Cosmos-nosql",
"Sink": "Json-AzureBlob",
"SourceSettings": {
"UseRbacAuth": true,
"Database": "operations",
"Container": "alerts",
"PartitionKeyValue": "jan",
"AccountEndpoint": "https://<databaseaccount>.documents.azure.com",
"EnableInteractiveCredentials": true,
"IncludeMetadataFields": false,
"Query": "SELECT a.name, a.description, a.count, a.id, a.isSet FROM a"
},
"SinkSettings": {
"UseRbacAuth": true,
"ContainerName": "operations-archive",
"AccountEndpoint": "https://<storage-account>.blob.core.windows.net",
"EnableInteractiveCredentials": true,
"BlobName": "jan-alerts"
},
"Operations": [
]
}
```
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using Azure.Storage.Blobs;
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Blobs.Models;

namespace Cosmos.DataTransfer.AzureBlobStorage
{
Expand All @@ -14,12 +15,31 @@ public async Task WriteToTargetAsync(Func<Stream, Task> writeToStream, IConfigur
var settings = config.Get<AzureBlobSinkSettings>();
settings.Validate();

logger.LogInformation("Saving file '{File}' to Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);
BlobContainerClient account;
if (settings.UseRbacAuth)
{
logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}'", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials));

var credential = new DefaultAzureCredential(includeInteractiveCredentials: settings.EnableInteractiveCredentials);
#pragma warning disable CS8604 // Validate above ensures AccountEndpoint is not null
var baseUri = new Uri(settings.AccountEndpoint);
var blobContainerUri = new Uri(baseUri, settings.ContainerName);
#pragma warning restore CS8604 // Restore warning

account = new BlobContainerClient(blobContainerUri, credential);
}
else
{
logger.LogInformation("Connecting to Storage account using {ConnectionString}'", nameof(AzureBlobSourceSettings.ConnectionString));

account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
}

var account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
await account.CreateIfNotExistsAsync(cancellationToken: cancellationToken);
var blob = account.GetBlockBlobClient(settings.BlobName);

logger.LogInformation("Saving file '{File}' to Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);

await using var blobStream = await blob.OpenWriteAsync(true, new BlockBlobOpenWriteOptions
{
BufferSize = settings.MaxBlockSizeinKB * 1024L,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
using System.Runtime.CompilerServices;
using Azure.Identity;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Cosmos.DataTransfer.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Blobs.Models;
using System.Runtime.CompilerServices;

namespace Cosmos.DataTransfer.AzureBlobStorage;

Expand All @@ -15,14 +16,33 @@ public class AzureBlobDataSource : IComposableDataSource
var settings = config.Get<AzureBlobSourceSettings>();
settings.Validate();

logger.LogInformation("Reading file '{File}' from Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);
BlobContainerClient account;
if (settings.UseRbacAuth)
{
logger.LogInformation("Connecting to Storage account {AccountEndpoint} using {UseRbacAuth} with {EnableInteractiveCredentials}'", settings.AccountEndpoint, nameof(AzureBlobSourceSettings.UseRbacAuth), nameof(AzureBlobSourceSettings.EnableInteractiveCredentials));

var account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
var credential = new DefaultAzureCredential(includeInteractiveCredentials: settings.EnableInteractiveCredentials);
#pragma warning disable CS8604 // Validate above ensures AccountEndpoint is not null
var baseUri = new Uri(settings.AccountEndpoint);
var blobContainerUri = new Uri(baseUri, settings.ContainerName);
#pragma warning restore CS8604 // Restore warning

account = new BlobContainerClient(blobContainerUri, credential);
}
else
{
logger.LogInformation("Connecting to Storage account using {ConnectionString}'", nameof(AzureBlobSourceSettings.ConnectionString));

account = new BlobContainerClient(settings.ConnectionString, settings.ContainerName);
}

var blob = account.GetBlockBlobClient(settings.BlobName);
var existsResponse = await blob.ExistsAsync(cancellationToken: cancellationToken);
if (!existsResponse)
yield break;

logger.LogInformation("Reading file '{File}' from Azure Blob Container '{ContainerName}'", settings.BlobName, settings.ContainerName);

var readStream = await blob.OpenReadAsync(new BlobOpenReadOptions(false)
{
BufferSize = settings.ReadBufferSizeInKB,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
using Cosmos.DataTransfer.Interfaces;
using System.ComponentModel.DataAnnotations;
using Cosmos.DataTransfer.Interfaces.Manifest;
using System.ComponentModel.DataAnnotations;

namespace Cosmos.DataTransfer.AzureBlobStorage
{
public class AzureBlobSinkSettings : IDataExtensionSettings
public class AzureBlobSinkSettings : IDataExtensionSettings, IValidatableObject
{
[Required]
[SensitiveValue]
public string ConnectionString { get; set; } = null!;
public string? ConnectionString { get; set; } = null!;

public string? AccountEndpoint { get; set; } = null!;

[Required]
public string ContainerName { get; set; } = null!;
Expand All @@ -17,5 +18,22 @@ public class AzureBlobSinkSettings : IDataExtensionSettings
public string BlobName { get; set; } = null!;

public int? MaxBlockSizeinKB { get; set; }

public bool UseRbacAuth { get; set; }

public bool EnableInteractiveCredentials { get; set; }

public virtual IEnumerable<ValidationResult> Validate(ValidationContext validationContext)
{
if (!UseRbacAuth && string.IsNullOrEmpty(ConnectionString))
{
yield return new ValidationResult($"{nameof(ConnectionString)} must be specified unless {nameof(UseRbacAuth)} is true", new[] { nameof(ConnectionString) });
}

if (UseRbacAuth && string.IsNullOrEmpty(AccountEndpoint))
{
yield return new ValidationResult($"{nameof(AccountEndpoint)} must be specified unless {nameof(UseRbacAuth)} is false", new[] { nameof(AccountEndpoint) });
}
}
}
}
Loading

0 comments on commit 8ca1b93

Please sign in to comment.