Skip to content

Commit

Permalink
CAMEL-14882: camel-main - Allow to configure thread pool profiles
Browse files Browse the repository at this point in the history
  • Loading branch information
davsclaus committed Apr 15, 2020
1 parent 2dafbed commit 37a537c
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -30,6 +31,7 @@
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

Expand All @@ -42,6 +44,7 @@
import org.apache.camel.RoutesBuilder;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.ThreadPoolProfileBuilder;
import org.apache.camel.model.FaultToleranceConfigurationDefinition;
import org.apache.camel.model.HystrixConfigurationDefinition;
import org.apache.camel.model.Model;
Expand All @@ -50,10 +53,12 @@
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.CamelBeanPostProcessor;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.PropertiesComponent;
import org.apache.camel.spi.PropertyConfigurer;
import org.apache.camel.spi.RestConfiguration;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.LifecycleStrategySupport;
import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.support.service.BaseService;
Expand All @@ -64,6 +69,7 @@
import org.apache.camel.util.OrderedProperties;
import org.apache.camel.util.PropertiesHelper;
import org.apache.camel.util.StringHelper;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -81,6 +87,8 @@ public abstract class BaseMainSupport extends BaseService {

private static final String SENSITIVE_KEYS = "passphrase|password|secretkey|accesstoken|clientsecret|authorizationtoken|sasljaasconfig";

private static final String VALID_THREAD_POOL_KEYS = "id|poolsize|maxpoolsize|keepalivetime|timeunit|maxqueuesize|allowcorethreadtimeout|rejectedpolicy";

protected final AtomicBoolean completed = new AtomicBoolean(false);

protected volatile CamelContext camelContext;
Expand Down Expand Up @@ -721,6 +729,7 @@ protected void doConfigureCamelContextFromMainConfiguration(CamelContext camelCo
Map<String, Object> resilience4jProperties = new LinkedHashMap<>();
Map<String, Object> faultToleranceProperties = new LinkedHashMap<>();
Map<String, Object> restProperties = new LinkedHashMap<>();
Map<String, Object> threadPoolProperties = new LinkedHashMap<>();
Map<String, Object> beansProperties = new LinkedHashMap<>();
for (String key : prop.stringPropertyNames()) {
if (key.startsWith("camel.context.")) {
Expand Down Expand Up @@ -753,6 +762,12 @@ protected void doConfigureCamelContextFromMainConfiguration(CamelContext camelCo
String option = key.substring(11);
validateOptionAndValue(key, option, value);
restProperties.put(optionKey(option), value);
} else if (key.startsWith("camel.threadpool")) {
// grab the value
String value = prop.getProperty(key);
String option = key.substring(16);
validateOptionAndValue(key, option, value);
threadPoolProperties.put(optionKey(option), value);
} else if (key.startsWith("camel.beans.")) {
// grab the value
String value = prop.getProperty(key);
Expand Down Expand Up @@ -817,6 +832,10 @@ protected void doConfigureCamelContextFromMainConfiguration(CamelContext camelCo
setPropertiesOnTarget(camelContext, rest, restProperties, "camel.rest.",
mainConfigurationProperties.isAutoConfigurationFailFast(), true, autoConfiguredProperties);
}
if (!threadPoolProperties.isEmpty()) {
LOG.debug("Auto-configuring Thread Pool from loaded properties: {}", threadPoolProperties.size());
setThreadPoolProfileProperties(camelContext, threadPoolProperties, mainConfigurationProperties.isAutoConfigurationFailFast(), autoConfiguredProperties);
}

// log which options was not set
if (!beansProperties.isEmpty()) {
Expand Down Expand Up @@ -856,6 +875,88 @@ protected void doConfigureCamelContextFromMainConfiguration(CamelContext camelCo
LOG.warn("Property not auto-configured: camel.rest.{}={} on bean: {}", k, v, rest);
});
}
if (!threadPoolProperties.isEmpty()) {
threadPoolProperties.forEach((k, v) -> {
LOG.warn("Property not auto-configured: camel.threadpool{}={} on bean: ThreadPoolProfileBuilder", k, v);
});
}
}

private void setThreadPoolProfileProperties(CamelContext camelContext, Map<String, Object> threadPoolProperties,
boolean failIfNotSet, Map<String, String> autoConfiguredProperties) {

Map<String, Map<String, String>> profiles = new LinkedHashMap<>();
// the id of the profile is in the key [xx]
threadPoolProperties.forEach((k, v) -> {
String id = StringHelper.between(k, "[", "].");
if (id == null) {
throw new IllegalArgumentException("Invalid syntax for key: camel.threadpool" + k + " should be: camel.threadpool[id]");
}
String key = StringHelper.after(k, "].");
String value = v.toString();
if (key == null) {
throw new PropertyBindingException("ThreadPoolProfileBuilder", k, value);
}
Map<String, String> map = profiles.computeIfAbsent(id, o -> new HashMap<>());
map.put(optionKey(key), value);

if (failIfNotSet && !VALID_THREAD_POOL_KEYS.contains(key)) {
throw new PropertyBindingException("ThreadPoolProfileBuilder", key, value);
}

autoConfiguredProperties.put(k, value);
});

// now build profiles from those options
for (String id : profiles.keySet()) {
Map<String, String> map = profiles.get(id);
// camel-main will lower-case keys
String overrideId = map.remove("id");
String poolSize = map.remove("poolsize");
String maxPoolSize = map.remove("maxpoolsize");
String keepAliveTime = map.remove("keepalivetime");
String timeUnit = map.remove("timeunit");
String maxQueueSize = map.remove("maxqueuesize");
String allowCoreThreadTimeOut = map.remove("allowcorethreadtimeout");
String rejectedPolicy = map.remove("rejectedpolicy");

if (overrideId != null) {
id = CamelContextHelper.parseText(camelContext, overrideId);
}
ThreadPoolProfileBuilder builder = new ThreadPoolProfileBuilder(id);
if ("default".equals(id)) {
builder.defaultProfile(true);
}
if (poolSize != null) {
builder.poolSize(CamelContextHelper.parseInteger(camelContext, poolSize));
}
if (maxPoolSize != null) {
builder.maxPoolSize(CamelContextHelper.parseInteger(camelContext, maxPoolSize));
}
if (keepAliveTime != null && timeUnit != null) {
String text = CamelContextHelper.parseText(camelContext, timeUnit);
builder.keepAliveTime(CamelContextHelper.parseLong(camelContext, keepAliveTime), camelContext.getTypeConverter().convertTo(TimeUnit.class, text));
}
if (keepAliveTime != null && timeUnit == null) {
builder.keepAliveTime(CamelContextHelper.parseLong(camelContext, keepAliveTime));
}
if (maxQueueSize != null) {
builder.maxQueueSize(CamelContextHelper.parseInteger(camelContext, maxQueueSize));
}
if (allowCoreThreadTimeOut != null) {
builder.allowCoreThreadTimeOut(CamelContextHelper.parseBoolean(camelContext, allowCoreThreadTimeOut));
}
if (rejectedPolicy != null) {
String text = CamelContextHelper.parseText(camelContext, rejectedPolicy);
builder.rejectedPolicy(camelContext.getTypeConverter().convertTo(ThreadPoolRejectedPolicy.class, text));
}
ExecutorServiceManager esm = camelContext.adapt(ExtendedCamelContext.class).getExecutorServiceManager();
if ("default".equals(id)) {
esm.setDefaultThreadPoolProfile(builder.build());
} else {
esm.registerThreadPoolProfile(builder.build());
}
}
}

private void bindBeansToRegistry(CamelContext camelContext, Map<String, Object> properties,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.main;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.ThreadPoolProfile;
import org.junit.Assert;
import org.junit.Test;

public class MainThreadPoolTest extends Assert {

@Test
public void testDefaultThreadPool() throws Exception {
Main main = new Main();
main.addRoutesBuilder(new MyRouteBuilder());
main.addProperty("camel.threadpool[default].pool-size", "5");
main.addProperty("camel.threadpool[default].max-pool-size", "10");
main.addProperty("camel.threadpool[default].max-queue-size", "20");
main.addProperty("camel.threadpool[default].rejectedPolicy", "DiscardOldest");
main.start();

CamelContext camelContext = main.getCamelContext();
assertNotNull(camelContext);

ThreadPoolProfile tp = camelContext.getExecutorServiceManager().getDefaultThreadPoolProfile();
assertEquals("default", tp.getId());
assertEquals(Boolean.TRUE, tp.isDefaultProfile());
assertEquals("5", tp.getPoolSize().toString());
assertEquals("10", tp.getMaxPoolSize().toString());
assertEquals("DiscardOldest", tp.getRejectedPolicy().toString());

main.stop();
}

@Test
public void testCustomThreadPool() throws Exception {
Main main = new Main();
main.addRoutesBuilder(new MyRouteBuilder());
main.addProperty("camel.threadpool[myPool].id", "myPool");
main.addProperty("camel.threadpool[myPool].pool-size", "1");
main.addProperty("camel.threadpool[myPool].max-pool-size", "2");
main.addProperty("camel.threadpool[myPool].rejectedPolicy", "DiscardOldest");
main.addProperty("camel.threadpool[myBigPool].id", "myBigPool");
main.addProperty("camel.threadpool[myBigPool].pool-size", "10");
main.addProperty("camel.threadpool[myBigPool].max-pool-size", "200");
main.addProperty("camel.threadpool[myBigPool].rejectedPolicy", "CallerRuns");
main.start();

CamelContext camelContext = main.getCamelContext();
assertNotNull(camelContext);

ThreadPoolProfile tp = camelContext.getExecutorServiceManager().getThreadPoolProfile("myPool");
assertEquals("myPool", tp.getId());
assertEquals(Boolean.FALSE, tp.isDefaultProfile());
assertEquals("1", tp.getPoolSize().toString());
assertEquals("2", tp.getMaxPoolSize().toString());
assertEquals("DiscardOldest", tp.getRejectedPolicy().toString());

tp = camelContext.getExecutorServiceManager().getThreadPoolProfile("myBigPool");
assertEquals("myBigPool", tp.getId());
assertEquals(Boolean.FALSE, tp.isDefaultProfile());
assertEquals("10", tp.getPoolSize().toString());
assertEquals("200", tp.getMaxPoolSize().toString());
assertEquals("CallerRuns", tp.getRejectedPolicy().toString());

main.stop();
}

public static class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:start").to("seda:foo");
}
}

}

0 comments on commit 37a537c

Please sign in to comment.