diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadDetectFilter.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadDetectFilter.java new file mode 100644 index 0000000000..b8457be62e --- /dev/null +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadDetectFilter.java @@ -0,0 +1,58 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.api.filter; + +import java.io.IOException; + +import javax.inject.Singleton; +import javax.ws.rs.ServiceUnavailableException; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerRequestFilter; +import javax.ws.rs.container.PreMatching; +import javax.ws.rs.core.Context; +import javax.ws.rs.ext.Provider; + +import com.baidu.hugegraph.config.HugeConfig; +import com.baidu.hugegraph.config.ServerOptions; +import com.baidu.hugegraph.core.WorkLoad; + +@Provider +@Singleton +@PreMatching +public class LoadDetectFilter implements ContainerRequestFilter { + + @Context + private javax.inject.Provider configProvider; + @Context + private javax.inject.Provider loadProvider; + + @Override + public void filter(ContainerRequestContext context) throws IOException { + HugeConfig config = this.configProvider.get(); + int maxWorkerThreads = config.get(ServerOptions.MAX_WORKER_THREADS); + WorkLoad load = this.loadProvider.get(); + // There will be a thread doesn't work, dedicated to statistics + if (load.incrementAndGet() >= maxWorkerThreads) { + throw new ServiceUnavailableException( + "The server is too busy to process the request, " + + "please try again later"); + } + } +} diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadReleaseFilter.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadReleaseFilter.java new file mode 100644 index 0000000000..c5b57941cf --- /dev/null +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/filter/LoadReleaseFilter.java @@ -0,0 +1,47 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.api.filter; + +import java.io.IOException; + +import javax.inject.Singleton; +import javax.ws.rs.container.ContainerRequestContext; +import javax.ws.rs.container.ContainerResponseContext; +import javax.ws.rs.container.ContainerResponseFilter; +import javax.ws.rs.core.Context; +import javax.ws.rs.ext.Provider; + +import com.baidu.hugegraph.core.WorkLoad; + +@Provider +@Singleton +public class LoadReleaseFilter implements ContainerResponseFilter { + + @Context + private javax.inject.Provider loadProvider; + + @Override + public void filter(ContainerRequestContext requestContext, + ContainerResponseContext responseContext) + throws IOException { + WorkLoad load = this.loadProvider.get(); + load.decrementAndGet(); + } +} diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java index 91d3053d1f..f7ac55b992 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/config/ServerOptions.java @@ -48,6 +48,13 @@ public static synchronized ServerOptions instance() { "http://127.0.0.1:8080" ); + public static final ConfigOption MAX_WORKER_THREADS = + new ConfigOption<>( + "restserver.max_worker_threads", + "The maximum worker threads of rest server.", + positiveInt(), + 2 * Runtime.getRuntime().availableProcessors()); + public static final ConfigOption GREMLIN_SERVER_URL = new ConfigOption<>( "gremlinserver.url", diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/core/WorkLoad.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/WorkLoad.java new file mode 100644 index 0000000000..563aaba223 --- /dev/null +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/core/WorkLoad.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017 HugeGraph Authors + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.baidu.hugegraph.core; + +import java.util.concurrent.atomic.AtomicInteger; + +public final class WorkLoad { + + private final AtomicInteger load; + + public WorkLoad() { + this(0); + } + + public WorkLoad(int load) { + this.load = new AtomicInteger(load); + } + + public WorkLoad(AtomicInteger load) { + this.load = load; + } + + public AtomicInteger get() { + return this.load; + } + + public int incrementAndGet() { + return this.load.incrementAndGet(); + } + + public int decrementAndGet() { + return this.load.decrementAndGet(); + } +} diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/ApplicationConfig.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/ApplicationConfig.java index 39ca722dcb..77510c413c 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/ApplicationConfig.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/ApplicationConfig.java @@ -34,6 +34,7 @@ import com.baidu.hugegraph.config.HugeConfig; import com.baidu.hugegraph.core.GraphManager; +import com.baidu.hugegraph.core.WorkLoad; import com.baidu.hugegraph.util.E; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.jersey2.InstrumentedResourceMethodApplicationListener; @@ -56,6 +57,9 @@ public ApplicationConfig(HugeConfig conf) { // Register GraphManager to context register(new GraphManagerFactory(conf)); + // Register WorkLoad to context + register(new WorkLoadFactory()); + // Let @Metric annotations work MetricRegistry registry = MetricManager.INSTANCE.getRegistry(); register(new InstrumentedResourceMethodApplicationListener(registry)); @@ -127,4 +131,29 @@ public void dispose(GraphManager manager) { // pass } } + + private class WorkLoadFactory extends AbstractBinder + implements Factory { + + private final WorkLoad load; + + public WorkLoadFactory() { + this.load = new WorkLoad(); + } + + @Override + public WorkLoad provide() { + return this.load; + } + + @Override + public void dispose(WorkLoad workLoad) { + // pass + } + + @Override + protected void configure() { + bindFactory(this).to(WorkLoad.class).in(RequestScoped.class); + } + } } diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java index 26c51b6cb7..4b3570bd60 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/server/RestServer.java @@ -56,16 +56,29 @@ public void start() throws IOException { ResourceConfig rc = new ApplicationConfig(this.conf); - this.httpServer = GrizzlyHttpServerFactory.createHttpServer(uri, rc); + this.httpServer = this.configHttpServer(uri, rc); this.httpServer.start(); this.calcMaxWriteThreads(); } - @SuppressWarnings("deprecation") // TODO: use shutdown instead - public void stop() { + private HttpServer configHttpServer(URI uri, ResourceConfig rc) { + HttpServer server = GrizzlyHttpServerFactory.createHttpServer(uri, rc, + false); + Collection listeners = server.getListeners(); + E.checkState(listeners.size() > 0, + "Http Server should have some listeners, but now is none"); + int maxWorkerThreads = this.conf.get(ServerOptions.MAX_WORKER_THREADS); + listeners.iterator().next().getTransport() + .getWorkerThreadPoolConfig() + .setCorePoolSize(maxWorkerThreads) + .setMaxPoolSize(maxWorkerThreads); + return server; + } + + public void shutdownNow() { E.checkNotNull(this.httpServer, "http server"); - this.httpServer.stop(); + this.httpServer.shutdownNow(); } public static RestServer start(String conf) throws Exception { @@ -90,8 +103,8 @@ private void calcMaxWriteThreads() { int maxWriteRatio = this.conf.get(ServerOptions.MAX_WRITE_RATIO); assert maxWriteRatio >= 0 && maxWriteRatio <= 100; - int maxThreadPoolSize = this.maxThreadPoolSize(); - maxWriteThreads = maxThreadPoolSize * maxWriteRatio / 100; + int maxWorkerThreads = this.conf.get(ServerOptions.MAX_WORKER_THREADS); + maxWriteThreads = maxWorkerThreads * maxWriteRatio / 100; E.checkState(maxWriteThreads >= 0, "Invalid value of maximum batch writing threads '%s'", maxWriteThreads); @@ -102,23 +115,14 @@ private void calcMaxWriteThreads() { "set to '%s' at least to ensure one thread." + "If you want to disable batch write, " + "please let max_write_ratio be 0", maxWriteRatio, - (int) Math.ceil(100.0 / maxThreadPoolSize)); + (int) Math.ceil(100.0 / maxWorkerThreads)); } LOG.info("The maximum batch writing threads is {} (total threads {})", - maxWriteThreads, maxThreadPoolSize); + maxWriteThreads, maxWorkerThreads); this.conf.addProperty(ServerOptions.MAX_WRITE_THREADS.name(), String.valueOf(maxWriteThreads)); } - private int maxThreadPoolSize() { - Collection listeners = this.httpServer.getListeners(); - if (listeners.size() == 0) { - return -1; - } - return listeners.iterator().next().getTransport() - .getWorkerThreadPoolConfig().getMaxPoolSize(); - } - public static void main(String[] args) throws Exception { if (args.length != 1) { LOG.error("RestServer need one config file, but given {}",