Skip to content

Commit

Permalink
rerun maven in monitor sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
mssfang committed Dec 3, 2024
1 parent e605e30 commit 88ec9d7
Show file tree
Hide file tree
Showing 15 changed files with 171 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ private void processResponse(LogsUploadOptions options, UploadLogsResponseHolder
uploadLogsErrorConsumer = options.getLogsUploadErrorConsumer();
}
if (uploadLogsErrorConsumer != null) {
uploadLogsErrorConsumer.accept(
new LogsUploadError(responseHolder.getException(), responseHolder.getRequest().getLogs()));
uploadLogsErrorConsumer
.accept(new LogsUploadError(responseHolder.getException(), responseHolder.getRequest().getLogs()));
return;
}
// emit the responseHolder without the original logs only if there's an error and there's no
Expand All @@ -304,10 +304,10 @@ private void processResponse(LogsUploadOptions options, UploadLogsResponseHolder

private Mono<UploadLogsResponseHolder> uploadToService(String ruleId, String streamName, Context context,
LogsIngestionRequest request) {
RequestOptions requestOptions = new RequestOptions().addHeader(HttpHeaderName.CONTENT_ENCODING, GZIP)
.setContext(context);
return service.uploadWithResponse(ruleId, streamName, BinaryData.fromBytes(request.getRequestBody()),
requestOptions)
RequestOptions requestOptions
= new RequestOptions().addHeader(HttpHeaderName.CONTENT_ENCODING, GZIP).setContext(context);
return service
.uploadWithResponse(ruleId, streamName, BinaryData.fromBytes(request.getRequestBody()), requestOptions)
.map(response -> new UploadLogsResponseHolder(null, null))
.onErrorResume(HttpResponseException.class,
ex -> Mono.fromSupplier(() -> new UploadLogsResponseHolder(request, ex)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,21 @@ public void upload(String ruleId, String streamName, Iterable<Object> logs, Logs
Objects.requireNonNull(streamName, "'streamName' cannot be null.");
Objects.requireNonNull(logs, "'logs' cannot be null.");

Consumer<LogsUploadError> uploadLogsErrorConsumer = options == null
? null
: options.getLogsUploadErrorConsumer();
Consumer<LogsUploadError> uploadLogsErrorConsumer
= options == null ? null : options.getLogsUploadErrorConsumer();

RequestOptions requestOptions = new RequestOptions();
requestOptions.addHeader(HttpHeaderName.CONTENT_ENCODING, GZIP);
requestOptions.setContext(context);

Stream<UploadLogsResponseHolder> responses = new Batcher(options, logs).toStream()
.map(r -> uploadToService(ruleId, streamName, requestOptions, r));
Stream<UploadLogsResponseHolder> responses
= new Batcher(options, logs).toStream().map(r -> uploadToService(ruleId, streamName, requestOptions, r));

responses = submit(responses, getConcurrency(options)).filter(response -> response.getException() != null);

if (uploadLogsErrorConsumer != null) {
responses.forEach(response -> uploadLogsErrorConsumer.accept(
new LogsUploadError(response.getException(), response.getRequest().getLogs())));
responses.forEach(response -> uploadLogsErrorConsumer
.accept(new LogsUploadError(response.getException(), response.getRequest().getLogs())));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public LogsIngestionClientBuilder endpoint(String endpoint) {
this.endpoint = endpoint;
return this;
} catch (MalformedURLException exception) {
throw LOGGER.logExceptionAsError(
new IllegalArgumentException("'endpoint' must be a valid URL.", exception));
throw LOGGER
.logExceptionAsError(new IllegalArgumentException("'endpoint' must be a valid URL.", exception));
}
}

Expand Down Expand Up @@ -204,8 +204,8 @@ public LogsIngestionClientBuilder clientOptions(ClientOptions clientOptions) {
* @return the updated {@link LogsIngestionClientBuilder}.
*/
public LogsIngestionClientBuilder serviceVersion(LogsIngestionServiceVersion serviceVersion) {
innerLogBuilder.serviceVersion(
IngestionUsingDataCollectionRulesServiceVersion.valueOf(serviceVersion.getVersion()));
innerLogBuilder
.serviceVersion(IngestionUsingDataCollectionRulesServiceVersion.valueOf(serviceVersion.getVersion()));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public LogsIngestionRequest next() {
*/
public Stream<LogsIngestionRequest> toStream() {
if (concurrency == 1) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(this, Spliterator.NONNULL | Spliterator.ORDERED), false);
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(this, Spliterator.NONNULL | Spliterator.ORDERED), false);
}

return StreamSupport.stream(new ConcurrencyLimitingSpliterator<>(this, concurrency), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private HttpPipeline createHttpPipeline() {
if (headers.getSize() > 0) {
policies.add(new AddHeadersPolicy(headers));
}
this.pipelinePolicies.stream().filter(p -> p.getPipelinePosition() == HttpPipelinePosition.PER_CALL)
this.pipelinePolicies.stream()
.filter(p -> p.getPipelinePosition() == HttpPipelinePosition.PER_CALL)
.forEach(p -> policies.add(p));
HttpPolicyProviders.addBeforeRetryPolicies(policies);
policies.add(ClientBuilderUtil.validateAndGetRetryPolicy(retryPolicy, retryOptions, new RetryPolicy()));
Expand All @@ -298,12 +299,15 @@ private HttpPipeline createHttpPipeline() {
policies.add(new BearerTokenAuthenticationPolicy(tokenCredential,
audience == null ? DEFAULT_SCOPES : new String[] { audience.toString() }));
}
this.pipelinePolicies.stream().filter(p -> p.getPipelinePosition() == HttpPipelinePosition.PER_RETRY)
this.pipelinePolicies.stream()
.filter(p -> p.getPipelinePosition() == HttpPipelinePosition.PER_RETRY)
.forEach(p -> policies.add(p));
HttpPolicyProviders.addAfterRetryPolicies(policies);
policies.add(new HttpLoggingPolicy(localHttpLogOptions));
HttpPipeline httpPipeline = new HttpPipelineBuilder().policies(policies.toArray(new HttpPipelinePolicy[0]))
.httpClient(httpClient).clientOptions(localClientOptions).build();
.httpClient(httpClient)
.clientOptions(localClientOptions)
.build();
return httpPipeline;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static int getConcurrency(LogsUploadOptions options) {
return options.getMaxConcurrency();
}

return 1;
return 1;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

module com.azure.monitor.ingestion {
requires transitive com.azure.core;

exports com.azure.monitor.ingestion;
exports com.azure.monitor.ingestion.models;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,9 @@ public class LogData {
@JsonProperty(value = "Time")
private OffsetDateTime time;


@JsonProperty(value = "ExtendedColumn")
private String extendedColumn;


@JsonProperty(value = "AdditionalContext")
private String additionalContext;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public void testUploadLogs() {
DataValidationPolicy dataValidationPolicy = new DataValidationPolicy(logs);

LogsIngestionAsyncClient client = clientBuilder.addPolicy(dataValidationPolicy).buildAsyncClient();
StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs))
.verifyComplete();
StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)).verifyComplete();
}

@Test
Expand All @@ -43,13 +42,10 @@ public void testUploadLogsInBatches() {
AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionAsyncClient client = clientBuilder
.addPolicy(logsCountPolicy)
.addPolicy(new BatchCountPolicy(count))
.buildAsyncClient();
LogsIngestionAsyncClient client
= clientBuilder.addPolicy(logsCountPolicy).addPolicy(new BatchCountPolicy(count)).buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs))
.verifyComplete();
StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)).verifyComplete();

assertEquals(2, count.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
Expand All @@ -61,11 +57,10 @@ public void testUploadLogsInBatchesConcurrently() {

AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();
LogsIngestionAsyncClient client = clientBuilder
.addPolicy(new BatchCountPolicy(count))
.addPolicy(logsCountPolicy)
.buildAsyncClient();
StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, new LogsUploadOptions().setMaxConcurrency(3)))
LogsIngestionAsyncClient client
= clientBuilder.addPolicy(new BatchCountPolicy(count)).addPolicy(logsCountPolicy).buildAsyncClient();
StepVerifier
.create(client.upload(dataCollectionRuleId, streamName, logs, new LogsUploadOptions().setMaxConcurrency(3)))
.verifyComplete();
assertEquals(2, count.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
Expand All @@ -79,20 +74,17 @@ public void testUploadLogsPartialFailure() {
AtomicInteger count = new AtomicInteger();
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionAsyncClient client = clientBuilder
.addPolicy(logsCountPolicy)
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs))
.verifyErrorSatisfies(error -> {
assertTrue(error instanceof LogsUploadException);
if (error instanceof LogsUploadException) {
LogsUploadException ex = (LogsUploadException) error;
assertEquals(49460, ex.getFailedLogsCount());
assertEquals(5, ex.getLogsUploadErrors().size());
}
});
LogsIngestionAsyncClient client
= clientBuilder.addPolicy(logsCountPolicy).addPolicy(new PartialFailurePolicy(count)).buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs)).verifyErrorSatisfies(error -> {
assertTrue(error instanceof LogsUploadException);
if (error instanceof LogsUploadException) {
LogsUploadException ex = (LogsUploadException) error;
assertEquals(49460, ex.getFailedLogsCount());
assertEquals(5, ex.getLogsUploadErrors().size());
}
});
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
}

Expand All @@ -107,13 +99,10 @@ public void testUploadLogsPartialFailureWithErrorHandler() {
.setLogsUploadErrorConsumer(error -> failedLogsCount.addAndGet(error.getFailedLogs().size()));
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionAsyncClient client = clientBuilder
.addPolicy(logsCountPolicy)
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();
LogsIngestionAsyncClient client
= clientBuilder.addPolicy(logsCountPolicy).addPolicy(new PartialFailurePolicy(count)).buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions))
.verifyComplete();
StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions)).verifyComplete();
assertEquals(49460, failedLogsCount.get());
assertEquals(11, count.get());
assertEquals(logs.size(), logsCountPolicy.getTotalLogsCount());
Expand All @@ -125,17 +114,14 @@ public void testUploadLogsStopOnFirstError() {
// Live Only, as it times out in CI playback mode. TODO: Re-record and update test base to exclude any sanitizers as needed.
List<Object> logs = getObjects(100000);
AtomicInteger count = new AtomicInteger();
LogsUploadOptions logsUploadOptions = new LogsUploadOptions()
.setLogsUploadErrorConsumer(error -> {
// throw on first error
throw error.getResponseException();
});
LogsUploadOptions logsUploadOptions = new LogsUploadOptions().setLogsUploadErrorConsumer(error -> {
// throw on first error
throw error.getResponseException();
});
LogsCountPolicy logsCountPolicy = new LogsCountPolicy();

LogsIngestionAsyncClient client = clientBuilder
.addPolicy(logsCountPolicy)
.addPolicy(new PartialFailurePolicy(count))
.buildAsyncClient();
LogsIngestionAsyncClient client
= clientBuilder.addPolicy(logsCountPolicy).addPolicy(new PartialFailurePolicy(count)).buildAsyncClient();

StepVerifier.create(client.upload(dataCollectionRuleId, streamName, logs, logsUploadOptions))
.verifyErrorSatisfies(ex -> assertTrue(ex instanceof HttpResponseException));
Expand All @@ -148,20 +134,25 @@ public void testUploadLogsStopOnFirstError() {
public void testUploadLogsProtocolMethod() {
List<Object> logs = getObjects(10);
LogsIngestionAsyncClient client = clientBuilder.buildAsyncClient();
StepVerifier.create(client.uploadWithResponse(dataCollectionRuleId, streamName,
BinaryData.fromObject(logs), new RequestOptions()))
StepVerifier
.create(client.uploadWithResponse(dataCollectionRuleId, streamName, BinaryData.fromObject(logs),
new RequestOptions()))
.assertNext(response -> assertEquals(204, response.getStatusCode()))
.verifyComplete();
}

@Test
@RecordWithoutRequestBody
@EnabledIfEnvironmentVariable(named = "AZURE_TEST_MODE", matches = "LIVE", disabledReason = "Test proxy network connection is timing out for this test in playback mode.")
@EnabledIfEnvironmentVariable(
named = "AZURE_TEST_MODE",
matches = "LIVE",
disabledReason = "Test proxy network connection is timing out for this test in playback mode.")
public void testUploadLargeLogsProtocolMethod() {
List<Object> logs = getObjects(375000);
LogsIngestionAsyncClient client = clientBuilder.buildAsyncClient();
StepVerifier.create(client.uploadWithResponse(dataCollectionRuleId, streamName,
BinaryData.fromObject(logs), new RequestOptions()))
StepVerifier
.create(client.uploadWithResponse(dataCollectionRuleId, streamName, BinaryData.fromObject(logs),
new RequestOptions()))
.verifyErrorMatches(responseException -> (responseException instanceof HttpResponseException)
&& ((HttpResponseException) responseException).getResponse().getStatusCode() == 413);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,21 @@ public class LogsIngestionClientBuilderTest {
@Test
public void testBuilderWithoutEndpoint() {
IllegalStateException ex = Assertions.assertThrows(IllegalStateException.class,
() -> new LogsIngestionClientBuilder().buildClient());
() -> new LogsIngestionClientBuilder().buildClient());
Assertions.assertEquals("endpoint is required to build the client.", ex.getMessage());
}

@Test
public void testBuilderWithoutCredential() {
IllegalStateException ex = Assertions.assertThrows(IllegalStateException.class,
() -> new LogsIngestionClientBuilder()
.endpoint("https://example.com")
.buildClient());
() -> new LogsIngestionClientBuilder().endpoint("https://example.com").buildClient());
Assertions.assertEquals("credential is required to build the client.", ex.getMessage());
}

@Test
public void testBuilderWithInvalidEndpoint() {
IllegalArgumentException ex = Assertions.assertThrows(IllegalArgumentException.class,
() -> new LogsIngestionClientBuilder()
.endpoint("example.com")
.buildClient());
() -> new LogsIngestionClientBuilder().endpoint("example.com").buildClient());
Assertions.assertEquals("'endpoint' must be a valid URL.", ex.getMessage());
}

Expand Down
Loading

0 comments on commit 88ec9d7

Please sign in to comment.