Skip to content

Commit

Permalink
Merge branch 'master' into ugi_opt
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz authored Jul 15, 2024
2 parents 93b506f + 420c849 commit 49496fb
Show file tree
Hide file tree
Showing 15 changed files with 1,259 additions and 125 deletions.
35 changes: 28 additions & 7 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,13 @@ Status Compaction::do_inverted_index_compaction() {
// format: rowsetId_segmentId
std::vector<std::unique_ptr<InvertedIndexFileWriter>> inverted_index_file_writers(
dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {

// Some columns have already been indexed
// key: seg_id, value: inverted index file size
std::unordered_map<int, int64_t> compacted_idx_file_size;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
std::string index_path_prefix {
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(i))};
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(seg_id))};
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
ctx.fs(), index_path_prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
Expand All @@ -654,16 +658,31 @@ Status Compaction::do_inverted_index_compaction() {
if (st.ok()) {
auto index_not_need_to_compact =
DORIS_TRY(inverted_index_file_reader->get_all_directories());
// V1: each index is a separate file
// V2: all indexes are in a single file
if (_cur_tablet_schema->get_inverted_index_storage_format() !=
doris::InvertedIndexStorageFormatPB::V1) {
int64_t fsize = 0;
st = ctx.fs()->file_size(
InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix), &fsize);
if (!st.ok()) {
LOG(ERROR) << "file size error in index compaction, error:" << st.msg();
return st;
}
compacted_idx_file_size[seg_id] = fsize;
}
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), i,
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
RETURN_IF_ERROR(inverted_index_file_writer->initialize(index_not_need_to_compact));
inverted_index_file_writers[i] = std::move(inverted_index_file_writer);
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
} else if (st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), i,
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
inverted_index_file_writers[i] = std::move(inverted_index_file_writer);
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
// no index file
compacted_idx_file_size[seg_id] = 0;
} else {
LOG(ERROR) << "inverted_index_file_reader init failed in index compaction, error:"
<< st;
Expand Down Expand Up @@ -744,11 +763,13 @@ Status Compaction::do_inverted_index_compaction() {
}

uint64_t inverted_index_file_size = 0;
for (auto& inverted_index_file_writer : inverted_index_file_writers) {
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
auto inverted_index_file_writer = inverted_index_file_writers[seg_id].get();
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
} else {
inverted_index_file_size += inverted_index_file_writer->get_index_file_size();
inverted_index_file_size -= compacted_idx_file_size[seg_id];
}
}
// check index compaction status. If status is not ok, we should return error and end this compaction round.
Expand Down
115 changes: 71 additions & 44 deletions be/src/pipeline/exec/multi_cast_data_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,63 +23,97 @@

namespace doris::pipeline {

MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size)
: _used_count(used_count), _mem_size(mem_size) {
MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int un_finish_copy,
size_t mem_size)
: _used_count(used_count), _un_finish_copy(un_finish_copy), _mem_size(mem_size) {
_block = vectorized::Block::create_unique(block->get_columns_with_type_and_name());
block->clear();
}

Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
if (pos_to_pull != _multi_cast_blocks.end()) {
if (pos_to_pull->_used_count == 1) {
DCHECK(pos_to_pull == _multi_cast_blocks.begin());
pos_to_pull->_block->swap(*block);

_cumulative_mem_size -= pos_to_pull->_mem_size;
pos_to_pull++;
_multi_cast_blocks.pop_front();
} else {
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
pos_to_pull->_used_count--;
pos_to_pull++;
int* un_finish_copy = nullptr;
int use_count = 0;
{
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
const auto end = _multi_cast_blocks.end();
DCHECK(pos_to_pull != end);

*block = *pos_to_pull->_block;

_cumulative_mem_size -= pos_to_pull->_mem_size;

pos_to_pull->_used_count--;
use_count = pos_to_pull->_used_count;
un_finish_copy = &pos_to_pull->_un_finish_copy;

pos_to_pull++;

if (pos_to_pull == end) {
_block_reading(sender_idx);
}

*eos = _eos and pos_to_pull == end;
}
*eos = _eos and pos_to_pull == _multi_cast_blocks.end();
if (pos_to_pull == _multi_cast_blocks.end()) {
_block_reading(sender_idx);

if (use_count == 0) {
// will clear _multi_cast_blocks
_wait_copy_block(block, *un_finish_copy);
} else {
_copy_block(block, *un_finish_copy);
}

return Status::OK();
}

void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int& un_finish_copy) {
const auto rows = block->rows();
for (int i = 0; i < block->columns(); ++i) {
block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows);
}

std::unique_lock l(_mutex);
un_finish_copy--;
if (un_finish_copy == 0) {
l.unlock();
_cv.notify_one();
}
}

