Skip to content

Commit

Permalink
Merge branch 'master' into 20240714_fix_arrowflight2
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Jul 16, 2024
2 parents 637b247 + 1b4108c commit 5de8d0b
Show file tree
Hide file tree
Showing 35 changed files with 1,325 additions and 1,163 deletions.
22 changes: 22 additions & 0 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/task/index_builder.h"

#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_writer_context.h"
Expand Down Expand Up @@ -124,7 +125,28 @@ Status IndexBuilder::update_inverted_index_info() {
}
}
_dropped_inverted_indexes.push_back(*index_meta);
// ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED.
// remove dropped index_meta from output rowset tablet schema
output_rs_tablet_schema->remove_index(index_meta->index_id());
}
DBUG_EXECUTE_IF("index_builder.update_inverted_index_info.drop_index", {
auto indexes_count = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"index_builder.update_inverted_index_info.drop_index", "indexes_count", 0);
if (indexes_count < 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"indexes count cannot be negative");
}
int32_t indexes_size = 0;
for (auto index : output_rs_tablet_schema->indexes()) {
if (index.index_type() == IndexType::INVERTED) {
indexes_size++;
}
}
if (indexes_count != indexes_size) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"indexes count not equal to expected");
}
})
} else {
// base on input rowset's tablet_schema to build
// output rowset's tablet_schema which only add
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,11 @@ public boolean isFinalState() {
@SerializedName("pg")
protected RoutineLoadProgress progress;

@SerializedName("lrt")
protected long latestResumeTimestamp; // the latest resume time
@SerializedName("art")
protected long autoResumeCount;
// some other msg which need to show to user;
@SerializedName("om")
protected String otherMsg = "";
@SerializedName("pr")
protected ErrorReason pauseReason;
@SerializedName("cr")
protected ErrorReason cancelReason;

@SerializedName("cts")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class OuterJoinAssocProject extends OneExplorationRuleFactory {
// Pair<bottomJoin, topJoin>
// newBottomJoin Type = topJoin Type, newTopJoin Type = bottomJoin Type
public static Set<Pair<JoinType, JoinType>> VALID_TYPE_PAIR_SET = ImmutableSet.of(
Pair.of(JoinType.LEFT_OUTER_JOIN, JoinType.INNER_JOIN),
Pair.of(JoinType.INNER_JOIN, JoinType.LEFT_OUTER_JOIN),
Pair.of(JoinType.LEFT_OUTER_JOIN, JoinType.LEFT_OUTER_JOIN));

Expand Down
8 changes: 8 additions & 0 deletions regression-test/framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -401,5 +401,13 @@ under the License.
<artifactId>netty-all</artifactId>
<version>4.1.104.Final</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.1</version>
<!--Regression tests need to include this jar-->
<scope>compile</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class CreateMVAction implements SuiteAction {
def tmp = doRun("SHOW ALTER TABLE MATERIALIZED VIEW ORDER BY CreateTime DESC LIMIT 1;")
sqlResult = tmp.result[0]
log.info("result: ${sqlResult}".toString())
if (tryTimes == 120 || sqlResult.contains("CANCELLED")) {
if (tryTimes == 600 || sqlResult.contains("CANCELLED")) {
throw new IllegalStateException("MV create check times over limit, result='${sqlResult}'");
}
Thread.sleep(1200)
Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/common/github-utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ file_changed_cloud_p0() {
[[ "${af}" == 'env.sh' ]] ||
[[ "${af}" == 'run-regression-test.sh' ]] ||
[[ "${af}" == 'cloud/src/'* ]] ||
[[ "${af}" == 'cloud/cmake/'* ]] ||
[[ "${af}" == 'cloud/test/'* ]]; then
echo "cloud-p0 related file changed, return need" && return 0
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ suite ("test_follower_consistent_auth","p0,auth") {

def get_follower_ip = {
def result = sql """show frontends;"""
logger.info("result:" + result)
for (int i = 0; i < result.size(); i++) {
if (result[i][7] == "FOLLOWER" && result[i][8] == "false") {
if (result[i][7] == "FOLLOWER" && result[i][8] == "false" && result[i][11] == "true") {
return result[i][1]
}
}
Expand All @@ -29,7 +30,10 @@ suite ("test_follower_consistent_auth","p0,auth") {
def switch_ip = get_follower_ip()
if (switch_ip != "null") {
logger.info("switch_ip: " + switch_ip)
def new_jdbc_url = context.config.jdbcUrl.replaceAll(/\/\/[0-9.]+:/, "//${switch_ip}:")

def tokens = context.config.jdbcUrl.split('/')
def url_tmp1 = tokens[0] + "//" + tokens[2] + "/" + "information_schema" + "?"
def new_jdbc_url = url_tmp1.replaceAll(/\/\/[0-9.]+:/, "//${switch_ip}:")
logger.info("new_jdbc_url: " + new_jdbc_url)

String user = 'test_follower_consistent_user'
Expand Down Expand Up @@ -109,8 +113,9 @@ suite ("test_follower_consistent_auth","p0,auth") {
sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO ${user}""";
}


connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
logger.info("url_tmp1:" + url_tmp1)
logger.info("new_jdbc_url:" + new_jdbc_url)
connect(user=user, password="${pwd}", url=url_tmp1) {
try {
sql "SHOW CATALOG RECYCLE BIN WHERE NAME = '${catalog_name}'"
} catch (Exception e) {
Expand All @@ -126,7 +131,7 @@ suite ("test_follower_consistent_auth","p0,auth") {
assertTrue(e.getMessage().contains("Admin_priv"))
}
}
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
try {
sql "SHOW DATA"
} catch (Exception e) {
Expand All @@ -143,7 +148,7 @@ suite ("test_follower_consistent_auth","p0,auth") {
}
}

connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
try {
sql "select username from ${dbName}.${tableName}"
} catch (Exception e) {
Expand All @@ -160,15 +165,15 @@ suite ("test_follower_consistent_auth","p0,auth") {
}
}
sql """grant select_priv(username) on ${dbName}.${tableName} to ${user}"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
sql "select username from ${dbName}.${tableName}"
}
connect(user=user, password="${pwd}", url=new_jdbc_url) {
sql "select username from ${dbName}.${tableName}"
}


connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
try {
sql "select username from ${dbName}.${view_name}"
} catch (Exception e) {
Expand All @@ -185,15 +190,15 @@ suite ("test_follower_consistent_auth","p0,auth") {
}
}
sql """grant select_priv(username) on ${dbName}.${view_name} to ${user}"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
sql "select username from ${dbName}.${view_name}"
}
connect(user=user, password="${pwd}", url=new_jdbc_url) {
sql "select username from ${dbName}.${view_name}"
}


connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
try {
sql "select username from ${dbName}.${mtmv_name}"
} catch (Exception e) {
Expand All @@ -210,7 +215,7 @@ suite ("test_follower_consistent_auth","p0,auth") {
}
}
sql """grant select_priv(username) on ${dbName}.${mtmv_name} to ${user}"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
sql "select username from ${dbName}.${mtmv_name}"
}
connect(user=user, password="${pwd}", url=new_jdbc_url) {
Expand All @@ -223,15 +228,15 @@ suite ("test_follower_consistent_auth","p0,auth") {

// user
sql """grant select_priv on ${dbName}.${tableName} to ${user}"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
sql "select username from ${dbName}.${tableName}"
}
connect(user=user, password="${pwd}", url=new_jdbc_url) {
sql "select username from ${dbName}.${tableName}"
}

sql """revoke select_priv on ${dbName}.${tableName} from ${user}"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
try {
sql "select username from ${dbName}.${tableName}"
} catch (Exception e) {
Expand All @@ -252,7 +257,7 @@ suite ("test_follower_consistent_auth","p0,auth") {
sql """grant select_priv on ${dbName}.${tableName} to ROLE '${role}'"""
sql """grant Load_priv on ${dbName}.${tableName} to ROLE '${role}'"""
sql """grant '${role}' to '${user}'"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
sql "select username from ${dbName}.${tableName}"
sql """insert into ${dbName}.`${tableName}` values (4, "444")"""
}
Expand All @@ -262,7 +267,7 @@ suite ("test_follower_consistent_auth","p0,auth") {
}

sql """revoke '${role}' from '${user}'"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
try {
sql "select username from ${dbName}.${tableName}"
} catch (Exception e) {
Expand All @@ -281,7 +286,7 @@ suite ("test_follower_consistent_auth","p0,auth") {


// workload group
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
sql """set workload_group = '${wg}';"""
try {
sql "select username from ${dbName}.${tableName}"
Expand All @@ -300,7 +305,7 @@ suite ("test_follower_consistent_auth","p0,auth") {
}
}
sql """GRANT USAGE_PRIV ON WORKLOAD GROUP '${wg}' TO '${user}';"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
sql """set workload_group = '${wg}';"""
sql """select username from ${dbName}.${tableName}"""
}
Expand All @@ -310,7 +315,7 @@ suite ("test_follower_consistent_auth","p0,auth") {
}

// resource group
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
def res = sql """SHOW RESOURCES;"""
assertTrue(res == [])
}
Expand All @@ -319,7 +324,7 @@ suite ("test_follower_consistent_auth","p0,auth") {
assertTrue(res == [])
}
sql """GRANT USAGE_PRIV ON RESOURCE ${rg} TO ${user};"""
connect(user=user, password="${pwd}", url=context.config.jdbcUrl) {
connect(user=user, password="${pwd}", url=url_tmp1) {
def res = sql """SHOW RESOURCES;"""
assertTrue(res.size == 10)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_index_builder_drop_index_fault_injection", "nonConcurrent") {
def runTest = { indexTbName ->
sql """ insert into ${indexTbName} values(1, "json love anny", "json", "anny",1); """
sql "sync"

def show_result = sql_return_maparray "show index from ${indexTbName}"
logger.info("show index from " + indexTbName + " result: " + show_result)
assertEquals(show_result.size(), 4)
assertEquals(show_result[0].Key_name, "index_int")
assertEquals(show_result[1].Key_name, "index_str_k2")
assertEquals(show_result[2].Key_name, "index_str_k4")
assertEquals(show_result[3].Key_name, "index_k5")

try {
GetDebugPoint().enableDebugPointForAllBEs("segment_iterator._read_columns_by_index", [indexes_count: 3])
sql "DROP INDEX index_int ON ${indexTbName}"
show_result = sql_return_maparray "show index from ${indexTbName}"
logger.info("show index from " + indexTbName + " result: " + show_result)
assertEquals(show_result.size(), 3)
assertEquals(show_result[0].Key_name, "index_str_k2")
assertEquals(show_result[1].Key_name, "index_str_k4")
assertEquals(show_result[2].Key_name, "index_k5")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("index_builder.update_inverted_index_info.drop_index")
}

try {
GetDebugPoint().enableDebugPointForAllBEs("segment_iterator._read_columns_by_index", [indexes_count: 2])
sql "DROP INDEX index_str_k2 ON ${indexTbName}"
show_result = sql_return_maparray "show index from ${indexTbName}"
logger.info("show index from " + indexTbName + " result: " + show_result)
assertEquals(show_result.size(), 2)
assertEquals(show_result[0].Key_name, "index_str_k4")
assertEquals(show_result[1].Key_name, "index_k5")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("index_builder.update_inverted_index_info.drop_index")
}

try {
GetDebugPoint().enableDebugPointForAllBEs("segment_iterator._read_columns_by_index", [indexes_count: 1])
sql "DROP INDEX index_str_k4 ON ${indexTbName}"
show_result = sql_return_maparray "show index from ${indexTbName}"
logger.info("show index from " + indexTbName + " result: " + show_result)
assertEquals(show_result.size(), 1)
assertEquals(show_result[0].Key_name, "index_k5")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("index_builder.update_inverted_index_info.drop_index")
}

try {
GetDebugPoint().enableDebugPointForAllBEs("segment_iterator._read_columns_by_index", [indexes_count: 0])
sql "DROP INDEX index_k5 ON ${indexTbName}"
show_result = sql_return_maparray "show index from ${indexTbName}"
logger.info("show index from " + indexTbName + " result: " + show_result)
assertEquals(show_result.size(), 0)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("index_builder.update_inverted_index_info.drop_index")
}
}

def createTestTable = { version ->
def indexTbName = "test_index_builder_drop_index_fault_injection_${version}"

sql "DROP TABLE IF EXISTS ${indexTbName}"
sql """
CREATE TABLE ${indexTbName}
(
k1 int ,
k2 string,
k3 char(50),
k4 varchar(200),
k5 int,
index index_int (k1) using inverted,
index index_str_k2 (k2) using inverted properties("parser"="english","ignore_above"="257"),
index index_str_k4 (k4) using inverted,
index index_k5 (k5) using inverted
)
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES("replication_num" = "1","inverted_index_storage_format"="${version}");
"""

runTest(indexTbName)
}

createTestTable("v1")
createTestTable("v2")

}
Loading

0 comments on commit 5de8d0b

Please sign in to comment.