Skip to content

Commit

Permalink
[FIX](agg) fix vertical_compaction_reader for agg table with array/ma…
Browse files Browse the repository at this point in the history
  • Loading branch information
amorynan authored and weixingyu12 committed Apr 8, 2024
1 parent a47da67 commit 0198024
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 29 deletions.
3 changes: 3 additions & 0 deletions be/src/vec/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,9 @@ class IColumn : public COW<IColumn> {
/// Implies is_fixed_and_contiguous.
virtual bool is_numeric() const { return false; }

// Column is ColumnString/ColumnArray/ColumnMap or other variable length column at every row
virtual bool is_variable_length() const { return false; }

virtual bool is_column_string() const { return false; }

virtual bool is_column_decimal() const { return false; }
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class ColumnArray final : public COWHelper<IColumn, ColumnArray> {
const char* get_family_name() const override { return "Array"; }
bool is_column_array() const override { return true; }
bool can_be_inside_nullable() const override { return true; }
bool is_variable_length() const override { return true; }
MutableColumnPtr clone_resized(size_t size) const override;
size_t size() const override;
void resize(size_t n) override;
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/columns/column_const.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class ColumnConst final : public COWHelper<IColumn, ColumnConst> {

ColumnPtr convert_to_full_column_if_const() const override { return convert_to_full_column(); }

bool is_variable_length() const override { return data->is_variable_length(); }

ColumnPtr remove_low_cardinality() const;

std::string get_name() const override { return "Const(" + data->get_name() + ")"; }
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/columns/column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ColumnMap final : public COWHelper<IColumn, ColumnMap> {
}

MutableColumnPtr clone_resized(size_t size) const override;
bool is_variable_length() const override { return true; }

bool can_be_inside_nullable() const override { return true; }

Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/columns/column_nullable.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ColumnNullable final : public COWHelper<IColumn, ColumnNullable> {
}

MutableColumnPtr get_shrinked_column() override;

bool is_variable_length() const override { return nested_column->is_variable_length(); }
const char* get_family_name() const override { return "Nullable"; }
std::string get_name() const override { return "Nullable(" + nested_column->get_name() + ")"; }
MutableColumnPtr clone_resized(size_t size) const override;
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/columns/column_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ColumnString final : public COWHelper<IColumn, ColumnString> {

public:
void sanity_check() const;

bool is_variable_length() const override { return true; }
const char* get_family_name() const override { return "String"; }

size_t size() const override { return offsets.size(); }
Expand Down Expand Up @@ -557,6 +557,8 @@ class ColumnString final : public COWHelper<IColumn, ColumnString> {
auto data = r.get_data_at(row);

if (!self_row) {
// self_row == 0 means we first call replace_column_data() with batch column data. so we
// should clean last batch column data.
chars.clear();
offsets[self_row] = data.size;
} else {
Expand Down
17 changes: 1 addition & 16 deletions be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,7 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) {
_agg_places.push_back(place);

// calculate `_has_variable_length_tag` tag. like string, array, map
_stored_has_variable_length_tag[idx] =
_stored_data_columns[idx]->is_column_string() ||
(_stored_data_columns[idx]->is_nullable() &&
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
->get_nested_column_ptr()
->is_column_string()) ||
_stored_data_columns[idx]->is_column_array() ||
(_stored_data_columns[idx]->is_nullable() &&
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
->get_nested_column_ptr()
->is_column_array()) ||
_stored_data_columns[idx]->is_column_map() ||
(_stored_data_columns[idx]->is_nullable() &&
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
->get_nested_column_ptr()
->is_column_map());
_stored_has_variable_length_tag[idx] = _stored_data_columns[idx]->is_variable_length();
}
}

Expand Down
15 changes: 5 additions & 10 deletions be/src/vec/olap/vertical_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) {
_next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();

_stored_has_null_tag.resize(_stored_data_columns.size());
_stored_has_string_tag.resize(_stored_data_columns.size());
_stored_has_variable_length_tag.resize(_stored_data_columns.size());

auto& tablet_schema = *_tablet_schema;
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
Expand All @@ -188,13 +188,8 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) {
});
_agg_places.push_back(place);

// calculate `has_string` tag.
_stored_has_string_tag[idx] =
_stored_data_columns[idx]->is_column_string() ||
(_stored_data_columns[idx]->is_nullable() &&
reinterpret_cast<ColumnNullable*>(_stored_data_columns[idx].get())
->get_nested_column_ptr()
->is_column_string());
// calculate `_has_variable_length_tag` tag. like string, array, map
_stored_has_variable_length_tag[idx] = _stored_data_columns[idx]->is_variable_length();
}
}

