Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1521: Configure which fields are indexed in the Ditto search index per namespace pattern #1870

Merged
merged 8 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .run/SearchService.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
</component>
2 changes: 1 addition & 1 deletion deployment/helm/ditto/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ description: |
A digital twin is a virtual, cloud based, representation of his real world counterpart
(real world “Things”, e.g. devices like sensors, smart heating, connected cars, smart grids, EV charging stations etc).
type: application
version: 3.4.4 # chart version is effectively set by release-job
version: 3.4.5 # chart version is effectively set by release-job
appVersion: 3.4.4
keywords:
- iot-chart
Expand Down
9 changes: 9 additions & 0 deletions deployment/helm/ditto/templates/thingssearch-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ spec:
{{- if .Values.global.logging.customConfigFile.enabled }}
-Dlogback.configurationFile=/opt/ditto/{{ .Values.global.logging.customConfigFile.fileName }}
{{- end }}
{{- if .Values.thingsSearch.config.indexedFieldsLimiting.enabled }}
-Dditto.extensions.caching-signal-enrichment-facade-provider=org.eclipse.ditto.thingsearch.service.persistence.write.streaming.SearchIndexingSignalEnrichmentFacadeProvider
{{- range $index, $value := .Values.thingsSearch.config.indexedFieldsLimiting.items }}
"{{ printf "%s%d%s=%s" "-Dditto.search.namespace-indexed-fields." $index ".namespace-pattern" $value.namespacePattern }}"
{{- range $fieldIndex, $indexedField := $value.indexedFields }}
"{{ printf "%s%d%s%d=%s" "-Dditto.search.namespace-indexed-fields." $index ".indexed-fields." $fieldIndex $indexedField }}"
{{- end }}
{{- end }}
{{- end }}
{{- range $key, $value := .Values.thingsSearch.config.operatorMetrics.customMetrics }}
"{{ printf "%s%s%s=%t" "-Dditto.search.operator-metrics.custom-metrics." $key ".enabled" $value.enabled }}"
"{{ printf "%s%s%s=%s" "-Dditto.search.operator-metrics.custom-metrics." $key ".scrape-interval" $value.scrapeInterval }}"
Expand Down
14 changes: 14 additions & 0 deletions deployment/helm/ditto/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,20 @@ thingsSearch:
throughput: 100
# period the throttle period
period: 30s
# indexedFieldsLimiting by default, Ditto indexed all fields of things in its search.
# However, this behavior can be customized, providing configuration to only index certain fields for specified namespaces.
indexedFieldsLimiting:
# enabled whether field index limiting should be enabled or not
enabled: false
# items contains the list of per-namespace configuration of which fields to include into to the search index
items:
# - # namespacePattern holds the namespace for which the single limiting configuration entry should apply.
# # Wildcards `*` (Matching any number of any character) and `?` (Matches any single character) are supported.
# namespacePattern: "org.eclipse.*"
# # indexedFields holds a list of fields that will be explicitly included in the search index
# indexedFields:
# - "attributes"
# - "features/included"
# operatorMetrics contains configuration for operator defined custom metrics, using a search "count" with namespaces and filter
operatorMetrics:
# enabled configures whether operator metrics should be enabled or not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,69 @@ entities (things/policies) and no-one other:

These system properties would have to be configured for the "things" and "policies" services.

## Limiting Indexed Fields

The default behavior of Ditto is to index the complete JSON of a thing, which includes all its attributes and features. This may not be desired behavior for certain use cases:
* Increased load on the search database, leading to performance degradation and increased database cost.
* Only a few fields are ever used for searching.

Since Ditto *3.5.0*, there is a configuration to specify, by a namespace pattern, which fields will be included in the search database.

To enable this functionality, there are two new options in the `thing-search.conf` configuration:

```hocon
ditto {
//...
caching-signal-enrichment-facade-provider = org.eclipse.ditto.thingsearch.service.persistence.write.streaming.SearchIndexingSignalEnrichmentFacadeProvider
//...
search {
namespace-indexed-fields = [
{
namespace-pattern = "org.eclipse.test"
indexed-fields = [
"attributes",
"features/info/properties",
"features/info/other"
]
},
{
namespace-pattern = "org.eclipse*"
indexed-fields = [
"attributes",
"features/info"
]
}
]
}
```

There is a new implementation of the caching signal enrichment facade provider that must be configured to enable this
functionality.

