From e1f3502d3946d9ac1e3cc7c06614ddeb3f8a2178 Mon Sep 17 00:00:00 2001 From: Andrea Tarocchi Date: Sat, 13 Apr 2019 00:02:46 +0200 Subject: [PATCH] fix #44 : Support StreamCaching configuration trough a ContextCustomizer. --- .../StreamCachingContextCustomizer.java | 124 ++++++++++++++++++ .../apache/camel/k/customizer/streamcaching | 18 +++ .../StreamCachingContextCustomizerTest.java | 60 +++++++++ 3 files changed, 202 insertions(+) create mode 100644 camel-k-runtime-core/src/main/java/org/apache/camel/k/cutomizer/StreamCachingContextCustomizer.java create mode 100644 camel-k-runtime-core/src/main/resources/META-INF/services/org/apache/camel/k/customizer/streamcaching create mode 100644 camel-k-runtime-core/src/test/java/org/apache/camel/k/support/StreamCachingContextCustomizerTest.java diff --git a/camel-k-runtime-core/src/main/java/org/apache/camel/k/cutomizer/StreamCachingContextCustomizer.java b/camel-k-runtime-core/src/main/java/org/apache/camel/k/cutomizer/StreamCachingContextCustomizer.java new file mode 100644 index 000000000..dcfa0998d --- /dev/null +++ b/camel-k-runtime-core/src/main/java/org/apache/camel/k/cutomizer/StreamCachingContextCustomizer.java @@ -0,0 +1,124 @@ +package org.apache.camel.k.cutomizer; + +import org.apache.camel.CamelContext; +import org.apache.camel.k.ContextCustomizer; +import org.apache.camel.k.Runtime; +import org.apache.camel.spi.StreamCachingStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StreamCachingContextCustomizer implements ContextCustomizer { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamCachingContextCustomizer.class); + + private boolean enabled; + private boolean anySpoolRules; + private int bufferSize; + private boolean removeSpoolDirectoryWhenStopping; + private String spoolChiper; + private String spoolDirectory; + private long spoolThreshold; + private String spoolUsedHeapMemoryLimit; + private int spoolUsedHeapMemoryThreshold; + + public int getBufferSize() { + return bufferSize; + } + + public void setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + } + + public String getSpoolChiper() { + return spoolChiper; + } + + public void setSpoolChiper(String spoolChiper) { + this.spoolChiper = spoolChiper; + } + + public String getSpoolDirectory() { + return spoolDirectory; + } + + public void setSpoolDirectory(String spoolDirectory) { + this.spoolDirectory = spoolDirectory; + } + + public long getSpoolThreshold() { + return spoolThreshold; + } + + public void setSpoolThreshold(long spoolThreshold) { + this.spoolThreshold = spoolThreshold; + } + + public String getSpoolUsedHeapMemoryLimit() { + return spoolUsedHeapMemoryLimit; + } + + public void setSpoolUsedHeapMemoryLimit(String spoolUsedHeapMemoryLimit) { + this.spoolUsedHeapMemoryLimit = spoolUsedHeapMemoryLimit; + } + + public int getSpoolUsedHeapMemoryThreshold() { + return spoolUsedHeapMemoryThreshold; + } + + public void setSpoolUsedHeapMemoryThreshold(int spoolUsedHeapMemoryThreshold) { + this.spoolUsedHeapMemoryThreshold = spoolUsedHeapMemoryThreshold; + } + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public boolean isAnySpoolRules() { + return anySpoolRules; + } + + public void setAnySpoolRules(boolean anySpoolRules) { + this.anySpoolRules = anySpoolRules; + } + + public boolean isRemoveSpoolDirectoryWhenStopping() { + return removeSpoolDirectoryWhenStopping; + } + + public void setRemoveSpoolDirectoryWhenStopping(boolean removeSpoolDirectoryWhenStopping) { + this.removeSpoolDirectoryWhenStopping = removeSpoolDirectoryWhenStopping; + } + + @Override + public void apply(CamelContext camelContext, Runtime.Registry runtimeRegistry) { + camelContext.setStreamCaching(isEnabled()); + camelContext.getStreamCachingStrategy().setAnySpoolRules(isAnySpoolRules()); + camelContext.getStreamCachingStrategy().setBufferSize(getBufferSize()); + camelContext.getStreamCachingStrategy().setRemoveSpoolDirectoryWhenStopping(isRemoveSpoolDirectoryWhenStopping()); + camelContext.getStreamCachingStrategy().setSpoolChiper(getSpoolChiper()); + if (getSpoolDirectory() != null) { + camelContext.getStreamCachingStrategy().setSpoolDirectory(getSpoolDirectory()); + } + if (getSpoolThreshold() != 0) { + camelContext.getStreamCachingStrategy().setSpoolThreshold(getSpoolThreshold()); + } + if (getSpoolUsedHeapMemoryLimit() != null) { + StreamCachingStrategy.SpoolUsedHeapMemoryLimit limit; + if ("Committed".equalsIgnoreCase(getSpoolUsedHeapMemoryLimit())) { + limit = StreamCachingStrategy.SpoolUsedHeapMemoryLimit.Committed; + } else if ("Max".equalsIgnoreCase(getSpoolUsedHeapMemoryLimit())) { + limit = StreamCachingStrategy.SpoolUsedHeapMemoryLimit.Max; + } else { + throw new IllegalArgumentException("Invalid option " + getSpoolUsedHeapMemoryLimit() + " must either be Committed or Max"); + } + camelContext.getStreamCachingStrategy().setSpoolUsedHeapMemoryLimit(limit); + } + if (getSpoolUsedHeapMemoryThreshold() != 0) { + camelContext.getStreamCachingStrategy().setSpoolUsedHeapMemoryThreshold(getSpoolUsedHeapMemoryThreshold()); + } + LOGGER.info("Configured camel context through CamelContextCustomizer.class"); + } +} diff --git a/camel-k-runtime-core/src/main/resources/META-INF/services/org/apache/camel/k/customizer/streamcaching b/camel-k-runtime-core/src/main/resources/META-INF/services/org/apache/camel/k/customizer/streamcaching new file mode 100644 index 000000000..58efa10d9 --- /dev/null +++ b/camel-k-runtime-core/src/main/resources/META-INF/services/org/apache/camel/k/customizer/streamcaching @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.k.cutomizer.StreamCachingContextCustomizer diff --git a/camel-k-runtime-core/src/test/java/org/apache/camel/k/support/StreamCachingContextCustomizerTest.java b/camel-k-runtime-core/src/test/java/org/apache/camel/k/support/StreamCachingContextCustomizerTest.java new file mode 100644 index 000000000..c3aab7fc2 --- /dev/null +++ b/camel-k-runtime-core/src/test/java/org/apache/camel/k/support/StreamCachingContextCustomizerTest.java @@ -0,0 +1,60 @@ +package org.apache.camel.k.support; + +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.k.InMemoryRegistry; +import org.apache.camel.k.cutomizer.StreamCachingContextCustomizer; +import org.apache.camel.spi.StreamCachingStrategy; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class StreamCachingContextCustomizerTest { + + @Test + public void testClasspathHandler() { + StreamCachingContextCustomizer scccc = new StreamCachingContextCustomizer(); + + scccc.setAnySpoolRules(true); + scccc.setBufferSize(9); + scccc.setEnabled(true); + scccc.setRemoveSpoolDirectoryWhenStopping(true); + scccc.setSpoolChiper("sha"); + scccc.setSpoolDirectory("./xxx"); + scccc.setSpoolThreshold(9); + scccc.setSpoolUsedHeapMemoryLimit("Committed"); + scccc.setSpoolUsedHeapMemoryThreshold(9); + + CamelContext context = new DefaultCamelContext(); + scccc.apply(context, new InMemoryRegistry()); + + assertThat(context.getStreamCachingStrategy().isAnySpoolRules()).isTrue(); + assertThat(context.getStreamCachingStrategy().getBufferSize()).isEqualTo(9); + assertThat(context.isStreamCaching()).isTrue(); + assertThat(context.getStreamCachingStrategy().isRemoveSpoolDirectoryWhenStopping()).isTrue(); + assertThat(context.getStreamCachingStrategy().getSpoolChiper()).isEqualTo("sha"); + assertThat(context.getStreamCachingStrategy().getSpoolDirectory()).isNull(); + assertThat(context.getStreamCachingStrategy().getSpoolThreshold()).isEqualTo(9L); + assertThat(context.getStreamCachingStrategy().getSpoolUsedHeapMemoryLimit()).isEqualTo(StreamCachingStrategy.SpoolUsedHeapMemoryLimit.Committed); + assertThat(context.getStreamCachingStrategy().getSpoolUsedHeapMemoryThreshold()).isEqualTo(9); + + scccc.setSpoolUsedHeapMemoryLimit("Max"); + + scccc.apply(context, new InMemoryRegistry()); + assertThat(context.getStreamCachingStrategy().getSpoolUsedHeapMemoryLimit()).isEqualTo(StreamCachingStrategy.SpoolUsedHeapMemoryLimit.Max); + } + + @Test + public void testUnsupportedStreamCachingSpoolUsedHeapMemoryLimit() { + StreamCachingContextCustomizer scccc = new StreamCachingContextCustomizer(); + + scccc.setSpoolUsedHeapMemoryLimit("Unsupported"); + + CamelContext context = new DefaultCamelContext(); + + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> scccc.apply(context, new InMemoryRegistry())); + + assertThat(exception.getMessage()).isEqualTo("Invalid option Unsupported must either be Committed or Max"); + } +}