Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support broadcast join #675

Merged
merged 69 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
0025c0f
implement join
hanfei1991 Feb 26, 2020
3e1abb2
fix
hanfei1991 Feb 27, 2020
77e6693
update tipb
hanfei1991 Mar 4, 2020
4095a44
update tipb
hanfei1991 Mar 4, 2020
92c0786
save work: refine dag interpreter, introduce dag interpreter query block
windtalker Mar 2, 2020
fd92e18
fix bug
windtalker Mar 2, 2020
4bcbc39
comment useless code
windtalker Mar 2, 2020
38eb05c
refine code
windtalker Mar 4, 2020
80c0821
fix bug
windtalker Mar 4, 2020
1679467
refine code
windtalker Mar 4, 2020
3137f0a
fix bug
windtalker Mar 5, 2020
b346aeb
save work
windtalker Mar 5, 2020
602412b
tiny fix
hanfei1991 Mar 6, 2020
c2f80ab
save work
windtalker Mar 7, 2020
4ceedc9
Merge branch 'hanfei/join' of https://github.com/pingcap/tics into ha…
windtalker Mar 9, 2020
944515c
support remote read
windtalker Mar 9, 2020
e8395ef
update tipb
windtalker Mar 9, 2020
2299220
refine code
windtalker Mar 10, 2020
3bf762f
fix bug
windtalker Mar 10, 2020
50644ec
update client-c
hanfei1991 Mar 10, 2020
b022400
refine cop read
windtalker Mar 11, 2020
116a508
update client-c to support cop reading from tiflash
windtalker Mar 11, 2020
d101639
refine code
windtalker Mar 12, 2020
9047e4e
support batch cop
hanfei1991 Mar 10, 2020
abbff57
fix build error
windtalker Mar 12, 2020
f8da925
merge master
windtalker Mar 12, 2020
3ce43c3
fix daily test fail
windtalker Mar 12, 2020
7edcb4f
some bug fix
windtalker Mar 12, 2020
72083dc
log dag execution time without encode to chunk
windtalker Mar 14, 2020
1dbd6b8
fix bug
windtalker Mar 14, 2020
28114f9
fix bug
windtalker Mar 14, 2020
8106ac6
log dag execution time without encode
windtalker Mar 14, 2020
cb294a7
make encode multi processors
hanfei1991 Mar 16, 2020
63d7a9f
fix
hanfei1991 Mar 16, 2020
c8b66d1
parallel encode
hanfei1991 Mar 17, 2020
b33da95
refine code of batch coprocessor
windtalker Mar 17, 2020
347ec5a
refine code
windtalker Mar 17, 2020
e9830a9
delete useless code
windtalker Mar 17, 2020
2c85b1d
update kvproto and client-c
windtalker Mar 17, 2020
f499cd7
[flash 1002]refine coprocessor read (#530)
hanfei1991 Mar 19, 2020
68b3815
support key ranges in batch coprocessor (#533)
windtalker Mar 23, 2020
9b1690c
merge master branch (#556)
windtalker Mar 26, 2020
34a4a50
fix type mismatch bug in broadcast join
windtalker Mar 27, 2020
5bc02ab
merge master
windtalker Mar 27, 2020
87d118b
broadcast join support join keys with different data type (#580)
windtalker Mar 27, 2020
3a7f042
merge master
windtalker Mar 30, 2020
01d9b6f
Merge branch 'master' of https://github.com/pingcap/tics into broadca…
windtalker Mar 30, 2020
060e37a
Merge branch 'master' of https://github.com/pingcap/tics into broadca…
windtalker Apr 2, 2020
f075756
some improvement for broadcast join (#600)
windtalker Apr 7, 2020
f8c6f8c
Merge branch 'master' into broadcast_join
hanfei1991 Apr 8, 2020
545653c
fix bug
hanfei1991 Apr 8, 2020
8a07c8c
fix bug
hanfei1991 Apr 9, 2020
1d13d59
make TiFlash backward compatible to old tipb (#653)
windtalker Apr 23, 2020
da84e57
Merge branch 'master' into broadcast_join
hanfei1991 Apr 28, 2020
bcb3d7b
fix bug
hanfei1991 Apr 28, 2020
54a9825
refine code
hanfei1991 Apr 28, 2020
21ce2b0
update header
hanfei1991 Apr 28, 2020
043870d
Fix execute details regression after merge master (#678)
windtalker May 5, 2020
06f38d2
Merge branch 'master' into broadcast_join
hanfei1991 May 5, 2020
ce6dda9
Merge branch 'master' into broadcast_join
hanfei1991 May 13, 2020
970b53f
Merge branch 'master' into broadcast_join
hanfei1991 May 15, 2020
142bcc1
Merge branch 'master' into broadcast_join
hanfei1991 May 21, 2020
05ed931
update kvproto
hanfei1991 May 21, 2020
e14ef71
fmt code
windtalker May 21, 2020
fe280f0
Merge branch 'broadcast_join' of https://github.com/pingcap/tics into…
windtalker May 21, 2020
0c0eca9
merge master
windtalker Jun 17, 2020
5844bb4
update client-c
windtalker Jun 17, 2020
4a10242
update tipb
hanfei1991 Jun 18, 2020
06c01bd
Merge branch 'master' into broadcast_join
hanfei1991 Jun 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions dbms/src/DataStreams/CoprocessorBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream
}

public:
CoprocessorBlockInputStream(pingcap::kv::Cluster * cluster_, const pingcap::coprocessor::Request & req_, const DAGSchema & schema_,
int concurrency, pingcap::kv::StoreType store_type)
: req(req_),
resp_iter(pingcap::coprocessor::Client::send(cluster_, &req, concurrency, store_type)),
CoprocessorBlockInputStream(
pingcap::kv::Cluster * cluster_, std::vector<pingcap::coprocessor::copTask> tasks, const DAGSchema & schema_, int concurrency)
: resp_iter(std::move(tasks), cluster_, concurrency, &Logger::get("pingcap/coprocessor")),
schema(schema_),
sample_block(getSampleBlock()),
log(&Logger::get("pingcap/coprocessor"))
Expand All @@ -52,8 +51,7 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream
if (!has_next)
return {};
}

auto chunk = std::move(chunk_queue.front());
auto chunk = chunk_queue.front();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why modify this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because std move a return value makes no sense

chunk_queue.pop();
switch (resp->encode_type())
{
Expand Down Expand Up @@ -101,7 +99,6 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream
return true;
}

pingcap::coprocessor::Request req;
pingcap::coprocessor::ResponseIter resp_iter;
DAGSchema schema;

Expand Down
74 changes: 44 additions & 30 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#include <Interpreters/Set.h>
#include <Interpreters/Join.h>
#include <DataStreams/materializeBlock.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Interpreters/Join.h>
#include <Interpreters/Set.h>
#include <Storages/IStorage.h>

#include <iomanip>


Expand All @@ -12,25 +13,38 @@ namespace DB

namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
extern const int SET_SIZE_LIMIT_EXCEEDED;
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits)
: subqueries_for_sets_list(std::move(subqueries_for_sets_list_)), network_transfer_limits(network_transfer_limits)
{
init(input);
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
const SubqueriesForSets & subqueries_for_sets_,
const SizeLimits & network_transfer_limits)
: subqueries_for_sets(subqueries_for_sets_),
network_transfer_limits(network_transfer_limits)
const BlockInputStreamPtr & input, const SubqueriesForSets & subqueries_for_sets, const SizeLimits & network_transfer_limits)
: network_transfer_limits(network_transfer_limits)
{
for (auto & elem : subqueries_for_sets)
subqueries_for_sets_list.push_back(subqueries_for_sets);
init(input);
}

void CreatingSetsBlockInputStream::init(const BlockInputStreamPtr & input)
{
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
if (elem.second.source)
for (auto & elem : subqueries_for_sets)
{
children.push_back(elem.second.source);
if (elem.second.source)
{
children.push_back(elem.second.source);

if (elem.second.set)
elem.second.set->setHeader(elem.second.source->getHeader());
if (elem.second.set)
elem.second.set->setHeader(elem.second.source->getHeader());
}
}
}

Expand All @@ -51,10 +65,7 @@ Block CreatingSetsBlockInputStream::readImpl()
}


void CreatingSetsBlockInputStream::readPrefixImpl()
{
createAll();
}
void CreatingSetsBlockInputStream::readPrefixImpl() { createAll(); }


Block CreatingSetsBlockInputStream::getTotals()
Expand All @@ -72,27 +83,29 @@ void CreatingSetsBlockInputStream::createAll()
{
if (!created)
{
for (auto & elem : subqueries_for_sets)
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
for (auto & elem : subqueries_for_sets)
{
if (isCancelledOrThrowIfKilled())
return;
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
{
if (isCancelledOrThrowIfKilled())
return;

createOne(elem.second);
createOne(elem.second);
}
}
}

created = true;
}
}


void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
LOG_TRACE(log, (subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "")
<< (subquery.table ? "Filling temporary table. " : ""));
LOG_TRACE(log,
(subquery.set ? "Creating set. " : "") << (subquery.join ? "Creating join. " : "")
<< (subquery.table ? "Filling temporary table. " : ""));
Stopwatch watch;

BlockOutputStreamPtr table_out;
Expand Down Expand Up @@ -137,7 +150,8 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
rows_to_transfer += block.rows();
bytes_to_transfer += block.bytes();

if (!network_transfer_limits.check(rows_to_transfer, bytes_to_transfer, "IN/JOIN external table", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
if (!network_transfer_limits.check(
rows_to_transfer, bytes_to_transfer, "IN/JOIN external table", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
done_with_table = true;
}

Expand Down Expand Up @@ -188,4 +202,4 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
}
}

}
} // namespace DB
21 changes: 14 additions & 7 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#pragma once

