diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/ComputerDisAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/ComputerDisAPI.java new file mode 100644 index 0000000000..0d0b2a77f9 --- /dev/null +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/ComputerDisAPI.java @@ -0,0 +1,238 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.api.job; + +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.api.API; +import com.baidu.hugegraph.api.filter.StatusFilter.Status; +import com.baidu.hugegraph.backend.id.IdGenerator; +import com.baidu.hugegraph.backend.page.PageInfo; +import com.baidu.hugegraph.core.GraphManager; +import com.baidu.hugegraph.define.Checkable; +import com.baidu.hugegraph.job.ComputerDisJob; +import com.baidu.hugegraph.job.JobBuilder; +import com.baidu.hugegraph.k8s.K8sDriverProxy; +import com.baidu.hugegraph.server.RestServer; +import com.baidu.hugegraph.task.HugeTask; +import com.baidu.hugegraph.task.TaskScheduler; +import com.baidu.hugegraph.task.TaskStatus; +import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.JsonUtil; +import com.baidu.hugegraph.util.Log; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import org.apache.groovy.util.Maps; +import org.slf4j.Logger; + +import javax.inject.Singleton; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static com.baidu.hugegraph.backend.query.Query.NO_LIMIT; + +@Path("graphs/{graph}/jobs/computerdis") +@Singleton +public class ComputerDisAPI extends API { + + private static final Logger LOG = Log.logger(RestServer.class); + + @POST + @Timed + @Status(Status.CREATED) + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON_WITH_CHARSET) + public Map post(@Context GraphManager manager, + @PathParam("graph") String graph, + JsonTask jsonTask) { + LOG.debug("Schedule computer dis job: {}", jsonTask); + E.checkArgument(K8sDriverProxy.isK8sApiEnabled() == true, + "The k8s api is not enable."); + checkCreatingBody(jsonTask); + + // username is "" means generate token from current context + String token = manager.authManager().createToken(""); + Map input = ImmutableMap.of( + "graph", graph, + "algorithm", jsonTask.algorithm, + "params", jsonTask.params, + "worker", jsonTask.worker, + "token", token); + HugeGraph g = graph(manager, graph); + JobBuilder builder = JobBuilder.of(g); + builder.name("computer-dis:" + jsonTask.algorithm) + .input(JsonUtil.toJson(input)) + .job(new ComputerDisJob()); + HugeTask task = builder.schedule(); + return ImmutableMap.of("task_id", task.id()); + } + + @DELETE + @Timed + @Path("/{id}") + @Status(Status.OK) + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON_WITH_CHARSET) + public Map delete(@Context GraphManager manager, + @PathParam("graph") String graph, + @PathParam("id") String id) { + LOG.debug("Graph [{}] delete computer job: {}", graph, id); + E.checkArgument(K8sDriverProxy.isK8sApiEnabled() == true, + "The k8s api is not enable."); + E.checkArgument(id != null && !id.isEmpty(), + "The computer name can't be empty"); + + TaskScheduler scheduler = graph(manager, graph).taskScheduler(); + HugeTask task = scheduler.task(IdGenerator.of(id)); + E.checkArgument(ComputerDisJob.COMPUTER_DIS.equals(task.type()), + "The task is not computer-dis task."); + + scheduler.delete(IdGenerator.of(id)); + return ImmutableMap.of("task_id", id, "message", "success"); + } + + @PUT + @Timed + @Path("/{id}") + @Status(Status.ACCEPTED) + @Consumes(APPLICATION_JSON) + @Produces(APPLICATION_JSON_WITH_CHARSET) + public Map cancel(@Context GraphManager manager, + @PathParam("graph") String graph, + @PathParam("id") String id) { + LOG.debug("Graph [{}] cancel computer job: {}", graph, id); + E.checkArgument(K8sDriverProxy.isK8sApiEnabled() == true, + "The k8s api is not enable."); + E.checkArgument(id != null && !id.isEmpty(), + "The computer name can't be empty."); + + TaskScheduler scheduler = graph(manager, graph).taskScheduler(); + HugeTask task = scheduler.task(IdGenerator.of(id)); + E.checkArgument(ComputerDisJob.COMPUTER_DIS.equals(task.type()), + "The task is not computer-dis task."); + + if (!task.completed() && !task.cancelling()) { + scheduler.cancel(task); + if (task.cancelling()) { + return task.asMap(); + } + } + + assert task.completed() || task.cancelling(); + return ImmutableMap.of("task_id", id); + } + + @GET + @Timed + @Path("/{id}") + @Produces(APPLICATION_JSON_WITH_CHARSET) + public Map get(@Context GraphManager manager, + @PathParam("graph") String graph, + @PathParam("id") long id) { + LOG.debug("Graph [{}] get task info", graph); + E.checkArgument(K8sDriverProxy.isK8sApiEnabled() == true, + "The k8s api is not enable."); + TaskScheduler scheduler = graph(manager, graph).taskScheduler(); + HugeTask task = scheduler.task(IdGenerator.of(id)); + E.checkArgument(ComputerDisJob.COMPUTER_DIS.equals(task.type()), + "The task is not computer-dis task."); + return task.asMap(); + } + + @GET + @Timed + @Produces(APPLICATION_JSON_WITH_CHARSET) + public Map list(@Context GraphManager manager, + @PathParam("graph") String graph, + @QueryParam("limit") + @DefaultValue("100") long limit, + @QueryParam("page") String page) { + LOG.debug("Graph [{}] get task list", graph); + E.checkArgument(K8sDriverProxy.isK8sApiEnabled() == true, + "The k8s api is not enable."); + + TaskScheduler scheduler = graph(manager, graph).taskScheduler(); + Iterator> iter = scheduler.tasks(null, + NO_LIMIT, page); + List tasks = new ArrayList<>(); + while (iter.hasNext()) { + HugeTask task = iter.next(); + if (ComputerDisJob.COMPUTER_DIS.equals(task.type())) { + tasks.add(task.asMap(false)); + } + } + if (limit != NO_LIMIT && tasks.size() > limit) { + tasks = tasks.subList(0, (int) limit); + } + + if (page == null) { + return Maps.of("tasks", tasks); + } else { + return Maps.of("tasks", tasks, "page", PageInfo.pageInfo(iter)); + } + } + + private static class JsonTask implements Checkable { + + @JsonProperty("algorithm") + public String algorithm; + @JsonProperty("worker") + public int worker; + @JsonProperty("params") + public Map params; + + @Override + public void checkCreate(boolean isBatch) { + E.checkArgument(this.algorithm != null && + K8sDriverProxy.isValidAlgorithm(this.algorithm), + "The algorithm is not existed."); + E.checkArgument(this.worker >= 1 && + this.worker <= 100, + "The worker should be in [1, 100]."); + } + + @Override + public void checkUpdate() {} + } + + private static TaskStatus parseStatus(String status) { + try { + return TaskStatus.valueOf(status); + } catch (Exception e) { + throw new IllegalArgumentException(String.format( + "Status value must be in %s, but got '%s'", + Arrays.asList(TaskStatus.values()), status)); + } + } +} diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java index 6ca8c4ea81..82b991a450 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/auth/HugeGraphAuthProxy.java @@ -39,6 +39,7 @@ import javax.ws.rs.ForbiddenException; import javax.ws.rs.NotAuthorizedException; +import org.apache.commons.lang3.StringUtils; import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyTranslator; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.traversal.Bytecode; @@ -1517,6 +1518,18 @@ public void logoutUser(String token) { this.authManager.logoutUser(token); } + @Override + public String createToken(String username) { + if (StringUtils.isEmpty(username)) { + Context context = getContext(); + E.checkState(context != null, + "Missing authentication context " + + "when verifying resource permission"); + username = context.user().username(); + } + return this.authManager.createToken(username); + } + private void switchAuthManager(AuthManager authManager) { this.authManager = authManager; HugeGraphAuthProxy.this.hugegraph.switchAuthManager(authManager); diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index 2570592637..8e622b6180 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -274,4 +274,72 @@ public static synchronized ServerOptions instance() { null, "hugegraph" ); + + public static final ConfigOption K8S_API_ENABLE = + new ConfigOption<>( + "k8s.api", + "The k8s api start status " + + "when the computer service is enabled.", + null, + "false" + ); + + public static final ConfigOption K8S_KUBE_CONFIG = + new ConfigOption<>( + "k8s.kubeconfig", + "The k8s kube config file " + + "when the computer service is enabled.", + null, + "" + ); + + public static final ConfigOption K8S_HUGEGRAPH_URL = + new ConfigOption<>( + "k8s.hugegraph_url", + "The hugegraph url for k8s work " + + "when the computer service is enabled.", + null, + "" + ); + + public static final ConfigOption K8S_NAMESPACE = + new ConfigOption<>( + "k8s.namespace", + "The hugegraph url for k8s work " + + "when the computer service is enabled.", + null, + "" + ); + + public static final ConfigOption K8S_ENABLE_INTERNAL_ALGORITHM = + new ConfigOption<>( + "k8s.enable_internal_algorithm", + "Open k8s internal algorithm", + null, + "false" + ); + + public static final ConfigOption K8S_INTERNAL_ALGORITHM_IMAGE_URL = + new ConfigOption<>( + "k8s.internal_algorithm_image_url", + "K8s internal algorithm image url", + null, + "" + ); + + public static final ConfigOption K8S_INTERNAL_ALGORITHM = + new ConfigOption<>( + "k8s.internal_algorithm", + "K8s internal algorithm", + null, + "[]" + ); + + public static final ConfigListOption K8S_ALGORITHMS = + new ConfigListOption<>( + "k8s.algorithms", + "K8s algorithms", + null, + "[]" + ); } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java index 4ef4dc4cb1..09dc3cd081 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java @@ -22,11 +22,14 @@ import java.io.IOException; import java.net.URI; import java.util.Collection; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import javax.ws.rs.core.UriBuilder; +import com.baidu.hugegraph.k8s.K8sDriverProxy; +import org.apache.commons.lang3.StringUtils; import org.glassfish.grizzly.CompletionHandler; import org.glassfish.grizzly.GrizzlyFuture; import org.glassfish.grizzly.http.server.HttpServer; @@ -60,6 +63,27 @@ public RestServer(HugeConfig conf, EventHub hub) { public void start() throws IOException { String url = this.conf.get(ServerOptions.REST_SERVER_URL); URI uri = UriBuilder.fromUri(url).build(); + String k8sApiEnable = this.conf.get(ServerOptions.K8S_API_ENABLE); + if (!StringUtils.isEmpty(k8sApiEnable) && + k8sApiEnable.equals("true")) { + String namespace = this.conf.get(ServerOptions.K8S_NAMESPACE); + String kubeConfigPath = this.conf.get( + ServerOptions.K8S_KUBE_CONFIG); + String hugegraphUrl = this.conf.get( + ServerOptions.K8S_HUGEGRAPH_URL); + String enableInternalAlgorithm = this.conf.get( + ServerOptions.K8S_ENABLE_INTERNAL_ALGORITHM); + String internalAlgorithmImageUrl = this.conf.get( + ServerOptions.K8S_INTERNAL_ALGORITHM_IMAGE_URL); + String internalAlgorithm = this.conf.get( + ServerOptions.K8S_INTERNAL_ALGORITHM); + Map algorithms = this.conf.getMap( + ServerOptions.K8S_ALGORITHMS); + K8sDriverProxy.setConfig(namespace, kubeConfigPath, + hugegraphUrl, enableInternalAlgorithm, + internalAlgorithmImageUrl, + internalAlgorithm, algorithms); + } ResourceConfig rc = new ApplicationConfig(this.conf, this.eventHub); diff --git a/hugegraph-cassandra/pom.xml b/hugegraph-cassandra/pom.xml index d4b5a99249..d7680c3f4b 100644 --- a/hugegraph-cassandra/pom.xml +++ b/hugegraph-cassandra/pom.xml @@ -50,6 +50,10 @@ org.eclipse.jdt.core.compiler ecj + + org.yaml + snakeyaml + diff --git a/hugegraph-core/pom.xml b/hugegraph-core/pom.xml index 40fe316a54..c1bdaa874e 100644 --- a/hugegraph-core/pom.xml +++ b/hugegraph-core/pom.xml @@ -26,6 +26,12 @@ org.apache.tinkerpop gremlin-core + + + org.yaml + snakeyaml + + org.apache.tinkerpop @@ -197,6 +203,16 @@ jetcd-core 0.5.2 + + io.fabric8 + kubernetes-client + ${fabric8.version} + + + com.baidu.hugegraph + computer-k8s + 0.1.0 + diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/auth/AuthManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/auth/AuthManager.java index 376478c93a..d19c672549 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/auth/AuthManager.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/auth/AuthManager.java @@ -86,6 +86,7 @@ public String loginUser(String username, String password, long expire) throws AuthenticationException; public void logoutUser(String token); + public String createToken(String username); public UserWithRole validateUser(String username, String password); public UserWithRole validateUser(String token); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/auth/StandardAuthManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/auth/StandardAuthManager.java index ea12c3eafc..e3c9e9eb18 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/auth/StandardAuthManager.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/auth/StandardAuthManager.java @@ -679,6 +679,22 @@ public void logoutUser(String token) { this.tokenCache.invalidate(IdGenerator.of(token)); } + @Override + public String createToken(String username) { + HugeUser user = this.findUser(username); + if (user == null) { + return null; + } + + Map payload = ImmutableMap.of(AuthConstant.TOKEN_USER_NAME, + username, + AuthConstant.TOKEN_USER_ID, + user.id.asString()); + String token = this.tokenGenerator.create(payload, this.tokenExpire); + this.tokenCache.update(IdGenerator.of(token), username); + return token; + } + @Override public UserWithRole validateUser(String username, String password) { HugeUser user = this.matchUser(username, password); diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/ComputerDisJob.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/ComputerDisJob.java new file mode 100644 index 0000000000..36c3c56ef8 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/ComputerDisJob.java @@ -0,0 +1,151 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.job; + +import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.computer.driver.JobStatus; +import com.baidu.hugegraph.job.computer.Computer; +import com.baidu.hugegraph.job.computer.ComputerPool; +import com.baidu.hugegraph.k8s.K8sDriverProxy; +import com.baidu.hugegraph.util.E; +import com.baidu.hugegraph.util.JsonUtil; +import com.baidu.hugegraph.util.Log; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +public class ComputerDisJob extends SysJob { + + private static final Logger LOG = Log.logger(ComputerDisJob.class); + + public static final String COMPUTER_DIS = "computer-dis"; + public static final String INNER_STATUS = "inner.status"; + public static final String INNER_JOB_ID = "inner.job.id"; + + public static boolean check(String name, Map parameters) { + Computer computer = ComputerPool.instance().find(name); + if (computer == null) { + return false; + } + computer.checkParameters(parameters); + return true; + } + + @Override + public String type() { + return COMPUTER_DIS; + } + + @Override + protected void cancelled() { + super.cancelled(); + String input = this.task().input(); + E.checkArgumentNotNull(input, "The input can't be null"); + @SuppressWarnings("unchecked") + Map map = JsonUtil.fromJson(input, Map.class); + String algorithm = map.get("algorithm").toString(); + String graph = map.get("graph").toString(); + String token = map.get("token").toString(); + int worker = Integer.valueOf(map.get("worker").toString()); + Object value = map.get("params"); + E.checkArgument(value instanceof Map, + "Invalid computer parameters '%s'", + value); + @SuppressWarnings("unchecked") + Map params = (Map) value; + Map k8sParams = new HashMap<>(); + for (Map.Entry item : params.entrySet()) { + k8sParams.put(item.getKey(), item.getValue().toString()); + } + + k8sParams.put("hugegraph.name", graph); + k8sParams.put("hugegraph.token", token); + k8sParams.put("k8s.worker_instances", String.valueOf(worker)); + if (map.containsKey(INNER_JOB_ID)) { + String jobId = (String) map.get(INNER_JOB_ID); + K8sDriverProxy k8sDriverProxy = + new K8sDriverProxy(String.valueOf(worker * 2), + algorithm); + k8sDriverProxy.getKubernetesDriver().cancelJob(jobId, k8sParams); + k8sDriverProxy.close(); + } + } + + @Override + public Object execute() throws Exception { + String input = this.task().input(); + E.checkArgumentNotNull(input, "The input can't be null"); + @SuppressWarnings("unchecked") + Map map = JsonUtil.fromJson(input, Map.class); + String status = map.containsKey(INNER_STATUS) ? + map.get(INNER_STATUS).toString() : null; + String jobId = map.containsKey(INNER_JOB_ID) ? + map.get(INNER_JOB_ID).toString() : null; + Object value = map.get("params"); + E.checkArgument(value instanceof Map, + "Invalid computer parameters '%s'", + value); + @SuppressWarnings("unchecked") + Map params = (Map) value; + String algorithm = map.get("algorithm").toString(); + String graph = map.get("graph").toString(); + String token = map.get("token").toString(); + int worker = Integer.valueOf(map.get("worker").toString()); + + Map k8sParams = new HashMap<>(); + for (Map.Entry item : params.entrySet()) { + k8sParams.put(item.getKey(), item.getValue().toString()); + } + k8sParams.put("hugegraph.name", graph); + k8sParams.put("hugegraph.token", token); + k8sParams.put("k8s.worker_instances", String.valueOf(worker)); + if (status == null) { + // TODO: DO TASK + K8sDriverProxy k8sDriverProxy = + new K8sDriverProxy(String.valueOf(worker * 2), + algorithm); + if (jobId == null) { + jobId = k8sDriverProxy.getKubernetesDriver() + .submitJob(algorithm, k8sParams); + map = JsonUtil.fromJson(this.task().input(), Map.class); + map.put(INNER_JOB_ID, jobId); + this.task().input(JsonUtil.toJson(map)); + } + + k8sDriverProxy.getKubernetesDriver() + .waitJob(jobId, k8sParams, observer -> { + JobStatus jobStatus = observer.jobStatus(); + Map innerMap = JsonUtil.fromJson( + this.task().input(), Map.class); + innerMap.put(INNER_STATUS, jobStatus); + this.task().input(JsonUtil.toJson(innerMap)); + }); + k8sDriverProxy.close(); + } + + map = JsonUtil.fromJson(this.task().input(), Map.class); + status = map.get(INNER_STATUS).toString(); + if (status != null && status.equals("FAILED")) { + throw new Exception("Computer-dis job failed."); + } + return status; + } +} diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/k8s/K8sDriverProxy.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/k8s/K8sDriverProxy.java new file mode 100644 index 0000000000..54a238a406 --- /dev/null +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/k8s/K8sDriverProxy.java @@ -0,0 +1,125 @@ +package com.baidu.hugegraph.k8s; + +import com.baidu.hugegraph.computer.k8s.driver.KubernetesDriver; +import com.baidu.hugegraph.config.HugeConfig; +import com.baidu.hugegraph.config.OptionSpace; +import com.baidu.hugegraph.util.Log; +import org.apache.commons.configuration.MapConfiguration; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class K8sDriverProxy { + + private static final Logger LOG = Log.logger(K8sDriverProxy.class); + + private static final String USER_DIR = System.getProperty("user.dir"); + + private static boolean K8S_API_ENABLED = false; + + private static String NAMESPACE = ""; + private static String CUBE_CONFIG_PATH = ""; + private static String HUGEGRAPH_URL = ""; + private static String ENABLE_INTERNAL_ALGORITHM = ""; + private static String INTERNAL_ALGORITHM_IMAGE_URL = ""; + private static Map ALGORITHM_PARAMS = null; + private static String INTERNAL_ALGORITHM = "[]"; + + protected HugeConfig config; + protected KubernetesDriver driver; + + static { + OptionSpace.register("computer-driver", + "com.baidu.hugegraph.computer.driver.config" + + ".ComputerOptions"); + OptionSpace.register("computer-k8s-driver", + "com.baidu.hugegraph.computer.k8s.config" + + ".KubeDriverOptions"); + OptionSpace.register("computer-k8s-spec", + "com.baidu.hugegraph.computer.k8s.config" + + ".KubeSpecOptions"); + } + + public static void setConfig(String namespace, String cubeConfigPath, + String hugegraphUrl, + String enableInternalAlgorithm, + String internalAlgorithmImageUrl, + String internalAlgorithm, + Map algorithms) + throws IOException { + File kubeConfigFile = new File(USER_DIR + "/" + cubeConfigPath); + if (!kubeConfigFile.exists() || StringUtils.isEmpty(hugegraphUrl)) { + throw new IOException("[K8s API] k8s config fail"); + } + + K8S_API_ENABLED = true; + NAMESPACE = namespace; + CUBE_CONFIG_PATH = USER_DIR + "/" + cubeConfigPath; + HUGEGRAPH_URL = hugegraphUrl; + ENABLE_INTERNAL_ALGORITHM = enableInternalAlgorithm; + INTERNAL_ALGORITHM_IMAGE_URL = internalAlgorithmImageUrl; + ALGORITHM_PARAMS = algorithms; + INTERNAL_ALGORITHM = internalAlgorithm; + } + + public static boolean isK8sApiEnabled() { + return K8S_API_ENABLED; + } + + public static boolean isValidAlgorithm(String algorithm) { + return ALGORITHM_PARAMS.containsKey(algorithm); + } + + public K8sDriverProxy(String partitionsCount, String algorithm) { + try { + if (!K8sDriverProxy.K8S_API_ENABLED) { + throw new UnsupportedOperationException( + "The k8s api not enabled."); + } + String paramsClass = ALGORITHM_PARAMS.get(algorithm); + this.initConfig(partitionsCount, INTERNAL_ALGORITHM, paramsClass); + this.initKubernetesDriver(); + } catch (Throwable throwable) { + LOG.error("Failed to start K8sDriverProxy ", throwable); + } + } + + protected void initConfig(String partitionsCount, + String internalAlgorithm, + String paramsClass) { + HashMap options = new HashMap<>(); + + // from configuration + options.put("k8s.namespace", K8sDriverProxy.NAMESPACE); + options.put("k8s.kube_config", K8sDriverProxy.CUBE_CONFIG_PATH); + options.put("hugegraph.url", K8sDriverProxy.HUGEGRAPH_URL); + options.put("k8s.enable_internal_algorithm", + K8sDriverProxy.ENABLE_INTERNAL_ALGORITHM); + options.put("k8s.internal_algorithm_image_url", + K8sDriverProxy.INTERNAL_ALGORITHM_IMAGE_URL); + + // from rest api params + // partitionsCount >= worker_instances + options.put("job.partitions_count", partitionsCount); + options.put("k8s.internal_algorithm", internalAlgorithm); + options.put("algorithm.params_class", paramsClass); + MapConfiguration mapConfig = new MapConfiguration(options); + this.config = new HugeConfig(mapConfig); + } + + protected void initKubernetesDriver() { + this.driver = new KubernetesDriver(this.config); + } + + public KubernetesDriver getKubernetesDriver() { + return this.driver; + } + + public void close() { + this.driver.close(); + } +} diff --git a/hugegraph-dist/src/assembly/static/conf/rest-server.properties b/hugegraph-dist/src/assembly/static/conf/rest-server.properties index 83ab4bc4d2..c8f2504006 100644 --- a/hugegraph-dist/src/assembly/static/conf/rest-server.properties +++ b/hugegraph-dist/src/assembly/static/conf/rest-server.properties @@ -5,7 +5,8 @@ restserver.url=http://127.0.0.1:8080 #restserver.request_timeout=30 cluster=hg -etcds=[http://127.0.0.1:2379] +meta.endpoints=[http://127.0.0.1:2379] +graph.load_from_local_config=true graphs=./conf/graphs # The maximum thread ratio for batch writing, only take effect if the batch.max_write_threads is 0 @@ -39,6 +40,30 @@ rpc.remote_url=127.0.0.1:8090 #rpc.client_retries=3 #rpc.client_load_balancer=consistentHash +# k8s api, default not support +k8s.api=false +k8s.namespace=hugegraph-computer-system +k8s.kubeconfig=conf/kube.kubeconfig +k8s.hugegraph_url=127.0.0.1:8080 +k8s.enable_internal_algorithm=true +k8s.internal_algorithm_image_url=hugegraph/hugegraph-computer-based-algorithm:beta1 +k8s.internal_algorithm=[page-rank, degree-centrality, wcc, triangle-count, rings, rings_with_filter, betweenness-centrality, closeness-centrality, lpa, links, kcore, louvain, clustering-coefficient] +k8s.algorithms=[ \ + page-rank:com.baidu.hugegraph.computer.algorithm.centrality.pagerank.PageRankParams, \ + degree-centrality:com.baidu.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityParams, \ + wcc:com.baidu.hugegraph.computer.algorithm.community.wcc.WccParams, \ + triangle-count:com.baidu.hugegraph.computer.algorithm.community.trianglecount.TriangleCountParams, \ + rings:com.baidu.hugegraph.computer.algorithm.path.rings.RingsDetectionParams, \ + rings-with-filter:com.baidu.hugegraph.computer.algorithm.path.rings.filter.RingsDetectionWithFilterParams, \ + betweenness-centrality:com.baidu.hugegraph.computer.algorithm.centrality.betweenness.BetweennessCentralityParams, \ + closeness-centrality:com.baidu.hugegraph.computer.algorithm.centrality.closeness.ClosenessCentralityParams, \ + lpa:com.baidu.hugegraph.computer.algorithm.community.lpa.LpaParams, \ + links:com.baidu.hugegraph.computer.algorithm.path.links.LinksParams, \ + kcore:com.baidu.hugegraph.computer.algorithm.community.kcore.KCoreParams, \ + louvain:com.baidu.hugegraph.computer.algorithm.community.louvain.LouvainParams, \ + clustering-coefficient:com.baidu.hugegraph.computer.algorithm.community.cc.ClusteringCoefficientParams \ +] + # lightweight load balancing (beta) server.id=server-1 server.role=master diff --git a/hugegraph-dist/src/assembly/travis/run-api-test.sh b/hugegraph-dist/src/assembly/travis/run-api-test.sh index 1e36154106..3629797f94 100755 --- a/hugegraph-dist/src/assembly/travis/run-api-test.sh +++ b/hugegraph-dist/src/assembly/travis/run-api-test.sh @@ -16,7 +16,7 @@ mvn package -DskipTests # config rest-server sed -i 's/#auth.authenticator=/auth.authenticator=com.baidu.hugegraph.auth.StandardAuthenticator/' $REST_SERVER_CONF sed -i 's/#auth.admin_token=/auth.admin_token=admin/' $REST_SERVER_CONF -echo "graph.load_from_local_config=true" >> $REST_SERVER_CONF +# echo "graph.load_from_local_config=true" >> $REST_SERVER_CONF # config hugegraph.properties sed -i 's/gremlin.graph=.*/gremlin.graph=com.baidu.hugegraph.auth.HugeFactoryAuthProxy/' $CONF diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/K8sDriverProxyCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/K8sDriverProxyCoreTest.java new file mode 100644 index 0000000000..0d14d3d32a --- /dev/null +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/K8sDriverProxyCoreTest.java @@ -0,0 +1,153 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.core; + +import com.baidu.hugegraph.HugeGraph; +import com.baidu.hugegraph.computer.driver.DefaultJobState; +import com.baidu.hugegraph.computer.driver.JobObserver; +import com.baidu.hugegraph.computer.driver.JobStatus; +import com.baidu.hugegraph.k8s.K8sDriverProxy; +import com.baidu.hugegraph.task.HugeTask; +import com.baidu.hugegraph.task.TaskScheduler; +import com.baidu.hugegraph.util.ExecutorUtil; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; + +public class K8sDriverProxyCoreTest extends BaseCoreTest { + + private static String NAMESPACE = "hugegraph-computer-system"; + private static String KUBE_CONFIG = "conf/kube.kubeconfig"; + private static String HUGEGRAPH_URL = "127.0.0.1:8080"; + private static String ENABLE_INTERNAL_ALGORITHM = "true"; + private static String INTERNAL_ALGORITHM_IMAGE_URL = "hugegraph/" + + "hugegraph-computer-based-algorithm:beta1"; + + private static String PARAMS_CLASS = "com.baidu.hugegraph.computer." + + "algorithm.rank.pagerank." + + "PageRankParams"; + + private static final Map ALGORITHM_PARAMS = + new HashMap(){{ + put("page-rank", "com.baidu.hugegraph.computer.algorithm.centrality" + + ".pagerank.PageRankParams"); + put("degree-centrality", "com.baidu.hugegraph.computer.algorithm" + + ".centrality.degree.DegreeCentralityParams"); + put("wcc", "com.baidu.hugegraph.computer.algorithm.community.wcc" + + ".WccParams"); + put("triangle-count", "com.baidu.hugegraph.computer.algorithm" + + ".community.trianglecount.TriangleCountParams"); + put("rings", "com.baidu.hugegraph.computer.algorithm.path.rings" + + ".RingsDetectionParams"); + put("rings-with-filter", "com.baidu.hugegraph.computer.algorithm" + + ".path.rings.filter.RingsDetectionWithFilterParams"); + put("betweenness-centrality", "com.baidu.hugegraph.computer" + + ".algorithm.centrality.betweenness.BetweennessCentralityParams"); + put("closeness-centrality", "com.baidu.hugegraph.computer.algorithm" + + ".centrality.closeness.ClosenessCentralityParams"); + put("lpa", "com.baidu.hugegraph.computer.algorithm.community.lpa" + + ".LpaParams"); + put("links", "com.baidu.hugegraph.computer.algorithm.path.links" + + ".LinksParams"); + put("kcore", "com.baidu.hugegraph.computer.algorithm.community" + + ".kcore.KCoreParams"); + put("louvain", "com.baidu.hugegraph.computer.algorithm.community" + + ".louvain.LouvainParams"); + put("clustering-coefficient", "com.baidu.hugegraph.computer" + + ".algorithm.community.cc.ClusteringCoefficientParams"); + }}; + + private static String INTERNAL_ALGORITHM = "[page-rank, " + + "degree-centrality, wcc, triangle-count, rings, " + + "rings-with-filter, betweenness-centrality, " + + "closeness-centrality, lpa, links, kcore, " + + "louvain, clustering-coefficient]"; + + private static String COMPUTER = "page-rank"; + + private static ExecutorService POOL; + + @BeforeClass + public static void init() { + POOL = ExecutorUtil.newFixedThreadPool(1, "k8s-driver-test-pool"); + } + + @Before + @Override + public void setup() { + super.setup(); + + HugeGraph graph = graph(); + TaskScheduler scheduler = graph.taskScheduler(); + Iterator> iter = scheduler.tasks(null, -1, null); + while (iter.hasNext()) { + scheduler.delete(iter.next().id()); + } + + try { + K8sDriverProxy.setConfig(NAMESPACE, + KUBE_CONFIG, + HUGEGRAPH_URL, + ENABLE_INTERNAL_ALGORITHM, + INTERNAL_ALGORITHM_IMAGE_URL, + INTERNAL_ALGORITHM, + ALGORITHM_PARAMS); + } catch (IOException e) { + // ignore + } + } + + @Test + public void testK8sTask() throws TimeoutException { + Map params = new HashMap<>(); + params.put("k8s.worker_instances", "2"); + K8sDriverProxy k8sDriverProxy = new K8sDriverProxy("2", COMPUTER); + String jobId = k8sDriverProxy.getKubernetesDriver() + .submitJob(COMPUTER, params); + + JobObserver jobObserver = Mockito.mock(JobObserver.class); + CompletableFuture future = CompletableFuture.runAsync(() -> { + k8sDriverProxy.getKubernetesDriver() + .waitJob(jobId, params, jobObserver); + }, POOL); + + DefaultJobState jobState = new DefaultJobState(); + jobState.jobStatus(JobStatus.INITIALIZING); + Mockito.verify(jobObserver, Mockito.timeout(15000L).atLeast(1)) + .onJobStateChanged(Mockito.eq(jobState)); + + DefaultJobState jobState2 = new DefaultJobState(); + jobState2.jobStatus(JobStatus.SUCCEEDED); + Mockito.verify(jobObserver, Mockito.timeout(15000L).atLeast(1)) + .onJobStateChanged(Mockito.eq(jobState2)); + + future.getNow(null); + k8sDriverProxy.close(); + } +} diff --git a/pom.xml b/pom.xml index bbddbf6d0a..745002eec2 100644 --- a/pom.xml +++ b/pom.xml @@ -106,6 +106,7 @@ 3.1.0 3.21.0-GA bash + 5.6.0 hugegraph-core