Skip to content

Commit

Permalink
Throw 503 error if all workers are busy
Browse files Browse the repository at this point in the history
Implement #341

Change-Id: I7770744d50bb21047c0b66e0d255d49a6d33e672
  • Loading branch information
Linary committed Jan 29, 2019
1 parent 05669ed commit 748c0e0
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.api.filter;

import java.io.IOException;

import javax.inject.Singleton;
import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.PreMatching;
import javax.ws.rs.core.Context;
import javax.ws.rs.ext.Provider;

import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.core.WorkLoad;

@Provider
@Singleton
@PreMatching
public class LoadDetectFilter implements ContainerRequestFilter {

@Context
private javax.inject.Provider<HugeConfig> configProvider;
@Context
private javax.inject.Provider<WorkLoad> loadProvider;

@Override
public void filter(ContainerRequestContext context) throws IOException {
HugeConfig config = this.configProvider.get();
int maxWorkerThreads = config.get(ServerOptions.MAX_WORKER_THREADS);
WorkLoad load = this.loadProvider.get();
// There will be a thread doesn't work, dedicated to statistics
if (load.incrementAndGet() >= maxWorkerThreads) {
throw new ServiceUnavailableException(
"The server is too busy to process the request, " +
"please try again later");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.api.filter;

import java.io.IOException;

import javax.inject.Singleton;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.core.Context;
import javax.ws.rs.ext.Provider;

import com.baidu.hugegraph.core.WorkLoad;

@Provider
@Singleton
public class LoadReleaseFilter implements ContainerResponseFilter {

@Context
private javax.inject.Provider<WorkLoad> loadProvider;

@Override
public void filter(ContainerRequestContext requestContext,
ContainerResponseContext responseContext)
throws IOException {
WorkLoad load = this.loadProvider.get();
load.decrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public static synchronized ServerOptions instance() {
"http://127.0.0.1:8080"
);

public static final ConfigOption<Integer> MAX_WORKER_THREADS =
new ConfigOption<>(
"restserver.max_worker_threads",
"The maximum worker threads of rest server.",
positiveInt(),
2 * Runtime.getRuntime().availableProcessors());

public static final ConfigOption<String> GREMLIN_SERVER_URL =
new ConfigOption<>(
"gremlinserver.url",
Expand Down
51 changes: 51 additions & 0 deletions hugegraph-api/src/main/java/com/baidu/hugegraph/core/WorkLoad.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.core;

import java.util.concurrent.atomic.AtomicInteger;

public final class WorkLoad {

private final AtomicInteger load;

public WorkLoad() {
this(0);
}

public WorkLoad(int load) {
this.load = new AtomicInteger(load);
}

public WorkLoad(AtomicInteger load) {
this.load = load;
}

public AtomicInteger get() {
return this.load;
}

public int incrementAndGet() {
return this.load.incrementAndGet();
}

public int decrementAndGet() {
return this.load.decrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.core.WorkLoad;
import com.baidu.hugegraph.util.E;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.jersey2.InstrumentedResourceMethodApplicationListener;
Expand All @@ -56,6 +57,9 @@ public ApplicationConfig(HugeConfig conf) {
// Register GraphManager to context
register(new GraphManagerFactory(conf));

// Register WorkLoad to context
register(new WorkLoadFactory());

// Let @Metric annotations work
MetricRegistry registry = MetricManager.INSTANCE.getRegistry();
register(new InstrumentedResourceMethodApplicationListener(registry));
Expand Down Expand Up @@ -127,4 +131,29 @@ public void dispose(GraphManager manager) {
// pass
}
}

private class WorkLoadFactory extends AbstractBinder
implements Factory<WorkLoad> {

private final WorkLoad load;

public WorkLoadFactory() {
this.load = new WorkLoad();
}

@Override
public WorkLoad provide() {
return this.load;
}

@Override
public void dispose(WorkLoad workLoad) {
// pass
}

@Override
protected void configure() {
bindFactory(this).to(WorkLoad.class).in(RequestScoped.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,29 @@ public void start() throws IOException {

ResourceConfig rc = new ApplicationConfig(this.conf);

this.httpServer = GrizzlyHttpServerFactory.createHttpServer(uri, rc);
this.httpServer = this.configHttpServer(uri, rc);
this.httpServer.start();

this.calcMaxWriteThreads();
}

@SuppressWarnings("deprecation") // TODO: use shutdown instead
public void stop() {
private HttpServer configHttpServer(URI uri, ResourceConfig rc) {
HttpServer server = GrizzlyHttpServerFactory.createHttpServer(uri, rc,
false);
Collection<NetworkListener> listeners = server.getListeners();
E.checkState(listeners.size() > 0,
"Http Server should have some listeners, but now is none");
int maxWorkerThreads = this.conf.get(ServerOptions.MAX_WORKER_THREADS);
listeners.iterator().next().getTransport()
.getWorkerThreadPoolConfig()
.setCorePoolSize(maxWorkerThreads)
.setMaxPoolSize(maxWorkerThreads);
return server;
}

public void shutdownNow() {
E.checkNotNull(this.httpServer, "http server");
this.httpServer.stop();
this.httpServer.shutdownNow();
}

public static RestServer start(String conf) throws Exception {
Expand All @@ -90,8 +103,8 @@ private void calcMaxWriteThreads() {

int maxWriteRatio = this.conf.get(ServerOptions.MAX_WRITE_RATIO);
assert maxWriteRatio >= 0 && maxWriteRatio <= 100;
int maxThreadPoolSize = this.maxThreadPoolSize();
maxWriteThreads = maxThreadPoolSize * maxWriteRatio / 100;
int maxWorkerThreads = this.conf.get(ServerOptions.MAX_WORKER_THREADS);
maxWriteThreads = maxWorkerThreads * maxWriteRatio / 100;
E.checkState(maxWriteThreads >= 0,
"Invalid value of maximum batch writing threads '%s'",
maxWriteThreads);
Expand All @@ -102,23 +115,14 @@ private void calcMaxWriteThreads() {
"set to '%s' at least to ensure one thread." +
"If you want to disable batch write, " +
"please let max_write_ratio be 0", maxWriteRatio,
(int) Math.ceil(100.0 / maxThreadPoolSize));
(int) Math.ceil(100.0 / maxWorkerThreads));
}
LOG.info("The maximum batch writing threads is {} (total threads {})",
maxWriteThreads, maxThreadPoolSize);
maxWriteThreads, maxWorkerThreads);
this.conf.addProperty(ServerOptions.MAX_WRITE_THREADS.name(),
String.valueOf(maxWriteThreads));
}

private int maxThreadPoolSize() {
Collection<NetworkListener> listeners = this.httpServer.getListeners();
if (listeners.size() == 0) {
return -1;
}
return listeners.iterator().next().getTransport()
.getWorkerThreadPoolConfig().getMaxPoolSize();
}

public static void main(String[] args) throws Exception {
if (args.length != 1) {
LOG.error("RestServer need one config file, but given {}",
Expand Down

0 comments on commit 748c0e0

Please sign in to comment.