#include <Poco/Logger.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
#include <Poco/Logger.h>


namespace Poco { class Logger; }
namespace Poco
{
class Logger;
}

namespace DB
{
Expand All @@ -18,8 +21,10 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
{
public:
CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
const SubqueriesForSets & subqueries_for_sets_,
const BlockInputStreamPtr & input, const SubqueriesForSets & subqueries_for_sets_, const SizeLimits & network_transfer_limits);

CreatingSetsBlockInputStream(const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits);

String getName() const override { return "CreatingSets"; }
Expand All @@ -34,7 +39,9 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
void readPrefixImpl() override;

private:
SubqueriesForSets subqueries_for_sets;
void init(const BlockInputStreamPtr & input);

std::vector<SubqueriesForSets> subqueries_for_sets_list;
bool created = false;

SizeLimits network_transfer_limits;
Expand All @@ -49,4 +56,4 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
void createOne(SubqueryForSet & subquery);
};

}
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/IProfilingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,29 @@ Block IProfilingBlockInputStream::read(FilterPtr & res_filter, bool return_filte

void IProfilingBlockInputStream::readPrefix()
{
auto start_time = info.total_stopwatch.elapsed();
readPrefixImpl();

forEachChild([&] (IBlockInputStream & child)
{
child.readPrefix();
return false;
});
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
}


void IProfilingBlockInputStream::readSuffix()
{
auto start_time = info.total_stopwatch.elapsed();
forEachChild([&] (IBlockInputStream & child)
{
child.readSuffix();
return false;
});

readSuffixImpl();
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
}


Expand Down
11 changes: 11 additions & 0 deletions dbms/src/DataStreams/NativeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}

NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_, std::vector<String> && output_names_)
: istr(istr_), server_revision(server_revision_), output_names(std::move(output_names_))
{
}

NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
: istr(istr_), server_revision(server_revision_)
Expand Down Expand Up @@ -109,6 +113,11 @@ Block NativeBlockInputStream::readImpl()
rows = index_block_it->num_rows;
}

if (output_names.size() > 0 && output_names.size() != columns)
throw Exception("NativeBlockInputStream with explicity output name, but the block column size "
"is not equal to the size of output names", ErrorCodes::LOGICAL_ERROR);
bool explicit_output_name = output_names.size() > 0;

for (size_t i = 0; i < columns; ++i)
{
if (use_index)
Expand All @@ -121,6 +130,8 @@ Block NativeBlockInputStream::readImpl()

/// Name
readBinary(column.name, istr);
if (explicit_output_name)
column.name = output_names[i];

/// Type
String type_name;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/NativeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ struct IndexForNativeFormat
class NativeBlockInputStream : public IProfilingBlockInputStream
{
public:
/// provide output column names explicitly
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_, std::vector<String> && output_names_);
/// If a non-zero server_revision is specified, additional block information may be expected and read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_);

Expand Down Expand Up @@ -96,6 +98,8 @@ class NativeBlockInputStream : public IProfilingBlockInputStream

PODArray<double> avg_value_size_hints;

std::vector<String> output_names;

void updateAvgValueSizeHints(const Block & block);
};

Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ std::unique_ptr<ChunkCodecStream> CHBlockChunkCodec::newCodecStream(const std::v
return std::make_unique<CHBlockChunkCodecStream>(field_types);
}

Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema &)
Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schema)
{
const String & row_data = chunk.rows_data();
ReadBufferFromString read_buffer(row_data);
NativeBlockInputStream block_in(read_buffer, 0);
std::vector<String> output_names;
for (const auto & c : schema)
output_names.push_back(c.first);
NativeBlockInputStream block_in(read_buffer, 0, std::move(output_names));
return block_in.read();
}

Expand Down
Loading