Expand Down Expand Up @@ -319,8 +314,8 @@ size_t VerticalBlockReader::_copy_agg_data() {
}
for (size_t idx = 0; idx < _return_columns.size(); ++idx) {
auto& dst_column = _stored_data_columns[idx];
if (_stored_has_string_tag[idx]) {
//string type should replace ordered
if (_stored_has_variable_length_tag[idx]) {
//variable length type should replace ordered
for (size_t i = 0; i < copy_size; i++) {
auto& ref = _stored_row_ref[i];
dst_column->replace_column_data(*ref.block->get_by_position(idx).column,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/olap/vertical_block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class VerticalBlockReader final : public TabletReader {
std::vector<IteratorRowRef> _stored_row_ref;

std::vector<bool> _stored_has_null_tag;
std::vector<bool> _stored_has_string_tag;
std::vector<bool> _stored_has_variable_length_tag;

phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>> _temp_ref_map;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_default --
1 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["a", "b", "c"] {"b":1, "c":2}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["amory", "doris", "2024-04-29"] {"c":2}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 \N \N
4 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 [null, "sdf"] \N

-- !select_default2 --
1 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["a", "b", "c"] {"b":1, "c":2}
2 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 ["amory", "doris", "2024-04-29"] {"c":2}
3 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 \N \N
4 2017-10-01 2017-10-01 2017-10-01T11:11:11.110 2017-10-01T11:11:11.110111 Beijing 10 1 [null, "sdf"] \N

Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_compaction_agg_keys_with_array_map") {
def tableName = "compaction_agg_keys_regression_test_complex"

try {
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backend_id = backendId_to_backendIP.keySet()[0]

def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

boolean disableAutoCompaction = true
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "disable_auto_compaction") {
disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2])
}
}

sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
`user_id` LARGEINT NOT NULL COMMENT "用户id",
`date` DATE NOT NULL COMMENT "数据灌入日期时间",
`datev2` DATEV2 NOT NULL COMMENT "数据灌入日期时间",
`datetimev2_1` DATETIMEV2(3) NOT NULL COMMENT "数据灌入日期时间",
`datetimev2_2` DATETIMEV2(6) NOT NULL COMMENT "数据灌入日期时间",
`city` VARCHAR(20) COMMENT "用户所在城市",
`age` SMALLINT COMMENT "用户年龄",
`sex` TINYINT COMMENT "用户性别",
`array_col` ARRAY<STRING> REPLACE NULL COMMENT "array column",
`map_col` MAP<STRING, INT> REPLACE NULL COMMENT "map column")
AGGREGATE KEY(`user_id`, `date`, `datev2`, `datetimev2_1`, `datetimev2_2`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`)
PROPERTIES ( "replication_num" = "1" );
"""

sql """ INSERT INTO ${tableName} VALUES
(1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b'], map('a', 1));
"""

sql """ INSERT INTO ${tableName} VALUES
(1, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['a', 'b', 'c'], map('b', 1, 'c', 2));
"""

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['amory', 'doris', 'commiter'], map('b', 1));
"""

sql """ INSERT INTO ${tableName} VALUES
(2, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['amory', 'doris', '2024-04-29'], map('c', 2));
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['e', 'f', 'g', 'd'], map());
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, ['e', 'f', 'g', 'd'], map('a', 1, 'b', 2));
"""

sql """ INSERT INTO ${tableName} VALUES
(3, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, NULL, NULL);
"""

sql """ INSERT INTO ${tableName} VALUES
(4, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, [NULL, 'sdf'], NULL);
"""

qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id; """

//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,QueryHits,VersionCount,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName}; """

// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}

// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)

def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}

int rowCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
def compactionStatusUrlIndex = 18

(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)

assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {
rowCount += Integer.parseInt(rowset.split(" ")[1])
}
}
assert (rowCount < 8)
qt_select_default2 """ SELECT * FROM ${tableName} t ORDER BY user_id; """
} finally {
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}

0 comments on commit 0198024

Please sign in to comment.