Skip to content

Commit

Permalink
support broadcast join (#675)
Browse files Browse the repository at this point in the history
* implement join

* fix

* update tipb

* update tipb

* save work: refine dag interpreter, introduce dag interpreter query block

* fix bug

* comment useless code

* refine code

* fix bug

* refine code

* fix bug

* save work

* tiny fix

* save work

* support remote read

* update tipb

* refine code

* fix bug

* update client-c

* refine cop read

* update client-c to support cop reading from tiflash

* refine code

* support batch cop

* fix build error

* fix daily test fail

* some bug fix

* log dag execution time without encode to chunk

* fix bug

* fix bug

* log dag execution time without encode

* make encode multi processors

* fix

* parallel encode

* refine code of batch coprocessor

* refine code

* delete useless code

* update kvproto and client-c

* [flash 1002]refine coprocessor read (#530)

* refind coprocessor

* fix

* try fix ci

* support key ranges in batch coprocessor (#533)

* save work

* save work

* save work

* save work

* dt support key ranges in dag request

* fix bug

* fix bug

* fix bug

* add some comments

* fix bug

* address comments

* merge master branch (#556)

* fix daily test fail (#520)

* fix daily test fail

* fix

* Add fullstack test for engine DeltaTree (#524)

## Add fullstack test for engine DeltaTree.
* Refine `tests/docker/run.sh` and split `tests/docker/docker-compose.yaml` into `tests/docker/{gtest/mock-test/cluster/tiflash-dt/tiflash-tmt}.yaml`

`fullstack/ddl`,`fullstack-test/fault-inject` will be enabled in #526 

## Others
* Add column `tidb_table_id` in `system.tables`
* Add some debugging info

Signed-off-by: JaySon-Huang <[email protected]>

* Bugfix: schrodinger bank2 fail (#521)

* Flush committed data in Region after resolve locks
* Stop append into last packs after split.
* Remove last_cache in Delta to reduce code complexity.
* Add system table: dt_tables and dt_segments, for debug.

* [FLASH-1008] Support br restore & ingest sst (#529)

* [flash-1018]fix bug of datetime default value (#534)

* fix bug of datetime default value

* address comment

* Using SegmentSnapshotPtr instead of SegmentSnapshot (#532)

* [Flash-664] Enable DDL for engine DeltaTree (#526)

- [x] Enable unittest in gtest_dbms
- [x] Enable mock test in tests/delta-merge-test
- [x] Enable fullstack-test/ddl
- [x] Enable fullstack-test/inject (Imported in #443)
- [x] Refine exception while read / write to DeltaTree (FLASH-994)
  * Use `Exception::addMessage` to add more diagnostics for locate which table is wrong (commit: 716ae4a)
- [x] shutdown should cancel all background tasks (FLASH-995) (commit: 7470c2f)
- [x] Run schrodinger/sddl test

## Others 
* Add atomic-rename table test in `tests/fullstack-test/fault-inject/rename-table.test`, but did not enable this. We will fix it later.
* "dt" engine ONLY support disable_bg_flush = true.
If background flush is enabled, read will not triggle schema sync. Which means that we may not get the right result with out-dated schema.
* Found that 'zero' value of type year is not match with tikv (FLASH-1023)

Signed-off-by: JaySon-Huang <[email protected]>

* [FLASH-1027] Fix: proxy override system signal listening (#541)

* remove signal listening from proxy.

* while terminating, stop all learner read.

* [FLASH-1026] Synchronization between drop table and remove region (#538)

* Set default storage engine to DT (#547)

* Do region flush in Region::handleWriteRaftCmd (#542)

* [FLASH-1028] Region merge should not remove data (#544)

* Remove region after region merge should not remove data

Signed-off-by: JaySon-Huang <[email protected]>

* Fix region init index

Signed-off-by: JaySon-Huang <[email protected]>

* Add region_merge.test for DT

Signed-off-by: JaySon-Huang <[email protected]>

* Fix different behavior between DT and TMT

Signed-off-by: JaySon-Huang <[email protected]>

* Fix mock remove region

Signed-off-by: JaySon-Huang <[email protected]>

Co-authored-by: pingcap-github-bot <[email protected]>

* clean useless code

Co-authored-by: JaySon <[email protected]>
Co-authored-by: Flowyi <[email protected]>
Co-authored-by: Tong Zhigao <[email protected]>
Co-authored-by: Han Fei <[email protected]>
Co-authored-by: pingcap-github-bot <[email protected]>

* fix type mismatch bug in broadcast join

* broadcast join support join keys with different data type (#580)

* fix type mismatch bug in broadcast join

* refine code

* refine code

* some improvement for broadcast join (#600)

* some improvement for broadcast join

* format code

* refine code

* address comment

* fix bug

* address comments

* fix bug

* fix bug

* make TiFlash backward compatible to old tipb (#653)

* 1. re-enable exec info in dag response, 2. support old style dag request

* basic support for execute summary

* refine support of executor time for join plan

* format code

* address comments

* fix bug

* refine code

* update header

* Fix execute details regression after merge master (#678)

* refine code

* fix bug

* fix bug

* format code

* update kvproto

* fmt code

* update client-c

* update tipb

Co-authored-by: xufei <[email protected]>
Co-authored-by: xufei <[email protected]>
Co-authored-by: JaySon <[email protected]>
Co-authored-by: Flowyi <[email protected]>
Co-authored-by: Tong Zhigao <[email protected]>
Co-authored-by: pingcap-github-bot <[email protected]>
  • Loading branch information
7 people authored Jun 22, 2020
1 parent 85bf28a commit 6cdffb3
Show file tree
Hide file tree
Showing 31 changed files with 2,362 additions and 1,176 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
11 changes: 4 additions & 7 deletions dbms/src/DataStreams/CoprocessorBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream
}

public:
CoprocessorBlockInputStream(pingcap::kv::Cluster * cluster_, const pingcap::coprocessor::Request & req_, const DAGSchema & schema_,
int concurrency, pingcap::kv::StoreType store_type)
: req(req_),
resp_iter(pingcap::coprocessor::Client::send(cluster_, &req, concurrency, store_type)),
CoprocessorBlockInputStream(
pingcap::kv::Cluster * cluster_, std::vector<pingcap::coprocessor::copTask> tasks, const DAGSchema & schema_, int concurrency)
: resp_iter(std::move(tasks), cluster_, concurrency, &Logger::get("pingcap/coprocessor")),
schema(schema_),
sample_block(getSampleBlock()),
log(&Logger::get("pingcap/coprocessor"))
Expand All @@ -52,8 +51,7 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream
if (!has_next)
return {};
}

auto chunk = std::move(chunk_queue.front());
auto chunk = chunk_queue.front();
chunk_queue.pop();
switch (resp->encode_type())
{
Expand Down Expand Up @@ -101,7 +99,6 @@ class CoprocessorBlockInputStream : public IProfilingBlockInputStream
return true;
}

pingcap::coprocessor::Request req;
pingcap::coprocessor::ResponseIter resp_iter;
DAGSchema schema;

Expand Down
74 changes: 44 additions & 30 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#include <Interpreters/Set.h>
#include <Interpreters/Join.h>
#include <DataStreams/materializeBlock.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Interpreters/Join.h>
#include <Interpreters/Set.h>
#include <Storages/IStorage.h>

#include <iomanip>


Expand All @@ -12,25 +13,38 @@ namespace DB

namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
extern const int SET_SIZE_LIMIT_EXCEEDED;
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits)
: subqueries_for_sets_list(std::move(subqueries_for_sets_list_)), network_transfer_limits(network_transfer_limits)
{
init(input);
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
const SubqueriesForSets & subqueries_for_sets_,
const SizeLimits & network_transfer_limits)
: subqueries_for_sets(subqueries_for_sets_),
network_transfer_limits(network_transfer_limits)
const BlockInputStreamPtr & input, const SubqueriesForSets & subqueries_for_sets, const SizeLimits & network_transfer_limits)
: network_transfer_limits(network_transfer_limits)
{
for (auto & elem : subqueries_for_sets)
subqueries_for_sets_list.push_back(subqueries_for_sets);
init(input);
}

void CreatingSetsBlockInputStream::init(const BlockInputStreamPtr & input)
{
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
if (elem.second.source)
for (auto & elem : subqueries_for_sets)
{
children.push_back(elem.second.source);
if (elem.second.source)
{
children.push_back(elem.second.source);

if (elem.second.set)
elem.second.set->setHeader(elem.second.source->getHeader());
if (elem.second.set)
elem.second.set->setHeader(elem.second.source->getHeader());
}
}
}

Expand All @@ -51,10 +65,7 @@ Block CreatingSetsBlockInputStream::readImpl()
}


void CreatingSetsBlockInputStream::readPrefixImpl()
{
createAll();
}
void CreatingSetsBlockInputStream::readPrefixImpl() { createAll(); }


Block CreatingSetsBlockInputStream::getTotals()
Expand All @@ -72,27 +83,29 @@ void CreatingSetsBlockInputStream::createAll()
{
if (!created)
{
for (auto & elem : subqueries_for_sets)
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
for (auto & elem : subqueries_for_sets)
{
if (isCancelledOrThrowIfKilled())
return;
if (elem.second.source) /// There could be prepared in advance Set/Join - no source is specified for them.
{
if (isCancelledOrThrowIfKilled())
return;

createOne(elem.second);
createOne(elem.second);
}
}
}

created = true;
}
}


void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
LOG_TRACE(log, (subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "")
<< (subquery.table ? "Filling temporary table. " : ""));
LOG_TRACE(log,
(subquery.set ? "Creating set. " : "") << (subquery.join ? "Creating join. " : "")
<< (subquery.table ? "Filling temporary table. " : ""));
Stopwatch watch;

BlockOutputStreamPtr table_out;
Expand Down Expand Up @@ -137,7 +150,8 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
rows_to_transfer += block.rows();
bytes_to_transfer += block.bytes();

if (!network_transfer_limits.check(rows_to_transfer, bytes_to_transfer, "IN/JOIN external table", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
if (!network_transfer_limits.check(
rows_to_transfer, bytes_to_transfer, "IN/JOIN external table", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
done_with_table = true;
}

Expand Down Expand Up @@ -188,4 +202,4 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
}
}

}
} // namespace DB
21 changes: 14 additions & 7 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#pragma once

#include <Poco/Logger.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
#include <Interpreters/ExpressionAnalyzer.h> /// SubqueriesForSets
#include <Poco/Logger.h>


namespace Poco { class Logger; }
namespace Poco
{
class Logger;
}

namespace DB
{
Expand All @@ -18,8 +21,10 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
{
public:
CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
const SubqueriesForSets & subqueries_for_sets_,
const BlockInputStreamPtr & input, const SubqueriesForSets & subqueries_for_sets_, const SizeLimits & network_transfer_limits);

CreatingSetsBlockInputStream(const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits);

String getName() const override { return "CreatingSets"; }
Expand All @@ -34,7 +39,9 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
void readPrefixImpl() override;

private:
SubqueriesForSets subqueries_for_sets;
void init(const BlockInputStreamPtr & input);

std::vector<SubqueriesForSets> subqueries_for_sets_list;
bool created = false;

SizeLimits network_transfer_limits;
Expand All @@ -49,4 +56,4 @@ class CreatingSetsBlockInputStream : public IProfilingBlockInputStream
void createOne(SubqueryForSet & subquery);
};

}
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/IProfilingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,25 +103,29 @@ Block IProfilingBlockInputStream::read(FilterPtr & res_filter, bool return_filte

void IProfilingBlockInputStream::readPrefix()
{
auto start_time = info.total_stopwatch.elapsed();
readPrefixImpl();

forEachChild([&] (IBlockInputStream & child)
{
child.readPrefix();
return false;
});
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
}


void IProfilingBlockInputStream::readSuffix()
{
auto start_time = info.total_stopwatch.elapsed();
forEachChild([&] (IBlockInputStream & child)
{
child.readSuffix();
return false;
});

readSuffixImpl();
info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
}


Expand Down
11 changes: 11 additions & 0 deletions dbms/src/DataStreams/NativeBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}

NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_, std::vector<String> && output_names_)
: istr(istr_), server_revision(server_revision_), output_names(std::move(output_names_))
{
}

NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
: istr(istr_), server_revision(server_revision_)
Expand Down Expand Up @@ -109,6 +113,11 @@ Block NativeBlockInputStream::readImpl()
rows = index_block_it->num_rows;
}

if (output_names.size() > 0 && output_names.size() != columns)
throw Exception("NativeBlockInputStream with explicity output name, but the block column size "
"is not equal to the size of output names", ErrorCodes::LOGICAL_ERROR);
bool explicit_output_name = output_names.size() > 0;

for (size_t i = 0; i < columns; ++i)
{
if (use_index)
Expand All @@ -121,6 +130,8 @@ Block NativeBlockInputStream::readImpl()

/// Name
readBinary(column.name, istr);
if (explicit_output_name)
column.name = output_names[i];

/// Type
String type_name;
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/DataStreams/NativeBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ struct IndexForNativeFormat
class NativeBlockInputStream : public IProfilingBlockInputStream
{
public:
/// provide output column names explicitly
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_, std::vector<String> && output_names_);
/// If a non-zero server_revision is specified, additional block information may be expected and read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_);

Expand Down Expand Up @@ -96,6 +98,8 @@ class NativeBlockInputStream : public IProfilingBlockInputStream

PODArray<double> avg_value_size_hints;

std::vector<String> output_names;

void updateAvgValueSizeHints(const Block & block);
};

Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ std::unique_ptr<ChunkCodecStream> CHBlockChunkCodec::newCodecStream(const std::v
return std::make_unique<CHBlockChunkCodecStream>(field_types);
}

Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema &)
Block CHBlockChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schema)
{
const String & row_data = chunk.rows_data();
ReadBufferFromString read_buffer(row_data);
NativeBlockInputStream block_in(read_buffer, 0);
std::vector<String> output_names;
for (const auto & c : schema)
output_names.push_back(c.first);
NativeBlockInputStream block_in(read_buffer, 0, std::move(output_names));
return block_in.read();
}

Expand Down
Loading

0 comments on commit 6cdffb3

Please sign in to comment.