void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_finish_copy) {
std::unique_lock l(_mutex);
_cv.wait(l, [&]() { return un_finish_copy == 0; });
_multi_cast_blocks.pop_front();
}

Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);

auto block_mem_size = block->allocated_bytes();
std::lock_guard l(_mutex);
int need_process_count = _cast_sender_count - _closed_sender_count;
if (need_process_count == 0) {
return Status::EndOfFile("All data streamer is EOF");
}
// TODO: if the [queue back block rows + block->rows()] < batch_size, better
// do merge block. but need check the need_process_count and used_count whether
// equal
_multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size);
const auto block_mem_size = block->allocated_bytes();
_cumulative_mem_size += block_mem_size;
COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value()));

auto end = _multi_cast_blocks.end();
end--;
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
_set_ready_for_read(i);
{
std::lock_guard l(_mutex);
_multi_cast_blocks.emplace_back(block, _cast_sender_count, _cast_sender_count - 1,
block_mem_size);
// last elem
auto end = std::prev(_multi_cast_blocks.end());
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
_set_ready_for_read(i);
}
}
_eos = eos;
}

if (_eos) {
for (auto* read_dep : _dependencies) {
read_dep->set_always_ready();
}
}
_eos = eos;
return Status::OK();
}

Expand All @@ -92,13 +126,6 @@ void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
dep->set_ready();
}

void MultiCastDataStreamer::_set_ready_for_read() {
for (auto* dep : _dependencies) {
DCHECK(dep);
dep->set_ready();
}
}

