Skip to content

Commit

Permalink
add HashJoinProbeBlockInputStream (#4246)
Browse files Browse the repository at this point in the history
close #4248
  • Loading branch information
SeaRise authored Mar 16, 2022
1 parent b18b1d0 commit 0f23a02
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 2 deletions.
69 changes: 69 additions & 0 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <DataStreams/HashJoinProbeBlockInputStream.h>
#include <Flash/Mpp/getMPPTaskLog.h>
#include <Interpreters/ExpressionActions.h>

namespace DB
{
HashJoinProbeBlockInputStream::HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & join_probe_actions_,
const LogWithPrefixPtr & log_)
: log(getMPPTaskLog(log_, name))
, join_probe_actions(join_probe_actions_)
{
children.push_back(input);

if (!join_probe_actions || join_probe_actions->getActions().size() != 1
|| join_probe_actions->getActions().back().type != ExpressionAction::Type::JOIN)
{
throw Exception("isn't valid join probe actions", ErrorCodes::LOGICAL_ERROR);
}
}

Block HashJoinProbeBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{
totals = child->getTotals();
join_probe_actions->executeOnTotals(totals);
}

return totals;
}

Block HashJoinProbeBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
join_probe_actions->execute(res);
return res;
}

Block HashJoinProbeBlockInputStream::readImpl()
{
Block res = children.back()->read();
if (!res)
return res;

join_probe_actions->execute(res);

// TODO split block if block.size() > settings.max_block_size
// https://github.com/pingcap/tiflash/issues/3436

return res;
}

} // namespace DB
55 changes: 55 additions & 0 deletions dbms/src/DataStreams/HashJoinProbeBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/IProfilingBlockInputStream.h>

namespace DB
{
class ExpressionActions;

/** Executes a certain expression over the block.
* Basically the same as ExpressionBlockInputStream,
* but requires that there must be a join probe action in the Expression.
*
* The join probe action is different from the general expression
* and needs to be executed after join hash map building.
* We should separate it from the ExpressionBlockInputStream.
*/
class HashJoinProbeBlockInputStream : public IProfilingBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
static constexpr auto name = "HashJoinProbe";

public:
HashJoinProbeBlockInputStream(
const BlockInputStreamPtr & input,
const ExpressionActionsPtr & join_probe_actions_,
const LogWithPrefixPtr & log_);

String getName() const override { return name; }
Block getTotals() override;
Block getHeader() const override;

protected:
Block readImpl() override;

private:
const LogWithPrefixPtr log;
ExpressionActionsPtr join_probe_actions;
};

} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
#include <DataStreams/HashJoinBuildBlockInputStream.h>
#include <DataStreams/HashJoinProbeBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DataStreams/PartialSortingBlockInputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/getLeastSupertype.h>
Expand Down Expand Up @@ -724,7 +724,7 @@ void DAGQueryBlockInterpreter::handleJoin(const tipb::Join & join, DAGPipeline &
}
}
for (auto & stream : pipeline.streams)
stream = std::make_shared<ExpressionBlockInputStream>(stream, chain.getLastActions(), taskLogger());
stream = std::make_shared<HashJoinProbeBlockInputStream>(stream, chain.getLastActions(), taskLogger());

/// add a project to remove all the useless column
NamesWithAliases project_cols;
Expand Down

0 comments on commit 0f23a02

Please sign in to comment.