Skip to content

Commit

Permalink
GH1190 Add byte[] to string conversion to CloudEventMessageUtils
Browse files Browse the repository at this point in the history
Resolves #1190
  • Loading branch information
olegz committed Oct 17, 2024
1 parent dbafb30 commit b30b2c8
Showing 1 changed file with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.function.cloudevent;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -170,7 +171,11 @@ private CloudEventMessageUtils() {

public static String getId(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + MessageHeaders.ID);
Object value = message.getHeaders().get(prefix + MessageHeaders.ID);
if (value instanceof byte[] v) {
value = toString(v);
}
return (String) value;
}

public static URI getSource(Message<?> message) {
Expand All @@ -180,17 +185,29 @@ public static URI getSource(Message<?> message) {

public static String getSpecVersion(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + _SPECVERSION);
Object value = message.getHeaders().get(prefix + _SPECVERSION);
if (value instanceof byte[] v) {
value = toString(v);
}
return (String) value;
}

public static String getType(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + _TYPE);
Object value = message.getHeaders().get(prefix + _TYPE);
if (value instanceof byte[] v) {
value = toString(v);
}
return (String) value;
}

public static String getDataContentType(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + _DATACONTENTTYPE);
Object value = message.getHeaders().get(prefix + _DATACONTENTTYPE);
if (value instanceof byte[] v) {
value = toString(v);
}
return (String) value;
}

public static URI getDataSchema(Message<?> message) {
Expand All @@ -200,7 +217,11 @@ public static URI getDataSchema(Message<?> message) {

public static String getSubject(Message<?> message) {
String prefix = determinePrefixToUse(message.getHeaders());
return (String) message.getHeaders().get(prefix + _SUBJECT);
Object value = message.getHeaders().get(prefix + _SUBJECT);
if (value instanceof byte[] v) {
value = toString(v);
}
return (String) value;
}

public static OffsetDateTime getTime(Message<?> message) {
Expand Down Expand Up @@ -435,11 +456,20 @@ private static Message<?> buildBinaryMessageFromStructuredMap(Map<String, Object
private static URI safeGetURI(Map<String, Object> map, String key) {
Object uri = map.get(key);
if (uri != null && uri instanceof String) {
uri = URI.create((String) uri);
if (uri instanceof String) {
uri = URI.create((String) uri);
}
else if (uri instanceof byte[] u) {
uri = URI.create(toString(u));
}
}
return (URI) uri;
}

private static String toString(byte[] value) {
return new String(value, StandardCharsets.UTF_8);
}

public static class Protocols {
static String AMQP = "amqp";
static String AVRO = "avro";
Expand Down

0 comments on commit b30b2c8

Please sign in to comment.