Skip to content

Commit

Permalink
[Configuration](transactional-hive) Add `skip_checking_acid_version_f…
Browse files Browse the repository at this point in the history
…ile` session var to skip checking acid version file in some hive envs.
  • Loading branch information
kaka11chen committed Oct 21, 2024
1 parent 8c34574 commit 8c06dab
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
-- Currently docker is hive 2.x version. Hive 2.x versioned full-acid tables need to run major compaction.
SET hive.support.concurrency=true;
SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

Expand All @@ -25,8 +24,6 @@ insert into orc_full_acid values

update orc_full_acid set value = 'CC' where id = 3;

alter table orc_full_acid compact 'major';

create table orc_full_acid_par (id INT, value STRING)
PARTITIONED BY (part_col INT)
CLUSTERED BY (id) INTO 3 BUCKETS
Expand All @@ -44,7 +41,3 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values
(6, 'F');

update orc_full_acid_par set value = 'BB' where id = 2;

alter table orc_full_acid_par PARTITION(part_col=20230101) compact 'major';
alter table orc_full_acid_par PARTITION(part_col=20230102) compact 'major';

Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.doris.planner.ColumnBound;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
import org.apache.doris.qe.ConnectContext;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.LoadingCache;
Expand Down Expand Up @@ -744,7 +745,7 @@ public LoadingCache<PartitionCacheKey, HivePartition> getPartitionCache() {
}

public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
boolean isFullAcid, long tableId, String bindBrokerName) {
boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
try {
Expand Down Expand Up @@ -778,25 +779,27 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
if (baseOrDeltaPath == null) {
return Collections.emptyList();
}
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName),
catalog.getCatalogProperty().getProperties(),
bindBrokerName, jobConf));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
acidVersion = 0;
} else {
throw new Exception(String.format("Failed to check remote path {} exists.",
acidVersionPath));
if (!skipCheckingAcidVersionFile) {
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
bindBrokerName),
catalog.getCatalogProperty().getProperties(),
bindBrokerName, jobConf));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
acidVersion = 0;
} else {
throw new Exception(String.format("Failed to check remote path {} exists.",
acidVersionPath));
}
}
if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) {
throw new Exception(
"Hive 2.x versioned full-acid tables need to run major compaction.");
}
}
if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) {
throw new Exception(
"Hive 2.x versioned full-acid tables need to run major compaction.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class HiveScanNode extends FileQueryScanNode {
private final Semaphore splittersOnFlight = new Semaphore(NUM_SPLITTERS_ON_FLIGHT);
private final AtomicInteger numSplitsPerPartition = new AtomicInteger(NUM_SPLITS_PER_PARTITION);

private boolean skipCheckingAcidVersionFile = false;

/**
* * External file scan node for Query Hive table
* needCheckColumnPriv: Some of ExternalFileScanNode do not need to check column priv
Expand Down Expand Up @@ -117,6 +119,7 @@ protected void doInitialize() throws UserException {
this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable());
Env.getCurrentHiveTransactionMgr().register(hiveTransaction);
skipCheckingAcidVersionFile = ConnectContext.get().getSessionVariable().skipCheckingAcidVersionFile;
}
}

Expand Down Expand Up @@ -343,7 +346,7 @@ private List<FileCacheValue> getFileSplitByTransaction(HiveMetaStoreCache cache,
ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
return cache.getFilesByTransaction(partitions, validWriteIds,
hiveTransaction.isFullAcid(), hmsTable.getId(), bindBrokerName);
hiveTransaction.isFullAcid(), skipCheckingAcidVersionFile, hmsTable.getId(), bindBrokerName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String ENABLE_COOLDOWN_REPLICA_AFFINITY =
"enable_cooldown_replica_affinity";
public static final String SKIP_CHECKING_ACID_VERSION_FILE = "skip_checking_acid_version_file";

/**
* If set false, user couldn't submit analyze SQL and FE won't allocate any related resources.
*/
Expand Down Expand Up @@ -2200,6 +2202,12 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
@VariableMgr.VarAttr(name = ENABLE_COOLDOWN_REPLICA_AFFINITY, needForward = true)
public boolean enableCooldownReplicaAffinity = true;

@VariableMgr.VarAttr(name = SKIP_CHECKING_ACID_VERSION_FILE, description = {
"跳过检查 transactional hive 版本文件 '_orc_acid_version.'",
"Skip checking transactional hive version file '_orc_acid_version.'"
})
public boolean skipCheckingAcidVersionFile = false;

public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
this.enableESParallelScroll = enableESParallelScroll;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
// under the License.

suite("test_transactional_hive", "p0,external,hive,external_docker,external_docker_hive") {
String skip_checking_acid_version_file = "false"

def q01 = {
sql """set skip_checking_acid_version_file=${skip_checking_acid_version_file}"""
qt_q01 """
select * from orc_full_acid order by id;
"""
Expand All @@ -32,6 +35,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock
}

def q01_par = {
sql """set skip_checking_acid_version_file=${skip_checking_acid_version_file}"""
qt_q01 """
select * from orc_full_acid_par order by id;
"""
Expand All @@ -54,7 +58,7 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock
return;
}

for (String hivePrefix : ["hive2", "hive3"]) {
for (String hivePrefix : ["hive3"]) {
try {
String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort")
String catalog_name = "test_transactional_${hivePrefix}"
Expand All @@ -67,6 +71,11 @@ suite("test_transactional_hive", "p0,external,hive,external_docker,external_dock
);"""
sql """use `${catalog_name}`.`default`"""

skip_checking_acid_version_file = "false"
q01()
q01_par()

skip_checking_acid_version_file = "true"
q01()
q01_par()

Expand Down

0 comments on commit 8c06dab

Please sign in to comment.