Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How do we listen to Server Sent Event(SSE) Stream using Dio package #1279

Closed
dewbambs opened this issue Sep 20, 2021 · 15 comments
Closed

How do we listen to Server Sent Event(SSE) Stream using Dio package #1279

dewbambs opened this issue Sep 20, 2021 · 15 comments

Comments

@dewbambs
Copy link

dewbambs commented Sep 20, 2021

Issue Info

Info Value
Platform Name flutter
Platform Version 2.5
Dio Version 4.0.0
Android Studio 4.1

Issue Description

I was trying to subscribe to server sent event(SSE) stream. I was able to connect to the stream using this code which uses HTTP package. It works totally as expected and returns steam of data. But generally I use Dio package. Hence I wanted to implement the same using dio flutter. But unfortunately I'm unable to do that.

subscribe() async {
    print("Subscribing..");
    try {
      _client = http.Client();

      var request = new http.Request("GET", Uri.parse("http://192.168.1.11:8080/myserver/events"));
      request.headers["Cache-Control"] = "no-cache";
      request.headers["Accept"] = "text/event-stream";

      Future<http.StreamedResponse> response = _client.send(request);
      print("Subscribed!");

      response.asStream().listen((streamedResponse) {
        print("Received streamedResponse.statusCode:${streamedResponse.statusCode}");
        streamedResponse.stream.listen((data) {
          print("Received data:${utf8.decode(data)}");
        });
      });
    } catch(e) {
      print("Caught $e");
    }
  }

This is what I tried with Dio package. But this returns 404 request not found. Mostly I'm not able to figure out how do I get a StreamedResponse using dio package.

var response = dio.request(url, options: Options(headers: {"Accept": "*/*", "Cache-Control": "no-cache"}));
    print("Subscribed!");

    response.asStream().listen((streamedResponse) {
      print("Received streamedResponse.statusCode:${streamedResponse.statusCode}");
      print("data:----------------------> ${streamedResponse.data}");
    });

I did search for example implementing SSE streams using dio but, I'm unable to find it.

@jgoyvaerts
Copy link
Contributor

I don't think dio allows for this use case at the moment

@dewbambs
Copy link
Author

That's very unfortunate . It's an important feature. As many apps use sse events now a days. e.g. stock broking apps etc. So if dio can add the feature it will be helpful.

@jgoyvaerts
Copy link
Contributor

Do you have a free example api which offers this kind of stream?

@dewbambs
Copy link
Author

I don't have a free example. I'm testing on my servers stream

@acn001
Copy link

acn001 commented Sep 23, 2021

That's very unfortunate . It's an important feature. As many apps use sse events now a days. e.g. stock broking apps etc. So if dio can add the feature it will be helpful.

I agree with it. SSE is a very important and popular feature, haha.

@alphamanorg
Copy link

You can use Socket.IO for server-side events for now.

@dewbambs
Copy link
Author

dewbambs commented Oct 7, 2021

Yes, I'm using http package itself for now. As it gives expected result.

@vhdrjb
Copy link

vhdrjb commented Jun 9, 2022

You can use SSE with ResponseType.stream in dio :

  Response<ResponseBody>  rs = await Dio().get<ResponseBody>(
      "https://server/stream",
      options: Options(headers: {
        "Authorization":
            'vhdrjb token"',
        "Accept": "text/event-stream",
        "Cache-Control": "no-cache",
      }, responseType: ResponseType.stream), // set responseType to `stream`
    );

it gets result as Uint8List and you should first transform it to List<int> :

    StreamTransformer<Uint8List, List<int>> unit8Transformer =
        StreamTransformer.fromHandlers(
      handleData: (data, sink) {
        sink.add(List<int>.from(data));
      },
    );

and then you can transform it to json :

    rs.data?.stream
        .transform(unit8Transformer)
        .transform(const Utf8Decoder())
        .transform(const LineSplitter())
        .listen((event) {
      log(event);
    });

This worked for me and now i'm getting data from server

@blopker
Copy link

blopker commented Jun 9, 2022

Wow, this would be amazing to have built into Dio. The other solutions to this problem in Dart don't handle errors well, like retrying disconnections. For testing, there's a free SSE endpoint at https://hacker-news.firebaseio.com/v0/topstories.json. the documentation is at https://github.com/HackerNews/API

@dewbambs
Copy link
Author

Yup that will be great we need to include all the functionality that eventsource package provides for js.

Wow, this would be amazing to have built into Dio. The other solutions to this problem in Dart don't handle errors well, like retrying disconnections. For testing, there's a free SSE endpoint at https://hacker-news.firebaseio.com/v0/topstories.json. the documentation is at https://github.com/HackerNews/API

