Skip to content

Commit

Permalink
fix bug that SharedQueryBlockInputStream may loss block randomly (#2759
Browse files Browse the repository at this point in the history
…) (#2763)
ti-chi-bot authored Aug 25, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent ca0ee5a commit 2e3b82e
Showing 1 changed file with 9 additions and 16 deletions.
25 changes: 9 additions & 16 deletions dbms/src/DataStreams/SharedQueryBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#pragma once

#include <thread>

#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadFactory.h>
#include <common/logger_useful.h>
#include <Common/typeid_cast.h>

#include <DataStreams/IProfilingBlockInputStream.h>
#include <common/logger_useful.h>

#include <thread>

namespace DB
{
@@ -38,15 +37,9 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
}
}

String getName() const override
{
return "SharedQuery";
}
String getName() const override { return "SharedQuery"; }

Block getHeader() const override
{
return children.back()->getHeader();
}
Block getHeader() const override { return children.back()->getHeader(); }

void readPrefix() override
{
@@ -103,7 +96,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
in->readPrefix();
while (!isCancelled())
{
Block block;
Block block = in->read();
do
{
if (isCancelled() || read_suffixed)
@@ -112,7 +105,7 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
queue.tryEmplace(0);
break;
}
} while (!queue.tryPush(block = in->read(), try_action_millisecionds));
} while (!queue.tryPush(block, try_action_millisecionds));

if (!block)
break;
@@ -142,11 +135,11 @@ class SharedQueryBlockInputStream : public IProfilingBlockInputStream
bool read_suffixed = false;

std::thread thread;
std::mutex mutex;
std::mutex mutex;

std::string exception_msg;

Logger * log;
BlockInputStreamPtr in;
};
}
} // namespace DB

0 comments on commit 2e3b82e

Please sign in to comment.