Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.IllegalStateException: The underlying HTTP client completed without emitting a response. #1

Open
kumar-csice opened this issue Nov 6, 2024 · 0 comments

Comments

@kumar-csice
Copy link

kumar-csice commented Nov 6, 2024

Getting this issue only when we migrate from spring-web-flux-5.3.22 to spring-web-flux-6.1.11. Actually our service class calls triggerAsyncJobs() method and that should be triggered only after 10 sec but the service layer/calling method must not wait for this response.

The below code works without any issue with spring-web-flux-5.3.22 i.e. the service layer calls this triggerAsyncJobs() method and not waiting for the result of it and immediately return response back to controller and after 10 sec this triggerTelemetryJob() method executed successfully but with spring-web-flux-6.1.11 not at all working i.e. controller immediately gets the response and after 10 seconds we are getting the issue. Looks like there are many changes in DefaultWebClient class

Note: No issue with triggerTelemetryJob() method and no issues with upstream service as well as we already tested it by calling directly triggerTelemetryJob() method from the service layer.

This is our code:

@OverRide
public void triggerAsyncJobs(DeviceDTO device, String token) {

	Mono.delay(Duration.ofSeconds(10)).flatMap(ignore -> {
		LoggingUtils.info(log, "Triggering post onboarding Async jobs for device id : {}", device.getDeviceId());
		return triggerTelemetryJob(device, token)
				.doOnError(ex -> LoggingUtils.error(log, "Exception occurred while running Async jobs", ex));
	}).subscribe();
}

public Mono triggerTelemetryJob(DeviceDTO device, String token) {

	Mono<List<DeviceJobDefinitionDTO>> deviceJobDefinitions = deviceCommunicationServiceHelper
			.fetchDeviceJobDefinitionsFromDMS(device.getDeviceDefinition().getDeviceDefinitionId(),
					device.getDeviceDefinition().getProductType(), JobType.TELEMETRY.getValue(), token);
	Mono<List<JobExecutionDTO>> jobExecutionsMono = deviceJobDefinitions.flatMap(definitions -> {
		if (ObjectUtils.isEmpty(definitions)) {
			throw new BffGraphqlException(HttpStatus.NOT_FOUND,
					DCSErrorInfoConstant.ERR_DCS_NO_JOB_DEFINITION_FOUND, null);
		}
		JobExecutionDTO createJobExecutionDTO = deviceCommunicationServiceHelper.createJobExecutionDTO(
				definitions.get(0), JobType.TELEMETRY, device.getDeviceId().toString(), true);
		return deviceCommunicationServiceHelper.saveJobExecutions(Arrays.asList(createJobExecutionDTO), token);
	});
	Mono<List<String>> triggerJobResponse = jobExecutionsMono.flatMapMany(Flux::fromIterable)
			.map(JobExecutionDTO::getJobExecutionId)
			.flatMap(jobExecId -> deviceCommunicationServiceHelper.triggerJobExecution(jobExecId, token))
			.collectList();
	return triggerJobResponse.map(triggerResponse -> {
		
		return DCSErrorInfoConstant.INFO_DCS_JOB_TRIGGERED_SUCCESSFULLY;
	}).thenReturn(true);
}

Stack trace
java.lang.IllegalStateException: The underlying HTTP client completed without emitting a response.
at org.springframework.web.reactive.function.client.DefaultWebClient.lambda$static$0(DefaultWebClient.java:78) ~[spring-webflux-6.1.11.jar:6.1.11]
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:189) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:189) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onComplete(FluxFilter.java:166) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2573) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.request(FluxMap.java:295) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:649) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2367) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2241) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:633) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onSubscribe(FluxFilter.java:85) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onSubscribe(FluxMap.java:194) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.Mono.subscribe(Mono.java:4568) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.6.8.jar:3.6.8]
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.6.8.jar:3.6.8]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant