Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

After dispose a r2dbc flux , which execute a select sql for many rows, the underlying query still exist and fetch data from db #247

Open
gongjinglong opened this issue Jan 3, 2023 · 0 comments

Comments

@gongjinglong
Copy link

  1. The scenario
    I use r2dbc to read many rows, and need to stop reading when some condition happened. I want to use Disposable::dispose() to achieve this purpose, but i encountered a problem. Below are the example code:
	private static final void testDispose_DatabaseClient(ConfigurableApplicationContext context) throws InterruptedException {
		DatabaseClient databaseClient = context.getBean(DatabaseClient.class);

		AtomicLong readCount = new AtomicLong();

		String sql = "select * from `t_trade_ext_for_update` WHERE id <=200*10000";
		Disposable disposable = databaseClient
				.sql(sql)
				.map(r -> {
					int id = r.get("id", Long.class).intValue();
					TradeExt tradeExt = new TradeExt(id);
					return tradeExt;
				})
				.all()
				.log((String) null, Level.INFO, false, Stream.of(SignalType.values()).filter(t -> t != SignalType.ON_NEXT).toArray(SignalType[]::new))
				.subscribe(t->{
					readCount.incrementAndGet();
					if (t.getId() % 10000 == 0) {
						logger.info("item: {}", t.getId());
					}
				});

		// dispose the r2dbc stream in another thread if some condition happened
		Thread disposeThread = new Thread(new Runnable() {
			@SneakyThrows
			@Override
			public void run() {
				// a mock condition on which need to stop the flux
				while (readCount.get() <= 10*10000) {
					Thread.sleep(500);
				}
				logger.info("################### call dispose ###########");
				disposable.dispose();
				logger.info("################### disposed ###########");
			}
		}, "dispose-thread");
		disposeThread.start();

		// sleep some time to avoid jvm exit
		Thread.sleep(600 * 1000);
	}

  1. The problem
    After disposable.dispose() executed, I run show processlist on the db, and I found the sql is still running, how can i completely stop it ?
    I also debug the driver my code, i found that, the flux is running with the below stack:
	...
	at reactor.core.publisher.Operators.onDiscard(Operators.java:438)
	at dev.miku.r2dbc.mysql.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:72)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:668)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:746)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.onNext(FluxWindowPredicate.java:788)
	at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onNext(FluxWindowPredicate.java:266)
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at dev.miku.r2dbc.mysql.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:70)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
	at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
	at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
	at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:265)
	at dev.miku.r2dbc.mysql.client.ReactorNettyClient$ResponseSink.next(ReactorNettyClient.java:340)
	at dev.miku.r2dbc.mysql.client.ReactorNettyClient.lambda$new$0(ReactorNettyClient.java:103)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.handleDecoded(MessageDuplexCodec.java:187)
	at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.channelRead(MessageDuplexCodec.java:95)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

I doubt the problem is related to DiscardOnCancelSubscriber, but i‘m not clear about that.

  1. Furthermore
  • I tried calling the connection's close() within doCancel and doFatally, but can not solve the problem.
  • I tried using r2dbc-mariadb 1.0.3 as alternative driver, a similar phenomenon occurred.

So is this a bug of our driver, or perhaps it's more likely that I'm using it wrong.
thanks!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant