Skip to content

Commit

Permalink
Merge branch 'master' into hotfix/bump-dependency-0905
Browse files Browse the repository at this point in the history
  • Loading branch information
czy006 authored Oct 17, 2024
2 parents f6092b8 + bd2485a commit beb6191
Show file tree
Hide file tree
Showing 33 changed files with 173 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ public class AmoroManagementConf {
.defaultValue(60000L)
.withDescription("Interval for refreshing table metadata.");

public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
ConfigOptions.key("refresh-tables.max-pending-partition-count")
.intType()
.defaultValue(100)
.withDescription("Filters will not be used beyond that number of partitions");

public static final ConfigOption<Long> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class DashboardServer {
public static final Logger LOG = LoggerFactory.getLogger(DashboardServer.class);

private static final String AUTH_TYPE_BASIC = "basic";
private static final String X_REQUEST_SOURCE_HEADER = "X-Request-Source";
private static final String X_REQUEST_SOURCE_WEB = "Web";

private final CatalogController catalogController;
private final HealthCheckController healthCheckController;
Expand Down Expand Up @@ -189,14 +191,13 @@ public EndpointGroup endpoints() {

// for dashboard api
path(
"/ams/v1",
"/api/ams/v1",
() -> {
// login controller
get("/login/current", loginController::getCurrent);
post("/login", loginController::login);
post("/logout", loginController::logout);
});
path("ams/v1", apiGroup());

// for open api
path("/api/ams/v1", apiGroup());
Expand Down Expand Up @@ -356,7 +357,8 @@ private EndpointGroup apiGroup() {

public void preHandleRequest(Context ctx) {
String uriPath = ctx.path();
if (needApiKeyCheck(uriPath)) {
String requestSource = ctx.header(X_REQUEST_SOURCE_HEADER);
if (needApiKeyCheck(uriPath) && !X_REQUEST_SOURCE_WEB.equalsIgnoreCase(requestSource)) {
if (AUTH_TYPE_BASIC.equalsIgnoreCase(authType)) {
BasicAuthCredentials cred = ctx.basicAuthCredentials();
if (!(basicAuthUser.equals(cred.component1())
Expand Down Expand Up @@ -392,10 +394,10 @@ public void handleException(Exception e, Context ctx) {
}

private static final String[] urlWhiteList = {
"/ams/v1/versionInfo",
"/ams/v1/login",
"/ams/v1/health/status",
"/ams/v1/login/current",
"/api/ams/v1/versionInfo",
"/api/ams/v1/login",
"/api/ams/v1/health/status",
"/api/ams/v1/login/current",
"/",
"/overview",
"/introduce",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ public void deleteCatalog(Context ctx) {
/** Construct a url */
private String constructCatalogConfigFileUrl(String catalogName, String type, String key) {
return String.format(
"/ams/v1/catalogs/%s/config/%s/%s", catalogName, type, key.replaceAll("\\.", "-"));
"/api/ams/v1/catalogs/%s/config/%s/%s", catalogName, type, key.replaceAll("\\.", "-"));
}

/** Get the config file content uri("/catalogs/{catalogName}/config/{type}/{key} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void uploadFile(Context ctx) throws IOException {
Integer fid = platformFileInfoService.addFile(name, content);
Map<String, String> result = new HashMap<>();
result.put("id", String.valueOf(fid));
result.put("url", "/ams/v1/files/" + fid);
result.put("url", "/api/ams/v1/files/" + fid);
ctx.json(OkResponse.of(result));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,6 @@ public void getOptimizingTypes(Context ctx) {
String db = ctx.pathParam("db");
String table = ctx.pathParam("table");
TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
Preconditions.checkState(serverCatalog.tableExists(db, table), "no such table");

Map<String, String> values =
tableDescriptor.getTableOptimizingTypes(tableIdentifier.buildTableIdentifier());
Expand All @@ -361,10 +359,8 @@ public void getOptimizingProcessTasks(Context ctx) {

int offset = (page - 1) * pageSize;
int limit = pageSize;
ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
Preconditions.checkArgument(offset >= 0, "offset[%s] must >= 0", offset);
Preconditions.checkArgument(limit >= 0, "limit[%s] must >= 0", limit);
Preconditions.checkState(serverCatalog.tableExists(db, table), "no such table");

TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
List<OptimizingTaskInfo> optimizingTaskInfos =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,12 +666,13 @@ private void cancelTasks() {
}

private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {
List<TaskRuntime<RewriteStageTask>> taskRuntimes =
getAs(
OptimizingMapper.class,
mapper ->
mapper.selectTaskRuntimes(tableRuntime.getTableIdentifier().getId(), processId));
try {
List<TaskRuntime<RewriteStageTask>> taskRuntimes =
getAs(
OptimizingMapper.class,
mapper ->
mapper.selectTaskRuntimes(
tableRuntime.getTableIdentifier().getId(), processId));
Map<Integer, RewriteFilesInput> inputs = TaskFilesPersistence.loadTaskInputs(processId);
taskRuntimes.forEach(
taskRuntime -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,18 @@ public class OptimizingEvaluator {
protected final MixedTable mixedTable;
protected final TableRuntime tableRuntime;
protected final TableSnapshot currentSnapshot;
protected final int maxPendingPartitions;
protected boolean isInitialized = false;

protected Map<String, PartitionEvaluator> needOptimizingPlanMap = Maps.newHashMap();
protected Map<String, PartitionEvaluator> partitionPlanMap = Maps.newHashMap();

public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
public OptimizingEvaluator(
TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
this.tableRuntime = tableRuntime;
this.mixedTable = table;
this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
this.maxPendingPartitions = maxPendingPartitions;
}

public TableRuntime getTableRuntime() {
Expand Down Expand Up @@ -137,6 +140,7 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
needOptimizingPlanMap.putAll(
partitionPlanMap.entrySet().stream()
.filter(entry -> entry.getValue().isNecessary())
.limit(maxPendingPartitions)
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public OptimizingPlanner(
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
super(tableRuntime, table);
super(tableRuntime, table, Integer.MAX_VALUE);
this.partitionFilter =
tableRuntime.getPendingInput() == null
? Expressions.alwaysTrue()
Expand Down Expand Up @@ -175,11 +175,11 @@ public List<RewriteStageTask> planTasks() {
tasks.addAll(partitionPlan.splitTasks((int) (actualInputSize / avgThreadCost)));
}
if (!tasks.isEmpty()) {
if (evaluators.stream()
.anyMatch(evaluator -> evaluator.getOptimizingType() == OptimizingType.FULL)) {
if (actualPartitionPlans.stream()
.anyMatch(plan -> plan.getOptimizingType() == OptimizingType.FULL)) {
optimizingType = OptimizingType.FULL;
} else if (evaluators.stream()
.anyMatch(evaluator -> evaluator.getOptimizingType() == OptimizingType.MAJOR)) {
} else if (actualPartitionPlans.stream()
.anyMatch(plan -> plan.getOptimizingType() == OptimizingType.MAJOR)) {
optimizingType = OptimizingType.MAJOR;
} else {
optimizingType = OptimizingType.MINOR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void setup(TableManager tableManager, Configurations conf) {
new TableRuntimeRefreshExecutor(
tableManager,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL));
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends BaseTableExecutor {

// 1 minutes
private final long interval;
private final int maxPendingPartitions;

public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize, long interval) {
public TableRuntimeRefreshExecutor(
TableManager tableRuntimes, int poolSize, long interval, int maxPendingPartitions) {
super(tableRuntimes, poolSize);
this.interval = interval;
this.maxPendingPartitions = maxPendingPartitions;
}

@Override
Expand All @@ -48,7 +51,8 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {

private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable table) {
if (tableRuntime.isOptimizingEnabled() && !tableRuntime.getOptimizingStatus().isProcessing()) {
OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime, table);
OptimizingEvaluator evaluator =
new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
if (evaluator.isNecessary()) {
OptimizingEvaluator.PendingInput pendingInput = evaluator.getOptimizingPendingInput();
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected void reboot() throws InterruptedException {
private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {

public TableRuntimeRefresher() {
super(tableService(), 1, Integer.MAX_VALUE);
super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
}

void refreshPending() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void appendData(UnkeyedTable table, int id) {

void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testFragmentFiles() {
}

protected OptimizingEvaluator buildOptimizingEvaluator() {
return new OptimizingEvaluator(getTableRuntime(), getMixedTable());
return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
}

protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private void appendPosDelete(UnkeyedTable table) {

void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ public void dropDatabase(String database) {

@Override
public AmoroTable<?> loadTable(String database, String table) {
if (!databaseExists(database)) {
throw new NoSuchDatabaseException("Database: " + database + " does not exist.");
}

return formatCatalogAsOrder(
TableFormat.MIXED_HIVE,
TableFormat.MIXED_ICEBERG,
Expand Down
26 changes: 13 additions & 13 deletions amoro-web/mock/modules/catalogs.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

export default [
{
url: '/mock/ams/v1/catalogs',
url: '/mock/api/ams/v1/catalogs',
method: 'get',
response: () => ({
"message": "success",
Expand Down Expand Up @@ -56,27 +56,27 @@ export default [
}),
},
{
url: '/mock/ams/v1/catalogs',
url: '/mock/api/ams/v1/catalogs',
method: 'post',
response: () => ({"message":"success","code":200,"result":""}),
},

{
url: '/mock/ams/v1/catalogs/test_catalog/databases',
url: '/mock/api/ams/v1/catalogs/test_catalog/databases',
method: 'get',
response: () => {
return { "message": "success", "code": 200, "result": ["db", "test", "acc"] }
},
},
{
url: '/mock/ams/v1/catalogs/test_catalog/databases/db/tables',
url: '/mock/api/ams/v1/catalogs/test_catalog/databases/db/tables',
method: 'get',
response: () => {
return { "message": "success", "code": 200, "result": [{ "name": "user", "type": "ICEBERG" },{ "name": "wf", "type": "ICEBERG" }, { "name": "xcvz", "type": "ICEBERG" }] };
},
},
{
url: '/mock/ams/v1/catalogs/:id',
url: '/mock/api/ams/v1/catalogs/:id',
method: 'get',
response: () => ({
"message": "success",
Expand All @@ -92,15 +92,15 @@ export default [
"storage.type": "Hadoop",
"hive.site": {
"fileName": "hive-site.xml",
"fileUrl": "/ams/v1/catalogs/test_catalog/config/storage-config/hive-site"
"fileUrl": "/api/ams/v1/catalogs/test_catalog/config/storage-config/hive-site"
},
"hadoop.core.site": {
"fileName": "core-site.xml",
"fileUrl": "/ams/v1/catalogs/test_catalog/config/storage-config/hadoop-core-site"
"fileUrl": "/api/ams/v1/catalogs/test_catalog/config/storage-config/hadoop-core-site"
},
"hadoop.hdfs.site": {
"fileName": "hdfs-site.xml",
"fileUrl": "/ams/v1/catalogs/test_catalog/config/storage-config/hadoop-hdfs-site"
"fileUrl": "/api/ams/v1/catalogs/test_catalog/config/storage-config/hadoop-hdfs-site"
}
},
"authConfig": {
Expand All @@ -116,22 +116,22 @@ export default [
}),
},
{
url: '/mock/ams/v1/catalogs/:id',
url: '/mock/api/ams/v1/catalogs/:id',
method: 'put',
response: () => ({ "message": "success", "code": 200, "result": null }),
},
{
url: '/mock/ams/v1/catalogs/:id',
url: '/mock/api/ams/v1/catalogs/:id',
method: 'delete',
response: () => ({ "message": "success", "code": 200, "result": true }),
},
{
url: '/mock/ams/v1/catalogs/:id/delete/check',
url: '/mock/api/ams/v1/catalogs/:id/delete/check',
method: 'get',
response: () => ({ "message": "success", "code": 200, "result": true }),
},
{
url: '/mock/ams/v1/catalogs/metastore/types',
url: '/mock/api/ams/v1/catalogs/metastore/types',
method: 'get',
response: () => ({
"message": "success",
Expand Down Expand Up @@ -161,7 +161,7 @@ export default [
}),
},
{
url: '/mock/ams/v1/tables/catalogs/:catalog/dbs/:dbId/tables/:tableName/optimizing-processes/:processesId/tasks',
url: '/mock/api/ams/v1/tables/catalogs/:catalog/dbs/:dbId/tables/:tableName/optimizing-processes/:processesId/tasks',
method: 'get',
response: () => ({
"message": "success",
Expand Down
10 changes: 5 additions & 5 deletions amoro-web/mock/modules/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

export default [
{
url: '/mock/ams/v1/login/current',
url: '/mock/api/ams/v1/login/current',
method: 'get',
response: () => ({
code: 200,
Expand All @@ -30,7 +30,7 @@ export default [
}),
},
{
url: '/mock/ams/v1/login',
url: '/mock/api/ams/v1/login',
method: 'post',
response: () => ({
code: 200,
Expand All @@ -39,7 +39,7 @@ export default [
}),
},
{
url: '/mock/ams/v1/versionInfo',
url: '/mock/api/ams/v1/versionInfo',
method: 'get',
response: () => ({
code: 200,
Expand All @@ -51,7 +51,7 @@ export default [
}),
},
{
url: '/mock/ams/v1/upgrade/properties',
url: '/mock/api/ams/v1/upgrade/properties',
method: 'get',
response: () => ({
code: 200,
Expand All @@ -63,7 +63,7 @@ export default [
})
},
{
url: '/mock/ams/v1/logout',
url: '/mock/api/ams/v1/logout',
method: 'post',
response: () => ({
code: 200,
Expand Down
Loading

0 comments on commit beb6191

Please sign in to comment.