Skip to content

Commit

Permalink
Update storage SDK to use Azure.Storage.Blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
HowardvanRooijen committed Feb 21, 2021
1 parent 8c74793 commit 8240301
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 44 deletions.
16 changes: 8 additions & 8 deletions Solutions/Ais.Net.Receiver/Ais.Net.Receiver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net5.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>

Expand All @@ -16,14 +16,14 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Ais.Net" Version="0.3.0" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.3" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.2" />
<PackageReference Include="Ais.Net" Version="0.4.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.8.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" />
<PackageReference Include="morelinq" Version="3.3.2" />
<PackageReference Include="System.Linq.Async" Version="4.0.0" />
<PackageReference Include="System.Reactive" Version="4.3.2" />
<PackageReference Include="System.Linq.Async" Version="5.0.0" />
<PackageReference Include="System.Reactive" Version="5.0.0" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion Solutions/Ais.Net.Receiver/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ static async Task Main(string[] args)
.Build();

storageClient = new StorageClient(config);
storageClient.InitialiseConnection();
await storageClient.InitialiseConnectionAsync().ConfigureAwait(false);

var receiver = new NmeaReceiver("153.44.253.27", 5631);
receiver.Items.Buffer(100).SelectMany(OnMessageReceivedAsync).Subscribe();
Expand Down
48 changes: 13 additions & 35 deletions Solutions/Ais.Net.Receiver/StorageClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,42 @@

namespace Endjin.Ais.Receiver
{
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Azure.Storage.Blobs.Specialized;

using Microsoft.Extensions.Configuration;

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;

public class StorageClient
{
private CloudBlobContainer container;
private AppendBlobClient container;
private IConfiguration configuration;
private Stream stream;

public StorageClient(IConfiguration configuration)
{
this.configuration = configuration;
}

public async Task AppendMessages(IList<string> messages)
public async Task AppendMessages(IEnumerable<string> messages)
{
CloudAppendBlob appendBlob;

try
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(messages.Aggregate(new StringBuilder(), (sb, a) => sb.AppendLine(String.Join(",", a)), sb => sb.ToString()))))
{
appendBlob = await GetAppendBlobAsync().ConfigureAwait(false);
await this.container.AppendBlockAsync(stream).ConfigureAwait(false);
}
catch
{
this.InitialiseConnection();

appendBlob = await GetAppendBlobAsync().ConfigureAwait(false);
}

await appendBlob.AppendTextAsync(messages.Aggregate(new StringBuilder(), (sb, a) => sb.AppendLine(String.Join(",", a)), sb => sb.ToString())).ConfigureAwait(false);
}

public void InitialiseConnection()
{
CloudStorageAccount cloudStorageAccount = CloudStorageAccount.Parse(this.configuration["connectionString"]);
CloudBlobClient client = cloudStorageAccount.CreateCloudBlobClient();

this.container = client.GetContainerReference(this.configuration["containerName"]);
this.container.CreateIfNotExists();
}

private async Task<CloudAppendBlob> GetAppendBlobAsync()
public async Task InitialiseConnectionAsync()
{
CloudAppendBlob blob = this.container.GetAppendBlobReference($"raw/{DateTimeOffset.Now.ToString("yyyy")}/{DateTimeOffset.Now.ToString("MM")}/{DateTimeOffset.Now.ToString("dd")}/{DateTimeOffset.Now.ToString("yyyyMMddTHH")}.nm4");

if (!blob.Exists())
{
await blob.CreateOrReplaceAsync().ConfigureAwait(false);
}

return blob;
string blobName = $"raw/{DateTimeOffset.Now.ToString("yyyy")}/{DateTimeOffset.Now.ToString("MM")}/{DateTimeOffset.Now.ToString("dd")}/{DateTimeOffset.Now.ToString("yyyyMMddTHH")}.nm4";
this.container = new AppendBlobClient(this.configuration["connectionString"], this.configuration["containerName"], blobName);

await this.container.CreateIfNotExistsAsync().ConfigureAwait(false);
}
}
}

0 comments on commit 8240301

Please sign in to comment.