Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iot Hub sevice streaming (preview) = websocket aborts and crashes #986

Closed
maddentist opened this issue Jun 26, 2019 · 4 comments
Closed
Assignees
Labels
bug Something isn't working. device-streaming Issues related to Device Streaming feature (currently in preview). investigation-required Requires further investigation to root cause this. IoTSDK Tracks all IoT SDK issues across the board

Comments

@maddentist
Copy link

  • OS, version, SKU and CPU architecture used: Windows 10 Desktop x64 & Service Fabric
  • Application's .NET Target Framework : UWP & Service Fabric (.NET)
  • Device: Surface Book i7
  • SDK version used: Microsoft.Azure.Devices.Client 1.29.0-preview-002

Description of the issue:

The connection is initiated through a device connection to the iot hub which allows me to get the device ID, then, using code modified from the samples provided on Github, I try send data with iot hub device streaming which crashed on the server side (Azure service fabric) and app (UWP)
System.Net.WebSockets.WebSocketException (0x80004005): Unable to connect to the remote server ---> System.ArgumentException: The base stream is not writeable.
Parameter name: stream
at System.Net.WebSockets.WebSocket.CreateFromStream(Stream stream, Boolean isServer, String subProtocol, TimeSpan keepAliveInterval)
at System.Net.WebSockets.WebSocketHandle.ConnectAsyncCore(Uri uri, CancellationToken cancellationToken, ClientWebSocketOptions options)
at System.Net.WebSockets.WebSocketHandle.ConnectAsyncCore(Uri uri, CancellationToken cancellationToken, ClientWebSocketOptions options)
at System.Net.WebSockets.ClientWebSocket.ConnectAsyncCore(Uri uri, CancellationToken cancellationToken)
at V2Vehicle.Helpers.DeviceStreamingCommon.GetStreamingClientAsync(Uri url, String authorizationToken, CancellationToken cancellationToken) in C:\Users\Mathi\source\repos\AIRPROX backend (fabric)\V2Vehicle\Helpers\DeviceStreamingCommon.cs:line 24
at V2Vehicle.V2Vehicle.<>c__DisplayClass7_0.<b__0>d.MoveNext() in C:\Users\Mathi\source\repos\AIRPROX backend (fabric)\V2Vehicle\V2Vehicle.cs:line 188

On the app side, the thing crashed saying that the websocket is in aborted state.

Please help !

Code sample exhibiting the issue:

Server code:
`
public async Task HandleStreamsRegistrationDataAsync(ActorStreamsRegistration data)
{
// Save data to statemanger to help recover in case of crash/restart...
// TODO : And populate variables (quicker)
VehicleSerialNumber = data.VehicleSerialNumber;
VehicleUniqueId = data.VehicleUniqueId;
VehicleGeneration = (SystemVersionConstants)data.VehicleGeneration;
VehicleDescription = data.VehicleDescription;
VehicleDeviceType = (AirproxObjectTypes)data.VehicleDeviceType;
vehicleUserGroup = data.VehicleUserGroup;
vehiclePrivateMode = data.VehiclePrivateMode;
AdditionalData = data.AdditionalData;
vehicleDeadManSignlInterval_ms = data.VehicleDeadManSignlInterval_ms;
VehicleMovementThreshold_m = data.VehicleMovementThreshold_m;
VehicleTimeMovementThreshold_ms = data.VehicleTimeMovementThreshold_ms;

        Debug.WriteLine($"Actor V2 handling registration data with {VehicleSerialNumber} / {VehicleIPAdress}");

        var returnMessageType = MessagesTypes.RegisteringAcknoledgement;
        object returnPackage = null;

        switch (VehicleGeneration)
        {
            // ALDRIN first gen device
            case SystemVersionConstants.IotStreams:
                returnMessageType = MessagesTypes.AutonomousRegisteringAcknoledgement;
                returnPackage = new AutonomousRegistrationReturnPackage(
                    VehicleUniqueId,
                    (short)VehicleDeviceType,
                    VehicleMovementThreshold_m,
                    VehicleTimeMovementThreshold_ms,
                    vehicleDeadManSignlInterval_ms,
                    VehicleDescription);
                break;
            default:
                throw new Exception($"Incorrect generation data {VehicleGeneration.ToString()}");
        }

        SB_Message aiMessage = null;
        if ((int)VehicleDeviceType < 200)
        {
            // Send message to AI for flying vehicles only
            // TODO : implement blackbox mechanisms
            aiMessage = await QueueInteractions.PrepareInterWorkerMessage(MessagesTypes.InterCloudRegistration, new InterWorkerRegistrationPackage(VehicleUniqueId, true, string.Empty, VehicleSerialNumber, Guid.Empty.ToString()));
            await Task.Delay(1000);
        }

        var tcpMessage = await QueueInteractions.PrepareTcpMessageAsync(returnMessageType, returnPackage, true);

        serviceClient = ServiceClient.CreateFromConnectionString(AirproxServerConstants.IotStreamingHubConnectionString, TransportType.Amqp);
        Debug.WriteLine($"Service client created");

        var deviceStreamRequest = new DeviceStreamRequest("airprox");

        var result = await serviceClient.CreateStreamAsync(data.VehicleSerialNumber, deviceStreamRequest, CancellationToken.None).ConfigureAwait(false);
        Debug.WriteLine($"Stream response received: Name={deviceStreamRequest.StreamName} IsAccepted={result.IsAccepted}");

        if (result.IsAccepted)
        {
            await Task.Run(async () =>
            {
                Debug.WriteLine($"Starting websocket loop");
                try
                {
                    using (var cancellationTokenSource = new CancellationTokenSource())
                    {
                        using (var remoteStream = await DeviceStreamingCommon.GetStreamingClientAsync(result.Url, result.AuthorizationToken, cancellationTokenSource.Token).ConfigureAwait(false))
                        {
                            Debug.WriteLine("Starting streaming");
                            _ = await Task.WhenAny(
                                HandleIncomingDataAsync(remoteStream, cancellationTokenSource.Token),
                                HandleOutgoingDataAsync(remoteStream, cancellationTokenSource.Token)).ConfigureAwait(false);
                        }
                    }
                    Debug.WriteLine("Done streaming");
                }
                catch (Exception ex)
                {
                    Debug.WriteLine("Got an exception in streaminloop : {0}", ex);
                }
            });
        }

        // Finalize
        SendMessages(
            aiMessage,
            tcpMessage,
            "UNATTACHED");
    }

    private async Task HandleOutgoingDataAsync(ClientWebSocket remoteStream, CancellationToken token)
    {
        while (true)
        {
            if (token.IsCancellationRequested)
            {
                break;
            }
            if (messageHolder.TryPop(out var package) && remoteStream.State == WebSocketState.Open)
            {
                Debug.WriteLine($"Sending : {package.ToReadable(true)}");
                await remoteStream.SendAsync(new ArraySegment<byte>(package, 0, package.Length), WebSocketMessageType.Binary, true, token).ConfigureAwait(false);
            }
            await Task.Delay(250);
        }
    }
    private async Task HandleIncomingDataAsync(ClientWebSocket remoteStream, CancellationToken token)
    {
        var receiveBuffer = new byte[10240];
        while (true)
        {
            if (token.IsCancellationRequested)
            {
                break;
            }
            if (remoteStream.State == WebSocketState.Open)
            {
                var receivedResult = await remoteStream.ReceiveAsync(receiveBuffer, token).ConfigureAwait(false);
                if (receivedResult != null)
                {
                    Debug.WriteLine($"Data received : {receiveBuffer.ToReadable(true)}");
                }
                else
                {
                    Debug.WriteLine("Received null data package");
                }
            }
        }
    }
    
    protected override async void SendMessages(object aiMessage, object tcpMessage, string _)
    {
        if (tcpMessage != null)
        {
            messageHolder.Push((byte[])tcpMessage);
        }
        if (aiMessage != null)
        {
            Debug.WriteLine($"Sending AI Message");
            await aiQueueClient.SendAsync((SB_Message)aiMessage);
        }
    }

`

Client code:
` protected override async void StartMessageEmitterTask()
{
streamTokenSource = new CancellationTokenSource();
Debug.WriteLine("Starting emitter task");
await Task.Run(async () =>
{
while (true)
{
var streamRequest = await deviceClient.WaitForDeviceStreamRequestAsync(streamTokenSource.Token).ConfigureAwait(false);

                if (streamRequest != null)
                {
                    await deviceClient.AcceptDeviceStreamRequestAsync(streamRequest, streamTokenSource.Token).ConfigureAwait(false);

                    using (var webSocket = await DeviceStreamingCommon.GetStreamingClientAsync(streamRequest.Url, streamRequest.AuthorizationToken, streamTokenSource.Token).ConfigureAwait(false))
                    {
                        Console.WriteLine("Starting streaming");

                        _ = await Task.WhenAny(
                            HandleIncomingDataAsync(webSocket, streamTokenSource.Token),
                            HandleOutgoingDataAsync(webSocket, streamTokenSource.Token)).ConfigureAwait(false);

                        await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, streamTokenSource.Token).ConfigureAwait(false);
                    }
                }
            }
        });
    }

    private async Task HandleOutgoingDataAsync(ClientWebSocket webSocket, CancellationToken token)
    {
        while (true)
        {
            if (token.IsCancellationRequested)
            {
                break;
            }
            if (messagesHolder.Pending && webSocket.State == WebSocketState.Open)
            {
                var package = messagesHolder.GetPendingMessage();
                Debug.WriteLine($"Sending : {package.Payload.ToReadable(true)}");
                await webSocket.SendAsync(new ArraySegment<byte>(package.Payload, 0, package.Payload.Length), WebSocketMessageType.Binary, true, token).ConfigureAwait(false);
            }
            else
            {
                Debug.WriteLine($"Outgoing - Pending : {messagesHolder.Pending} / State {webSocket.State.ToString()}");
            }
            await Task.Delay(500);
        }
    }

    private async Task HandleIncomingDataAsync(ClientWebSocket webSocket, CancellationToken token)
    {
        var receiveBuffer = new byte[10240];
        while (true)
        {
            if (token.IsCancellationRequested)
            {
                break;
            }
            if (webSocket.State == WebSocketState.Open)
            {
                var receivedResult = await webSocket.ReceiveAsync(new ArraySegment<byte>(receiveBuffer, 0, receiveBuffer.Length), token).ConfigureAwait(false);
                if (receivedResult != null)
                {
                    Debug.WriteLine($"Data received : {receiveBuffer.ToReadable(true)}");
                    MessageTriage(receiveBuffer);
                }
                else
                {
                    Debug.WriteLine("Received null data package");
                }
            }
            else
            {
                Debug.WriteLine($"Incoming - State {webSocket.State.ToString()}");
                await Task.Delay(500);
            }
        }
    }`
@prmathur-microsoft prmathur-microsoft added the investigation-required Requires further investigation to root cause this. label Jul 15, 2019
@ewertons
Copy link
Contributor

Hi @maddentist ,
so looks like this issue happened at the WebSocket client level.
Have you tried running the sample code as is against your IoT Hub? Using a standard Windows and one of the frameworks supported by Azure IoT C# SDK? That will help verify if you have any issues with the IoT Hub and Streaming Gateway. If they work, then there will be some work to verify the functionality using the platforms you used.
Could you please also post the version of the .NET framework you are using?

@maddentist
Copy link
Author

Hi, sample code is working against my IoT Hub,
Server is running under .NET Core 2.1 (inside service fabric)
Client is running in a UWP app targeting minimum version 1903 (build 18362)
The code itself is embedded in a .NET Standard 2.0 library

@CIPop CIPop added the device-streaming Issues related to Device Streaming feature (currently in preview). label Sep 13, 2019
@sharmasejal sharmasejal added the IoTSDK Tracks all IoT SDK issues across the board label Mar 19, 2020
@vinagesh
Copy link
Member

vinagesh commented Dec 8, 2020

@maddentist - Were you able to verify it works using one of the supported frameworks (UWP is not supported)? This will help us narrow down if the issue is in the service or the UWP environment and help you.

@vinagesh vinagesh added the bug Something isn't working. label Dec 9, 2020
@drwill-ms
Copy link
Contributor

Moving all device streaming issues to this discussion #1811.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working. device-streaming Issues related to Device Streaming feature (currently in preview). investigation-required Requires further investigation to root cause this. IoTSDK Tracks all IoT SDK issues across the board
Projects
None yet
Development

No branches or pull requests

7 participants