For each namespace pattern, only the selected fields are included in the search database. In the example above, for
things in the "org.eclipse.test" namespace, the fields indexed in the search database will
only be "attributes", "features/info/properties", and "features/info/other".
Things matching the "org.eclipse*" namespace, only the "attributes" and "features/info" paths will be the only fields
indexed in the search database.

Important notes:
* Ditto will use the namespace of the thing and match the FIRST namespace-pattern it encounters. So make sure any
configured namespace-patterns are unique enough to match.
* Ditto will automatically add the system-level fields it needs to operate, so no manual configuration of these is
necessary.

Example for configuring the same configuration via system properties for the `things-search` service:

```shell
-Dditto.search.namespace-indexed-fields.0.namespace-pattern=org.eclipse.test
-Dditto.search.namespace-indexed-fields.0.indexed-fields.0=attributes
-Dditto.search.namespace-indexed-fields.0.indexed-fields.1=features/info/properties
-Dditto.search.namespace-indexed-fields.0.indexed-fields.2=features/info/other
-Dditto.search.namespace-indexed-fields.1.namespace-pattern=org.eclipse*
-Dditto.search.namespace-indexed-fields.1.indexed-fields.0=attributes
-Dditto.search.namespace-indexed-fields.1.indexed-fields.1=features/info
```

## Logging

