Skip to content

Commit

Permalink
Add CancellationToken support (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
HowardvanRooijen authored Mar 6, 2021
1 parent ca31a20 commit 1577263
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 8 deletions.
12 changes: 11 additions & 1 deletion Solutions/Ais.Net.Receiver.Host.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Ais.Net.Receiver.Host.Console
{
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

using Ais.Net.Models;
Expand Down Expand Up @@ -46,10 +47,19 @@ from vesselLocationAndName in vesselLocationsWithNames
{
(uint mmsi, IVesselNavigation navigation, IVesselName name) = navigationWithName;
string positionText = navigation.Position is null ? "unknown position" : $"{navigation.Position.Latitude},{navigation.Position.Longitude}";
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"[{mmsi}: '{name.VesselName.CleanVesselName()}'] - [{positionText}] - [{navigation.CourseOverGroundDegrees ?? 0}]");
Console.ResetColor();
});

await receiverHost.StartAsync();
var cts = new CancellationTokenSource();

Task task = receiverHost.StartAsync(cts.Token);

// If you wanted to cancel the long running process:
// cts.Cancel();

await task;
}
}
}
8 changes: 5 additions & 3 deletions Solutions/Ais.Net.Receiver/Receiver/NmeaReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Ais.Net.Receiver.Receiver
using System.Collections.Generic;
using System.IO;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

public class NmeaReceiver
Expand All @@ -32,7 +34,7 @@ public NmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retry

public TimeSpan RetryPeriodicity { get; }

public async IAsyncEnumerable<string> GetAsync()
public async IAsyncEnumerable<string> GetAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
await this.tcpClient.ConnectAsync(this.Host, this.Port);
await using NetworkStream stream = this.tcpClient.GetStream();
Expand All @@ -42,14 +44,14 @@ public async IAsyncEnumerable<string> GetAsync()

while (this.tcpClient.Connected)
{
while (stream.DataAvailable)
while (stream.DataAvailable && !cancellationToken.IsCancellationRequested)
{
string? line = await reader.ReadLineAsync().ConfigureAwait(false);
if (line is not null) { yield return line; }
retryAttempt = 0;
}

if (retryAttempt == this.RetryAttemptLimit)
if (cancellationToken.IsCancellationRequested || retryAttempt == this.RetryAttemptLimit)
{
break;
}
Expand Down
10 changes: 6 additions & 4 deletions Solutions/Ais.Net.Receiver/Receiver/ReceiverHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ namespace Ais.Net.Receiver.Receiver
using System;
using System.Collections.Generic;
using System.Reactive.Subjects;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Ais.Net.Models.Abstractions;
Expand Down Expand Up @@ -35,14 +37,14 @@ public ReceiverHost(AisConfig configuration)

public IObservable<IAisMessage> Messages => this.messages;

public async Task StartAsync()
public async Task StartAsync(CancellationToken cancellationToken = default)
{
var processor = new NmeaToAisMessageTypeProcessor();
var adapter = new NmeaLineToAisStreamAdapter(processor);

processor.Messages.Subscribe(this.messages);

await foreach (string? message in this.GetAsync())
await foreach (string? message in this.GetAsync(cancellationToken).WithCancellation(cancellationToken))
{
static void ProcessLineNonAsync(string line, INmeaLineStreamProcessor lineStreamProcessor)
{
Expand All @@ -59,9 +61,9 @@ static void ProcessLineNonAsync(string line, INmeaLineStreamProcessor lineStream
}
}

private async IAsyncEnumerable<string> GetAsync()
private async IAsyncEnumerable<string> GetAsync([EnumeratorCancellation]CancellationToken cancellationToken = default)
{
await foreach (string? message in this.receiver.GetAsync())
await foreach (string? message in this.receiver.GetAsync().WithCancellation(cancellationToken))
{
if (message.IsMissingNmeaBlockTags())
{
Expand Down

0 comments on commit 1577263

Please sign in to comment.