Skip to content

Commit

Permalink
Merge branch 'master' into hotfix/bump-dependency-0905
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoujinsong authored Oct 14, 2024
2 parents ee9eb00 + d430fe2 commit f6092b8
Show file tree
Hide file tree
Showing 104 changed files with 2,402 additions and 957 deletions.
8 changes: 4 additions & 4 deletions amoro-ams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@
<artifactId>url-connection-client</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-transfer-manager</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-transfer-manager</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.amoro.config.ConfigHelpers;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.dashboard.DashboardServer;
import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
import org.apache.amoro.server.dashboard.utils.AmsUtil;
import org.apache.amoro.server.dashboard.utils.CommonUtil;
Expand Down Expand Up @@ -242,6 +243,7 @@ private void initHttpService() {
config.addStaticFiles(dashboardServer.configStaticFiles());
config.sessionHandler(SessionHandler::new);
config.enableCorsForAllOrigins();
config.jsonMapper(JavalinJsonMapper.createDefaultJsonMapper());
config.showJavalinBanner = false;
});
httpServer.routes(
Expand Down Expand Up @@ -437,6 +439,8 @@ private void initServiceConfig(Map<String, Object> envConfig) throws Exception {

private Map<String, Object> initEnvConfig() {
LOG.info("initializing system env configuration...");
Map<String, String> envs = System.getenv();
envs.forEach((k, v) -> LOG.info("export {}={}", k, v));
String prefix = AmoroManagementConf.SYSTEM_CONFIG.toUpperCase();
return ConfigHelpers.convertConfigurationKeys(prefix, System.getenv());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.exception.ForbiddenException;
import org.apache.amoro.server.exception.IllegalTaskStateException;
import org.apache.amoro.server.exception.ObjectNotExistsException;
import org.apache.amoro.server.exception.PluginRetryAuthException;
import org.apache.amoro.server.exception.TaskNotFoundException;
Expand Down Expand Up @@ -147,8 +148,8 @@ private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
.forEach(groupName -> LOG.warn("Unloaded task runtime in group {}", groupName));
}

private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistency) {
if (needPersistency) {
private void registerOptimizer(OptimizerInstance optimizer, boolean needPersistent) {
if (needPersistent) {
doAs(OptimizerMapper.class, mapper -> mapper.insertOptimizer(optimizer));
}

Expand All @@ -163,13 +164,15 @@ private void unregisterOptimizer(String token) {
doAs(OptimizerMapper.class, mapper -> mapper.deleteOptimizer(token));
OptimizingQueue optimizingQueue = optimizingQueueByToken.remove(token);
OptimizerInstance optimizer = authOptimizers.remove(token);
optimizingQueue.removeOptimizer(optimizer);
if (optimizingQueue != null) {
optimizingQueue.removeOptimizer(optimizer);
}
}

@Override
public void ping() {}

public List<TaskRuntime> listTasks(String optimizerGroup) {
public List<TaskRuntime<?>> listTasks(String optimizerGroup) {
return getQueueByGroup(optimizerGroup).collectTasks();
}

Expand Down Expand Up @@ -201,7 +204,7 @@ private OptimizingTask extractOptimizingTask(
OptimizerThread optimizerThread = getAuthenticatedOptimizer(authToken).getThread(threadId);
task.schedule(optimizerThread);
LOG.info("OptimizerThread {} polled task {}", optimizerThread, task.getTaskId());
return task.getOptimizingTask();
return task.extractProtocolTask();
} catch (Throwable throwable) {
LOG.error("Schedule task {} failed, put it to retry queue", task.getTaskId(), throwable);
queue.retryTask(task);
Expand Down Expand Up @@ -559,16 +562,23 @@ public void run() {
}
}

private void retryTask(TaskRuntime task, OptimizingQueue queue) {
private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
LOG.info(
"Task {} is suspending, since it's optimizer is expired, put it to retry queue, optimizer {}",
task.getTaskId(),
task.getResourceDesc());
// optimizing task of suspending optimizer would not be counted for retrying
queue.retryTask(task);
try {
queue.retryTask(task);
} catch (IllegalTaskStateException e) {
LOG.error(
"Retry task {} failed due to {}, will check it in next round",
task.getTaskId(),
e.getMessage());
}
}

private Predicate<TaskRuntime> buildSuspendingPredication(Set<String> activeTokens) {
private Predicate<TaskRuntime<?>> buildSuspendingPredication(Set<String> activeTokens) {
return task ->
StringUtils.isNotBlank(task.getToken())
&& !activeTokens.contains(task.getToken())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.amoro.server.dashboard.controller.HealthCheckController;
import org.apache.amoro.server.dashboard.controller.LoginController;
import org.apache.amoro.server.dashboard.controller.OptimizerController;
import org.apache.amoro.server.dashboard.controller.OptimizerGroupController;
import org.apache.amoro.server.dashboard.controller.OverviewController;
import org.apache.amoro.server.dashboard.controller.PlatformFileInfoController;
import org.apache.amoro.server.dashboard.controller.SettingController;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class DashboardServer {
private final CatalogController catalogController;
private final HealthCheckController healthCheckController;
private final LoginController loginController;
private final OptimizerGroupController optimizerGroupController;
private final OptimizerController optimizerController;
private final PlatformFileInfoController platformFileInfoController;
private final SettingController settingController;
Expand All @@ -98,7 +100,8 @@ public DashboardServer(
this.catalogController = new CatalogController(tableService, platformFileManager);
this.healthCheckController = new HealthCheckController();
this.loginController = new LoginController(serviceConfig);
this.optimizerController = new OptimizerController(tableService, optimizerManager);
this.optimizerGroupController = new OptimizerGroupController(tableService, optimizerManager);
this.optimizerController = new OptimizerController(optimizerManager);
this.platformFileInfoController = new PlatformFileInfoController(platformFileManager);
this.settingController = new SettingController(serviceConfig, optimizerManager);
ServerTableDescriptor tableDescriptor = new ServerTableDescriptor(tableService, serviceConfig);
Expand Down Expand Up @@ -221,6 +224,9 @@ private EndpointGroup apiGroup() {
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes",
tableController::getOptimizingProcesses);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-types",
tableController::getOptimizingTypes);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes/{processId}/tasks",
tableController::getOptimizingProcessTasks);
Expand Down Expand Up @@ -272,28 +278,32 @@ private EndpointGroup apiGroup() {
path(
"/optimize",
() -> {
get("/actions", optimizerGroupController::getActions);
get(
"/optimizerGroups/{optimizerGroup}/tables",
optimizerController::getOptimizerTables);
get("/optimizerGroups/{optimizerGroup}/optimizers", optimizerController::getOptimizers);
get("/optimizerGroups", optimizerController::getOptimizerGroups);
optimizerGroupController::getOptimizerTables);
get(
"/optimizerGroups/{optimizerGroup}/optimizers",
optimizerGroupController::getOptimizers);
get("/optimizerGroups", optimizerGroupController::getOptimizerGroups);
get(
"/optimizerGroups/{optimizerGroup}/info",
optimizerController::getOptimizerGroupInfo);
delete(
"/optimizerGroups/{optimizerGroup}/optimizers/{jobId}",
optimizerController::releaseOptimizer);
optimizerGroupController::getOptimizerGroupInfo);
post(
"/optimizerGroups/{optimizerGroup}/optimizers",
optimizerController::scaleOutOptimizer);
get("/resourceGroups", optimizerController::getResourceGroup);
post("/resourceGroups", optimizerController::createResourceGroup);
put("/resourceGroups", optimizerController::updateResourceGroup);
delete("/resourceGroups/{resourceGroupName}", optimizerController::deleteResourceGroup);
optimizerGroupController::scaleOutOptimizer);
post("/optimizers", optimizerController::createOptimizer);
delete("/optimizers/{jobId}", optimizerController::releaseOptimizer);
get("/resourceGroups", optimizerGroupController::getResourceGroup);
post("/resourceGroups", optimizerGroupController::createResourceGroup);
put("/resourceGroups", optimizerGroupController::updateResourceGroup);
delete(
"/resourceGroups/{resourceGroupName}",
optimizerGroupController::deleteResourceGroup);
get(
"/resourceGroups/{resourceGroupName}/delete/check",
optimizerController::deleteCheckResourceGroup);
get("/containers/get", optimizerController::getContainers);
optimizerGroupController::deleteCheckResourceGroup);
get("/containers/get", optimizerGroupController::getContainers);
});

// console apis
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.dashboard;

import io.javalin.plugin.json.JsonMapper;
import org.apache.amoro.TableFormat;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.jetbrains.annotations.NotNull;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

/** Json mapper to adapt shaded jackson. */
public class JavalinJsonMapper implements JsonMapper {

private final ObjectMapper objectMapper;

public static JavalinJsonMapper createDefaultJsonMapper() {
ObjectMapper om = new ObjectMapper();
SimpleModule module = new SimpleModule();
module.addSerializer(TableFormat.class, new TableFormat.JsonSerializer());
module.addDeserializer(TableFormat.class, new TableFormat.JsonDeserializer());
om.registerModule(module);
return new JavalinJsonMapper(om);
}

public JavalinJsonMapper(ObjectMapper shadedMapper) {
this.objectMapper = shadedMapper;
}

@NotNull
@Override
public String toJsonString(@NotNull Object obj) {
if (obj instanceof String) {
return (String) obj;
}
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@NotNull
@Override
public InputStream toJsonStream(@NotNull Object obj) {
if (obj instanceof String) {
String result = (String) obj;
return new ByteArrayInputStream(result.getBytes());
} else {
byte[] string = new byte[0];
try {
string = objectMapper.writeValueAsBytes(obj);
return new ByteArrayInputStream(string);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

@NotNull
@Override
public <T> T fromJsonString(@NotNull String json, @NotNull Class<T> targetClass) {
try {
return objectMapper.readValue(json, targetClass);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@NotNull
@Override
public <T> T fromJsonStream(@NotNull InputStream json, @NotNull Class<T> targetClass) {
try {
return objectMapper.readValue(json, targetClass);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.HasTableOperations;
Expand Down Expand Up @@ -501,7 +503,7 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {

@Override
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, int limit, int offset) {
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
TableIdentifier tableIdentifier = amoroTable.id();
List<OptimizingProcessMeta> processMetaList =
getAs(
Expand All @@ -511,6 +513,16 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName()));

processMetaList =
processMetaList.stream()
.filter(
p ->
StringUtils.isBlank(type)
|| type.equalsIgnoreCase(p.getOptimizingType().getStatus().displayValue()))
.filter(p -> status == null || status.name().equalsIgnoreCase(p.getStatus().name()))
.collect(Collectors.toList());

int total = processMetaList.size();
processMetaList =
processMetaList.stream().skip(offset).limit(limit).collect(Collectors.toList());
Expand All @@ -532,6 +544,15 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
total);
}

@Override
public Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable) {
Map<String, String> types = Maps.newHashMap();
for (OptimizingType type : OptimizingType.values()) {
types.put(type.name(), type.getStatus().displayValue());
}
return types;
}

@Override
public List<OptimizingTaskInfo> getOptimizingTaskInfos(
AmoroTable<?> amoroTable, String processId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.table.TableService;
Expand Down Expand Up @@ -124,10 +125,11 @@ public List<ConsumerInfo> getTableConsumersInfos(TableIdentifier tableIdentifier
}

public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
TableIdentifier tableIdentifier, int limit, int offset) {
TableIdentifier tableIdentifier, String type, ProcessStatus status, int limit, int offset) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
return formatTableDescriptor.getOptimizingProcessesInfo(amoroTable, limit, offset);
return formatTableDescriptor.getOptimizingProcessesInfo(
amoroTable, type, status, limit, offset);
}

public List<OptimizingTaskInfo> getOptimizingProcessTaskInfos(
Expand All @@ -137,6 +139,12 @@ public List<OptimizingTaskInfo> getOptimizingProcessTaskInfos(
return formatTableDescriptor.getOptimizingTaskInfos(amoroTable, processId);
}

public Map<String, String> getTableOptimizingTypes(TableIdentifier tableIdentifier) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
return formatTableDescriptor.getTableOptimizingTypes(amoroTable);
}

private AmoroTable<?> loadTable(TableIdentifier identifier) {
ServerCatalog catalog = tableService.getServerCatalog(identifier.getCatalog());
return catalog.loadTable(identifier.getDatabase(), identifier.getTableName());
Expand Down
Loading

0 comments on commit f6092b8

Please sign in to comment.