Gathering logs for a running Ditto installation can be achieved by:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.WithResource;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
Expand All @@ -52,15 +52,15 @@
* Instantiated once per cluster node so that it builds up a cache across all signal enrichments on a local cluster
* node.
*/
public final class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade {
public class DittoCachingSignalEnrichmentFacade implements CachingSignalEnrichmentFacade {

private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory
.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(DittoCachingSignalEnrichmentFacade.class);
private static final String CACHE_NAME_SUFFIX = "_signal_enrichment_cache";

private final Cache<SignalEnrichmentCacheKey, JsonObject> extraFieldsCache;
protected final Cache<SignalEnrichmentCacheKey, JsonObject> extraFieldsCache;

private DittoCachingSignalEnrichmentFacade(final SignalEnrichmentFacade cacheLoaderFacade,
protected DittoCachingSignalEnrichmentFacade(
final SignalEnrichmentFacade cacheLoaderFacade,
final CacheConfig cacheConfig,
final Executor cacheLoaderExecutor,
final String cacheNamePrefix) {
Expand Down Expand Up @@ -96,14 +96,17 @@ public CompletionStage<JsonObject> retrieveThing(final ThingId thingId, final Li
final long minAcceptableSeqNr) {

final DittoHeaders dittoHeaders = DittoHeaders.empty();

final JsonFieldSelector fieldSelector = determineSelector(thingId.getNamespace());

if (minAcceptableSeqNr < 0) {
final var cacheKey =
SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, null));
SignalEnrichmentCacheKey.of(thingId, SignalEnrichmentContext.of(dittoHeaders, fieldSelector));
extraFieldsCache.invalidate(cacheKey);
return doCacheLookup(cacheKey, dittoHeaders);
} else {
final var cachingParameters =
new CachingParameters(null, events, false, minAcceptableSeqNr);
new CachingParameters(fieldSelector, events, false, minAcceptableSeqNr);

return doRetrievePartialThing(thingId, dittoHeaders, cachingParameters);
}
Expand Down Expand Up @@ -157,9 +160,9 @@ public CompletionStage<JsonObject> retrievePartialThing(final EntityId thingId,
.thenApply(jsonObject -> applyJsonFieldSelector(jsonObject, jsonFieldSelector));
}

private CompletionStage<JsonObject> doRetrievePartialThing(final EntityId thingId,
final DittoHeaders dittoHeaders,
final CachingParameters cachingParameters) {
protected CompletionStage<JsonObject> doRetrievePartialThing(final EntityId thingId,
final DittoHeaders dittoHeaders,
final CachingParameters cachingParameters) {

final var fieldSelector = cachingParameters.fieldSelector;
final JsonFieldSelector enhancedFieldSelector = enhanceFieldSelectorWithRevision(fieldSelector);
Expand Down Expand Up @@ -278,8 +281,8 @@ private static DittoHeaders getLastDittoHeaders(final List<? extends Signal<?>>
}
}

private CompletableFuture<JsonObject> doCacheLookup(final SignalEnrichmentCacheKey cacheKey,
final DittoHeaders dittoHeaders) {
protected CompletableFuture<JsonObject> doCacheLookup(final SignalEnrichmentCacheKey cacheKey,
final DittoHeaders dittoHeaders) {
LOGGER.withCorrelationId(dittoHeaders).debug("Looking up cache entry for <{}>", cacheKey);

return extraFieldsCache.get(cacheKey)
Expand Down Expand Up @@ -446,17 +449,23 @@ private JsonObject enhanceJsonObject(final JsonObject jsonObject, final List<Thi
return applyJsonFieldSelector(jsonObjectBuilder.build(), enhancedFieldSelector);
}

private static final class CachingParameters {
@Nullable
protected JsonFieldSelector determineSelector(final String namespace) {
// By default, we do not return a field selector.
return null;
}

protected static final class CachingParameters {

@Nullable private final JsonFieldSelector fieldSelector;
private final List<ThingEvent<?>> concernedEvents;
private final boolean invalidateCacheOnPolicyChange;
private final long minAcceptableSeqNr;

private CachingParameters(@Nullable final JsonFieldSelector fieldSelector,
final List<ThingEvent<?>> concernedEvents,
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {
public CachingParameters(@Nullable final JsonFieldSelector fieldSelector,
final List<ThingEvent<?>> concernedEvents,
final boolean invalidateCacheOnPolicyChange,
final long minAcceptableSeqNr) {

this.fieldSelector = fieldSelector;
this.concernedEvents = concernedEvents;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.models.signalenrichment;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.regex.Pattern;

import org.apache.pekko.japi.Pair;
import org.eclipse.ditto.internal.utils.cache.config.CacheConfig;
import org.eclipse.ditto.json.JsonFieldSelector;

/**
* Extension of {@code DittoCachingSignalEnrichmentFacade} that allows a selected map of selected indexes grouped by
* namespace to be added to the signal enrichment cache.
*/
public final class SearchIndexingSignalEnrichmentFacade extends DittoCachingSignalEnrichmentFacade {

private final List<Pair<Pattern, JsonFieldSelector>> selectedIndexes;
private final Map<String, JsonFieldSelector> selectedIndexesCache;

private SearchIndexingSignalEnrichmentFacade(
final List<Pair<Pattern, JsonFieldSelector>> selectedIndexes,
final SignalEnrichmentFacade cacheLoaderFacade,
final CacheConfig cacheConfig,
final Executor cacheLoaderExecutor,
final String cacheNamePrefix) {

super(cacheLoaderFacade, cacheConfig, cacheLoaderExecutor, cacheNamePrefix);

this.selectedIndexes = List.copyOf(selectedIndexes);
selectedIndexesCache = new HashMap<>();
}

/**
* Returns a new {@code SearchIndexingSignalEnrichmentFacade} instance.
*
* @param selectedIndexes The selected indexes to be loaded into the search context
* @param cacheLoaderFacade the facade whose argument-result-pairs we are caching.
* @param cacheConfig the cache configuration to use for the cache.
* @param cacheLoaderExecutor the executor to use in order to asynchronously load cache entries.
* @param cacheNamePrefix the prefix to use as cacheName of the cache.
* @throws NullPointerException if any argument is null.
*/
public static SearchIndexingSignalEnrichmentFacade newInstance(
final List<Pair<Pattern, JsonFieldSelector>> selectedIndexes,
final SignalEnrichmentFacade cacheLoaderFacade,
final CacheConfig cacheConfig,
final Executor cacheLoaderExecutor,
final String cacheNamePrefix) {

return new SearchIndexingSignalEnrichmentFacade(
checkNotNull(selectedIndexes, "selectedIndexes"),
checkNotNull(cacheLoaderFacade, "cacheLoaderFacade"),
checkNotNull(cacheConfig, "cacheConfig"),
checkNotNull(cacheLoaderExecutor, "cacheLoaderExecutor"),
checkNotNull(cacheNamePrefix, "cacheNamePrefix"));
}

@Override
protected JsonFieldSelector determineSelector(final String namespace) {

if (!selectedIndexesCache.containsKey(namespace)) {
// We iterate through the list and return the first JsonFieldSelector that matches the namespace pattern.
selectedIndexes.stream()
.filter(pair -> pair.first().matcher(namespace).matches())
.findFirst()
.ifPresent(pair -> selectedIndexesCache.put(namespace, pair.second()));
}
return selectedIndexesCache.get(namespace);
}
}
Loading
Loading