From 88ec9d7bdcc813d0a742e698a4ecece784ea4614 Mon Sep 17 00:00:00 2001 From: Shawn Fang Date: Tue, 3 Dec 2024 15:04:15 -0800 Subject: [PATCH] rerun maven in monitor sdk --- .../ingestion/LogsIngestionAsyncClient.java | 12 +-- .../ingestion/LogsIngestionClient.java | 13 ++- .../ingestion/LogsIngestionClientBuilder.java | 8 +- .../ingestion/implementation/Batcher.java | 4 +- ...UsingDataCollectionRulesClientBuilder.java | 10 ++- .../ingestion/implementation/Utils.java | 2 +- .../src/main/java/module-info.java | 1 + .../com/azure/monitor/ingestion/LogData.java | 2 - .../LogsIngestionAsyncClientTest.java | 85 +++++++++---------- .../LogsIngestionClientBuilderTest.java | 10 +-- .../LogsIngestionClientConcurrencyTest.java | 26 +++--- .../ingestion/LogsIngestionClientTest.java | 52 +++++------- .../ingestion/LogsIngestionTestBase.java | 64 +++++++------- .../ConcurrencyLimitingSpliteratorTest.java | 71 +++++++--------- .../ingestion/implementation/UtilsTest.java | 10 +-- 15 files changed, 171 insertions(+), 199 deletions(-) diff --git a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionAsyncClient.java b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionAsyncClient.java index 86a688fda3b7e..e8258c04fac5e 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionAsyncClient.java +++ b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionAsyncClient.java @@ -291,8 +291,8 @@ private void processResponse(LogsUploadOptions options, UploadLogsResponseHolder uploadLogsErrorConsumer = options.getLogsUploadErrorConsumer(); } if (uploadLogsErrorConsumer != null) { - uploadLogsErrorConsumer.accept( - new LogsUploadError(responseHolder.getException(), responseHolder.getRequest().getLogs())); + uploadLogsErrorConsumer + .accept(new LogsUploadError(responseHolder.getException(), responseHolder.getRequest().getLogs())); return; } // emit the responseHolder without the original logs only if there's an error and there's no @@ -304,10 +304,10 @@ private void processResponse(LogsUploadOptions options, UploadLogsResponseHolder private Mono uploadToService(String ruleId, String streamName, Context context, LogsIngestionRequest request) { - RequestOptions requestOptions = new RequestOptions().addHeader(HttpHeaderName.CONTENT_ENCODING, GZIP) - .setContext(context); - return service.uploadWithResponse(ruleId, streamName, BinaryData.fromBytes(request.getRequestBody()), - requestOptions) + RequestOptions requestOptions + = new RequestOptions().addHeader(HttpHeaderName.CONTENT_ENCODING, GZIP).setContext(context); + return service + .uploadWithResponse(ruleId, streamName, BinaryData.fromBytes(request.getRequestBody()), requestOptions) .map(response -> new UploadLogsResponseHolder(null, null)) .onErrorResume(HttpResponseException.class, ex -> Mono.fromSupplier(() -> new UploadLogsResponseHolder(request, ex))); diff --git a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClient.java b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClient.java index 605c8dfcfae58..25f56ec715845 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClient.java +++ b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClient.java @@ -216,22 +216,21 @@ public void upload(String ruleId, String streamName, Iterable logs, Logs Objects.requireNonNull(streamName, "'streamName' cannot be null."); Objects.requireNonNull(logs, "'logs' cannot be null."); - Consumer uploadLogsErrorConsumer = options == null - ? null - : options.getLogsUploadErrorConsumer(); + Consumer uploadLogsErrorConsumer + = options == null ? null : options.getLogsUploadErrorConsumer(); RequestOptions requestOptions = new RequestOptions(); requestOptions.addHeader(HttpHeaderName.CONTENT_ENCODING, GZIP); requestOptions.setContext(context); - Stream responses = new Batcher(options, logs).toStream() - .map(r -> uploadToService(ruleId, streamName, requestOptions, r)); + Stream responses + = new Batcher(options, logs).toStream().map(r -> uploadToService(ruleId, streamName, requestOptions, r)); responses = submit(responses, getConcurrency(options)).filter(response -> response.getException() != null); if (uploadLogsErrorConsumer != null) { - responses.forEach(response -> uploadLogsErrorConsumer.accept( - new LogsUploadError(response.getException(), response.getRequest().getLogs()))); + responses.forEach(response -> uploadLogsErrorConsumer + .accept(new LogsUploadError(response.getException(), response.getRequest().getLogs()))); return; } diff --git a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClientBuilder.java b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClientBuilder.java index 30ba226874566..33f1ac6e39901 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClientBuilder.java +++ b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/LogsIngestionClientBuilder.java @@ -90,8 +90,8 @@ public LogsIngestionClientBuilder endpoint(String endpoint) { this.endpoint = endpoint; return this; } catch (MalformedURLException exception) { - throw LOGGER.logExceptionAsError( - new IllegalArgumentException("'endpoint' must be a valid URL.", exception)); + throw LOGGER + .logExceptionAsError(new IllegalArgumentException("'endpoint' must be a valid URL.", exception)); } } @@ -204,8 +204,8 @@ public LogsIngestionClientBuilder clientOptions(ClientOptions clientOptions) { * @return the updated {@link LogsIngestionClientBuilder}. */ public LogsIngestionClientBuilder serviceVersion(LogsIngestionServiceVersion serviceVersion) { - innerLogBuilder.serviceVersion( - IngestionUsingDataCollectionRulesServiceVersion.valueOf(serviceVersion.getVersion())); + innerLogBuilder + .serviceVersion(IngestionUsingDataCollectionRulesServiceVersion.valueOf(serviceVersion.getVersion())); return this; } diff --git a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Batcher.java b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Batcher.java index 39f5786372f8f..281a78354d462 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Batcher.java +++ b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Batcher.java @@ -86,8 +86,8 @@ public LogsIngestionRequest next() { */ public Stream toStream() { if (concurrency == 1) { - return StreamSupport.stream( - Spliterators.spliteratorUnknownSize(this, Spliterator.NONNULL | Spliterator.ORDERED), false); + return StreamSupport + .stream(Spliterators.spliteratorUnknownSize(this, Spliterator.NONNULL | Spliterator.ORDERED), false); } return StreamSupport.stream(new ConcurrencyLimitingSpliterator<>(this, concurrency), true); diff --git a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/IngestionUsingDataCollectionRulesClientBuilder.java b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/IngestionUsingDataCollectionRulesClientBuilder.java index 138b8d887a678..bb7b259456785 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/IngestionUsingDataCollectionRulesClientBuilder.java +++ b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/IngestionUsingDataCollectionRulesClientBuilder.java @@ -289,7 +289,8 @@ private HttpPipeline createHttpPipeline() { if (headers.getSize() > 0) { policies.add(new AddHeadersPolicy(headers)); } - this.pipelinePolicies.stream().filter(p -> p.getPipelinePosition() == HttpPipelinePosition.PER_CALL) + this.pipelinePolicies.stream() + .filter(p -> p.getPipelinePosition() == HttpPipelinePosition.PER_CALL) .forEach(p -> policies.add(p)); HttpPolicyProviders.addBeforeRetryPolicies(policies); policies.add(ClientBuilderUtil.validateAndGetRetryPolicy(retryPolicy, retryOptions, new RetryPolicy())); @@ -298,12 +299,15 @@ private HttpPipeline createHttpPipeline() { policies.add(new BearerTokenAuthenticationPolicy(tokenCredential, audience == null ? DEFAULT_SCOPES : new String[] { audience.toString() })); } - this.pipelinePolicies.stream().filter(p -> p.getPipelinePosition() == HttpPipelinePosition.PER_RETRY) + this.pipelinePolicies.stream() + .filter(p -> p.getPipelinePosition() == HttpPipelinePosition.PER_RETRY) .forEach(p -> policies.add(p)); HttpPolicyProviders.addAfterRetryPolicies(policies); policies.add(new HttpLoggingPolicy(localHttpLogOptions)); HttpPipeline httpPipeline = new HttpPipelineBuilder().policies(policies.toArray(new HttpPipelinePolicy[0])) - .httpClient(httpClient).clientOptions(localClientOptions).build(); + .httpClient(httpClient) + .clientOptions(localClientOptions) + .build(); return httpPipeline; } diff --git a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Utils.java b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Utils.java index def73f52078e4..54f11b7985baa 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Utils.java +++ b/sdk/monitor/azure-monitor-ingestion/src/main/java/com/azure/monitor/ingestion/implementation/Utils.java @@ -48,7 +48,7 @@ public static int getConcurrency(LogsUploadOptions options) { return options.getMaxConcurrency(); } - return 1; + return 1; } /** diff --git a/sdk/monitor/azure-monitor-ingestion/src/main/java/module-info.java b/sdk/monitor/azure-monitor-ingestion/src/main/java/module-info.java index 6ddf35235ed6a..b32a3e25d837b 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/main/java/module-info.java +++ b/sdk/monitor/azure-monitor-ingestion/src/main/java/module-info.java @@ -4,6 +4,7 @@ module com.azure.monitor.ingestion { requires transitive com.azure.core; + exports com.azure.monitor.ingestion; exports com.azure.monitor.ingestion.models; diff --git a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogData.java b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogData.java index e1b0b1a9c7129..5afdd12f1e2ce 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogData.java +++ b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogData.java @@ -12,11 +12,9 @@ public class LogData { @JsonProperty(value = "Time") private OffsetDateTime time; - @JsonProperty(value = "ExtendedColumn") private String extendedColumn; - @JsonProperty(value = "AdditionalContext") private String additionalContext; diff --git a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionAsyncClientTest.java b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionAsyncClientTest.java index 490d4df57994b..465f5f3749f71 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionAsyncClientTest.java +++ b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionAsyncClientTest.java @@ -32,8 +32,7 @@ public void testUploadLogs() { DataValidationPolicy dataValidationPolicy = new DataValidationPolicy(logs); LogsIngestionAsyncClient client = clientBuilder.addPolicy(dataValidationPolicy).buildAsyncClient(); - StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)) - .verifyComplete(); + StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)).verifyComplete(); } @Test @@ -43,13 +42,10 @@ public void testUploadLogsInBatches() { AtomicInteger count = new AtomicInteger(); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionAsyncClient client = clientBuilder - .addPolicy(logsCountPolicy) - .addPolicy(new BatchCountPolicy(count)) - .buildAsyncClient(); + LogsIngestionAsyncClient client + = clientBuilder.addPolicy(logsCountPolicy).addPolicy(new BatchCountPolicy(count)).buildAsyncClient(); - StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)) - .verifyComplete(); + StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)).verifyComplete(); assertEquals(2, count.get()); assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount()); @@ -61,11 +57,10 @@ public void testUploadLogsInBatchesConcurrently() { AtomicInteger count = new AtomicInteger(); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionAsyncClient client = clientBuilder - .addPolicy(new BatchCountPolicy(count)) - .addPolicy(logsCountPolicy) - .buildAsyncClient(); - StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, new LogsUploadOptions().setMaxConcurrency(3))) + LogsIngestionAsyncClient client + = clientBuilder.addPolicy(new BatchCountPolicy(count)).addPolicy(logsCountPolicy).buildAsyncClient(); + StepVerifier + .create(client.upload(dataCollectionRuleId, streamName, logs, new LogsUploadOptions().setMaxConcurrency(3))) .verifyComplete(); assertEquals(2, count.get()); assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount()); @@ -79,20 +74,17 @@ public void testUploadLogsPartialFailure() { AtomicInteger count = new AtomicInteger(); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionAsyncClient client = clientBuilder - .addPolicy(logsCountPolicy) - .addPolicy(new PartialFailurePolicy(count)) - .buildAsyncClient(); - - StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)) - .verifyErrorSatisfies(error -> { - assertTrue(error instanceof LogsUploadException); - if (error instanceof LogsUploadException) { - LogsUploadException ex = (LogsUploadException) error; - assertEquals(49460, ex.getFailedLogsCount()); - assertEquals(5, ex.getLogsUploadErrors().size()); - } - }); + LogsIngestionAsyncClient client + = clientBuilder.addPolicy(logsCountPolicy).addPolicy(new PartialFailurePolicy(count)).buildAsyncClient(); + + StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)).verifyErrorSatisfies(error -> { + assertTrue(error instanceof LogsUploadException); + if (error instanceof LogsUploadException) { + LogsUploadException ex = (LogsUploadException) error; + assertEquals(49460, ex.getFailedLogsCount()); + assertEquals(5, ex.getLogsUploadErrors().size()); + } + }); assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount()); } @@ -107,13 +99,10 @@ public void testUploadLogsPartialFailureWithErrorHandler() { .setLogsUploadErrorConsumer(error -> failedLogsCount.addAndGet(error.getFailedLogs().size())); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionAsyncClient client = clientBuilder - .addPolicy(logsCountPolicy) - .addPolicy(new PartialFailurePolicy(count)) - .buildAsyncClient(); + LogsIngestionAsyncClient client + = clientBuilder.addPolicy(logsCountPolicy).addPolicy(new PartialFailurePolicy(count)).buildAsyncClient(); - StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions)) - .verifyComplete(); + StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions)).verifyComplete(); assertEquals(49460, failedLogsCount.get()); assertEquals(11, count.get()); assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount()); @@ -125,17 +114,14 @@ public void testUploadLogsStopOnFirstError() { // Live Only, as it times out in CI playback mode. TODO: Re-record and update test base to exclude any sanitizers as needed. List logs = getObjects(100000); AtomicInteger count = new AtomicInteger(); - LogsUploadOptions logsUploadOptions = new LogsUploadOptions() - .setLogsUploadErrorConsumer(error -> { - // throw on first error - throw error.getResponseException(); - }); + LogsUploadOptions logsUploadOptions = new LogsUploadOptions().setLogsUploadErrorConsumer(error -> { + // throw on first error + throw error.getResponseException(); + }); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionAsyncClient client = clientBuilder - .addPolicy(logsCountPolicy) - .addPolicy(new PartialFailurePolicy(count)) - .buildAsyncClient(); + LogsIngestionAsyncClient client + = clientBuilder.addPolicy(logsCountPolicy).addPolicy(new PartialFailurePolicy(count)).buildAsyncClient(); StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions)) .verifyErrorSatisfies(ex -> assertTrue(ex instanceof HttpResponseException)); @@ -148,20 +134,25 @@ public void testUploadLogsStopOnFirstError() { public void testUploadLogsProtocolMethod() { List logs = getObjects(10); LogsIngestionAsyncClient client = clientBuilder.buildAsyncClient(); - StepVerifier.create(client.uploadWithResponse(dataCollectionRuleId, streamName, - BinaryData.fromObject(logs), new RequestOptions())) + StepVerifier + .create(client.uploadWithResponse(dataCollectionRuleId, streamName, BinaryData.fromObject(logs), + new RequestOptions())) .assertNext(response -> assertEquals(204, response.getStatusCode())) .verifyComplete(); } @Test @RecordWithoutRequestBody - @EnabledIfEnvironmentVariable(named = "AZURE_TEST_MODE", matches = "LIVE", disabledReason = "Test proxy network connection is timing out for this test in playback mode.") + @EnabledIfEnvironmentVariable( + named = "AZURE_TEST_MODE", + matches = "LIVE", + disabledReason = "Test proxy network connection is timing out for this test in playback mode.") public void testUploadLargeLogsProtocolMethod() { List logs = getObjects(375000); LogsIngestionAsyncClient client = clientBuilder.buildAsyncClient(); - StepVerifier.create(client.uploadWithResponse(dataCollectionRuleId, streamName, - BinaryData.fromObject(logs), new RequestOptions())) + StepVerifier + .create(client.uploadWithResponse(dataCollectionRuleId, streamName, BinaryData.fromObject(logs), + new RequestOptions())) .verifyErrorMatches(responseException -> (responseException instanceof HttpResponseException) && ((HttpResponseException) responseException).getResponse().getStatusCode() == 413); } diff --git a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientBuilderTest.java b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientBuilderTest.java index 8355f8bc1642d..fbefc46c635a2 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientBuilderTest.java +++ b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientBuilderTest.java @@ -14,25 +14,21 @@ public class LogsIngestionClientBuilderTest { @Test public void testBuilderWithoutEndpoint() { IllegalStateException ex = Assertions.assertThrows(IllegalStateException.class, - () -> new LogsIngestionClientBuilder().buildClient()); + () -> new LogsIngestionClientBuilder().buildClient()); Assertions.assertEquals("endpoint is required to build the client.", ex.getMessage()); } @Test public void testBuilderWithoutCredential() { IllegalStateException ex = Assertions.assertThrows(IllegalStateException.class, - () -> new LogsIngestionClientBuilder() - .endpoint("https://example.com") - .buildClient()); + () -> new LogsIngestionClientBuilder().endpoint("https://example.com").buildClient()); Assertions.assertEquals("credential is required to build the client.", ex.getMessage()); } @Test public void testBuilderWithInvalidEndpoint() { IllegalArgumentException ex = Assertions.assertThrows(IllegalArgumentException.class, - () -> new LogsIngestionClientBuilder() - .endpoint("example.com") - .buildClient()); + () -> new LogsIngestionClientBuilder().endpoint("example.com").buildClient()); Assertions.assertEquals("'endpoint' must be a valid URL.", ex.getMessage()); } diff --git a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientConcurrencyTest.java b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientConcurrencyTest.java index 806b910fc33ff..6cf1722091e91 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientConcurrencyTest.java +++ b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientConcurrencyTest.java @@ -41,6 +41,7 @@ public class LogsIngestionClientConcurrencyTest { private static final int LOGS_IN_BATCH = 9800; // approx private LogsIngestionClientBuilder clientBuilder; + @BeforeEach void beforeEach() { clientBuilder = new LogsIngestionClientBuilder() @@ -59,11 +60,11 @@ public void testUploadLogsInBatchesConcurrent() { clientBuilder.httpClient(http); LogsUploadOptions uploadOptions = new LogsUploadOptions().setMaxConcurrency(concurrency); - SyncAsyncExtension.execute( - () -> clientBuilder.buildClient().upload(RULE_ID, STREAM, logs, uploadOptions), + SyncAsyncExtension.execute(() -> clientBuilder.buildClient().upload(RULE_ID, STREAM, logs, uploadOptions), () -> clientBuilder.buildAsyncClient().upload(RULE_ID, STREAM, logs, uploadOptions)); assertEquals(batchCount, http.getCallsCount()); - assertTrue(http.getMaxConcurrentCalls() <= concurrency + 1, String.format("http.getMaxConcurrentCalls() = %s", http.getMaxConcurrentCalls())); + assertTrue(http.getMaxConcurrentCalls() <= concurrency + 1, + String.format("http.getMaxConcurrentCalls() = %s", http.getMaxConcurrentCalls())); } @Test @@ -76,12 +77,10 @@ public void testUploadLogsPartialFailureConcurrent() { clientBuilder.httpClient(http); LogsUploadOptions uploadOptions = new LogsUploadOptions().setMaxConcurrency(concurrency); - LogsIngestionClient client = clientBuilder - .httpClient(http) - .buildClient(); + LogsIngestionClient client = clientBuilder.httpClient(http).buildClient(); - LogsUploadException uploadLogsException = assertThrows(LogsUploadException.class, - () -> client.upload(RULE_ID, STREAM, logs, uploadOptions)); + LogsUploadException uploadLogsException + = assertThrows(LogsUploadException.class, () -> client.upload(RULE_ID, STREAM, logs, uploadOptions)); asserError(uploadLogsException); assertEquals(batchCount, http.getCallsCount()); @@ -96,10 +95,8 @@ public void testUploadLogsPartialFailureConcurrentAsync() { TestHttpClient http = new TestHttpClient(true); LogsUploadOptions uploadOptions = new LogsUploadOptions().setMaxConcurrency(concurrency); - StepVerifier.create(clientBuilder - .httpClient(http) - .buildAsyncClient() - .upload(RULE_ID, STREAM, logs, uploadOptions)) + StepVerifier + .create(clientBuilder.httpClient(http).buildAsyncClient().upload(RULE_ID, STREAM, logs, uploadOptions)) .consumeErrorWith(ex -> { assertTrue(ex instanceof LogsUploadException); asserError((LogsUploadException) ex); @@ -118,6 +115,7 @@ public class TestHttpClient extends NoOpHttpClient { private final AtomicInteger maxConcurrency; private final AtomicBoolean failSecondRequest; private final AtomicInteger counter; + public TestHttpClient(boolean failSecondRequest) { this.maxConcurrency = new AtomicInteger(); this.failSecondRequest = new AtomicBoolean(failSecondRequest); @@ -125,8 +123,7 @@ public TestHttpClient(boolean failSecondRequest) { } public Mono send(HttpRequest request) { - return Mono.delay(Duration.ofMillis(1)) - .map(l -> process(request)); + return Mono.delay(Duration.ofMillis(1)).map(l -> process(request)); } public HttpResponse sendSync(HttpRequest request, Context context) { @@ -136,6 +133,7 @@ public HttpResponse sendSync(HttpRequest request, Context context) { public int getCallsCount() { return counter.get(); } + private HttpResponse process(HttpRequest request) { int c = concurrentCalls.incrementAndGet(); if (c > maxConcurrency.get()) { diff --git a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientTest.java b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientTest.java index 367a166c47997..fdc41119be78b 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientTest.java +++ b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionClientTest.java @@ -41,10 +41,8 @@ public void testUploadLogsInBatches() { AtomicInteger count = new AtomicInteger(); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionClient client = clientBuilder - .addPolicy(new BatchCountPolicy(count)) - .addPolicy(logsCountPolicy) - .buildClient(); + LogsIngestionClient client + = clientBuilder.addPolicy(new BatchCountPolicy(count)).addPolicy(logsCountPolicy).buildClient(); client.upload(dataCollectionRuleId, streamName, logs); assertEquals(2, count.get()); assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount()); @@ -56,10 +54,8 @@ public void testUploadLogsInBatchesConcurrently() { AtomicInteger count = new AtomicInteger(); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionClient client = clientBuilder - .addPolicy(new BatchCountPolicy(count)) - .addPolicy(logsCountPolicy) - .buildClient(); + LogsIngestionClient client + = clientBuilder.addPolicy(new BatchCountPolicy(count)).addPolicy(logsCountPolicy).buildClient(); client.upload(dataCollectionRuleId, streamName, logs, new LogsUploadOptions().setMaxConcurrency(3)); assertEquals(2, count.get()); assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount()); @@ -73,10 +69,8 @@ public void testUploadLogsPartialFailure() { AtomicInteger count = new AtomicInteger(); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionClient client = clientBuilder - .addPolicy(new PartialFailurePolicy(count)) - .addPolicy(logsCountPolicy) - .buildClient(); + LogsIngestionClient client + = clientBuilder.addPolicy(new PartialFailurePolicy(count)).addPolicy(logsCountPolicy).buildClient(); LogsUploadException uploadLogsException = assertThrows(LogsUploadException.class, () -> { client.upload(dataCollectionRuleId, streamName, logs); @@ -98,10 +92,8 @@ public void testUploadLogsPartialFailureWithErrorHandler() { .setLogsUploadErrorConsumer(error -> failedLogsCount.addAndGet(error.getFailedLogs().size())); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionClient client = clientBuilder - .addPolicy(new PartialFailurePolicy(count)) - .addPolicy(logsCountPolicy) - .buildClient(); + LogsIngestionClient client + = clientBuilder.addPolicy(new PartialFailurePolicy(count)).addPolicy(logsCountPolicy).buildClient(); client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions); assertEquals(11, count.get()); @@ -116,20 +108,17 @@ public void testUploadLogsStopOnFirstError() { // Live Only, as it times out in CI playback mode. TODO: Re-record and update test base to exclude any sanitizers as needed. List logs = getObjects(100000); AtomicInteger count = new AtomicInteger(); - LogsUploadOptions logsUploadOptions = new LogsUploadOptions() - .setLogsUploadErrorConsumer(error -> { - // throw on first error - throw error.getResponseException(); - }); + LogsUploadOptions logsUploadOptions = new LogsUploadOptions().setLogsUploadErrorConsumer(error -> { + // throw on first error + throw error.getResponseException(); + }); LogsCountPolicy logsCountPolicy = new LogsCountPolicy(); - LogsIngestionClient client = clientBuilder - .addPolicy(new PartialFailurePolicy(count)) - .addPolicy(logsCountPolicy) - .buildClient(); + LogsIngestionClient client + = clientBuilder.addPolicy(new PartialFailurePolicy(count)).addPolicy(logsCountPolicy).buildClient(); - assertThrows(HttpResponseException.class, () -> client.upload(dataCollectionRuleId, streamName, logs, - logsUploadOptions)); + assertThrows(HttpResponseException.class, + () -> client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions)); assertEquals(2, count.get()); // only a subset of logs should be sent @@ -147,13 +136,16 @@ public void testUploadLogsProtocolMethod() { @Test @RecordWithoutRequestBody - @EnabledIfEnvironmentVariable(named = "AZURE_TEST_MODE", matches = "LIVE", disabledReason = "Test proxy network connection is timing out for this test in playback mode.") + @EnabledIfEnvironmentVariable( + named = "AZURE_TEST_MODE", + matches = "LIVE", + disabledReason = "Test proxy network connection is timing out for this test in playback mode.") public void testUploadLargeLogsProtocolMethod() { List logs = getObjects(375000); LogsIngestionClient client = clientBuilder.buildClient(); - HttpResponseException responseException = assertThrows(HttpResponseException.class, - () -> client.uploadWithResponse(dataCollectionRuleId, streamName, BinaryData.fromObject(logs), new RequestOptions())); + HttpResponseException responseException = assertThrows(HttpResponseException.class, () -> client + .uploadWithResponse(dataCollectionRuleId, streamName, BinaryData.fromObject(logs), new RequestOptions())); assertEquals(413, responseException.getResponse().getStatusCode()); } } diff --git a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionTestBase.java b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionTestBase.java index 41f894498cfb8..e758397fabac7 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionTestBase.java +++ b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/LogsIngestionTestBase.java @@ -57,34 +57,34 @@ public abstract class LogsIngestionTestBase extends TestProxyTestBase { @Override public void beforeTest() { - dataCollectionEndpoint = Configuration.getGlobalConfiguration().get("AZURE_MONITOR_DCE", "https://dce.monitor.azure.com"); - dataCollectionRuleId = Configuration.getGlobalConfiguration().get("AZURE_MONITOR_DCR_ID", "dcr-01584ffffeac4f7abbd4fbc24aa64130"); + dataCollectionEndpoint + = Configuration.getGlobalConfiguration().get("AZURE_MONITOR_DCE", "https://dce.monitor.azure.com"); + dataCollectionRuleId = Configuration.getGlobalConfiguration() + .get("AZURE_MONITOR_DCR_ID", "dcr-01584ffffeac4f7abbd4fbc24aa64130"); streamName = "Custom-MyTableRawData"; - LogsIngestionClientBuilder clientBuilder = new LogsIngestionClientBuilder() - .credential(getTestTokenCredential(interceptorManager)) - .retryPolicy(new RetryPolicy(new RetryStrategy() { - @Override - public int getMaxRetries() { - return 0; - } - - @Override - public Duration calculateRetryDelay(int i) { - return null; - } - })); + LogsIngestionClientBuilder clientBuilder + = new LogsIngestionClientBuilder().credential(getTestTokenCredential(interceptorManager)) + .retryPolicy(new RetryPolicy(new RetryStrategy() { + @Override + public int getMaxRetries() { + return 0; + } + + @Override + public Duration calculateRetryDelay(int i) { + return null; + } + })); if (getTestMode() == TestMode.PLAYBACK) { interceptorManager.addMatchers(Arrays.asList(new BodilessMatcher())); - clientBuilder - .httpClient(interceptorManager.getPlaybackClient()); + clientBuilder.httpClient(interceptorManager.getPlaybackClient()); } else if (getTestMode() == TestMode.RECORD) { - clientBuilder - .addPolicy(interceptorManager.getRecordPolicy()); + clientBuilder.addPolicy(interceptorManager.getRecordPolicy()); } - this.clientBuilder = clientBuilder - .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS)) - .endpoint(dataCollectionEndpoint); + this.clientBuilder + = clientBuilder.httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS)) + .endpoint(dataCollectionEndpoint); } public class BatchCountPolicy implements HttpPipelinePolicy { @@ -135,8 +135,7 @@ public HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNex private void process(HttpPipelineCallContext context) { counter.incrementAndGet(); if (changeDcrId.get()) { - String url = context.getHttpRequest().getUrl().toString() - .replace(dataCollectionRuleId, "dcr-id"); + String url = context.getHttpRequest().getUrl().toString().replace(dataCollectionRuleId, "dcr-id"); context.getHttpRequest().setUrl(url); changeDcrId.set(false); } else { @@ -154,8 +153,7 @@ public static List getObjects(int logsCount) { List logs = new ArrayList<>(); for (int i = 0; i < logsCount; i++) { - LogData logData = new LogData() - .setTime(OffsetDateTime.parse("2022-01-01T00:00:00+07:00")) + LogData logData = new LogData().setTime(OffsetDateTime.parse("2022-01-01T00:00:00+07:00")) .setExtendedColumn("test" + i) .setAdditionalContext("additional logs context"); logs.add(logData); @@ -168,12 +166,14 @@ public static class LogsCountPolicy implements HttpPipelinePolicy { private AtomicLong totalLogsCount = new AtomicLong(); @Override - public Mono process(HttpPipelineCallContext httpPipelineCallContext, HttpPipelineNextPolicy httpPipelineNextPolicy) { + public Mono process(HttpPipelineCallContext httpPipelineCallContext, + HttpPipelineNextPolicy httpPipelineNextPolicy) { BinaryData bodyAsBinaryData = httpPipelineCallContext.getHttpRequest().getBodyAsBinaryData(); byte[] requestBytes = unzipRequestBody(bodyAsBinaryData); List logs = JsonSerializerProviders.createInstance(true) - .deserializeFromBytes(requestBytes, new TypeReference>() { }); + .deserializeFromBytes(requestBytes, new TypeReference>() { + }); totalLogsCount.addAndGet(logs.size()); return httpPipelineNextPolicy.process(); } @@ -193,7 +193,8 @@ public DataValidationPolicy(List inputData) { } @Override - public Mono process(HttpPipelineCallContext httpPipelineCallContext, HttpPipelineNextPolicy httpPipelineNextPolicy) { + public Mono process(HttpPipelineCallContext httpPipelineCallContext, + HttpPipelineNextPolicy httpPipelineNextPolicy) { BinaryData bodyAsBinaryData = httpPipelineCallContext.getHttpRequest().getBodyAsBinaryData(); String actualJson = new String(unzipRequestBody(bodyAsBinaryData)); assertEquals(expectedJson, actualJson); @@ -222,13 +223,12 @@ private static byte[] unzipRequestBody(BinaryData bodyAsBinaryData) { public static TokenCredential getTestTokenCredential(InterceptorManager interceptorManager) { if (interceptorManager.isLiveMode()) { Configuration config = Configuration.getGlobalConfiguration(); - String serviceConnectionId = config.get("AZURESUBSCRIPTION_SERVICE_CONNECTION_ID"); + String serviceConnectionId = config.get("AZURESUBSCRIPTION_SERVICE_CONNECTION_ID"); String clientId = config.get("AZURESUBSCRIPTION_CLIENT_ID"); String tenantId = config.get("AZURESUBSCRIPTION_TENANT_ID"); String systemAccessToken = config.get("SYSTEM_ACCESSTOKEN"); - return new AzurePipelinesCredentialBuilder() - .systemAccessToken(systemAccessToken) + return new AzurePipelinesCredentialBuilder().systemAccessToken(systemAccessToken) .clientId(clientId) .tenantId(tenantId) .serviceConnectionId(serviceConnectionId) diff --git a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/ConcurrencyLimitingSpliteratorTest.java b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/ConcurrencyLimitingSpliteratorTest.java index 16ef6b4904be8..ea478eed406e0 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/ConcurrencyLimitingSpliteratorTest.java +++ b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/ConcurrencyLimitingSpliteratorTest.java @@ -36,35 +36,33 @@ public class ConcurrencyLimitingSpliteratorTest { @Test public void invalidParams() { assertThrows(NullPointerException.class, () -> new ConcurrencyLimitingSpliterator(null, 1)); - assertThrows(IllegalArgumentException.class, () -> new ConcurrencyLimitingSpliterator<>(Arrays.asList(1, 2, 3).iterator(), 0)); + assertThrows(IllegalArgumentException.class, + () -> new ConcurrencyLimitingSpliterator<>(Arrays.asList(1, 2, 3).iterator(), 0)); } @ParameterizedTest - @ValueSource(ints = {1, 2, 4, 5, 7, 11, 15}) + @ValueSource(ints = { 1, 2, 4, 5, 7, 11, 15 }) public void concurrentCalls(int concurrency) throws ExecutionException, InterruptedException { assumeTrue(Runtime.getRuntime().availableProcessors() > concurrency); List list = IntStream.range(0, 11).boxed().collect(Collectors.toList()); - ConcurrencyLimitingSpliterator spliterator = new ConcurrencyLimitingSpliterator<>(list.iterator(), concurrency); + ConcurrencyLimitingSpliterator spliterator + = new ConcurrencyLimitingSpliterator<>(list.iterator(), concurrency); Stream stream = StreamSupport.stream(spliterator, true); int effectiveConcurrency = Math.min(list.size(), concurrency); CountDownLatch latch = new CountDownLatch(effectiveConcurrency); - List processed = TEST_THREAD_POOL - .submit(() -> - stream.map(r -> { - latch.countDown(); - try { - Thread.sleep(10); - assertTrue(latch.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - fail("countdown await interrupted"); - } - return r; - }) - .collect(Collectors.toList()) - ).get(); + List processed = TEST_THREAD_POOL.submit(() -> stream.map(r -> { + latch.countDown(); + try { + Thread.sleep(10); + assertTrue(latch.await(TEST_TIMEOUT_SEC, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + fail("countdown await interrupted"); + } + return r; + }).collect(Collectors.toList())).get(); assertArrayEquals(list.toArray(), processed.stream().sorted().toArray()); } @@ -73,32 +71,29 @@ public void concurrentCalls(int concurrency) throws ExecutionException, Interrup public void concurrencyHigherThanItemsCount() throws ExecutionException, InterruptedException { int concurrency = 100; List list = IntStream.range(0, 7).boxed().collect(Collectors.toList()); - ConcurrencyLimitingSpliterator spliterator = new ConcurrencyLimitingSpliterator<>(list.iterator(), concurrency); + ConcurrencyLimitingSpliterator spliterator + = new ConcurrencyLimitingSpliterator<>(list.iterator(), concurrency); Stream stream = StreamSupport.stream(spliterator, true); AtomicInteger parallel = new AtomicInteger(0); AtomicInteger maxParallel = new AtomicInteger(0); - List processed = TEST_THREAD_POOL - .submit(() -> - stream.map(r -> { - int cur = parallel.incrementAndGet(); - int curMax = maxParallel.get(); - while (cur > curMax && !maxParallel.compareAndSet(curMax, cur)) { - curMax = maxParallel.get(); - } - - try { - Thread.sleep(50); - } catch (InterruptedException e) { - fail("timeout"); - } - - parallel.decrementAndGet(); - return r; - }) - .collect(Collectors.toList()) - ).get(); + List processed = TEST_THREAD_POOL.submit(() -> stream.map(r -> { + int cur = parallel.incrementAndGet(); + int curMax = maxParallel.get(); + while (cur > curMax && !maxParallel.compareAndSet(curMax, cur)) { + curMax = maxParallel.get(); + } + + try { + Thread.sleep(50); + } catch (InterruptedException e) { + fail("timeout"); + } + + parallel.decrementAndGet(); + return r; + }).collect(Collectors.toList())).get(); assertTrue(maxParallel.get() <= list.size()); assertArrayEquals(list.toArray(), processed.stream().sorted().toArray()); diff --git a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/UtilsTest.java b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/UtilsTest.java index afc99a8bb5abf..41e1b80a36daf 100644 --- a/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/UtilsTest.java +++ b/sdk/monitor/azure-monitor-ingestion/src/test/java/com/azure/monitor/ingestion/implementation/UtilsTest.java @@ -26,10 +26,7 @@ public void shutdownHookTerminatesPool() throws InterruptedException, ExecutionE ExecutorService threadPool = Executors.newFixedThreadPool(1); Thread hook = Utils.registerShutdownHook(threadPool, timeoutSec); - Stream stream = IntStream.of(100, 4000) - .boxed() - .parallel() - .map(this::task); + Stream stream = IntStream.of(100, 4000).boxed().parallel().map(this::task); Future> tasks = threadPool.submit(() -> stream.collect(Collectors.toList())); @@ -37,9 +34,10 @@ public void shutdownHookTerminatesPool() throws InterruptedException, ExecutionE assertTrue(threadPool.isShutdown()); assertTrue(threadPool.isTerminated()); - assertArrayEquals(new Integer[] {100, -1}, tasks.get().toArray()); + assertArrayEquals(new Integer[] { 100, -1 }, tasks.get().toArray()); - assertThrows(RejectedExecutionException.class, () -> threadPool.submit(() -> stream.collect(Collectors.toList()))); + assertThrows(RejectedExecutionException.class, + () -> threadPool.submit(() -> stream.collect(Collectors.toList()))); } @Test