You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
PLEASE READ: If you have a support contract with Google, please create an issue in the support console instead of filing on GitHub. This will ensure
a timely response.
We'd love to accept your patches and contributions to this project. There are just a few small
guidelines you need to follow before opening an issue or a PR:
Ensure the issue was not already reported.
Open a new issue if you are unable to find an existing issue addressing your problem. Make sure
to include a title and clear description, as much relevant information as possible, and a code
sample or an executable test case demonstrating the expected behavior that is not occurring.
Discuss the priority and potential solutions with the maintainers in the issue. The maintainers
would review the issue and add a label "Accepting Contributions" once the issue is ready for
accepting contributions.
Open a PR only if the issue is labeled with "Accepting Contributions", ensure the PR description
clearly describes the problem and solution. Note that an open PR without an issue labeled with "
Accepting Contributions" will not be accepted.
Describe the bug
I am using PubSubReactiveFactory route to stream messages. They sometimes work for few minutes to an hour, and after sometime, nothing gets picked up. If I restart the K8 pod running the code, it starts working again and the behavior repeats.
Sample Code
@Configuration
@Slf4j
public class PubSubConsumer {
@Autowired
private final PubSubReactiveFactory pubSubReactiveFactory;
private final MyConfig myConfig;
private final MySubscriberService mySubscriberService;
@Autowired
private final ObjectMapper objectMapper;
private Disposable disposable;
@Autowired
public PubSubConsumer(PubSubReactiveFactory pubSubReactiveFactory, MyConfig myConfig, mySubscriberService mySubscriberService, ObjectMapper objectMapper) {
log.info("PubSubConsumer constructor");
this.pubSubReactiveFactory = pubSubReactiveFactory;
this.myConfig = myConfig;
this.mySubscriberService = mySubscriberService;
this.objectMapper = objectMapper;
}
@EventListener(ApplicationReadyEvent.class)
public void start() {
log.info("Staring the My-Event-Listener");
this.disposable = this.pubSubReactiveFactory.poll(myConfig.getMySubscription(),1000)
.flatMap(this::myMessageHandler)
.subscribeOn(Schedulers.parallel())
.subscribe();
}
public Mono<Void> myMessageHandler(AcknowledgeablePubsubMessage message) {
log.debug("myMessageHandler is called.");
PubsubMessage m = message.getPubsubMessage();
try {
MyMessage myMessage = objectMapper
.readValue(m.getData().toByteArray(), MyMessage.class);
mySubscriberService.myMessageReceiver(myMessage);
} catch (Exception e) {
log.error("Could not finish the action. {}",e.getMessage());
} finally {
return Mono.fromFuture(message.ack().toCompletableFuture());
}
}
@EventListener(ContextClosedEvent.class)
public void stop() {
log.info("Stopping My-Event-Listener");
if(this.disposable!=null && !this.disposable.isDisposed()) {
this.disposable.dispose();
}
}
The text was updated successfully, but these errors were encountered:
Thanks for filing this issue @ksachdev1! Please consider submitting a support ticket to https://cloud.google.com/support/ to help with debugging this issue further. This will ensure your situation gets attention and may result in guidance on how to achieve your end goal.
PLEASE READ: If you have a support contract with Google, please create an issue in the
support console instead of filing on GitHub. This will ensure
a timely response.
We'd love to accept your patches and contributions to this project. There are just a few small
guidelines you need to follow before opening an issue or a PR:
to include a title and clear description, as much relevant information as possible, and a code
sample or an executable test case demonstrating the expected behavior that is not occurring.
would review the issue and add a label "Accepting Contributions" once the issue is ready for
accepting contributions.
clearly describes the problem and solution. Note that an open PR without an issue labeled with "
Accepting Contributions" will not be accepted.
See
also CONTRIBUTING.md
.
Describe the bug
I am using PubSubReactiveFactory route to stream messages. They sometimes work for few minutes to an hour, and after sometime, nothing gets picked up. If I restart the K8 pod running the code, it starts working again and the behavior repeats.
Sample Code
The text was updated successfully, but these errors were encountered: