Skip to content

Commit

Permalink
Merge branch 'master' into improve-ps-write
Browse files Browse the repository at this point in the history
  • Loading branch information
lidezhu authored Mar 24, 2023
2 parents a598c3d + 828da8a commit 0709a1a
Show file tree
Hide file tree
Showing 37 changed files with 1,036 additions and 514 deletions.
6 changes: 5 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,11 @@ namespace DB
F(type_complete_multi_part_upload, {{"type", "complete_multi_part_upload"}}, ExpBuckets{0.001, 2, 20}), \
F(type_list_objects, {{"type", "list_objects"}}, ExpBuckets{0.001, 2, 20}), \
F(type_delete_object, {{"type", "delete_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}))
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20})) \
M(tiflash_storage_checkpoint_seconds, "PageStorage checkpoint elapsed time", Histogram, \
F(type_dump_checkpoint_snapshot, {{"type", "dump_checkpoint_snapshot"}}, ExpBuckets{0.001, 2, 20}), \
F(type_dump_checkpoint_data, {{"type", "dump_checkpoint_data"}}, ExpBuckets{0.001, 2, 20}), \
F(type_upload_checkpoint, {{"type", "upload_checkpoint"}}, ExpBuckets{0.001, 2, 20}))

// clang-format on

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Block AggregatingBlockInputStream::readImpl()
aggregator.spill(*data_variants);
}
aggregator.finishSpill();
LOG_INFO(log, "Begin restore data from disk for aggregation.");
BlockInputStreams input_streams = aggregator.restoreSpilledData();
impl = std::make_unique<MergingAggregatedMemoryEfficientBlockInputStream>(input_streams, params, final, 1, 1, log->identifier());
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Block MergeSortingBlockInputStream::readImpl()
{
/// If spill happens

LOG_INFO(log, "Begin external merge sort.");
LOG_INFO(log, "Begin restore data from disk for merge sort.");

/// Create sorted streams to merge.
spiller->finishSpill();
Expand Down
25 changes: 6 additions & 19 deletions dbms/src/DataStreams/NonJoinedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,9 @@ Block NonJoinedBlockInputStream::readImpl()
/// just return empty block for extra non joined block input stream read
if (unlikely(index >= parent.getBuildConcurrency()))
return Block();
if (!parent.isEnableSpill())
{
if (parent.blocks.empty())
return Block();
}
else
{
if (std::all_of(
std::begin(parent.partitions),
std::end(parent.partitions),
[](const std::unique_ptr<Join::JoinPartition> & partition) { return partition->build_partition.blocks.empty(); }))
{
return Block();
}
}

if (!parent.has_build_data_in_memory)
/// no build data in memory, the non joined result must be empty
return Block();

/// todo read data based on JoinPartition
if (add_not_mapped_rows)
Expand Down Expand Up @@ -258,7 +245,7 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,
{
current_segment = index;

while (parent.partitions[current_segment]->spill)
while (parent.partitions[current_segment]->isSpill())
{
current_segment += step;
if (current_segment >= map.getSegmentSize())
Expand All @@ -282,10 +269,10 @@ size_t NonJoinedBlockInputStream::fillColumns(const Map & map,

for (; *it != end || current_segment + step < map.getSegmentSize();)
{
if (*it == end || parent.partitions[current_segment]->spill)
if (*it == end || parent.partitions[current_segment]->isSpill())
{
current_segment += step;
while (parent.partitions[current_segment]->spill)
while (parent.partitions[current_segment]->isSpill())
{
current_segment += step;
if (current_segment >= map.getSegmentSize())
Expand Down
17 changes: 16 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
#include <DataStreams/UnionBlockInputStream.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Pipeline/Exec/PipelineExecBuilder.h>
#include <Interpreters/Context.h>

#include <Operators/ExpressionTransformOp.h>

namespace DB
{
Expand Down Expand Up @@ -96,6 +97,20 @@ void executeExpression(
}
}

void executeExpression(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
const ExpressionActionsPtr & expr_actions,
const LoggerPtr & log)
{
if (expr_actions && !expr_actions->getActions().empty())
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(exec_status, log->identifier(), expr_actions));
});
}
}

void orderStreams(
DAGPipeline & pipeline,
size_t max_streams,
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ namespace DB
{
class Context;

class PipelineExecutorStatus;
struct PipelineExecGroupBuilder;

void restoreConcurrency(
DAGPipeline & pipeline,
size_t concurrency,
Expand Down Expand Up @@ -53,6 +56,12 @@ void executeExpression(
const LoggerPtr & log,
const String & extra_info = "");

void executeExpression(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
const ExpressionActionsPtr & expr_actions,
const LoggerPtr & log);

void orderStreams(
DAGPipeline & pipeline,
size_t max_streams,
Expand Down
13 changes: 4 additions & 9 deletions dbms/src/Flash/Pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,23 +237,18 @@ bool Pipeline::isSupported(const tipb::DAGRequest & dag_request)
switch (executor.tp())
{
case tipb::ExecType::TypeTableScan:
if (executor.tbl_scan().keep_order())
{
is_supported = false;
return false;
}
// TODO support keep order.
is_supported = !executor.tbl_scan().keep_order();
return is_supported;
case tipb::ExecType::TypeProjection:
case tipb::ExecType::TypeSelection:
case tipb::ExecType::TypeLimit:
case tipb::ExecType::TypeTopN:
case tipb::ExecType::TypeExchangeSender:
case tipb::ExecType::TypeExchangeReceiver:
case tipb::ExecType::TypeExpand:
return true;
case tipb::ExecType::TypeAggregation:
// TODO support fine grained shuffle.
if (!FineGrainedShuffle(&executor).enable())
return true;
return true;
default:
is_supported = false;
return false;
Expand Down
99 changes: 73 additions & 26 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <Flash/Planner/Plans/PhysicalAggregationBuild.h>
#include <Flash/Planner/Plans/PhysicalAggregationConvergent.h>
#include <Interpreters/Context.h>
#include <Operators/LocalAggregateTransform.h>

namespace DB
{
Expand Down Expand Up @@ -161,36 +162,82 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
executeExpression(pipeline, expr_after_agg, log, "expr after aggregation");
}

void PhysicalAggregation::buildPipeline(PipelineBuilder & builder)
void PhysicalAggregation::buildPipelineExecGroup(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
Context & context,
size_t /*concurrency*/)
{
auto aggregate_context = std::make_shared<AggregateContext>(
log->identifier());
// TODO support fine grained shuffle.
assert(!fine_grained_shuffle.enable());
auto agg_build = std::make_shared<PhysicalAggregationBuild>(
executor_id,
schema,
log->identifier(),
child,
before_agg_actions,
// For non fine grained shuffle, PhysicalAggregation will be broken into AggregateBuild and AggregateConvergent.
// So only fine grained shuffle is considered here.
assert(fine_grained_shuffle.enable());

executeExpression(exec_status, group_builder, before_agg_actions, log);

Block before_agg_header = group_builder.getCurrentHeader();
size_t concurrency = group_builder.concurrency;
AggregationInterpreterHelper::fillArgColumnNumbers(aggregate_descriptions, before_agg_header);
SpillConfig spill_config(
context.getTemporaryPath(),
fmt::format("{}_aggregation", log->identifier()),
context.getSettingsRef().max_cached_data_bytes_in_spiller,
context.getSettingsRef().max_spilled_rows_per_file,
context.getSettingsRef().max_spilled_bytes_per_file,
context.getFileProvider());
auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
concurrency,
concurrency,
aggregation_keys,
aggregation_collators,
is_final_agg,
aggregate_descriptions,
aggregate_context);
// Break the pipeline for agg_build.
auto agg_build_builder = builder.breakPipeline(agg_build);
// agg_build pipeline.
child->buildPipeline(agg_build_builder);
agg_build_builder.build();
// agg_convergent pipeline.
auto agg_convergent = std::make_shared<PhysicalAggregationConvergent>(
executor_id,
schema,
log->identifier(),
aggregate_context,
expr_after_agg);
builder.addPlanNode(agg_convergent);
is_final_agg,
spill_config);
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<LocalAggregateTransform>(exec_status, log->identifier(), params));
});

executeExpression(exec_status, group_builder, expr_after_agg, log);
}

void PhysicalAggregation::buildPipeline(PipelineBuilder & builder)
{
auto aggregate_context = std::make_shared<AggregateContext>(log->identifier());
if (fine_grained_shuffle.enable())
{
// For fine grained shuffle, Aggregate wouldn't be broken.
child->buildPipeline(builder);
builder.addPlanNode(shared_from_this());
}
else
{
// For non fine grained shuffle, Aggregate would be broken into AggregateBuild and AggregateConvergent.
auto agg_build = std::make_shared<PhysicalAggregationBuild>(
executor_id,
schema,
log->identifier(),
child,
before_agg_actions,
aggregation_keys,
aggregation_collators,
is_final_agg,
aggregate_descriptions,
aggregate_context);
// Break the pipeline for agg_build.
auto agg_build_builder = builder.breakPipeline(agg_build);
// agg_build pipeline.
child->buildPipeline(agg_build_builder);
agg_build_builder.build();
// agg_convergent pipeline.
auto agg_convergent = std::make_shared<PhysicalAggregationConvergent>(
executor_id,
schema,
log->identifier(),
aggregate_context,
expr_after_agg);
builder.addPlanNode(agg_convergent);
}
}

void PhysicalAggregation::finalize(const Names & parent_require)
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ class PhysicalAggregation : public PhysicalUnary

void buildPipeline(PipelineBuilder & builder) override;

void buildPipelineExecGroup(
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
Context & context,
size_t /*concurrency*/) override;

void finalize(const Names & parent_require) override;

const Block & getSampleBlock() const override;
Expand Down
28 changes: 13 additions & 15 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
// limitations under the License.

#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Executor/PipelineExecutorStatus.h>
#include <Flash/Planner/Plans/PhysicalAggregationBuild.h>
#include <Interpreters/Context.h>
#include <Operators/AggregateSinkOp.h>
#include <Operators/ExpressionTransformOp.h>
#include <Operators/AggregateBuildSinkOp.h>

namespace DB
{
Expand All @@ -26,17 +27,11 @@ void PhysicalAggregationBuild::buildPipelineExecGroup(
Context & context,
size_t /*concurrency*/)
{
if (!before_agg_actions->getActions().empty())
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(exec_status, log->identifier(), before_agg_actions));
});
}
// For fine grained shuffle, PhysicalAggregation will not be broken into AggregateBuild and AggregateConvergent.
// So only non fine grained shuffle is considered here.
assert(!fine_grained_shuffle.enable());

size_t build_index = 0;
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<AggregateSinkOp>(exec_status, build_index++, aggregate_context, log->identifier()));
});
executeExpression(exec_status, group_builder, before_agg_actions, log);

