Skip to content

Commit

Permalink
[improvement](statistics) Optimize drop stats operation (#30144)
Browse files Browse the repository at this point in the history
Before, drop stats operation need to call columns * followers times of isMaster() function and the same times of rpc to drop remote column stats. This pr is to reduce the rpc calls and use more efficient way to check master node instead of using isMaster()
  • Loading branch information
Jibing-Li authored Jan 22, 2024
1 parent f97dbd1 commit 25da056
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public class DropStatsStmt extends DdlStmt {
private Set<String> columnNames;
// Flag to drop external table row count in table_statistics.
private boolean dropTableRowCount;
private boolean isAllColumns;

private long catalogId;
private long dbId;
private long tblId;

public DropStatsStmt(boolean dropExpired) {
Expand Down Expand Up @@ -100,10 +103,13 @@ public void analyze(Analyzer analyzer) throws UserException {
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrAnalysisException(tblName);
tblId = table.getId();
dbId = db.getId();
catalogId = catalog.getId();
// check permission
checkAnalyzePriv(db.getFullName(), table.getName());
// check columnNames
if (columnNames != null) {
isAllColumns = false;
for (String cName : columnNames) {
if (table.getColumn(cName) == null) {
ErrorReport.reportAnalysisException(
Expand All @@ -115,6 +121,7 @@ public void analyze(Analyzer analyzer) throws UserException {
}
}
} else {
isAllColumns = true;
columnNames = table.getColumns().stream().map(Column::getName).collect(Collectors.toSet());
}
}
Expand All @@ -123,10 +130,22 @@ public long getTblId() {
return tblId;
}

public long getDbId() {
return dbId;
}

public long getCatalogIdId() {
return catalogId;
}

public Set<String> getColumnNames() {
return columnNames;
}

public boolean isAllColumns() {
return isAllColumns;
}

public boolean dropTableRowCount() {
return dropTableRowCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.InvalidateStatsTarget;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticsCacheKey;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
Expand Down Expand Up @@ -3041,8 +3044,13 @@ public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws

@Override
public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request) throws TException {
StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
Env.getCurrentEnv().getStatisticsCache().invalidate(k.tableId, k.idxId, k.colName);
InvalidateStatsTarget target = GsonUtils.GSON.fromJson(request.key, InvalidateStatsTarget.class);
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
TableStatsMeta tableStats = analysisManager.findTableStatsStatus(target.tableId);
if (tableStats == null) {
return new TStatus(TStatusCode.OK);
}
analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId, target.columns, tableStats);
return new TStatus(TStatusCode.OK);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -646,23 +649,16 @@ public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
}

Set<String> cols = dropStatsStmt.getColumnNames();
long catalogId = dropStatsStmt.getCatalogIdId();
long dbId = dropStatsStmt.getDbId();
long tblId = dropStatsStmt.getTblId();
TableStatsMeta tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
}
if (cols == null) {
tableStats.reset();
} else {
dropStatsStmt.getColumnNames().forEach(tableStats::removeColumn);
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
for (String col : cols) {
statisticsCache.syncInvalidate(tblId, -1L, col);
}
tableStats.updatedTime = 0;
}
tableStats.userInjected = false;
logCreateTableStats(tableStats);
invalidateLocalStats(catalogId, dbId, tblId, cols, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tblId, cols, dropStatsStmt.isAllColumns());
StatisticsRepository.dropStatistics(tblId, cols);
}

Expand All @@ -671,15 +667,55 @@ public void dropStats(TableIf table) throws DdlException {
if (tableStats == null) {
return;
}
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
Set<String> cols = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
invalidateLocalStats(catalogId, dbId, tableId, cols, tableStats);
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, cols, true);
StatisticsRepository.dropStatistics(table.getId(), cols);
}

public void invalidateLocalStats(long catalogId, long dbId, long tableId,
Set<String> columns, TableStatsMeta tableStats) {
if (tableStats == null) {
return;
}
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
for (String col : cols) {
tableStats.removeColumn(col);
statisticsCache.syncInvalidate(table.getId(), -1L, col);
if (columns == null) {
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tableId);
columns = table.getBaseSchema().stream().map(Column::getName).collect(Collectors.toSet());
}
for (String column : columns) {
tableStats.removeColumn(column);
statisticsCache.invalidate(tableId, -1, column);
}
tableStats.updatedTime = 0;
logCreateTableStats(tableStats);
StatisticsRepository.dropStatistics(table.getId(), cols);
tableStats.userInjected = false;
}

public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
Set<String> columns, boolean isAllColumns) {
InvalidateStatsTarget target = new InvalidateStatsTarget(catalogId, dbId, tableId, columns, isAllColumns);
TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest();
request.key = GsonUtils.GSON.toJson(target);
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
boolean success = true;
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
// Skip master
if (selfNode.equals(frontend.getHost())) {
continue;
}
success = success && statisticsCache.invalidateStats(frontend, request);
}
if (!success) {
// If any rpc failed, use edit log to sync table stats to non-master FEs.
LOG.warn("Failed to invalidate all remote stats by rpc for table {}, use edit log.", tableId);
TableStatsMeta tableStats = findTableStatsStatus(tableId);
logCreateTableStats(tableStats);
}
}

public void handleKillAnalyzeStmt(KillAnalysisJobStmt killAnalysisJobStmt) throws DdlException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics;

import com.google.gson.annotations.SerializedName;

import java.util.Set;

public class InvalidateStatsTarget {

@SerializedName("catalogId")
public final long catalogId;

@SerializedName("dbId")
public final long dbId;

@SerializedName("tableId")
public final long tableId;

@SerializedName("columns")
public final Set<String> columns;

public InvalidateStatsTarget(long catalogId, long dbId, long tableId, Set<String> columns, boolean isAllColumns) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
if (isAllColumns) {
this.columns = null;
} else {
this.columns = columns;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,6 @@ public void invalidate(long tblId, long idxId, String colName) {
columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(tblId, idxId, colName));
}

public void syncInvalidate(long tblId, long idxId, String colName) {
StatisticsCacheKey cacheKey = new StatisticsCacheKey(tblId, idxId, colName);
columnStatisticsCache.synchronous().invalidate(cacheKey);
TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest();
request.key = GsonUtils.GSON.toJson(cacheKey);
for (Frontend frontend : Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER)) {
if (StatisticsUtil.isMaster(frontend)) {
continue;
}
invalidateStats(frontend, request);
}
}

public void updateColStatsCache(long tblId, long idxId, String colName, ColumnStatistic statistic) {
columnStatisticsCache.synchronous().put(new StatisticsCacheKey(tblId, idxId, colName), Optional.of(statistic));
}
Expand Down Expand Up @@ -261,19 +248,21 @@ public void sendStats(Frontend frontend, TUpdateFollowerStatsCacheRequest update
}

@VisibleForTesting
public void invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) {
public boolean invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
client.invalidateStatsCache(request);
} catch (Throwable t) {
LOG.warn("Failed to sync invalidate to follower: {}", address, t);
return false;
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
return true;
}

public void putCache(StatisticsCacheKey k, ColumnStatistic c) {
Expand Down

0 comments on commit 25da056

Please sign in to comment.