diff --git a/CHANGELOG.md b/CHANGELOG.md index 224ea1ff62b53..453ad7b1bbba3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993)) +- [BUG] Fix the thread context that is not properly cleared and messes up the traces ([#10873](https://github.com/opensearch-project/OpenSearch/pull/10873)) +- Handle canMatchSearchAfter for frozen context scenario ([#11249](https://github.com/opensearch-project/OpenSearch/pull/11249)) ### Security @@ -105,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670)) - Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069)) - [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672)) +- Add template snippets support for field and target_field in KV ingest processor ([#10040](https://github.com/opensearch-project/OpenSearch/pull/10040)) - Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307)) ### Dependencies @@ -118,10 +121,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `org.codehaus.woodstox:stax2-api` from 4.2.1 to 4.2.2 ([#10639](https://github.com/opensearch-project/OpenSearch/pull/10639)) - Bump `com.google.http-client:google-http-client` from 1.43.2 to 1.43.3 ([#10635](https://github.com/opensearch-project/OpenSearch/pull/10635)) - Bump `com.squareup.okio:okio` from 3.5.0 to 3.6.0 ([#10637](https://github.com/opensearch-project/OpenSearch/pull/10637)) -- Bump `org.apache.logging.log4j:log4j-core` from 2.20.0 to 2.21.1 ([#10858](https://github.com/opensearch-project/OpenSearch/pull/10858), [#11000](https://github.com/opensearch-project/OpenSearch/pull/11000)) +- Bump `org.apache.logging.log4j:log4j-core` from 2.20.0 to 2.22.0 ([#10858](https://github.com/opensearch-project/OpenSearch/pull/10858), [#11000](https://github.com/opensearch-project/OpenSearch/pull/11000), [#11270](https://github.com/opensearch-project/OpenSearch/pull/11270)) - Bump `aws-actions/configure-aws-credentials` from 2 to 4 ([#10504](https://github.com/opensearch-project/OpenSearch/pull/10504)) - Bump `stefanzweifel/git-auto-commit-action` from 4 to 5 ([#11171](https://github.com/opensearch-project/OpenSearch/pull/11171)) - Bump `actions/github-script` from 6 to 7 ([#11271](https://github.com/opensearch-project/OpenSearch/pull/11271)) +- Bump `jackson` and `jackson_databind` from 2.15.2 to 2.16.0 ([#11273](https://github.com/opensearch-project/OpenSearch/pull/11273)) ### Changed - Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840)) @@ -132,12 +136,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642)) - Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395)) - Add instrumentation for indexing in transport bulk action and transport shard bulk action. ([#10273](https://github.com/opensearch-project/OpenSearch/pull/10273)) -- [BUG] Disable sort optimization for HALF_FLOAT ([#10999](https://github.com/opensearch-project/OpenSearch/pull/10999)) - Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023)) - Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057)) - Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087)) - Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528)) -- Use iterative approach to evaluate Regex.simpleMatch ([#11060](https://github.com/opensearch-project/OpenSearch/pull/11060)) ### Deprecated @@ -151,7 +153,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix some test methods in SimulatePipelineRequestParsingTests never run and fix test failure ([#10496](https://github.com/opensearch-project/OpenSearch/pull/10496)) - Fix passing wrong parameter when calling newConfigurationException() in DotExpanderProcessor ([#10737](https://github.com/opensearch-project/OpenSearch/pull/10737)) - Fix SuggestSearch.testSkipDuplicates by forceing refresh when indexing its test documents ([#11068](https://github.com/opensearch-project/OpenSearch/pull/11068)) -- Adding version condition while adding geoshape doc values to the index, to ensure backward compatibility.([#11095](https://github.com/opensearch-project/OpenSearch/pull/11095)) - Fix per request latency last phase not tracked ([#10934](https://github.com/opensearch-project/OpenSearch/pull/10934)) ### Security diff --git a/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle b/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle index dca2bce94ea6d..f24b61ef0d165 100644 --- a/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle +++ b/buildSrc/src/testKit/thirdPartyAudit/sample_jars/build.gradle @@ -17,7 +17,7 @@ repositories { } dependencies { - implementation "org.apache.logging.log4j:log4j-core:2.21.1" + implementation "org.apache.logging.log4j:log4j-core:2.22.0" } ["0.0.1", "0.0.2"].forEach { v -> diff --git a/buildSrc/version.properties b/buildSrc/version.properties index f19437979c852..1b2377cc7fe57 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -7,8 +7,8 @@ bundled_jdk = 21.0.1+12 # optional dependencies spatial4j = 0.7 jts = 1.15.0 -jackson = 2.15.2 -jackson_databind = 2.15.2 +jackson = 2.16.0 +jackson_databind = 2.16.0 snakeyaml = 2.1 icu4j = 70.1 supercsv = 2.4.0 diff --git a/client/sniffer/licenses/jackson-core-2.15.2.jar.sha1 b/client/sniffer/licenses/jackson-core-2.15.2.jar.sha1 deleted file mode 100644 index ec6781b968eed..0000000000000 --- a/client/sniffer/licenses/jackson-core-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a6fe1836469a69b3ff66037c324d75fc66ef137c \ No newline at end of file diff --git a/client/sniffer/licenses/jackson-core-2.16.0.jar.sha1 b/client/sniffer/licenses/jackson-core-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..c2b70fb4ae202 --- /dev/null +++ b/client/sniffer/licenses/jackson-core-2.16.0.jar.sha1 @@ -0,0 +1 @@ +899e5cf01be55fbf094ad72b2edb0c5df99111ee \ No newline at end of file diff --git a/distribution/tools/upgrade-cli/licenses/jackson-annotations-2.15.2.jar.sha1 b/distribution/tools/upgrade-cli/licenses/jackson-annotations-2.15.2.jar.sha1 deleted file mode 100644 index f63416ddb8ceb..0000000000000 --- a/distribution/tools/upgrade-cli/licenses/jackson-annotations-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4724a65ac8e8d156a24898d50fd5dbd3642870b8 \ No newline at end of file diff --git a/distribution/tools/upgrade-cli/licenses/jackson-annotations-2.16.0.jar.sha1 b/distribution/tools/upgrade-cli/licenses/jackson-annotations-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..79ed9e0c63fc8 --- /dev/null +++ b/distribution/tools/upgrade-cli/licenses/jackson-annotations-2.16.0.jar.sha1 @@ -0,0 +1 @@ +dc30995f7428c0a405eba9b8c619b20d2b3b9905 \ No newline at end of file diff --git a/distribution/tools/upgrade-cli/licenses/jackson-databind-2.15.2.jar.sha1 b/distribution/tools/upgrade-cli/licenses/jackson-databind-2.15.2.jar.sha1 deleted file mode 100644 index f16d80af8dce6..0000000000000 --- a/distribution/tools/upgrade-cli/licenses/jackson-databind-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9353b021f10c307c00328f52090de2bdb4b6ff9c \ No newline at end of file diff --git a/distribution/tools/upgrade-cli/licenses/jackson-databind-2.16.0.jar.sha1 b/distribution/tools/upgrade-cli/licenses/jackson-databind-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..da00d281934b1 --- /dev/null +++ b/distribution/tools/upgrade-cli/licenses/jackson-databind-2.16.0.jar.sha1 @@ -0,0 +1 @@ +3a6b7f8ff7b30d518bbd65678e9c30cd881f19a7 \ No newline at end of file diff --git a/libs/core/licenses/jackson-core-2.15.2.jar.sha1 b/libs/core/licenses/jackson-core-2.15.2.jar.sha1 deleted file mode 100644 index ec6781b968eed..0000000000000 --- a/libs/core/licenses/jackson-core-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a6fe1836469a69b3ff66037c324d75fc66ef137c \ No newline at end of file diff --git a/libs/core/licenses/jackson-core-2.16.0.jar.sha1 b/libs/core/licenses/jackson-core-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..c2b70fb4ae202 --- /dev/null +++ b/libs/core/licenses/jackson-core-2.16.0.jar.sha1 @@ -0,0 +1 @@ +899e5cf01be55fbf094ad72b2edb0c5df99111ee \ No newline at end of file diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java index a5d515443b54d..decbf49f795c4 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultSpanScope.java @@ -44,23 +44,23 @@ private DefaultSpanScope(Span span, SpanScope previousSpanScope, TracerContextSt public static SpanScope create(Span span, TracerContextStorage tracerContextStorage) { final SpanScope beforeSpanScope = spanScopeThreadLocal.get(); SpanScope newSpanScope = new DefaultSpanScope(span, beforeSpanScope, tracerContextStorage); - spanScopeThreadLocal.set(newSpanScope); return newSpanScope; } @Override public void close() { detach(); - spanScopeThreadLocal.set(previousSpanScope); } @Override public SpanScope attach() { + spanScopeThreadLocal.set(this); tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, this.span); return this; } private void detach() { + spanScopeThreadLocal.set(previousSpanScope); if (previousSpanScope != null) { tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, previousSpanScope.getSpan()); } else { diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index a3bb64ea392a9..8f1a26d99e725 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -12,7 +12,7 @@ import java.io.Closeable; import java.io.IOException; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Optional; @@ -53,7 +53,6 @@ public Span startSpan(SpanCreationContext context) { parentSpan = getCurrentSpanInternal(); } Span span = createSpan(context, parentSpan); - setCurrentSpanInContext(span); addDefaultAttributes(span); return span; } @@ -94,10 +93,6 @@ private Span createSpan(SpanCreationContext spanCreationContext, Span parentSpan return tracingTelemetry.createSpan(spanCreationContext, parentSpan); } - private void setCurrentSpanInContext(Span span) { - tracerContextStorage.put(TracerContextStorage.CURRENT_SPAN, span); - } - /** * Adds default attributes in the span * @param span the current active span @@ -107,7 +102,7 @@ protected void addDefaultAttributes(Span span) { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { Optional propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers); return startSpan(spanCreationContext.parent(propagatedSpan.map(SpanContext::new).orElse(null))); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java index 8257d251e9560..9b49ca7668992 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java @@ -9,7 +9,7 @@ package org.opensearch.telemetry.tracing; import org.opensearch.common.annotation.ExperimentalApi; -import org.opensearch.telemetry.tracing.http.HttpTracer; +import org.opensearch.telemetry.tracing.transport.TransportTracer; import java.io.Closeable; @@ -22,7 +22,7 @@ * @opensearch.experimental */ @ExperimentalApi -public interface Tracer extends HttpTracer, Closeable { +public interface Tracer extends TransportTracer, Closeable { /** * Starts the {@link Span} with given {@link SpanCreationContext} * diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java index 5fbc5d329e227..d7d48d1db10d6 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java @@ -10,7 +10,7 @@ import org.opensearch.common.annotation.ExperimentalApi; -import java.util.List; +import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -36,7 +36,7 @@ public interface TracingContextPropagator { * @param headers request headers to extract the context from * @return current span */ - Optional extractFromHeaders(Map> headers); + Optional extractFromHeaders(Map> headers); /** * Injects tracing context diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java index 50452ff5fe3b4..c57eaccf1f3df 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java @@ -16,7 +16,7 @@ import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; -import java.util.List; +import java.util.Collection; import java.util.Map; /** @@ -65,7 +65,7 @@ public void close() { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> header) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> header) { return NoopSpan.INSTANCE; } } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java similarity index 64% rename from libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java rename to libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java index 50d18c0a0d040..5883d7de8e83a 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/TransportTracer.java @@ -6,31 +6,31 @@ * compatible open source license. */ -package org.opensearch.telemetry.tracing.http; +package org.opensearch.telemetry.tracing.transport; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.SpanCreationContext; -import java.util.List; +import java.util.Collection; import java.util.Map; /** - * HttpTracer helps in creating a {@link Span} which reads the incoming tracing information - * from the HttpRequest header and propagate the span accordingly. + * TransportTracer helps in creating a {@link Span} which reads the incoming tracing information + * from the HTTP or TCP transport headers and propagate the span accordingly. *

* All methods on the Tracer object are multi-thread safe. * * @opensearch.experimental */ @ExperimentalApi -public interface HttpTracer { +public interface TransportTracer { /** * Start the span with propagating the tracing info from the HttpRequest header. * * @param spanCreationContext span name. - * @param header http request header. - * @return span. + * @param headers transport headers + * @return the span instance */ - Span startSpan(SpanCreationContext spanCreationContext, Map> header); + Span startSpan(SpanCreationContext spanCreationContext, Map> headers); } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java similarity index 65% rename from libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java rename to libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java index 9feb862a4e010..87ffcc43184bb 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/transport/package-info.java @@ -7,6 +7,6 @@ */ /** - * Contains No-op implementations + * Contains HTTP or TCP transport related tracer capabilities */ -package org.opensearch.telemetry.tracing.http; +package org.opensearch.telemetry.tracing.transport; diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java index 2a791f1ae4164..2182b3ea28ac8 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java @@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -35,7 +37,6 @@ public class DefaultTracerTests extends OpenSearchTestCase { private Span mockSpan; private Span mockParentSpan; - private SpanScope mockSpanScope; private ThreadPool threadPool; private ExecutorService executorService; private SpanCreationContext spanCreationContext; @@ -102,11 +103,11 @@ public void testCreateSpanWithAttributes() { Span span = defaultTracer.startSpan(spanCreationContext); - assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(1.0, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key1")); - assertEquals(2l, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key2")); - assertEquals(true, ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key3")); - assertEquals("key4", ((MockSpan) defaultTracer.getCurrentSpan().getSpan()).getAttribute("key4")); + assertThat(defaultTracer.getCurrentSpan(), is(nullValue())); + assertEquals(1.0, ((MockSpan) span).getAttribute("key1")); + assertEquals(2l, ((MockSpan) span).getAttribute("key2")); + assertEquals(true, ((MockSpan) span).getAttribute("key3")); + assertEquals("key4", ((MockSpan) span).getAttribute("key4")); span.endSpan(); } @@ -121,16 +122,18 @@ public void testCreateSpanWithParent() { Span span = defaultTracer.startSpan(spanCreationContext, null); - SpanContext parentSpan = defaultTracer.getCurrentSpan(); - - SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan()); + try (final SpanScope scope = defaultTracer.withSpanInScope(span)) { + SpanContext parentSpan = defaultTracer.getCurrentSpan(); - Span span1 = defaultTracer.startSpan(spanCreationContext1); + SpanCreationContext spanCreationContext1 = buildSpanCreationContext("span_name_1", Attributes.EMPTY, parentSpan.getSpan()); - assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan()); - span1.endSpan(); - span.endSpan(); + try (final ScopedSpan span1 = defaultTracer.startScopedSpan(spanCreationContext1)) { + assertEquals("span_name_1", defaultTracer.getCurrentSpan().getSpan().getSpanName()); + assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan()); + } + } finally { + span.endSpan(); + } } @SuppressWarnings("unchecked") @@ -155,8 +158,7 @@ public void testCreateSpanWithNullParent() { Span span = defaultTracer.startSpan(spanCreationContext); - assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName()); - assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan()); + assertThat(defaultTracer.getCurrentSpan(), is(nullValue())); span.endSpan(); } @@ -403,7 +405,6 @@ private void setupMocks() { mockTracingTelemetry = mock(TracingTelemetry.class); mockSpan = mock(Span.class); mockParentSpan = mock(Span.class); - mockSpanScope = mock(SpanScope.class); mockTracerContextStorage = mock(TracerContextStorage.class); when(mockSpan.getSpanName()).thenReturn("span_name"); when(mockSpan.getSpanId()).thenReturn("span_id"); diff --git a/libs/x-content/licenses/jackson-core-2.15.2.jar.sha1 b/libs/x-content/licenses/jackson-core-2.15.2.jar.sha1 deleted file mode 100644 index ec6781b968eed..0000000000000 --- a/libs/x-content/licenses/jackson-core-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -a6fe1836469a69b3ff66037c324d75fc66ef137c \ No newline at end of file diff --git a/libs/x-content/licenses/jackson-core-2.16.0.jar.sha1 b/libs/x-content/licenses/jackson-core-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..c2b70fb4ae202 --- /dev/null +++ b/libs/x-content/licenses/jackson-core-2.16.0.jar.sha1 @@ -0,0 +1 @@ +899e5cf01be55fbf094ad72b2edb0c5df99111ee \ No newline at end of file diff --git a/libs/x-content/licenses/jackson-dataformat-cbor-2.15.2.jar.sha1 b/libs/x-content/licenses/jackson-dataformat-cbor-2.15.2.jar.sha1 deleted file mode 100644 index 0022265a84b68..0000000000000 --- a/libs/x-content/licenses/jackson-dataformat-cbor-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -baafc85c70765594add14bd93f3efd68e1945b76 \ No newline at end of file diff --git a/libs/x-content/licenses/jackson-dataformat-cbor-2.16.0.jar.sha1 b/libs/x-content/licenses/jackson-dataformat-cbor-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..8da478fc6013d --- /dev/null +++ b/libs/x-content/licenses/jackson-dataformat-cbor-2.16.0.jar.sha1 @@ -0,0 +1 @@ +35e8b7bf4fc1d078766bb155103d433ed5bb1627 \ No newline at end of file diff --git a/libs/x-content/licenses/jackson-dataformat-smile-2.15.2.jar.sha1 b/libs/x-content/licenses/jackson-dataformat-smile-2.15.2.jar.sha1 deleted file mode 100644 index 2b8caad846fec..0000000000000 --- a/libs/x-content/licenses/jackson-dataformat-smile-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -16d1dd22f7d641459ed056399d4f7df0220f1176 \ No newline at end of file diff --git a/libs/x-content/licenses/jackson-dataformat-smile-2.16.0.jar.sha1 b/libs/x-content/licenses/jackson-dataformat-smile-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..3e952ffe92418 --- /dev/null +++ b/libs/x-content/licenses/jackson-dataformat-smile-2.16.0.jar.sha1 @@ -0,0 +1 @@ +3c422d7f3901c9a1becf9df3cf41efc68a5ab95c \ No newline at end of file diff --git a/libs/x-content/licenses/jackson-dataformat-yaml-2.15.2.jar.sha1 b/libs/x-content/licenses/jackson-dataformat-yaml-2.15.2.jar.sha1 deleted file mode 100644 index 4ad7255e2318f..0000000000000 --- a/libs/x-content/licenses/jackson-dataformat-yaml-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -58194ff9f51915ad6bf6b6f24818232d7566418a \ No newline at end of file diff --git a/libs/x-content/licenses/jackson-dataformat-yaml-2.16.0.jar.sha1 b/libs/x-content/licenses/jackson-dataformat-yaml-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..d62b5874ab023 --- /dev/null +++ b/libs/x-content/licenses/jackson-dataformat-yaml-2.16.0.jar.sha1 @@ -0,0 +1 @@ +2033e2c5f531785d17f3a2bc31842e3bbb7983b2 \ No newline at end of file diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java index 0f8422ea474d2..a2a51d968e078 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/IngestCommonModulePlugin.java @@ -98,7 +98,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)); processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory()); processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory()); - processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory()); + processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService)); processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()); processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory()); processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)); diff --git a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java index ff3cca4ce111f..73f03b3cb2e0f 100644 --- a/modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java +++ b/modules/ingest-common/src/main/java/org/opensearch/ingest/common/KeyValueProcessor.java @@ -33,10 +33,13 @@ package org.opensearch.ingest.common; import org.opensearch.common.util.set.Sets; +import org.opensearch.core.common.Strings; import org.opensearch.ingest.AbstractProcessor; import org.opensearch.ingest.ConfigurationUtils; import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.Processor; +import org.opensearch.script.ScriptService; +import org.opensearch.script.TemplateScript; import java.util.Collections; import java.util.List; @@ -56,24 +59,24 @@ public final class KeyValueProcessor extends AbstractProcessor { private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)"); - private final String field; + private final TemplateScript.Factory field; private final String fieldSplit; private final String valueSplit; private final Set includeKeys; private final Set excludeKeys; - private final String targetField; + private final TemplateScript.Factory targetField; private final boolean ignoreMissing; private final Consumer execution; KeyValueProcessor( String tag, String description, - String field, + TemplateScript.Factory field, String fieldSplit, String valueSplit, Set includeKeys, Set excludeKeys, - String targetField, + TemplateScript.Factory targetField, boolean ignoreMissing, String trimKey, String trimValue, @@ -106,10 +109,10 @@ public final class KeyValueProcessor extends AbstractProcessor { private static Consumer buildExecution( String fieldSplit, String valueSplit, - String field, + TemplateScript.Factory field, Set includeKeys, Set excludeKeys, - String targetField, + TemplateScript.Factory targetField, boolean ignoreMissing, String trimKey, String trimValue, @@ -130,41 +133,62 @@ private static Consumer buildExecution( keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false; } } - final String fieldPathPrefix; - String keyPrefix = prefix == null ? "" : prefix; - if (targetField == null) { - fieldPathPrefix = keyPrefix; - } else { - fieldPathPrefix = targetField + "." + keyPrefix; - } - final Function keyPrefixer; - if (fieldPathPrefix.isEmpty()) { - keyPrefixer = val -> val; - } else { - keyPrefixer = val -> fieldPathPrefix + val; - } - final Function fieldSplitter = buildSplitter(fieldSplit, true); - Function valueSplitter = buildSplitter(valueSplit, false); - final Function keyTrimmer = buildTrimmer(trimKey); - final Function bracketStrip; - if (stripBrackets) { - bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll(""); - } else { - bracketStrip = val -> val; - } - final Function valueTrimmer = buildTrimmer(trimValue); + return document -> { - String value = document.getFieldValue(field, String.class, ignoreMissing); + final String fieldPathPrefix; + String keyPrefix = prefix == null ? "" : prefix; + if (targetField != null) { + String targetFieldPath = document.renderTemplate(targetField); + if (!Strings.isNullOrEmpty((targetFieldPath))) { + fieldPathPrefix = targetFieldPath + "." + keyPrefix; + } else { + fieldPathPrefix = keyPrefix; + } + } else { + fieldPathPrefix = keyPrefix; + } + + final Function keyPrefixer; + if (fieldPathPrefix.isEmpty()) { + keyPrefixer = val -> val; + } else { + keyPrefixer = val -> fieldPathPrefix + val; + } + final Function fieldSplitter = buildSplitter(fieldSplit, true); + Function valueSplitter = buildSplitter(valueSplit, false); + final Function keyTrimmer = buildTrimmer(trimKey); + final Function bracketStrip; + if (stripBrackets) { + bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll(""); + } else { + bracketStrip = val -> val; + } + final Function valueTrimmer = buildTrimmer(trimValue); + + String path = document.renderTemplate(field); + final boolean fieldPathNullOrEmpty = Strings.isNullOrEmpty(path); + if (fieldPathNullOrEmpty || document.hasField(path, true) == false) { + if (ignoreMissing) { + return; + } else if (fieldPathNullOrEmpty) { + throw new IllegalArgumentException("field path cannot be null nor empty"); + } else { + throw new IllegalArgumentException("field [" + path + "] doesn't exist"); + } + } + + String value = document.getFieldValue(path, String.class, ignoreMissing); if (value == null) { if (ignoreMissing) { return; } - throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs."); + throw new IllegalArgumentException("field [" + path + "] is null, cannot extract key-value pairs. "); } + for (String part : fieldSplitter.apply(value)) { String[] kv = valueSplitter.apply(part); if (kv.length != 2) { - throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]"); + throw new IllegalArgumentException("field [" + path + "] does not contain value_split [" + valueSplit + "]"); } String key = keyTrimmer.apply(kv[0]); if (keyFilter.test(key)) { @@ -193,7 +217,7 @@ private static Function buildSplitter(String split, boolean fi } } - String getField() { + TemplateScript.Factory getField() { return field; } @@ -213,7 +237,7 @@ Set getExcludeKeys() { return excludeKeys; } - String getTargetField() { + TemplateScript.Factory getTargetField() { return targetField; } @@ -241,6 +265,12 @@ public String getType() { } public static class Factory implements Processor.Factory { + private final ScriptService scriptService; + + public Factory(ScriptService scriptService) { + this.scriptService = scriptService; + } + @Override public KeyValueProcessor create( Map registry, @@ -249,7 +279,13 @@ public KeyValueProcessor create( Map config ) throws Exception { String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field"); + TemplateScript.Factory fieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService); String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field"); + TemplateScript.Factory targetFieldTemplate = null; + if (!Strings.isNullOrEmpty(targetField)) { + targetFieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "target_field", targetField, scriptService); + } + String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split"); String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split"); String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key"); @@ -270,12 +306,12 @@ public KeyValueProcessor create( return new KeyValueProcessor( processorTag, description, - field, + fieldTemplate, fieldSplit, valueSplit, includeKeys, excludeKeys, - targetField, + targetFieldTemplate, ignoreMissing, trimKey, trimValue, diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorFactoryTests.java index 62060a682c0cb..78972ff8d5dea 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorFactoryTests.java @@ -35,7 +35,9 @@ import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchParseException; import org.opensearch.common.util.set.Sets; +import org.opensearch.ingest.TestTemplateService; import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; import java.util.Arrays; import java.util.Collections; @@ -48,8 +50,14 @@ public class KeyValueProcessorFactoryTests extends OpenSearchTestCase { + private KeyValueProcessor.Factory factory; + + @Before + public void init() { + factory = new KeyValueProcessor.Factory(TestTemplateService.instance()); + } + public void testCreateWithDefaults() throws Exception { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); config.put("field_split", "&"); @@ -57,7 +65,7 @@ public void testCreateWithDefaults() throws Exception { String processorTag = randomAlphaOfLength(10); KeyValueProcessor processor = factory.create(null, processorTag, null, config); assertThat(processor.getTag(), equalTo(processorTag)); - assertThat(processor.getField(), equalTo("field1")); + assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1")); assertThat(processor.getFieldSplit(), equalTo("&")); assertThat(processor.getValueSplit(), equalTo("=")); assertThat(processor.getIncludeKeys(), is(nullValue())); @@ -66,7 +74,6 @@ public void testCreateWithDefaults() throws Exception { } public void testCreateWithAllFieldsSet() throws Exception { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); config.put("field_split", "&"); @@ -78,17 +85,16 @@ public void testCreateWithAllFieldsSet() throws Exception { String processorTag = randomAlphaOfLength(10); KeyValueProcessor processor = factory.create(null, processorTag, null, config); assertThat(processor.getTag(), equalTo(processorTag)); - assertThat(processor.getField(), equalTo("field1")); + assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1")); assertThat(processor.getFieldSplit(), equalTo("&")); assertThat(processor.getValueSplit(), equalTo("=")); assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b"))); assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet())); - assertThat(processor.getTargetField(), equalTo("target")); + assertThat(processor.getTargetField().newInstance(Collections.emptyMap()).execute(), equalTo("target")); assertTrue(processor.isIgnoreMissing()); } public void testCreateWithMissingField() { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); String processorTag = randomAlphaOfLength(10); OpenSearchException exception = expectThrows( @@ -99,7 +105,6 @@ public void testCreateWithMissingField() { } public void testCreateWithMissingFieldSplit() { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); String processorTag = randomAlphaOfLength(10); @@ -111,7 +116,6 @@ public void testCreateWithMissingFieldSplit() { } public void testCreateWithMissingValueSplit() { - KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory(); Map config = new HashMap<>(); config.put("field", "field1"); config.put("field_split", "&"); diff --git a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorTests.java b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorTests.java index 685a78e2e769b..5f71ea6f16a4f 100644 --- a/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/opensearch/ingest/common/KeyValueProcessorTests.java @@ -36,6 +36,7 @@ import org.opensearch.ingest.IngestDocument; import org.opensearch.ingest.Processor; import org.opensearch.ingest.RandomDocumentPicks; +import org.opensearch.ingest.TestTemplateService; import org.opensearch.test.OpenSearchTestCase; import java.util.ArrayList; @@ -51,7 +52,7 @@ public class KeyValueProcessorTests extends OpenSearchTestCase { - private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(); + private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(TestTemplateService.instance()); public void test() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random()); @@ -123,7 +124,12 @@ public void testMissingField() throws Exception { IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap()); Processor processor = createKvProcessor("unknown", "&", "=", null, null, "target", false); IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument)); - assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]")); + assertThat(exception.getMessage(), equalTo("field [unknown] doesn't exist")); + + // when using template snippet, the resolved field path maybe empty + Processor processorWithEmptyFieldPath = createKvProcessor("", "&", "=", null, null, "target", false); + exception = expectThrows(IllegalArgumentException.class, () -> processorWithEmptyFieldPath.execute(ingestDocument)); + assertThat(exception.getMessage(), equalTo("field path cannot be null nor empty")); } public void testNullValueWithIgnoreMissing() throws Exception { diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml index 836243652b2e0..30a0a520b5c40 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/150_kv.yml @@ -39,3 +39,151 @@ teardown: id: 1 - match: { _source.goodbye: "everybody" } - match: { _source.hello: "world" } + +--- +"Test KV Processor with template snippets": + - skip: + version: " - 2.11.99" + reason: "KV Processor with template snippets is only supported since 2.12.0" + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "{{source}}", + "target_field" : "{{target}}", + "field_split": " ", + "value_split": "=" + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "foo", + target: "zoo", + foo: "goodbye=everybody hello=world" + } + + - do: + get: + index: test + id: 1 + - match: { _source.zoo.goodbye: "everybody" } + - match: { _source.zoo.hello: "world" } + +--- +"Test KV Processor with non-existing field and without ignore_missing": + - skip: + version: " - 2.11.99" + reason: "KV Processor with template snippets is only supported since 2.12.0" + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "{{source}}", + "target_field" : "{{target}}", + "field_split": " ", + "value_split": "=" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: /field path cannot be null nor empty/ + index: + index: test + id: 1 + pipeline: "1" + body: { + target: "zoo", + foo: "goodbye=everybody hello=world" + } + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "{{source}}", + "target_field" : "{{target}}", + "field_split": " ", + "value_split": "=" + } + } + ] + } + - match: { acknowledged: true } + + - do: + catch: /field \[unknown\] doesn\'t exist/ + index: + index: test + id: 1 + pipeline: "1" + body: { + source: "unknown", + target: "zoo", + foo: "goodbye=everybody hello=world" + } + +--- +"Test KV Processor with non-existing field and ignore_missing": + - skip: + version: " - 2.11.99" + reason: "KV Processor with template snippets is only supported since 2.12.0" + + - do: + ingest.put_pipeline: + id: "1" + body: > + { + "processors": [ + { + "kv" : { + "field" : "{{source}}", + "target_field" : "{{target}}", + "field_split": " ", + "value_split": "=", + "ignore_missing": true + } + } + ] + } + - match: { acknowledged: true } + + - do: + index: + index: test + id: 1 + pipeline: "1" + body: { + target: "zoo", + foo: "goodbye=everybody hello=world" + } + + - do: + get: + index: test + id: 1 + - match: { _source: { target: "zoo", foo: "goodbye=everybody hello=world"}} diff --git a/modules/ingest-geoip/licenses/jackson-annotations-2.15.2.jar.sha1 b/modules/ingest-geoip/licenses/jackson-annotations-2.15.2.jar.sha1 deleted file mode 100644 index f63416ddb8ceb..0000000000000 --- a/modules/ingest-geoip/licenses/jackson-annotations-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4724a65ac8e8d156a24898d50fd5dbd3642870b8 \ No newline at end of file diff --git a/modules/ingest-geoip/licenses/jackson-annotations-2.16.0.jar.sha1 b/modules/ingest-geoip/licenses/jackson-annotations-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..79ed9e0c63fc8 --- /dev/null +++ b/modules/ingest-geoip/licenses/jackson-annotations-2.16.0.jar.sha1 @@ -0,0 +1 @@ +dc30995f7428c0a405eba9b8c619b20d2b3b9905 \ No newline at end of file diff --git a/modules/ingest-geoip/licenses/jackson-databind-2.15.2.jar.sha1 b/modules/ingest-geoip/licenses/jackson-databind-2.15.2.jar.sha1 deleted file mode 100644 index f16d80af8dce6..0000000000000 --- a/modules/ingest-geoip/licenses/jackson-databind-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9353b021f10c307c00328f52090de2bdb4b6ff9c \ No newline at end of file diff --git a/modules/ingest-geoip/licenses/jackson-databind-2.16.0.jar.sha1 b/modules/ingest-geoip/licenses/jackson-databind-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..da00d281934b1 --- /dev/null +++ b/modules/ingest-geoip/licenses/jackson-databind-2.16.0.jar.sha1 @@ -0,0 +1 @@ +3a6b7f8ff7b30d518bbd65678e9c30cd881f19a7 \ No newline at end of file diff --git a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java deleted file mode 100644 index 604c233ca49c4..0000000000000 --- a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.codec; - -import org.opensearch.action.admin.indices.flush.FlushResponse; -import org.opensearch.action.admin.indices.refresh.RefreshResponse; -import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; -import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.opensearch.action.support.ActiveShardCount; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.engine.Segment; -import org.opensearch.index.reindex.BulkByScrollResponse; -import org.opensearch.index.reindex.ReindexAction; -import org.opensearch.index.reindex.ReindexModulePlugin; -import org.opensearch.index.reindex.ReindexRequestBuilder; -import org.opensearch.index.reindex.ReindexTestCase; -import org.opensearch.plugins.Plugin; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.util.stream.Collectors.toList; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; - -public class MultiCodecReindexIT extends ReindexTestCase { - - @Override - protected Collection> nodePlugins() { - return List.of(ReindexModulePlugin.class); - } - - public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { - internalCluster().ensureAtLeastNumDataNodes(1); - Map codecMap = Map.of( - "best_compression", - "BEST_COMPRESSION", - "zlib", - "BEST_COMPRESSION", - "default", - "BEST_SPEED", - "lz4", - "BEST_SPEED" - ); - - for (Map.Entry codec : codecMap.entrySet()) { - assertReindexingWithMultipleCodecs(codec.getKey(), codec.getValue(), codecMap); - } - - } - - private void assertReindexingWithMultipleCodecs(String destCodec, String destCodecMode, Map codecMap) - throws ExecutionException, InterruptedException { - - final String index = "test-index" + destCodec; - final String destIndex = "dest-index" + destCodec; - - // creating source index - createIndex( - index, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.codec", "default") - .put("index.merge.policy.max_merged_segment", "1b") - .build() - ); - ensureGreen(index); - - final int nbDocs = randomIntBetween(2, 5); - - // indexing with all 4 codecs - for (Map.Entry codec : codecMap.entrySet()) { - useCodec(index, codec.getKey()); - ingestDocs(index, nbDocs); - } - - assertTrue( - getSegments(index).stream() - .flatMap(s -> s.getAttributes().values().stream()) - .collect(Collectors.toSet()) - .containsAll(codecMap.values()) - ); - - // creating destination index with destination codec - createIndex( - destIndex, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.codec", destCodec) - .build() - ); - - BulkByScrollResponse bulkResponse = new ReindexRequestBuilder(client(), ReindexAction.INSTANCE).source(index) - .destination(destIndex) - .refresh(true) - .waitForActiveShards(ActiveShardCount.ONE) - .get(); - - assertEquals(codecMap.size() * nbDocs, bulkResponse.getCreated()); - assertEquals(codecMap.size() * nbDocs, bulkResponse.getTotal()); - assertEquals(0, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); - } - - private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { - assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1)); - - assertAcked( - client().admin() - .indices() - .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) - .get() - ); - - assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1)); - } - - private void flushAndRefreshIndex(String index) { - - // Request is not blocked - for (String blockSetting : Arrays.asList( - SETTING_BLOCKS_READ, - SETTING_BLOCKS_WRITE, - SETTING_READ_ONLY, - SETTING_BLOCKS_METADATA, - SETTING_READ_ONLY_ALLOW_DELETE - )) { - try { - enableIndexBlock(index, blockSetting); - // flush - FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet(); - assertNoFailures(flushResponse); - - // refresh - RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(index).execute().actionGet(); - assertNoFailures(refreshResponse); - } finally { - disableIndexBlock(index, blockSetting); - } - } - } - - private void ingestDocs(String index, int nbDocs) throws InterruptedException { - - indexRandom( - randomBoolean(), - false, - randomBoolean(), - IntStream.range(0, nbDocs) - .mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i)) - .collect(toList()) - ); - flushAndRefreshIndex(index); - } - - private ArrayList getSegments(String index) { - - return new ArrayList<>( - client().admin() - .indices() - .segments(new IndicesSegmentsRequest(index)) - .actionGet() - .getIndices() - .get(index) - .getShards() - .get(0) - .getShards()[0].getSegments() - ); - } - -} diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/MultiCodecReindexTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/MultiCodecReindexTests.java new file mode 100644 index 0000000000000..53a0545fd2ff7 --- /dev/null +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/MultiCodecReindexTests.java @@ -0,0 +1,160 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.reindex; + +import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.MergePolicyProvider; +import org.opensearch.index.engine.Segment; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalSettingsPlugin; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +public class MultiCodecReindexTests extends ReindexTestCase { + final static Map codecMap = Map.of( + "best_compression", + "BEST_COMPRESSION", + "zlib", + "BEST_COMPRESSION", + "default", + "BEST_SPEED", + "lz4", + "BEST_SPEED" + ); + final static String[] codecChoices = codecMap.keySet().toArray(String[]::new); + + @Override + protected Collection> nodePlugins() { + return List.of(InternalSettingsPlugin.class, ReindexModulePlugin.class); + } + + public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { + for (Map.Entry candidate : codecMap.entrySet()) { + final int nbDocs = randomIntBetween(2, 5); + + final String destCodec = candidate.getKey(); + final String destCodecMode = candidate.getValue(); + + final String index = "test-index-" + destCodec; + final String destIndex = "dest-index-" + destCodec; + + // create source index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", randomFrom(codecChoices)) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) + .build() + ); + ensureGreen(index); + + // index using all codecs + for (String codec : codecMap.keySet()) { + useCodec(index, codec); + ingestDocs(index, nbDocs); + } + + assertTrue( + getSegments(index).stream() + .flatMap(s -> s.getAttributes().values().stream()) + .collect(Collectors.toSet()) + .containsAll(codecMap.values()) + ); + + // create destination index with destination codec + createIndex( + destIndex, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", destCodec) + .build() + ); + ensureGreen(destIndex); + + // perform reindex + BulkByScrollResponse response = reindex().source(index) + .destination(destIndex) + .refresh(true) + .waitForActiveShards(ActiveShardCount.ONE) + .get(); + final int expectedResponseSize = codecMap.size() * nbDocs; + + // assertions + assertEquals(0, response.getNoops()); + assertEquals(1, response.getBatches()); + assertEquals(0, response.getDeleted()); + assertEquals(0, response.getVersionConflicts()); + assertEquals(0, response.getBulkFailures().size()); + assertEquals(0, response.getSearchFailures().size()); + + assertEquals(expectedResponseSize, response.getTotal()); + assertEquals(expectedResponseSize, response.getCreated()); + + assertTrue(response.getTook().getMillis() > 0); + assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); + } + } + + private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { + assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1)); + + assertAcked( + client().admin() + .indices() + .updateSettings(new UpdateSettingsRequest(index).settings(Settings.builder().put("index.codec", codec))) + .get() + ); + + assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1)); + } + + private void ingestDocs(String index, int nbDocs) throws InterruptedException { + indexRandom( + randomBoolean(), + false, + randomBoolean(), + IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i)) + .collect(toList()) + ); + + flushAndRefresh(index); + } + + private ArrayList getSegments(String index) { + return new ArrayList<>( + client().admin() + .indices() + .segments(new IndicesSegmentsRequest(index)) + .actionGet() + .getIndices() + .get(index) + .getShards() + .get(0) + .getShards()[0].getSegments() + ); + } +} diff --git a/plugins/crypto-kms/licenses/jackson-annotations-2.15.2.jar.sha1 b/plugins/crypto-kms/licenses/jackson-annotations-2.15.2.jar.sha1 deleted file mode 100644 index f63416ddb8ceb..0000000000000 --- a/plugins/crypto-kms/licenses/jackson-annotations-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4724a65ac8e8d156a24898d50fd5dbd3642870b8 \ No newline at end of file diff --git a/plugins/crypto-kms/licenses/jackson-annotations-2.16.0.jar.sha1 b/plugins/crypto-kms/licenses/jackson-annotations-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..79ed9e0c63fc8 --- /dev/null +++ b/plugins/crypto-kms/licenses/jackson-annotations-2.16.0.jar.sha1 @@ -0,0 +1 @@ +dc30995f7428c0a405eba9b8c619b20d2b3b9905 \ No newline at end of file diff --git a/plugins/crypto-kms/licenses/jackson-databind-2.15.2.jar.sha1 b/plugins/crypto-kms/licenses/jackson-databind-2.15.2.jar.sha1 deleted file mode 100644 index f16d80af8dce6..0000000000000 --- a/plugins/crypto-kms/licenses/jackson-databind-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9353b021f10c307c00328f52090de2bdb4b6ff9c \ No newline at end of file diff --git a/plugins/crypto-kms/licenses/jackson-databind-2.16.0.jar.sha1 b/plugins/crypto-kms/licenses/jackson-databind-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..da00d281934b1 --- /dev/null +++ b/plugins/crypto-kms/licenses/jackson-databind-2.16.0.jar.sha1 @@ -0,0 +1 @@ +3a6b7f8ff7b30d518bbd65678e9c30cd881f19a7 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/jackson-annotations-2.15.2.jar.sha1 b/plugins/discovery-ec2/licenses/jackson-annotations-2.15.2.jar.sha1 deleted file mode 100644 index f63416ddb8ceb..0000000000000 --- a/plugins/discovery-ec2/licenses/jackson-annotations-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4724a65ac8e8d156a24898d50fd5dbd3642870b8 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/jackson-annotations-2.16.0.jar.sha1 b/plugins/discovery-ec2/licenses/jackson-annotations-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..79ed9e0c63fc8 --- /dev/null +++ b/plugins/discovery-ec2/licenses/jackson-annotations-2.16.0.jar.sha1 @@ -0,0 +1 @@ +dc30995f7428c0a405eba9b8c619b20d2b3b9905 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/jackson-databind-2.15.2.jar.sha1 b/plugins/discovery-ec2/licenses/jackson-databind-2.15.2.jar.sha1 deleted file mode 100644 index f16d80af8dce6..0000000000000 --- a/plugins/discovery-ec2/licenses/jackson-databind-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9353b021f10c307c00328f52090de2bdb4b6ff9c \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/jackson-databind-2.16.0.jar.sha1 b/plugins/discovery-ec2/licenses/jackson-databind-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..da00d281934b1 --- /dev/null +++ b/plugins/discovery-ec2/licenses/jackson-databind-2.16.0.jar.sha1 @@ -0,0 +1 @@ +3a6b7f8ff7b30d518bbd65678e9c30cd881f19a7 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-annotations-2.15.2.jar.sha1 b/plugins/repository-azure/licenses/jackson-annotations-2.15.2.jar.sha1 deleted file mode 100644 index f63416ddb8ceb..0000000000000 --- a/plugins/repository-azure/licenses/jackson-annotations-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4724a65ac8e8d156a24898d50fd5dbd3642870b8 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-annotations-2.16.0.jar.sha1 b/plugins/repository-azure/licenses/jackson-annotations-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..79ed9e0c63fc8 --- /dev/null +++ b/plugins/repository-azure/licenses/jackson-annotations-2.16.0.jar.sha1 @@ -0,0 +1 @@ +dc30995f7428c0a405eba9b8c619b20d2b3b9905 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-databind-2.15.2.jar.sha1 b/plugins/repository-azure/licenses/jackson-databind-2.15.2.jar.sha1 deleted file mode 100644 index f16d80af8dce6..0000000000000 --- a/plugins/repository-azure/licenses/jackson-databind-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9353b021f10c307c00328f52090de2bdb4b6ff9c \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-databind-2.16.0.jar.sha1 b/plugins/repository-azure/licenses/jackson-databind-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..da00d281934b1 --- /dev/null +++ b/plugins/repository-azure/licenses/jackson-databind-2.16.0.jar.sha1 @@ -0,0 +1 @@ +3a6b7f8ff7b30d518bbd65678e9c30cd881f19a7 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-dataformat-xml-2.15.2.jar.sha1 b/plugins/repository-azure/licenses/jackson-dataformat-xml-2.15.2.jar.sha1 deleted file mode 100644 index 18c388b84f333..0000000000000 --- a/plugins/repository-azure/licenses/jackson-dataformat-xml-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -e7e9038dee5c1adb1ebd07d3669e0e1182ac5b60 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-dataformat-xml-2.16.0.jar.sha1 b/plugins/repository-azure/licenses/jackson-dataformat-xml-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..f0d165ff7cf82 --- /dev/null +++ b/plugins/repository-azure/licenses/jackson-dataformat-xml-2.16.0.jar.sha1 @@ -0,0 +1 @@ +f3cdb002e0f2f30ad9c5fd053d78b1a485511ab1 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-datatype-jsr310-2.15.2.jar.sha1 b/plugins/repository-azure/licenses/jackson-datatype-jsr310-2.15.2.jar.sha1 deleted file mode 100644 index 6aa4f9b99c274..0000000000000 --- a/plugins/repository-azure/licenses/jackson-datatype-jsr310-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -30d16ec2aef6d8094c5e2dce1d95034ca8b6cb42 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-datatype-jsr310-2.16.0.jar.sha1 b/plugins/repository-azure/licenses/jackson-datatype-jsr310-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..40379694f5ea5 --- /dev/null +++ b/plugins/repository-azure/licenses/jackson-datatype-jsr310-2.16.0.jar.sha1 @@ -0,0 +1 @@ +77e3a27823f795d928b897d8444744ddb044a5c3 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-module-jaxb-annotations-2.15.2.jar.sha1 b/plugins/repository-azure/licenses/jackson-module-jaxb-annotations-2.15.2.jar.sha1 deleted file mode 100644 index 80da08928f855..0000000000000 --- a/plugins/repository-azure/licenses/jackson-module-jaxb-annotations-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -6a22fd1c7b0f9788e81eea32c11dc8c1ba421f18 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-module-jaxb-annotations-2.16.0.jar.sha1 b/plugins/repository-azure/licenses/jackson-module-jaxb-annotations-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..820d14b3df8e4 --- /dev/null +++ b/plugins/repository-azure/licenses/jackson-module-jaxb-annotations-2.16.0.jar.sha1 @@ -0,0 +1 @@ +684daae9ea45087c670b4f6511edcfdb19c3a695 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/jackson-annotations-2.15.2.jar.sha1 b/plugins/repository-s3/licenses/jackson-annotations-2.15.2.jar.sha1 deleted file mode 100644 index f63416ddb8ceb..0000000000000 --- a/plugins/repository-s3/licenses/jackson-annotations-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -4724a65ac8e8d156a24898d50fd5dbd3642870b8 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/jackson-annotations-2.16.0.jar.sha1 b/plugins/repository-s3/licenses/jackson-annotations-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..79ed9e0c63fc8 --- /dev/null +++ b/plugins/repository-s3/licenses/jackson-annotations-2.16.0.jar.sha1 @@ -0,0 +1 @@ +dc30995f7428c0a405eba9b8c619b20d2b3b9905 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/jackson-databind-2.15.2.jar.sha1 b/plugins/repository-s3/licenses/jackson-databind-2.15.2.jar.sha1 deleted file mode 100644 index f16d80af8dce6..0000000000000 --- a/plugins/repository-s3/licenses/jackson-databind-2.15.2.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -9353b021f10c307c00328f52090de2bdb4b6ff9c \ No newline at end of file diff --git a/plugins/repository-s3/licenses/jackson-databind-2.16.0.jar.sha1 b/plugins/repository-s3/licenses/jackson-databind-2.16.0.jar.sha1 new file mode 100644 index 0000000000000..da00d281934b1 --- /dev/null +++ b/plugins/repository-s3/licenses/jackson-databind-2.16.0.jar.sha1 @@ -0,0 +1 @@ +3a6b7f8ff7b30d518bbd65678e9c30cd881f19a7 \ No newline at end of file diff --git a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java index f07f2b308e801..156dc344d1ae2 100644 --- a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java +++ b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java @@ -61,20 +61,22 @@ public void testSanityChecksWhenTracingEnabled() throws Exception { // Create Index and ingest data String indexName = "test-index-11"; - Settings basicSettings = Settings.builder().put("number_of_shards", 3).put("number_of_replicas", 0).build(); + Settings basicSettings = Settings.builder() + .put("number_of_shards", 2) + .put("number_of_replicas", 0) + .put("index.routing.allocation.total_shards_per_node", 1) + .build(); createIndex(indexName, basicSettings); - indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well")); - indexRandom(true, client.prepareIndex(indexName).setId("1").setSource("field2", "another fox did the same.")); + + indexRandom(false, client.prepareIndex(indexName).setId("1").setSource("field1", "the fox jumps in the well")); + indexRandom(false, client.prepareIndex(indexName).setId("2").setSource("field2", "another fox did the same.")); ensureGreen(); refresh(); // Make the search calls; adding the searchType and PreFilterShardSize to make the query path predictable across all the runs. - client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("fox")).get(); - client.prepareSearch().setSearchType("query_then_fetch").setPreFilterShardSize(3).setQuery(queryStringQuery("jumps")).get(); - - ensureGreen(); - refresh(); + client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("fox")).get(); + client.prepareSearch().setSearchType("dfs_query_then_fetch").setPreFilterShardSize(2).setQuery(queryStringQuery("jumps")).get(); // Sleep for about 3s to wait for traces are published, delay is (the delay is 1s). Thread.sleep(3000); @@ -88,8 +90,10 @@ public void testSanityChecksWhenTracingEnabled() throws Exception { ) ); + // See please https://github.com/opensearch-project/OpenSearch/issues/10291 till local transport is not instrumented, + // capturing only the inter-nodes transport actions. InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE; - validators.validate(exporter.getFinishedSpanItems(), 6); + validators.validate(exporter.getFinishedSpanItems(), 4); } private static void updateTelemetrySetting(Client client, boolean value) { diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java index f8fe885ee450c..0fb05a08c27bb 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java @@ -10,8 +10,8 @@ import org.opensearch.core.common.Strings; +import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.BiConsumer; @@ -51,7 +51,7 @@ private static OTelPropagatedSpan getPropagatedSpan(Context context) { } @Override - public Optional extractFromHeaders(Map> headers) { + public Optional extractFromHeaders(Map> headers) { Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER); return Optional.ofNullable(getPropagatedSpan(context)); } @@ -87,9 +87,9 @@ public String get(Map headers, String key) { } }; - private static final TextMapGetter>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() { + private static final TextMapGetter>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() { @Override - public Iterable keys(Map> headers) { + public Iterable keys(Map> headers) { if (headers != null) { return headers.keySet(); } else { @@ -98,7 +98,7 @@ public Iterable keys(Map> headers) { } @Override - public String get(Map> headers, String key) { + public String get(Map> headers, String key) { if (headers != null && headers.containsKey(key)) { return Strings.collectionToCommaDelimitedString(headers.get(key)); } diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java index 16a3ec9493d5d..d865a329104c1 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java @@ -11,8 +11,8 @@ import org.opensearch.test.OpenSearchTestCase; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import io.opentelemetry.api.OpenTelemetry; @@ -57,7 +57,7 @@ public void testExtractTracerContextFromHeader() { } public void testExtractTracerContextFromHttpHeader() { - Map> requestHeaders = new HashMap<>(); + Map> requestHeaders = new HashMap<>(); requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00")); OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); diff --git a/release-notes/opensearch.release-notes-2.11.1.md b/release-notes/opensearch.release-notes-2.11.1.md new file mode 100644 index 0000000000000..06613558de177 --- /dev/null +++ b/release-notes/opensearch.release-notes-2.11.1.md @@ -0,0 +1,10 @@ +## 2023-11-20 Version 2.11.1 Release Notes + +## [2.11.1] + +### Changed +- Use iterative approach to evaluate Regex.simpleMatch ([#11060](https://github.com/opensearch-project/OpenSearch/pull/11060)) + +### Fixed +- [BUG] Disable sort optimization for HALF_FLOAT ([#10999](https://github.com/opensearch-project/OpenSearch/pull/10999)) +- Adding version condition while adding geoshape doc values to the index, to ensure backward compatibility.([#11095](https://github.com/opensearch-project/OpenSearch/pull/11095)) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/240_date_nanos.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/240_date_nanos.yml index 1ddba45c97c72..fb5c3268a3c82 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/240_date_nanos.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/240_date_nanos.yml @@ -164,3 +164,99 @@ setup: - match: { aggregations.date.buckets.1.key: 1540857600000 } - match: { aggregations.date.buckets.1.key_as_string: "2018-10-30T00:00:00.000Z" } - match: { aggregations.date.buckets.1.doc_count: 2 } + + +--- +"date with nested sort now": + - skip: + version: " - 2.11.99" + reason: fixed in 2.12.0 + + # This tests cover scenario where nested sort have now() in date field type. + # For this test, we have date field as nested field and we trigger asc/desc sort + # on that nested field. `filter` clause is needed when we sort any nested field, + # like in this case, "gte": "now/h" says sort nested field date_field only where + # document is having value greater than current time now(). + # Nested field sort query doesn't sort documents if it is not qualified through + # `filter` clause. + # Only adding tests for `gte` as `lte` would be same behaviour + + - do: + indices.create: + index: test + body: + mappings: + properties: + nested_field: + type: nested + properties: + date_field: + type: date + format: date_optional_time + - do: + bulk: + refresh: true + index: test + body: | + {"index":{}} + {"nested_field": [{"date_field": "3023-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "3024-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "3025-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "3026-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "3027-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "2022-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "2023-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "2021-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "2020-10-26T12:00:00+09:00"}]} + {"index":{}} + {"nested_field": [{"date_field": "2019-10-26T12:00:00+09:00"}]} + + # gte: now/h with the desc sort + - do: + search: + index: test + body: + size: 5 + sort: [{ nested_field.date_field: { mode: max, order: desc, nested: { path: nested_field, filter: { bool: { filter : [{ range : { nested_field.date_field: { gte: now/h, time_zone: +09:00} } }] } } } } } ] + - match: {hits.total.value: 10 } + - length: {hits.hits: 5 } + - match: { hits.hits.0._index: test } + - match: { hits.hits.0._source.nested_field.0.date_field: "3027-10-26T12:00:00+09:00" } + - match: { hits.hits.0.sort: [33381428400000] } + - match: { hits.hits.1._source.nested_field.0.date_field: "3026-10-26T12:00:00+09:00" } + - match: { hits.hits.1.sort: [ 33349892400000 ] } + - match: { hits.hits.2._source.nested_field.0.date_field: "3025-10-26T12:00:00+09:00" } + - match: { hits.hits.2.sort: [ 33318356400000 ] } + - match: { hits.hits.3._source.nested_field.0.date_field: "3024-10-26T12:00:00+09:00" } + - match: { hits.hits.3.sort: [ 33286820400000 ] } + - match: { hits.hits.4._source.nested_field.0.date_field: "3023-10-26T12:00:00+09:00" } + - match: { hits.hits.4.sort: [ 33255198000000 ] } + + # gte: now/h with the asc sort + - do: + search: + index: test + body: + size: 5 + sort: [ { nested_field.date_field: { mode: max, order: asc, nested: { path: nested_field, filter: { bool: { filter: [ { range: { nested_field.date_field: { gte: now/h, time_zone: +09:00 } } } ] } } } } } ] + - match: { hits.total.value: 10 } + - length: { hits.hits: 5 } + - match: { hits.hits.0._index: test } + - match: { hits.hits.0._source.nested_field.0.date_field: "3023-10-26T12:00:00+09:00" } + - match: { hits.hits.0.sort: [ 33255198000000 ] } + - match: { hits.hits.1._source.nested_field.0.date_field: "3024-10-26T12:00:00+09:00" } + - match: { hits.hits.1.sort: [ 33286820400000 ] } + - match: { hits.hits.2._source.nested_field.0.date_field: "3025-10-26T12:00:00+09:00" } + - match: { hits.hits.2.sort: [ 33318356400000 ] } + - match: { hits.hits.3._source.nested_field.0.date_field: "3026-10-26T12:00:00+09:00" } + - match: { hits.hits.3.sort: [ 33349892400000 ] } + - match: { hits.hits.4._source.nested_field.0.date_field: "3027-10-26T12:00:00+09:00" } + - match: { hits.hits.4.sort: [ 33381428400000 ] } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/search/90_search_after.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/search/90_search_after.yml index aceae6cee53ff..ebecb63dedbaf 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/search/90_search_after.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/search/90_search_after.yml @@ -219,7 +219,7 @@ - match: {hits.hits.0._source.timestamp: "2019-10-21 08:30:04.828733" } - match: {hits.hits.0.sort: [1571646604828733000] } - # search_after with the sort + # search_after with the asc sort - do: search: index: test diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index b8f8abb6c2c23..257aca2b67990 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -69,9 +69,11 @@ import java.nio.channels.CancelledKeyException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -362,7 +364,7 @@ protected void serverAcceptedChannel(HttpChannel httpChannel) { * @param httpChannel that received the http request */ public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) { - final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), httpRequest.getHeaders()); + final Span span = tracer.startSpan(SpanBuilder.from(httpRequest), extractHeaders(httpRequest.getHeaders())); try (final SpanScope httpRequestSpanScope = tracer.withSpanInScope(span)) { HttpChannel traceableHttpChannel = TraceableHttpChannel.create(httpChannel, span, tracer); handleIncomingRequest(httpRequest, traceableHttpChannel, httpRequest.getInboundException()); @@ -483,4 +485,9 @@ private static ActionListener earlyResponseListener(HttpRequest request, H return NO_OP; } } + + @SuppressWarnings("unchecked") + private static > Map> extractHeaders(Map headers) { + return (Map>) headers; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 5c1c2a0e410c8..b042f3cf41d61 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -507,14 +507,15 @@ private boolean canMatch(LeafReaderContext ctx) throws IOException { } private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException { - if (searchContext.request() != null && searchContext.request().source() != null) { + if (searchContext.searchAfter() != null && searchContext.request() != null && searchContext.request().source() != null) { // Only applied on primary sort field and primary search_after. FieldSortBuilder primarySortField = FieldSortBuilder.getPrimaryFieldSortOrNull(searchContext.request().source()); if (primarySortField != null) { MinAndMax minMax = FieldSortBuilder.getMinMaxOrNullForSegment( this.searchContext.getQueryShardContext(), ctx, - primarySortField + primarySortField, + searchContext.sort() ); return SearchService.canMatchSearchAfter( searchContext.searchAfter(), diff --git a/server/src/main/java/org/opensearch/search/sort/FieldSortBuilder.java b/server/src/main/java/org/opensearch/search/sort/FieldSortBuilder.java index 97e1d444d4a0a..5cecda1346b90 100644 --- a/server/src/main/java/org/opensearch/search/sort/FieldSortBuilder.java +++ b/server/src/main/java/org/opensearch/search/sort/FieldSortBuilder.java @@ -611,7 +611,8 @@ public static FieldSortBuilder getPrimaryFieldSortOrNull(SearchSourceBuilder sou * and configurations return null. */ public static MinAndMax getMinMaxOrNull(QueryShardContext context, FieldSortBuilder sortBuilder) throws IOException { - return getMinMaxOrNullInternal(context.getIndexReader(), context, sortBuilder); + final SortAndFormats sort = SortBuilder.buildSort(Collections.singletonList(sortBuilder), context).get(); + return getMinMaxOrNullInternal(context.getIndexReader(), context, sortBuilder, sort); } /** @@ -619,14 +620,21 @@ public static MinAndMax getMinMaxOrNull(QueryShardContext context, FieldSortB * The value can be extracted on non-nested indexed mapped fields of type keyword, numeric or date, other fields * and configurations return null. */ - public static MinAndMax getMinMaxOrNullForSegment(QueryShardContext context, LeafReaderContext ctx, FieldSortBuilder sortBuilder) - throws IOException { - return getMinMaxOrNullInternal(ctx.reader(), context, sortBuilder); + public static MinAndMax getMinMaxOrNullForSegment( + QueryShardContext context, + LeafReaderContext ctx, + FieldSortBuilder sortBuilder, + SortAndFormats sort + ) throws IOException { + return getMinMaxOrNullInternal(ctx.reader(), context, sortBuilder, sort); } - private static MinAndMax getMinMaxOrNullInternal(IndexReader reader, QueryShardContext context, FieldSortBuilder sortBuilder) - throws IOException { - SortAndFormats sort = SortBuilder.buildSort(Collections.singletonList(sortBuilder), context).get(); + private static MinAndMax getMinMaxOrNullInternal( + IndexReader reader, + QueryShardContext context, + FieldSortBuilder sortBuilder, + SortAndFormats sort + ) throws IOException { SortField sortField = sort.sort.getSort()[0]; if (sortField.getField() == null) { return null; diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index 208df90f65d74..863f56d9fbe94 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -15,7 +15,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Optional; /** * Core's ThreadContext based TracerContextStorage implementation @@ -79,17 +78,7 @@ public Map headers(Map source) { } Span getCurrentSpan(String key) { - Optional optionalSpanFromContext = spanFromThreadContext(key); - return optionalSpanFromContext.orElse(spanFromHeader()); - } - - private Optional spanFromThreadContext(String key) { SpanReference currentSpanRef = threadContext.getTransient(key); - return (currentSpanRef == null) ? Optional.empty() : Optional.ofNullable(currentSpanRef.getSpan()); - } - - private Span spanFromHeader() { - Optional span = tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders()); - return span.orElse(null); + return (currentSpanRef == null) ? null : currentSpanRef.getSpan(); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java b/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java index 1cb73e0247c3a..b0cecb0ee485d 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java @@ -62,6 +62,13 @@ public void close() { } } + protected TracerContextStorage createTracerContextStorage( + TracingTelemetry tracingTelemetry, + ThreadContext threadContext + ) { + return new ThreadContextBasedTracerContextStorage(threadContext, tracingTelemetry); + } + private Tracer tracer(Optional telemetry, ThreadContext threadContext) { return telemetry.map(Telemetry::getTracingTelemetry) .map(tracingTelemetry -> createDefaultTracer(tracingTelemetry, threadContext)) @@ -70,10 +77,7 @@ private Tracer tracer(Optional telemetry, ThreadContext threadContext } private Tracer createDefaultTracer(TracingTelemetry tracingTelemetry, ThreadContext threadContext) { - TracerContextStorage tracerContextStorage = new ThreadContextBasedTracerContextStorage( - threadContext, - tracingTelemetry - ); + TracerContextStorage tracerContextStorage = createTracerContextStorage(tracingTelemetry, threadContext); return new DefaultTracer(tracingTelemetry, tracerContextStorage); } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java index 631fb8242d78e..dfe456a0a6784 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java @@ -13,7 +13,7 @@ import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.io.IOException; -import java.util.List; +import java.util.Collection; import java.util.Map; /** @@ -75,7 +75,7 @@ Tracer getDelegateTracer() { } @Override - public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { + public Span startSpan(SpanCreationContext spanCreationContext, Map> headers) { return defaultTracer.startSpan(spanCreationContext, headers); } } diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index c14a53e799319..a8315c3cae4e0 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -57,6 +57,10 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; /** * Handler for inbound data @@ -188,11 +192,16 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st } } + private Map> extractHeaders(Map headers) { + return headers.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> Collections.singleton(e.getValue()))); + } + private void handleRequest(TcpChannel channel, Header header, InboundMessage message) throws IOException { final String action = header.getActionName(); final long requestId = header.getRequestId(); final Version version = header.getVersion(); - Span span = tracer.startSpan(SpanBuilder.from(action, channel)); + final Map> headers = extractHeaders(header.getHeaders().v1()); + Span span = tracer.startSpan(SpanBuilder.from(action, channel), headers); try (SpanScope spanScope = tracer.withSpanInScope(span)) { if (header.isHandshake()) { messageListener.onRequestReceived(requestId, action); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index de88c3619abe8..5aeed72f306db 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -867,59 +867,18 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); - try (SpanScope spanScope = tracer.withSpanInScope(span)) { - TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer); - try { - logger.debug("Action: " + action); - final TransportResponseHandler delegate; - if (request.getParentTask().isSet()) { - // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. - final Releasable unregisterChildNode = taskManager.registerChildNode( - request.getParentTask().getId(), - connection.getNode() - ); - delegate = new TransportResponseHandler() { - @Override - public void handleResponse(T response) { - unregisterChildNode.close(); - traceableTransportResponseHandler.handleResponse(response); - } - - @Override - public void handleException(TransportException exp) { - unregisterChildNode.close(); - traceableTransportResponseHandler.handleException(exp); - } - - @Override - public String executor() { - return traceableTransportResponseHandler.executor(); - } - - @Override - public T read(StreamInput in) throws IOException { - return traceableTransportResponseHandler.read(in); - } - - @Override - public String toString() { - return getClass().getName() + "/[" + action + "]:" + handler.toString(); - } - }; - } else { - delegate = traceableTransportResponseHandler; - } - asyncSender.sendRequest(connection, action, request, options, delegate); - } catch (final Exception ex) { - // the caller might not handle this so we invoke the handler - final TransportException te; - if (ex instanceof TransportException) { - te = (TransportException) ex; - } else { - te = new TransportException("failure to send", ex); - } - traceableTransportResponseHandler.handleException(te); + if (connection == localNodeConnection) { + // See please https://github.com/opensearch-project/OpenSearch/issues/10291 + sendRequestAsync(connection, action, request, options, handler); + } else { + final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create( + handler, + span, + tracer + ); + sendRequestAsync(connection, action, request, options, traceableTransportResponseHandler); } } } @@ -1690,4 +1649,61 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder) } } } + + private void sendRequestAsync( + final Transport.Connection connection, + final String action, + final TransportRequest request, + final TransportRequestOptions options, + final TransportResponseHandler handler + ) { + try { + logger.debug("Action: " + action); + final TransportResponseHandler delegate; + if (request.getParentTask().isSet()) { + // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. + final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); + delegate = new TransportResponseHandler() { + @Override + public void handleResponse(T response) { + unregisterChildNode.close(); + handler.handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + unregisterChildNode.close(); + handler.handleException(exp); + } + + @Override + public String executor() { + return handler.executor(); + } + + @Override + public T read(StreamInput in) throws IOException { + return handler.read(in); + } + + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); + } + }; + } else { + delegate = handler; + } + asyncSender.sendRequest(connection, action, request, options, delegate); + } catch (final Exception ex) { + // the caller might not handle this so we invoke the handler + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + handler.handleException(te); + } + } } diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java new file mode 100644 index 0000000000000..3a98a67b53920 --- /dev/null +++ b/server/src/test/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorageTests.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.concurrent.ThreadContext.StoredContext; +import org.opensearch.telemetry.Telemetry; +import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.metrics.MetricsTelemetry; +import org.opensearch.telemetry.tracing.noop.NoopTracer; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.telemetry.tracing.MockTracingTelemetry; +import org.junit.After; +import org.junit.Before; + +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING; +import static org.opensearch.telemetry.TelemetrySettings.TRACER_FEATURE_ENABLED_SETTING; +import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; + +public class ThreadContextBasedTracerContextStorageTests extends OpenSearchTestCase { + private Tracer tracer; + private ThreadContext threadContext; + private TracerContextStorage threadContextStorage; + private ExecutorService executorService; + + @SuppressWarnings("resource") + @Before + public void setUp() throws Exception { + super.setUp(); + + final Settings settings = Settings.builder() + .put(TRACER_ENABLED_SETTING.getKey(), true) + .put(TRACER_SAMPLER_PROBABILITY.getKey(), 1d) + .put(TRACER_FEATURE_ENABLED_SETTING.getKey(), true) + .build(); + + final TelemetrySettings telemetrySettings = new TelemetrySettings( + settings, + new ClusterSettings(Settings.EMPTY, Set.of(TRACER_ENABLED_SETTING, TRACER_SAMPLER_PROBABILITY)) + ); + + final TracingTelemetry tracingTelemetry = new MockTracingTelemetry(); + + threadContext = new ThreadContext(Settings.EMPTY); + threadContextStorage = new ThreadContextBasedTracerContextStorage(threadContext, tracingTelemetry); + + tracer = new TracerFactory(telemetrySettings, Optional.of(new Telemetry() { + @Override + public MetricsTelemetry getMetricsTelemetry() { + return null; + } + + @Override + public TracingTelemetry getTracingTelemetry() { + return tracingTelemetry; + } + }), threadContext) { + @Override + protected TracerContextStorage createTracerContextStorage( + TracingTelemetry tracingTelemetry, + ThreadContext threadContext + ) { + return threadContextStorage; + } + }.getTracer(); + + executorService = Executors.newSingleThreadExecutor(); + assertThat(tracer, not(instanceOf(NoopTracer.class))); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + executorService.shutdown(); + tracer.close(); + } + + public void testStartingSpanDoesNotChangeThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testSpanInScopeChangesThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testStashingPropagatesThreadContext() { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + try (StoredContext ignored = threadContext.stashContext()) { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testPreservingContextThreadContext() throws InterruptedException, ExecutionException, TimeoutException { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + final Runnable r = new Runnable() { + @Override + public void run() { + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(span)); + } + }; + + executorService.submit(threadContext.preserveContext(r)).get(1, TimeUnit.SECONDS); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } + + public void testPreservingContextAndStashingThreadContext() throws InterruptedException, ExecutionException, TimeoutException { + final Span span = tracer.startSpan(SpanCreationContext.internal().name("test")); + + try (SpanScope scope = tracer.withSpanInScope(span)) { + final Runnable r = new Runnable() { + @Override + public void run() { + final Span local = tracer.startSpan(SpanCreationContext.internal().name("test-local")); + try (SpanScope localScope = tracer.withSpanInScope(local)) { + try (StoredContext ignored = threadContext.stashContext()) { + assertThat( + threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), + is(not(nullValue())) + ); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(local)); + } + } + } + }; + + executorService.submit(threadContext.preserveContext(r)).get(1, TimeUnit.SECONDS); + } + + assertThat(threadContext.getTransient(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(not(nullValue()))); + assertThat(threadContextStorage.get(ThreadContextBasedTracerContextStorage.CURRENT_SPAN), is(nullValue())); + } +} diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java index 6d0cd6d0b1290..4c58352531ca8 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java @@ -14,7 +14,7 @@ import org.opensearch.telemetry.tracing.TracingContextPropagator; import org.opensearch.telemetry.tracing.attributes.Attributes; -import java.util.List; +import java.util.Collection; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -52,7 +52,7 @@ public Optional extract(Map props) { } @Override - public Optional extractFromHeaders(Map> headers) { + public Optional extractFromHeaders(Map> headers) { if (headers != null) { Map convertedHeader = headers.entrySet() .stream() diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java index 8d8c18fb9ef6b..9b5d84954908b 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/TelemetryValidators.java @@ -49,7 +49,7 @@ private String printProblematicSpansMap(Map> spanMap) StringBuilder sb = new StringBuilder(); for (var entry : spanMap.entrySet()) { sb.append("SpanData validation failed for validator " + entry.getKey()); - sb.append("/n"); + sb.append("\n"); for (MockSpanData span : entry.getValue()) { sb.append(span.toString()); } diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java index 5fe268a8f0581..045d3a85e21e7 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/validators/NumberOfTraceIDsEqualToRequests.java @@ -13,9 +13,9 @@ import org.opensearch.test.telemetry.tracing.TracingValidator; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Objects; -import java.util.Set; import java.util.stream.Collectors; /** @@ -41,13 +41,10 @@ public NumberOfTraceIDsEqualToRequests(Attributes attributes) { */ @Override public List validate(List spans, int requests) { - Set totalTraceIDs = spans.stream() - .filter(span -> isMatchingSpan(span)) - .map(MockSpanData::getTraceID) - .collect(Collectors.toSet()); + final Collection totalTraceIDs = spans.stream().filter(span -> isMatchingSpan(span)).collect(Collectors.toList()); List problematicSpans = new ArrayList<>(); - if (totalTraceIDs.size() != requests) { - problematicSpans.addAll(spans); + if (totalTraceIDs.stream().map(MockSpanData::getTraceID).distinct().count() != requests) { + problematicSpans.addAll(totalTraceIDs); } return problematicSpans; }