Skip to content

Commit

Permalink
fix: improve multicast discovery with CoRE Link-Format
Browse files Browse the repository at this point in the history
  • Loading branch information
JKRhb committed Oct 18, 2022
1 parent 3ced172 commit d1073b5
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 34 deletions.
50 changes: 37 additions & 13 deletions lib/src/binding_coap/coap_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,32 @@ class CoapClient extends ProtocolClient {
return response.content;
}

Future<DiscoveryContent> _sendDiscoveryRequest(
Uri uri,
coap.CoapCode method, {
Content? content,
required Form? form,
coap.CoapMediaType? format,
coap.CoapMediaType? accept,
int? block1Size,
int? block2Size,
coap.CoapMulticastResponseHandler? multicastResponseHandler,
}) async {
final responseContent = await _sendRequest(
uri,
method,
content: content,
form: form,
format: format,
accept: accept,
block1Size: block1Size,
block2Size: block2Size,
multicastResponseHandler: multicastResponseHandler,
);

return DiscoveryContent.fromContent(responseContent, uri);
}

Future<AuthServerRequestCreationHint?> _obtainCreationHintFromResourceServer(
Form form,
) async {
Expand Down Expand Up @@ -427,24 +453,22 @@ class CoapClient extends ProtocolClient {
@override
Future<void> stop() async {}

Stream<Content> _discoverFromMulticast(
Stream<DiscoveryContent> _discoverFromMulticast(
coap.CoapClient client,
Uri uri,
) async* {
// TODO(JKRhb): This method currently does not work with block-wise transfer
// due to a bug in the CoAP library.
final streamController = StreamController<Content>();
final streamController = StreamController<DiscoveryContent>();
final multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.resp.content);
streamController.add(data.resp.determineDiscoveryContent(uri.scheme));
},
onError: streamController.addError,
onDone: () async {
await streamController.close();
},
);

final content = _sendRequest(
final content = _sendDiscoveryRequest(
uri,
coap.CoapCode.get,
form: null,
Expand All @@ -455,11 +479,11 @@ class CoapClient extends ProtocolClient {
yield* streamController.stream;
}

Stream<Content> _discoverFromUnicast(
Stream<DiscoveryContent> _discoverFromUnicast(
coap.CoapClient client,
Uri uri,
) async* {
yield await _sendRequest(
yield await _sendDiscoveryRequest(
uri,
coap.CoapCode.get,
form: null,
Expand All @@ -468,7 +492,7 @@ class CoapClient extends ProtocolClient {
}

@override
Stream<Content> discoverDirectly(
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
Expand All @@ -485,15 +509,15 @@ class CoapClient extends ProtocolClient {
}

@override
Stream<Content> discoverWithCoreLinkFormat(Uri uri) async* {
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) async* {
coap.CoapMulticastResponseHandler? multicastResponseHandler;
final streamController = StreamController<Content>();
final streamController = StreamController<DiscoveryContent>();

// TODO: Replace once https://github.com/shamblett/coap/pull/129 is merged
if (uri.isMulticastAddress) {
multicastResponseHandler = coap.CoapMulticastResponseHandler(
(data) {
streamController.add(data.resp.content);
streamController.add(data.resp.determineDiscoveryContent(uri.scheme));
},
onError: streamController.addError,
onDone: () async {
Expand All @@ -502,7 +526,7 @@ class CoapClient extends ProtocolClient {
);
}

final content = await _sendRequest(
final content = await _sendDiscoveryRequest(
uri,
coap.CoapCode.get,
form: null,
Expand Down
7 changes: 7 additions & 0 deletions lib/src/binding_coap/coap_extensions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ extension ResponseExtension on CoapResponse {
return Content(_contentType, _payloadStream);
}

/// Extract the [Content] of this [CoapResponse].
DiscoveryContent determineDiscoveryContent(String scheme) {
// ignore: invalid_use_of_internal_member
final discoveryUri = Uri(scheme: scheme, host: source?.host);
return DiscoveryContent(_contentType, _payloadStream, discoveryUri);
}

/// Checks the [code] of this [CoapResponse] and throws an [Exception] if it
/// should indicate an error.
void checkResponseCode() {
Expand Down
9 changes: 5 additions & 4 deletions lib/src/binding_http/http_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -299,21 +299,22 @@ class HttpClient extends ProtocolClient {
throw UnimplementedError();
}

Future<Content> _sendDiscoveryRequest(
Future<DiscoveryContent> _sendDiscoveryRequest(
Request request, {
required String acceptHeaderValue,
}) async {
request.headers['Accept'] = acceptHeaderValue;
final response = await _client.send(request);
final finalResponse = await _handleResponse(request, response);
return Content(
return DiscoveryContent(
response.headers['Content-Type'] ?? acceptHeaderValue,
finalResponse.stream,
request.url,
);
}

@override
Stream<Content> discoverDirectly(
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
Expand All @@ -326,7 +327,7 @@ class HttpClient extends ProtocolClient {
}

@override
Stream<Content> discoverWithCoreLinkFormat(Uri uri) async* {
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) async* {
final request = Request(HttpRequestMethod.get.methodName, uri);

final encodedLinks = await _sendDiscoveryRequest(
Expand Down
15 changes: 10 additions & 5 deletions lib/src/binding_mqtt/mqtt_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,14 @@ class MqttClient extends ProtocolClient {
}

@override
Stream<Content> discoverDirectly(
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
}) async* {
final client = await _connect(uri, null);
const discoveryTopic = 'wot/td/#';

final streamController = StreamController<Content>();
final streamController = StreamController<DiscoveryContent>();

Timer(
_mqttConfig.discoveryTimeout,
Expand All @@ -229,8 +229,13 @@ class MqttClient extends ProtocolClient {
final publishedMessage = message.payload as MqttPublishMessage;
final payload = publishedMessage.payload.message;

streamController
.add(Content(discoveryContentType, Stream.value(payload)));
streamController.add(
DiscoveryContent(
discoveryContentType,
Stream.value(payload),
uri,
),
);
}
},
cancelOnError: false,
Expand All @@ -240,7 +245,7 @@ class MqttClient extends ProtocolClient {
}

@override
Stream<Content> discoverWithCoreLinkFormat(Uri uri) {
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri) {
// TODO: implement discoverWithCoreLinkFormat
throw UnimplementedError();
}
Expand Down
4 changes: 2 additions & 2 deletions lib/src/core/protocol_interfaces/protocol_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ abstract class ProtocolClient {
///
/// Allows the caller to explicitly [disableMulticast], overriding the
/// multicast settings in the config of the underlying binding implementation.
Stream<Content> discoverDirectly(
Stream<DiscoveryContent> discoverDirectly(
Uri uri, {
bool disableMulticast = false,
});
Expand All @@ -41,7 +41,7 @@ abstract class ProtocolClient {
///
/// [RFC 6690]: https://datatracker.ietf.org/doc/html/rfc6690
/// [RFC 9176]: https://datatracker.ietf.org/doc/html/rfc9176
Stream<Content> discoverWithCoreLinkFormat(Uri uri);
Stream<DiscoveryContent> discoverWithCoreLinkFormat(Uri uri);

/// Requests the client to perform a `readproperty` operation on a [form].
Future<Content> readResource(Form form);
Expand Down
18 changes: 8 additions & 10 deletions lib/src/core/thing_discovery.dart
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,12 @@ class ThingDiscovery extends Stream<ThingDescription>
}

Future<ThingDescription> _decodeThingDescription(
Content content,
Uri uri,
DiscoveryContent content,
) async {
final value = await _servient.contentSerdes.contentToValue(content, null);
if (value is! Map<String, dynamic>) {
throw DiscoveryException(
'Could not parse Thing Description obtained from $uri',
'Could not parse Thing Description obtained from ${content.sourceUri}',
);
}

Expand All @@ -106,7 +105,7 @@ class ThingDiscovery extends Stream<ThingDescription>

yield* client
.discoverDirectly(uri, disableMulticast: true)
.asyncMap((content) => _decodeThingDescription(content, uri));
.asyncMap(_decodeThingDescription);
}

Future<List<CoapWebLink>?> _getCoreWebLinks(Content content) async {
Expand All @@ -122,14 +121,14 @@ class ThingDiscovery extends Stream<ThingDescription>

Future<Iterable<Uri>> _filterCoreWebLinks(
String resourceType,
Content coreWebLink,
Uri baseUri,
DiscoveryContent coreWebLink,
) async {
final webLinks = await _getCoreWebLinks(coreWebLink);
final sourceUri = coreWebLink.sourceUri;

if (webLinks == null) {
throw DiscoveryException(
'Discovery from $baseUri returned no valid CoRE Link-Format Links.',
'Discovery from $sourceUri returned no valid CoRE Link-Format Links.',
);
}

Expand All @@ -141,7 +140,7 @@ class ThingDiscovery extends Stream<ThingDescription>
)
.map((weblink) => Uri.tryParse(weblink.uri))
.whereType<Uri>()
.map((uri) => uri.toAbsoluteUri(baseUri));
.map((uri) => uri.toAbsoluteUri(sourceUri));
}

Stream<ThingDescription> _discoverWithCoreLinkFormat(Uri uri) async* {
Expand Down Expand Up @@ -171,8 +170,7 @@ class ThingDiscovery extends Stream<ThingDescription>
final Iterable<Uri> parsedUris;

try {
parsedUris =
await _filterCoreWebLinks(resourceType, coreWebLink, discoveryUri);
parsedUris = await _filterCoreWebLinks(resourceType, coreWebLink);
} on Exception catch (exception) {
yield* Stream.error(exception);
continue;
Expand Down

0 comments on commit d1073b5

Please sign in to comment.