From 15772630232b044b70013222834f6b6f4300b936 Mon Sep 17 00:00:00 2001 From: Howard van Rooijen Date: Sat, 6 Mar 2021 16:15:08 +0000 Subject: [PATCH] Add CancellationToken support (#96) --- Solutions/Ais.Net.Receiver.Host.Console/Program.cs | 12 +++++++++++- Solutions/Ais.Net.Receiver/Receiver/NmeaReceiver.cs | 8 +++++--- Solutions/Ais.Net.Receiver/Receiver/ReceiverHost.cs | 10 ++++++---- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/Solutions/Ais.Net.Receiver.Host.Console/Program.cs b/Solutions/Ais.Net.Receiver.Host.Console/Program.cs index a33e49e..43deffc 100644 --- a/Solutions/Ais.Net.Receiver.Host.Console/Program.cs +++ b/Solutions/Ais.Net.Receiver.Host.Console/Program.cs @@ -6,6 +6,7 @@ namespace Ais.Net.Receiver.Host.Console { using System; using System.Reactive.Linq; + using System.Threading; using System.Threading.Tasks; using Ais.Net.Models; @@ -46,10 +47,19 @@ from vesselLocationAndName in vesselLocationsWithNames { (uint mmsi, IVesselNavigation navigation, IVesselName name) = navigationWithName; string positionText = navigation.Position is null ? "unknown position" : $"{navigation.Position.Latitude},{navigation.Position.Longitude}"; + Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"[{mmsi}: '{name.VesselName.CleanVesselName()}'] - [{positionText}] - [{navigation.CourseOverGroundDegrees ?? 0}]"); + Console.ResetColor(); }); - await receiverHost.StartAsync(); + var cts = new CancellationTokenSource(); + + Task task = receiverHost.StartAsync(cts.Token); + + // If you wanted to cancel the long running process: + // cts.Cancel(); + + await task; } } } \ No newline at end of file diff --git a/Solutions/Ais.Net.Receiver/Receiver/NmeaReceiver.cs b/Solutions/Ais.Net.Receiver/Receiver/NmeaReceiver.cs index 99e6b37..1492af4 100644 --- a/Solutions/Ais.Net.Receiver/Receiver/NmeaReceiver.cs +++ b/Solutions/Ais.Net.Receiver/Receiver/NmeaReceiver.cs @@ -8,6 +8,8 @@ namespace Ais.Net.Receiver.Receiver using System.Collections.Generic; using System.IO; using System.Net.Sockets; + using System.Runtime.CompilerServices; + using System.Threading; using System.Threading.Tasks; public class NmeaReceiver @@ -32,7 +34,7 @@ public NmeaReceiver(string host, int port, TimeSpan? retryPeriodicity, int retry public TimeSpan RetryPeriodicity { get; } - public async IAsyncEnumerable GetAsync() + public async IAsyncEnumerable GetAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { await this.tcpClient.ConnectAsync(this.Host, this.Port); await using NetworkStream stream = this.tcpClient.GetStream(); @@ -42,14 +44,14 @@ public async IAsyncEnumerable GetAsync() while (this.tcpClient.Connected) { - while (stream.DataAvailable) + while (stream.DataAvailable && !cancellationToken.IsCancellationRequested) { string? line = await reader.ReadLineAsync().ConfigureAwait(false); if (line is not null) { yield return line; } retryAttempt = 0; } - if (retryAttempt == this.RetryAttemptLimit) + if (cancellationToken.IsCancellationRequested || retryAttempt == this.RetryAttemptLimit) { break; } diff --git a/Solutions/Ais.Net.Receiver/Receiver/ReceiverHost.cs b/Solutions/Ais.Net.Receiver/Receiver/ReceiverHost.cs index 0be4c1b..7a173b3 100644 --- a/Solutions/Ais.Net.Receiver/Receiver/ReceiverHost.cs +++ b/Solutions/Ais.Net.Receiver/Receiver/ReceiverHost.cs @@ -7,7 +7,9 @@ namespace Ais.Net.Receiver.Receiver using System; using System.Collections.Generic; using System.Reactive.Subjects; + using System.Runtime.CompilerServices; using System.Text; + using System.Threading; using System.Threading.Tasks; using Ais.Net.Models.Abstractions; @@ -35,14 +37,14 @@ public ReceiverHost(AisConfig configuration) public IObservable Messages => this.messages; - public async Task StartAsync() + public async Task StartAsync(CancellationToken cancellationToken = default) { var processor = new NmeaToAisMessageTypeProcessor(); var adapter = new NmeaLineToAisStreamAdapter(processor); processor.Messages.Subscribe(this.messages); - await foreach (string? message in this.GetAsync()) + await foreach (string? message in this.GetAsync(cancellationToken).WithCancellation(cancellationToken)) { static void ProcessLineNonAsync(string line, INmeaLineStreamProcessor lineStreamProcessor) { @@ -59,9 +61,9 @@ static void ProcessLineNonAsync(string line, INmeaLineStreamProcessor lineStream } } - private async IAsyncEnumerable GetAsync() + private async IAsyncEnumerable GetAsync([EnumeratorCancellation]CancellationToken cancellationToken = default) { - await foreach (string? message in this.receiver.GetAsync()) + await foreach (string? message in this.receiver.GetAsync().WithCancellation(cancellationToken)) { if (message.IsMissingNmeaBlockTags()) {