Block before_agg_header = group_builder.getCurrentHeader();
size_t concurrency = group_builder.concurrency;
Expand All @@ -48,18 +43,21 @@ void PhysicalAggregationBuild::buildPipelineExecGroup(
context.getSettingsRef().max_spilled_rows_per_file,
context.getSettingsRef().max_spilled_bytes_per_file,
context.getFileProvider());

auto params = AggregationInterpreterHelper::buildParams(
context,
before_agg_header,
concurrency,
concurrency,
1,
aggregation_keys,
aggregation_collators,
aggregate_descriptions,
is_final_agg,
spill_config);
aggregate_context->initBuild(params, concurrency, /*hook=*/[&]() { return exec_status.isCancelled(); });

aggregate_context->initBuild(params, concurrency);
size_t build_index = 0;
group_builder.transform([&](auto & builder) {
builder.setSinkOp(std::make_unique<AggregateBuildSinkOp>(exec_status, build_index++, aggregate_context, log->identifier()));
});
}
} // namespace DB
13 changes: 6 additions & 7 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregationConvergent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Planner/Plans/PhysicalAggregationConvergent.h>
#include <Operators/AggregateConvergentSourceOp.h>
#include <Operators/ExpressionTransformOp.h>
#include <Operators/NullSourceOp.h>

namespace DB
Expand All @@ -25,6 +25,10 @@ void PhysicalAggregationConvergent::buildPipelineExecGroup(
Context & /*context*/,
size_t /*concurrency*/)
{
// For fine grained shuffle, PhysicalAggregation will not be broken into AggregateBuild and AggregateConvergent.
// So only non fine grained shuffle is considered here.
assert(!fine_grained_shuffle.enable());

aggregate_context->initConvergent();

if (unlikely(aggregate_context->useNullSource()))
Expand All @@ -50,11 +54,6 @@ void PhysicalAggregationConvergent::buildPipelineExecGroup(
});
}

if (!expr_after_agg->getActions().empty())
{
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<ExpressionTransformOp>(exec_status, log->identifier(), expr_after_agg));
});
}
executeExpression(exec_status, group_builder, expr_after_agg, log);
}
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class PhysicalAggregationConvergent : public PhysicalLeaf
PipelineExecutorStatus & exec_status,
PipelineExecGroupBuilder & group_builder,
Context & /*context*/,
size_t concurrency) override;
size_t /*concurrency*/) override;

private:
DISABLE_USELESS_FUNCTION_FOR_BREAKER
Expand Down
Loading

0 comments on commit 0709a1a

Please sign in to comment.