void MultiCastDataStreamer::_block_reading(int sender_idx) {
if (_dependencies.empty()) {
return;
Expand Down
16 changes: 7 additions & 9 deletions be/src/pipeline/exec/multi_cast_data_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ namespace doris::pipeline {

class Dependency;
struct MultiCastBlock {
MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size);
MultiCastBlock(vectorized::Block* block, int used_count, int need_copy, size_t mem_size);

std::unique_ptr<vectorized::Block> _block;
int _used_count;
int _un_finish_copy;
size_t _mem_size;
};

Expand Down Expand Up @@ -58,30 +59,27 @@ class MultiCastDataStreamer {

RuntimeProfile* profile() { return _profile; }

void set_eos() {
std::lock_guard l(_mutex);
_eos = true;
_set_ready_for_read();
}

void set_dep_by_sender_idx(int sender_idx, Dependency* dep) {
_dependencies[sender_idx] = dep;
_block_reading(sender_idx);
}

private:
void _set_ready_for_read(int sender_idx);
void _set_ready_for_read();
void _block_reading(int sender_idx);

void _copy_block(vectorized::Block* block, int& un_finish_copy);

void _wait_copy_block(vectorized::Block* block, int& un_finish_copy);

const RowDescriptor& _row_desc;
RuntimeProfile* _profile = nullptr;
std::list<MultiCastBlock> _multi_cast_blocks;
std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
std::condition_variable _cv;
std::mutex _mutex;
bool _eos = false;
int _cast_sender_count = 0;
int _closed_sender_count = 0;
int64_t _cumulative_mem_size = 0;

RuntimeProfile::Counter* _process_rows = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions docker/thirdparties/custom_settings.env
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
# eg: CONTAINER_UID="doris-jack-"
# NOTICE: change this uid will modify the file in docker-compose.
CONTAINER_UID="doris--"
s3Endpoint="cos.ap-hongkong.myqcloud.com"
s3BucketName="doris-build-hk-1308700295"
s3Endpoint="oss-cn-hongkong.aliyuncs.com"
s3BucketName="qa-build-hk"
Original file line number Diff line number Diff line change
Expand Up @@ -2447,7 +2447,6 @@ public int hashCode() {
int result = super.hashCode();
result = 31 * result + Objects.hashCode(opcode);
result = 31 * result + Objects.hashCode(fnName);
result = 31 * result + Objects.hashCode(fnParams);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static void modifyTblReplicaCount(Database database, String tblName) {
return;
}
while (true) {
int backendNum = Env.getCurrentSystemInfo().getBackendNumFromDiffHosts(true);
int backendNum = Env.getCurrentSystemInfo().getStorageBackendNumFromDiffHosts(true);
if (FeConstants.runningUnitTest) {
backendNum = Env.getCurrentSystemInfo().getAllBackendIds().size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,11 @@ public void replayInitCatalog(InitCatalogLog log) {
// Should not return null.
// Because replyInitCatalog can only be called when `use_meta_cache` is false.
// And if `use_meta_cache` is false, getDbForReplay() will not return null
if (!db.isPresent()) {
LOG.warn("met invalid db id {} in replayInitCatalog, catalog: {}, ignore it to skip bug.",
log.getRefreshDbIds().get(i), name);
continue;
}
Preconditions.checkNotNull(db.get());
tmpDbNameToId.put(db.get().getFullName(), db.get().getId());
tmpIdToDb.put(db.get().getId(), db.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.doris.nereids.trees.expressions.IsNull;
import org.apache.doris.nereids.trees.expressions.LessThan;
import org.apache.doris.nereids.trees.expressions.LessThanEqual;
import org.apache.doris.nereids.trees.expressions.Like;
import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
import org.apache.doris.nereids.trees.expressions.Or;
Expand Down Expand Up @@ -403,11 +402,6 @@ public Expression visitOr(Or or, ExpressionRewriteContext context) {
}
}

@Override
public Expression visitLike(Like like, ExpressionRewriteContext context) {
return like;
}

@Override
public Expression visitCast(Cast cast, ExpressionRewriteContext context) {
cast = rewriteChildren(cast, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,9 @@ public static boolean couldColocateJoin(DistributionSpecHash leftHashSpec, Distr

SlotReference leftSlot = (SlotReference) leftChild;
SlotReference rightSlot = (SlotReference) rightChild;
Integer leftIndex = null;
Integer rightIndex = null;
if (leftSlot.getTable().isPresent() && leftSlot.getTable().get().getId() == leftHashSpec.getTableId()) {
leftIndex = leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
rightIndex = rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
} else {
Integer leftIndex = leftHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
Integer rightIndex = rightHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
if (leftIndex == null) {
leftIndex = rightHashSpec.getExprIdToEquivalenceSet().get(leftSlot.getExprId());
rightIndex = leftHashSpec.getExprIdToEquivalenceSet().get(rightSlot.getExprId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,20 @@ public void forward(long catalogId, long dbId) throws Exception {
boolean isReturnToPool = false;
try {
TInitExternalCtlMetaResult result = client.initExternalCtlMeta(request);
Env.getCurrentEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs);
if (!result.getStatus().equalsIgnoreCase(STATUS_OK)) {
throw new UserException(result.getStatus());
} else {
// DO NOT wait on journal replayed, this may cause deadlock.
// 1. hold table read lock
// 2. wait on journal replayed
// 3. previous journal (eg, txn journal) replayed need to hold table write lock
// 4. deadlock
// But no waiting on journal replayed may cause some request on non-master FE failed for some time.
// There is no good solution for this.
// In feature version, this whole process is refactored, so we temporarily remove this waiting.
// Env.getCurrentEnv().getJournalObservable().waitOn(result.maxJournalId, timeoutMs);
isReturnToPool = true;
}
isReturnToPool = true;
} catch (Exception e) {
LOG.warn("Failed to finish forward init operation, please try again. ", e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,11 +420,11 @@ public List<Backend> getCnBackends() {
}

// return num of backends that from different hosts
public int getBackendNumFromDiffHosts(boolean aliveOnly) {
public int getStorageBackendNumFromDiffHosts(boolean aliveOnly) {
Set<String> hosts = Sets.newHashSet();
ImmutableMap<Long, Backend> idToBackend = idToBackendRef;
for (Backend backend : idToBackend.values()) {
if (aliveOnly && !backend.isAlive()) {
if ((aliveOnly && !backend.isAlive()) || backend.isComputeNode()) {
continue;
}
hosts.add(backend.getHost());
Expand Down
Loading

0 comments on commit 49496fb

Please sign in to comment.