Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix providing Ditto Adaptable information in the "_context" of an SSE event #1716

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.SignalEnrichmentFailedException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
Expand Down Expand Up @@ -75,6 +75,10 @@
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.placeholders.TimePlaceholder;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.Payload;
import org.eclipse.ditto.protocol.adapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
Expand Down Expand Up @@ -154,6 +158,8 @@ public final class ThingsSseRouteBuilder extends RouteDirectives implements SseR
private final StreamingConfig streamingConfig;
private final QueryFilterCriteriaFactory queryFilterCriteriaFactory;
private final ActorRef pubSubMediator;
private final HeaderTranslator headerTranslator;
private final DittoProtocolAdapter dittoProtocolAdapter;
private SseConnectionSupervisor sseConnectionSupervisor;
private SseEventSniffer eventSniffer;
private StreamingAuthorizationEnforcer sseAuthorizationEnforcer;
Expand All @@ -164,12 +170,16 @@ private ThingsSseRouteBuilder(final ActorSystem actorSystem,
final ActorRef streamingActor,
final StreamingConfig streamingConfig,
final QueryFilterCriteriaFactory queryFilterCriteriaFactory,
final ActorRef pubSubMediator) {
final ActorRef pubSubMediator,
final HeaderTranslator headerTranslator) {

this.streamingActor = streamingActor;
this.streamingConfig = streamingConfig;
this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
this.pubSubMediator = pubSubMediator;
this.headerTranslator = headerTranslator;

dittoProtocolAdapter = DittoProtocolAdapter.of(headerTranslator);

final Config config = actorSystem.settings().config();
final var dittoExtensionsConfig = ScopedConfig.dittoExtension(config);
Expand All @@ -183,16 +193,19 @@ private ThingsSseRouteBuilder(final ActorSystem actorSystem,
/**
* Returns an instance of this class.
*
* @param actorSystem the actor system.
* @param streamingActor is used for actual event streaming.
* @param streamingConfig the streaming configuration.
* @param pubSubMediator akka pub-sub mediator for error reporting by the search source.
* @param headerTranslator the header translator used for translating to external headers.
* @return the instance.
* @throws NullPointerException if {@code streamingActor} is {@code null}.
*/
public static ThingsSseRouteBuilder getInstance(final ActorSystem actorSystem,
final ActorRef streamingActor,
final StreamingConfig streamingConfig,
final ActorRef pubSubMediator) {
final ActorRef pubSubMediator,
final HeaderTranslator headerTranslator) {

checkNotNull(streamingActor, "streamingActor");
final var queryFilterCriteriaFactory =
Expand All @@ -201,7 +214,7 @@ public static ThingsSseRouteBuilder getInstance(final ActorSystem actorSystem,
TimePlaceholder.getInstance());

return new ThingsSseRouteBuilder(actorSystem, streamingActor, streamingConfig, queryFilterCriteriaFactory,
pubSubMediator);
pubSubMediator, headerTranslator);
}

@Override
Expand Down Expand Up @@ -685,7 +698,7 @@ private static boolean targetThingIdMatches(final ThingEvent<?> event,
return targetThingIds.isEmpty() || targetThingIds.contains(event.getEntityId());
}

private static Collection<JsonValue> toNonemptyValue(final Thing thing, final ThingEvent<?> event,
private Collection<JsonValue> toNonemptyValue(final Thing thing, final ThingEvent<?> event,
final JsonPointer fieldPointer,
@Nullable final JsonFieldSelector fields) {
final var jsonSchemaVersion = event.getDittoHeaders()
Expand Down Expand Up @@ -763,21 +776,24 @@ private static Counter getCounterFor(final String path) {
* {@code withDittoHeaders}.
*
* @param objectBuilder the JsonObject build to add the {@code _context} to.
* @param withDittoHeaders the object to extract the {@code DittoHeaders} from.
* @param thingEvent the event to add context from.
* @return the built JsonObject including the {@code _context}.
*/
private static JsonObject addContext(final JsonObjectBuilder objectBuilder,
final WithDittoHeaders withDittoHeaders) {
private JsonObject addContext(final JsonObjectBuilder objectBuilder,
final ThingEvent<?> thingEvent) {

final Adaptable adaptable = dittoProtocolAdapter.toAdaptable(thingEvent);
objectBuilder.set(CONTEXT, JsonObject.newBuilder()
.set("headers", dittoHeadersToJson(withDittoHeaders.getDittoHeaders()))
.set(JsonifiableAdaptable.JsonFields.TOPIC, adaptable.getTopicPath().getPath())
.set(Payload.JsonFields.PATH, adaptable.getPayload().getPath().toString())
.set(JsonifiableAdaptable.JsonFields.HEADERS, dittoHeadersToJson(thingEvent.getDittoHeaders()))
.build()
);
return objectBuilder.build();
}

private static JsonObject dittoHeadersToJson(final DittoHeaders dittoHeaders) {
return dittoHeaders.entrySet()
private JsonObject dittoHeadersToJson(final DittoHeaders dittoHeaders) {
return headerTranslator.toExternalHeaders(dittoHeaders).entrySet()
.stream()
.map(entry -> JsonFactory.newField(JsonKey.of(entry.getKey()), JsonFactory.newValue(entry.getValue())))
.collect(JsonCollectors.fieldsToObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ private static Route createRoute(final ActorSystem actorSystem,
.devopsRoute(new DevOpsRoute(routeBaseProperties, devopsAuthenticationDirective))
.policiesRoute(new PoliciesRoute(routeBaseProperties,
OAuthTokenIntegrationSubjectIdFactory.of(authConfig.getOAuthConfig())))
.sseThingsRoute(
ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor, streamingConfig, pubSubMediator)
.withProxyActor(proxyActor)
.withSignalEnrichmentProvider(signalEnrichmentProvider))
.sseThingsRoute(ThingsSseRouteBuilder
.getInstance(actorSystem, streamingActor, streamingConfig, pubSubMediator, headerTranslator)
.withProxyActor(proxyActor)
.withSignalEnrichmentProvider(signalEnrichmentProvider))
.thingsRoute(new ThingsRoute(routeBaseProperties,
gatewayConfig.getMessageConfig(),
gatewayConfig.getClaimMessageConfig()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void setUp() {
.sseThingsRoute(ThingsSseRouteBuilder.getInstance(routeBaseProperties.getActorSystem(),
routeBaseProperties.getProxyActor(),
streamingConfig,
routeBaseProperties.getProxyActor()))
routeBaseProperties.getProxyActor(), httpHeaderTranslator))
.thingsRoute(new ThingsRoute(routeBaseProperties, messageConfig, claimMessageConfig))
.thingSearchRoute(new ThingSearchRoute(routeBaseProperties))
.whoamiRoute(new WhoamiRoute(routeBaseProperties))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.ditto.base.model.auth.AuthorizationModelFactory;
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.gateway.api.GatewayServiceUnavailableException;
import org.eclipse.ditto.gateway.service.endpoints.EndpointTestBase;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
Expand Down Expand Up @@ -101,7 +102,8 @@ public void setUp() {
() -> CompletableFuture.completedFuture(dittoHeaders);

final var sseRouteBuilder =
ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor.ref(), streamingConfig, proxyActor.ref());
ThingsSseRouteBuilder.getInstance(actorSystem, streamingActor.ref(), streamingConfig, proxyActor.ref(),
HeaderTranslator.empty());
sseRouteBuilder.withProxyActor(proxyActor.ref());
final Route sseRoute = extractRequestContext(ctx -> sseRouteBuilder.build(ctx, dittoHeadersSupplier));
underTest = testRoute(sseRoute);
Expand Down