From 6096f0869f2293699d4ad8fe6499c1042416d65a Mon Sep 17 00:00:00 2001 From: ggke Date: Fri, 9 Oct 2020 18:54:08 +0800 Subject: [PATCH] KYLIN-4771 Clear the recordCachePool when the deadline has reached;add timeout for the recordCachePool offer method. --- .../stream/core/query/MultiThreadsResultCollector.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java index 0ca08e4b2b5..f0c91fd6ee2 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java @@ -89,6 +89,7 @@ public boolean hasNext() { if (System.currentTimeMillis() > deadline) { masterThread.interrupt(); // notify main thread cancelFlag.set(true); + recordCachePool.clear(); logger.warn("Beyond the deadline for {}.", queryId); throw new RuntimeException("Timeout when iterate search result"); } @@ -112,6 +113,7 @@ public Record next() { if (one == null) { masterThread.interrupt(); // notify main thread cancelFlag.set(true); + recordCachePool.clear(); logger.debug("Exceeded the deadline for {}.", queryId); throw new RuntimeException("Timeout when iterate search result"); } @@ -141,10 +143,15 @@ public ResultIterateWorker(IStreamingSearchResult result) { @Override public void run() { + long offserTimeout = 0L; try { result.startRead(); for (Record record : result) { - recordCachePool.put(record.copy()); + offserTimeout = deadline - System.currentTimeMillis(); + if (!recordCachePool.offer(record, offserTimeout, TimeUnit.MILLISECONDS)) { + logger.warn("Timeout when offer to recordCachePool, deadline: {}, offser Timeout: {}", deadline, offserTimeout); + break; + } } result.endRead(); } catch (InterruptedException inter) {