@hurelhuyag
Copy link

SSE messages must be split by a double new line. Does anyone implement such a splitter?

@hurelhuyag
Copy link

Based on @vhdrjb's code, I have implemented one more transformer to extract the correct SSE message. But I believe there must be a more efficient transformer. But with my current dart/flutter knowledge, that is what I can do.

rs.data?.stream
        .transform(unit8Transformer)
        .transform(const Utf8Decoder())
        .transform(const LineSplitter())
        .transform(const SseTransformer())
        .listen((event) {
          debugPrint("Event: ${event.id}, ${event.event}, ${event.retry}, ${event.data}");
        });

class SseTransformer extends StreamTransformerBase<String, SseMessage> {
  const SseTransformer();
  @override
  Stream<SseMessage> bind(Stream<String> stream) {
    return Stream.eventTransformed(stream, (sink) => SseEventSink(sink));
  }
}

class SseEventSink extends EventSink<String> {

  final EventSink<SseMessage> _eventSink;

  String? _id;
  String _event = "message";
  String _data = "";
  int? _retry;

  SseEventSink(this._eventSink);

  @override
  void add(String event) {
    if (event.startsWith("id:")) {
      _id = event.substring(3);
      return;
    }
    if (event.startsWith("event:")) {
      _event = event.substring(6);
      return;
    }
    if (event.startsWith("data:")) {
      _data = event.substring(5);
      return;
    }
    if (event.startsWith("retry:")) {
      _retry = int.tryParse(event.substring(6));
      return;
    }
    if (event.isEmpty) {
      _eventSink.add(SseMessage(id: _id, event: _event, data: _data, retry: _retry));
      _id = null;
      _event = "message";
      _data = "";
      _retry = null;
    }
  }

  @override
  void addError(Object error, [StackTrace? stackTrace]) {
    _eventSink.addError(error, stackTrace);
  }

  @override
  void close() {
    _eventSink.close();
  }
}

class SseMessage {
  final String? id;
  final String event;
  final String data;
  final int? retry;

  const SseMessage({
    this.id,
    required this.event,
    required this.data,
    this.retry,
  });
}

@quangtuyen1993
Copy link

Based on @vhdrjb's code, I have implemented one more transformer to extract the correct SSE message. But I believe there must be a more efficient transformer. But with my current dart/flutter knowledge, that is what I can do.

rs.data?.stream
        .transform(unit8Transformer)
        .transform(const Utf8Decoder())
        .transform(const LineSplitter())
        .transform(const SseTransformer())
        .listen((event) {
          debugPrint("Event: ${event.id}, ${event.event}, ${event.retry}, ${event.data}");
        });

class SseTransformer extends StreamTransformerBase<String, SseMessage> {
  const SseTransformer();
  @override
  Stream<SseMessage> bind(Stream<String> stream) {
    return Stream.eventTransformed(stream, (sink) => SseEventSink(sink));
  }
}

class SseEventSink extends EventSink<String> {

  final EventSink<SseMessage> _eventSink;

  String? _id;
  String _event = "message";
  String _data = "";
  int? _retry;

  SseEventSink(this._eventSink);

  @override
  void add(String event) {
    if (event.startsWith("id:")) {
      _id = event.substring(3);
      return;
    }
    if (event.startsWith("event:")) {
      _event = event.substring(6);
      return;
    }
    if (event.startsWith("data:")) {
      _data = event.substring(5);
      return;
    }
    if (event.startsWith("retry:")) {
      _retry = int.tryParse(event.substring(6));
      return;
    }
    if (event.isEmpty) {
      _eventSink.add(SseMessage(id: _id, event: _event, data: _data, retry: _retry));
      _id = null;
      _event = "message";
      _data = "";
      _retry = null;
    }
  }

  @override
  void addError(Object error, [StackTrace? stackTrace]) {
    _eventSink.addError(error, stackTrace);
  }

  @override
  void close() {
    _eventSink.close();
  }
}

class SseMessage {
  final String? id;
  final String event;
  final String data;
  final int? retry;

  const SseMessage({
    this.id,
    required this.event,
    required this.data,
    this.retry,
  });
}

Can you share how to disconnect the stream connection?

@clotodex
Copy link

I am on flutter web and when I use this code, the stream only starts when the whole response was recieved. I get all events, but I would like to receive them live. Is there some missing support for Web, do I need configure something or is this expected?

@AlexV525
Copy link
Member

The stream reading based on the XHR is not available. As of workaround, see dart-lang/http#595.

@cfug cfug locked as resolved and limited conversation to collaborators Apr 19, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants