diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar new file mode 100644 index 0000000..6480942 Binary files /dev/null and b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar differ diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar.md5 b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar.md5 new file mode 100644 index 0000000..90e3242 --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar.md5 @@ -0,0 +1 @@ +2e409bc6c8a8db39c27753710d29143b \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar.sha1 b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar.sha1 new file mode 100644 index 0000000..1c705b6 --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-javadoc.jar.sha1 @@ -0,0 +1 @@ +8d8f9988501de6c55cbcfeaaca903b7eaa5ad6e2 \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar new file mode 100644 index 0000000..d77ca51 Binary files /dev/null and b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar differ diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar.md5 b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar.md5 new file mode 100644 index 0000000..804d40a --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar.md5 @@ -0,0 +1 @@ +0d1b37f9977393a6d003731c11be9183 \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar.sha1 b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar.sha1 new file mode 100644 index 0000000..39e04e8 --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2-sources.jar.sha1 @@ -0,0 +1 @@ +6cc7b92f336a7b6c6e9e52a94f5bbaf8f28cd945 \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar new file mode 100644 index 0000000..daae5e5 Binary files /dev/null and b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar differ diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar.md5 b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar.md5 new file mode 100644 index 0000000..5d4e4fd --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar.md5 @@ -0,0 +1 @@ +199b5bbde92e70dd0cbd2fb3d081237d \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar.sha1 b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar.sha1 new file mode 100644 index 0000000..77b7b36 --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.jar.sha1 @@ -0,0 +1 @@ +82bed758ad8da18faddad63b724b7f28ed2c45a6 \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom new file mode 100644 index 0000000..e9e02b3 --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom @@ -0,0 +1,154 @@ + + + + 4.0.0 + + co.featureflags + ffc-java-server-sdk + 1.1.2 + + + + The Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + + DIAN SUN + dian.sun0720@gmail.com + featureflags.co + https://www.feature-flags.co/ + + + + + main + scm:git:git@github.com:feature-flags-co/ffc-java-sdk.git + scm:git:git@github.com:feature-flags-co/ffc-java-sdk.git + https://github.com/feature-flags-co/ffc-java-sdk/tree/main + + + + + github-ffc-java-sdk-commons-repo + The Maven Repository on Github + https://feature-flags-co.github.io/ffc-java-sdk-commons/maven-repo + + + + + 1.8 + 1.8 + UTF-8 + UTF-8 + 3.12.0 + 1.15 + 31.0.1-jre + 4.9.3 + 2.8.9 + 1.7.35 + 2.12.6 + + + + + org.apache.commons + commons-lang3 + ${commons-lang-version} + + + + commons-codec + commons-codec + ${common-codec-version} + + + + com.google.guava + guava + ${guava-version} + + + + com.squareup.okhttp3 + okhttp + ${okhttp-version} + + + + com.google.code.gson + gson + ${gson-version} + + + + org.slf4j + slf4j-api + ${slf4j-version} + + + + org.slf4j + slf4j-simple + ${slf4j-version} + provided + + + + co.featureflags + ffc-java-sdk-commons + 1.1.1 + + + + com.alibaba + transmittable-thread-local + ${ttl-version} + + + + + + + local-repo-release + GitHub Release + file://${project.basedir}/maven-repo + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.1.0 + + + attach-javadocs + + jar + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.1.0 + + + attach-sources + + jar-no-fork + + + + + + + \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom.md5 b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom.md5 new file mode 100644 index 0000000..1d2b200 --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom.md5 @@ -0,0 +1 @@ +a54056355016cd83e1863dc507574dd5 \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom.sha1 b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom.sha1 new file mode 100644 index 0000000..520bf18 --- /dev/null +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/1.1.2/ffc-java-server-sdk-1.1.2.pom.sha1 @@ -0,0 +1 @@ +f717d42a46e7e5d14c96c89e00d8e08012bde86b \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml b/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml index f1caf7c..0725f8c 100644 --- a/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml @@ -3,12 +3,13 @@ co.featureflags ffc-java-server-sdk - 1.1.1 + 1.1.2 1.0 1.1 1.1.1 + 1.1.2 - 20220504022225 + 20220907052401 diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml.md5 b/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml.md5 index c5dcade..79b750b 100644 --- a/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml.md5 +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml.md5 @@ -1 +1 @@ -86a279583645b3a09a3147a05c1b4034 \ No newline at end of file +ebe31dcc3d0612046e81250923e93e60 \ No newline at end of file diff --git a/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml.sha1 b/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml.sha1 index 6496805..e2cbeff 100644 --- a/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml.sha1 +++ b/maven-repo/co/featureflags/ffc-java-server-sdk/maven-metadata.xml.sha1 @@ -1 +1 @@ -10ec4c786b8ef9be3ab200f71aa5d2726e0d1a51 \ No newline at end of file +35664eb24d09000d73cb7c2cd53bcce44fd8aace \ No newline at end of file diff --git a/pom.xml b/pom.xml index 58b68d0..e9e02b3 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ co.featureflags ffc-java-server-sdk - 1.1.1 + 1.1.2 diff --git a/src/main/java/co/featureflags/server/Evaluator.java b/src/main/java/co/featureflags/server/Evaluator.java index f89b2d0..5097e15 100644 --- a/src/main/java/co/featureflags/server/Evaluator.java +++ b/src/main/java/co/featureflags/server/Evaluator.java @@ -14,6 +14,8 @@ abstract class Evaluator { protected static final Logger logger = Loggers.EVALUATION; protected static final Integer NO_EVAL_RES = -1; + protected static final String DEFAULT_JSON_VALUE = "DJV"; + protected static final String REASON_USER_NOT_SPECIFIED = "user not specified"; protected static final String REASON_FLAG_OFF = "flag off"; protected static final String REASON_PREREQUISITE_FAILED = "prerequisite failed"; diff --git a/src/main/java/co/featureflags/server/FFCClientImp.java b/src/main/java/co/featureflags/server/FFCClientImp.java index ed12138..1bc5f9f 100644 --- a/src/main/java/co/featureflags/server/FFCClientImp.java +++ b/src/main/java/co/featureflags/server/FFCClientImp.java @@ -1,6 +1,7 @@ package co.featureflags.server; import co.featureflags.commons.json.JsonHelper; +import co.featureflags.commons.json.JsonParseException; import co.featureflags.commons.model.AllFlagStates; import co.featureflags.commons.model.EvalDetail; import co.featureflags.commons.model.FFCUser; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import static co.featureflags.server.Evaluator.DEFAULT_JSON_VALUE; import static co.featureflags.server.Evaluator.FLAG_KEY_UNKNOWN; import static co.featureflags.server.Evaluator.FLAG_NAME_UNKNOWN; import static co.featureflags.server.Evaluator.FLAG_VALUE_UNKNOWN; @@ -317,6 +319,53 @@ public FlagState longVariationDetail(String featureFlagKey, Long defaultVa return longVariationDetail(featureFlagKey, FFCUserContextHolder.getCurrentUser(), defaultValue); } + @Override + public T jsonVariation(String featureFlagKey, FFCUser user, Class clazz, T defaultValue) { + String json = variation(featureFlagKey, user, DEFAULT_JSON_VALUE); + if (DEFAULT_JSON_VALUE.equals(json)) return defaultValue; + try { + return JsonHelper.deserialize(json, clazz); + } catch (JsonParseException ex) { + logger.error("FFC JAVA SDK: unexpected error in evaluation", ex); + return defaultValue; + } + + } + + @Override + public T jsonVariation(String featureFlagKey, Class clazz, T defaultValue) { + String json = variation(featureFlagKey, DEFAULT_JSON_VALUE); + if (DEFAULT_JSON_VALUE.equals(json)) return defaultValue; + try { + return JsonHelper.deserialize(json, clazz); + } catch (JsonParseException ex) { + logger.error("FFC JAVA SDK: unexpected error in evaluation", ex); + return defaultValue; + } + } + + @Override + public FlagState jsonVariationDetail(String featureFlagKey, FFCUser user, Class clazz, T defaultValue) { + T value; + Evaluator.EvalResult res = evaluateInternal(featureFlagKey, user, DEFAULT_JSON_VALUE, false); + if (DEFAULT_JSON_VALUE.equals(res.getValue())) { + value = defaultValue; + } else { + try { + value = JsonHelper.deserialize(res.getValue(), clazz); + } catch (JsonParseException ex) { + logger.error("FFC JAVA SDK: unexpected error in evaluation", ex); + value = defaultValue; + } + } + return EvalDetail.of(value, res.getIndex(), res.getReason(), featureFlagKey, featureFlagKey).toFlagState(); + } + + @Override + public FlagState jsonVariationDetail(String featureFlagKey, Class clazz, T defaultValue) { + return jsonVariationDetail(featureFlagKey, FFCUserContextHolder.getCurrentUser(), clazz, defaultValue); + } + Evaluator.EvalResult evaluateInternal(String featureFlagKey, FFCUser user, Object defaultValue, boolean checkType) { try { if (!isInitialized()) { @@ -416,21 +465,13 @@ public AllFlagStates getAllLatestFlagsVariations(FFCUser user) { try { if (!isInitialized()) { Loggers.EVALUATION.warn("FFC JAVA SDK: Evaluation is called before Java SDK client is initialized for feature flag"); - ed = EvalDetail.of(FLAG_VALUE_UNKNOWN, - NO_EVAL_RES, - REASON_CLIENT_NOT_READY, - FLAG_KEY_UNKNOWN, - FLAG_NAME_UNKNOWN); + ed = EvalDetail.of(FLAG_VALUE_UNKNOWN, NO_EVAL_RES, REASON_CLIENT_NOT_READY, FLAG_KEY_UNKNOWN, FLAG_NAME_UNKNOWN); builder.put(ed, InsightTypes.NullEvent.INSTANCE); success = false; errorString = REASON_CLIENT_NOT_READY; } else if (user == null || StringUtils.isBlank(user.getKey())) { Loggers.EVALUATION.warn("FFC JAVA SDK: null user or feature flag"); - ed = EvalDetail.of(FLAG_VALUE_UNKNOWN, - NO_EVAL_RES, - REASON_USER_NOT_SPECIFIED, - FLAG_KEY_UNKNOWN, - FLAG_NAME_UNKNOWN); + ed = EvalDetail.of(FLAG_VALUE_UNKNOWN, NO_EVAL_RES, REASON_USER_NOT_SPECIFIED, FLAG_KEY_UNKNOWN, FLAG_NAME_UNKNOWN); builder.put(ed, InsightTypes.NullEvent.INSTANCE); success = false; errorString = REASON_USER_NOT_SPECIFIED; @@ -440,21 +481,13 @@ public AllFlagStates getAllLatestFlagsVariations(FFCUser user) { InsightTypes.Event event = InsightTypes.FlagEvent.of(user); DataModel.FeatureFlag flag = (DataModel.FeatureFlag) item.item(); Evaluator.EvalResult res = evaluator.evaluate(flag, user, event); - ed = EvalDetail.of(res.getValue(), - res.getIndex(), - res.getReason(), - res.getKeyName(), - res.getName()); + ed = EvalDetail.of(res.getValue(), res.getIndex(), res.getReason(), res.getKeyName(), res.getName()); builder.put(ed, event); } } } catch (Exception ex) { logger.error("FFC JAVA SDK: unexpected error in evaluation", ex); - ed = EvalDetail.of(FLAG_VALUE_UNKNOWN, - NO_EVAL_RES, - REASON_ERROR, - FLAG_KEY_UNKNOWN, - FLAG_NAME_UNKNOWN); + ed = EvalDetail.of(FLAG_VALUE_UNKNOWN, NO_EVAL_RES, REASON_ERROR, FLAG_KEY_UNKNOWN, FLAG_NAME_UNKNOWN); builder.put(ed, InsightTypes.NullEvent.INSTANCE); success = false; errorString = REASON_ERROR; @@ -495,8 +528,7 @@ public void trackMetric(FFCUser user, String eventName, double metricValue) { Loggers.CLIENT.warn("FFC JAVA SDK: event/user/metric invalid"); return; } - InsightTypes.Event event = InsightTypes.MetricEvent.of(user) - .add(InsightTypes.Metric.of(eventName, metricValue)); + InsightTypes.Event event = InsightTypes.MetricEvent.of(user).add(InsightTypes.Metric.of(eventName, metricValue)); insightProcessor.send(event); } diff --git a/src/main/java/co/featureflags/server/InsightProcessorBuilder.java b/src/main/java/co/featureflags/server/InsightProcessorBuilder.java index d7603ea..2063ba1 100644 --- a/src/main/java/co/featureflags/server/InsightProcessorBuilder.java +++ b/src/main/java/co/featureflags/server/InsightProcessorBuilder.java @@ -33,7 +33,7 @@ public abstract class InsightProcessorBuilder implements InsightEventSenderFacto protected final static int DEFAULT_CAPACITY = 10000; protected final static int DEFAULT_RETRY_DELAY = 100; protected final static int DEFAULT_RETRY_TIMES = 1; - protected final static Duration DEFAULT_FLUSH_INTERVAL = Duration.ofSeconds(3); + protected final static Duration DEFAULT_FLUSH_INTERVAL = Duration.ofSeconds(1); protected String eventUri; diff --git a/src/main/java/co/featureflags/server/InsightTypes.java b/src/main/java/co/featureflags/server/InsightTypes.java index 24e5444..de19c03 100644 --- a/src/main/java/co/featureflags/server/InsightTypes.java +++ b/src/main/java/co/featureflags/server/InsightTypes.java @@ -261,7 +261,7 @@ public void completed() { } } - public void waitForComplete(Duration timeout) { + public void waitForComplete() { if (waitLock == null) { return; } diff --git a/src/main/java/co/featureflags/server/Insights.java b/src/main/java/co/featureflags/server/Insights.java index 00807f7..b9850ac 100644 --- a/src/main/java/co/featureflags/server/Insights.java +++ b/src/main/java/co/featureflags/server/Insights.java @@ -2,16 +2,16 @@ import co.featureflags.commons.json.JsonHelper; import co.featureflags.server.exterior.InsightProcessor; +import com.google.common.collect.Iterables; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -34,7 +34,7 @@ public InsightProcessorImpl(InsightTypes.InsightConfig config) { this.inbox = new ArrayBlockingQueue<>(config.capacity); new EventDispatcher(config, inbox); this.flushScheduledExecutor = new ScheduledThreadPoolExecutor(1, Utils.createThreadFactory("insight-periodic-flush-worker-%d", true)); - flushScheduledExecutor.scheduleAtFixedRate(this::flush, config.flushInterval, config.flushInterval, TimeUnit.MILLISECONDS); + flushScheduledExecutor.scheduleAtFixedRate(this::flush, config.getFlushInterval(), config.getFlushInterval(), TimeUnit.MILLISECONDS); Loggers.EVENTS.debug("insight processor is ready"); } @@ -45,7 +45,7 @@ public void send(InsightTypes.Event event) { putEventAsync(InsightTypes.InsightMessageType.FLAGS, event); } else if (event instanceof InsightTypes.MetricEvent) { putEventAsync(InsightTypes.InsightMessageType.METRICS, event); - } else{ + } else { Loggers.EVENTS.debug("ignore event type: {}", event.getClass().getName()); } } @@ -79,7 +79,7 @@ private void putEventAndWaitTermination(InsightTypes.InsightMessageType type, In InsightTypes.InsightMessage msg = new InsightTypes.InsightMessage(type, event, true); if (putMsgToInbox(msg)) { Loggers.EVENTS.debug("put {} WaitTermination message to inbox", type); - msg.waitForComplete(Duration.ZERO); + msg.waitForComplete(); } } @@ -106,34 +106,94 @@ private boolean putMsgToInbox(InsightTypes.InsightMessage msg) { } - private final static class FlushPaypladRunner implements Runnable { + private final static class FlushPayload { + private final InsightTypes.Event[] events; + + public FlushPayload(InsightTypes.Event[] events) { + this.events = events; + } + + public InsightTypes.Event[] getEvents() { + return events; + } + } + + private final static class EventBuffer { + private final List incomingEvents = new ArrayList<>(); + + void add(InsightTypes.Event event) { + incomingEvents.add(event); + } + + FlushPayload getPayload() { + return new FlushPayload(incomingEvents.toArray(new InsightTypes.Event[0])); + } + + void clear() { + incomingEvents.clear(); + } + + boolean isEmpty() { + return incomingEvents.isEmpty(); + } + + } + + private final static class FlushPayloadRunner implements Runnable { + + private final static int MAX_EVENT_SIZE_PER_REQUEST = 100; private final InsightTypes.InsightConfig config; - private final Semaphore permits; + + private final BlockingQueue payloadQueue; private final AtomicInteger busyFlushPaypladThreadNum; - private final InsightTypes.Event[] payload; + private final AtomicBoolean running; + + private final Thread thread; - public FlushPaypladRunner(InsightTypes.InsightConfig config, Semaphore permits, AtomicInteger busyFlushPaypladThreadNum, InsightTypes.Event[] payload) { + public FlushPayloadRunner(InsightTypes.InsightConfig config, BlockingQueue payloadQueue, AtomicInteger busyFlushPaypladThreadNum) { this.config = config; - this.permits = permits; + this.payloadQueue = payloadQueue; this.busyFlushPaypladThreadNum = busyFlushPaypladThreadNum; - this.payload = payload; + this.running = new AtomicBoolean(true); + ThreadFactory threadFactory = Utils.createThreadFactory("flush-payload-worker-%d", true); + this.thread = threadFactory.newThread(this); + this.thread.start(); } @Override public void run() { - try { - String json = JsonHelper.serialize(payload); - config.getSender().sendEvent(config.getEventUrl(), json); - } catch (Exception unexpected) { - Loggers.EVENTS.error("FFC JAVA SDK: unexpected error in sending payload: {}", unexpected.getMessage()); - } - permits.release(); - synchronized (busyFlushPaypladThreadNum) { - busyFlushPaypladThreadNum.decrementAndGet(); - busyFlushPaypladThreadNum.notifyAll(); + while (running.get()) { + FlushPayload payload; + try { + payload = payloadQueue.take(); // blocked until a payload comes in + } catch (InterruptedException e) { + continue; + } + try { + // split the payload into small partitions and send them to featureflag.co + Iterables.partition(Arrays.asList(payload.getEvents()), MAX_EVENT_SIZE_PER_REQUEST) + .forEach(partition -> { + String json = JsonHelper.serialize(partition); + config.getSender().sendEvent(config.getEventUrl(), json); + Loggers.EVENTS.debug("paload size: {}", partition.size()); + }); + } catch (Exception unexpected) { + Loggers.EVENTS.error("FFC JAVA SDK: unexpected error in sending payload: {}", unexpected.getMessage()); + } + // busy payload worker - 1 + synchronized (busyFlushPaypladThreadNum) { + busyFlushPaypladThreadNum.decrementAndGet(); + busyFlushPaypladThreadNum.notifyAll(); + } } } + + public void stop() { + running.set(true); + thread.interrupt(); + Loggers.EVENTS.debug("flush payload worker is stopping..."); + } } private static final class EventDispatcher { @@ -145,26 +205,33 @@ private static final class EventDispatcher { private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicInteger busyFlushPayloadThreadNum = new AtomicInteger(0); private final InsightTypes.InsightConfig config; - private final ThreadPoolExecutor threadPoolExecutor; - private final List eventsBufferToNextFlush = new ArrayList<>(); + private final EventBuffer eventBuffer = new EventBuffer(); // permits to flush events - private final Semaphore permits = new Semaphore(MAX_FLUSH_WORKERS_NUMBER); + private final List flushWorkers; + + // This queue only holds one payload, that should be immediately picked up by any free flush worker. + // if we try to push another one to this queue and then is refused, + // it means all the flush workers are busy, this payload will be consumed until a flush worker becomes free again. + // Events in the refused payload should be kept in buffer and try to be pushed to this queue in the next flush + private final BlockingQueue payloadQueue = new ArrayBlockingQueue<>(1); public EventDispatcher(InsightTypes.InsightConfig config, BlockingQueue inbox) { this.config = config; this.inbox = inbox; - this.threadPoolExecutor = new ThreadPoolExecutor(MAX_FLUSH_WORKERS_NUMBER, - MAX_FLUSH_WORKERS_NUMBER, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(MAX_QUEUE_SIZE), - Utils.createThreadFactory("flush-payload-worker-%d", true), - new ThreadPoolExecutor.CallerRunsPolicy()); Thread mainThread = Utils.createThreadFactory("event-dispatcher", true).newThread(this::dispatchEvents); mainThread.start(); + this.flushWorkers = new ArrayList<>(); + for (int i = 0; i < MAX_FLUSH_WORKERS_NUMBER; i++) { + FlushPayloadRunner task = new FlushPayloadRunner(config, payloadQueue, busyFlushPayloadThreadNum); + flushWorkers.add(task); + } } + // blocks until a message is available and then: + // 1: transfer the events to event buffer + // 2: try to flush events to featureflag if a flush message arrives + // 3: wait for releasing resources if a shutdown arrives private void dispatchEvents() { List messages = new ArrayList<>(); Loggers.EVENTS.debug("event dispatcher is working..."); @@ -172,7 +239,7 @@ private void dispatchEvents() { try { messages.clear(); messages.add(inbox.take()); - inbox.drainTo(messages, BATCH_SIZE - 1); + inbox.drainTo(messages, BATCH_SIZE - 1); // this nonblocking call allows us to pick up more messages if available for (InsightTypes.InsightMessage message : messages) { try { switch (message.getType()) { @@ -209,7 +276,6 @@ private void waitUntilFlushPayLoadWorkerDown() { } } } - Loggers.EVENTS.debug("flush payload worker is down"); } private void putEventToNextBuffer(InsightTypes.Event event) { @@ -218,30 +284,37 @@ private void putEventToNextBuffer(InsightTypes.Event event) { } if (event.isSendEvent()) { Loggers.EVENTS.debug("put event to buffer"); - eventsBufferToNextFlush.add(event); + eventBuffer.add(event); } } private void triggerFlush() { - if (closed.get() || eventsBufferToNextFlush.isEmpty()) { + if (closed.get() || eventBuffer.isEmpty()) { return; } - InsightTypes.Event[] payload = eventsBufferToNextFlush.toArray(new InsightTypes.Event[0]); - if (permits.tryAcquire()) { - Loggers.EVENTS.debug("trigger flush"); - // busy payload worker + 1 - busyFlushPayloadThreadNum.incrementAndGet(); - // send events - threadPoolExecutor.execute(new FlushPaypladRunner(config, permits, busyFlushPayloadThreadNum, payload)); - // clear buffer for next flush - eventsBufferToNextFlush.clear(); + + //get all the current events from event buffer + FlushPayload payload = eventBuffer.getPayload(); + // busy payload worker + 1 + busyFlushPayloadThreadNum.incrementAndGet(); + if (payloadQueue.offer(payload)) { + // put events to the next available flush worker, so drop them from our buffer + eventBuffer.clear(); + } else { + Loggers.EVENTS.debug("Skipped flushing because all workers are busy"); + // All the workers are busy so we can't flush now; + // the buffer should keep the events for the next flush + // busy payload worker - 1 + synchronized (busyFlushPayloadThreadNum) { + busyFlushPayloadThreadNum.decrementAndGet(); + busyFlushPayloadThreadNum.notify(); + } } - // if no more space in the payload queue, the buffer will be merged in the next flush } private void shutdown() { - Loggers.EVENTS.debug("event dispatcher clean up thread and conn pool"); + Loggers.EVENTS.debug("event dispatcher clean up threads and conn pool"); try { // wait for all flush payload is well done waitUntilFlushPayLoadWorkerDown(); @@ -251,7 +324,9 @@ private void shutdown() { // } // shutdown resources if (closed.compareAndSet(false, true)) { - Utils.shutDownThreadPool("flush-payload-worker", threadPoolExecutor, AWAIT_TERMINATION); + for (FlushPayloadRunner task : flushWorkers) { + task.stop(); + } config.getSender().close(); } } catch (Exception unexpected) { diff --git a/src/main/java/co/featureflags/server/Streaming.java b/src/main/java/co/featureflags/server/Streaming.java index feec36b..94a3756 100644 --- a/src/main/java/co/featureflags/server/Streaming.java +++ b/src/main/java/co/featureflags/server/Streaming.java @@ -57,13 +57,13 @@ final class Streaming implements UpdateProcessor { private static final String INVALID_REQUEST_CLOSE_REASON = "invalid request"; private static final Integer GOING_AWAY_CLOSE = 1001; private static final String JUST_RECONN_REASON_REGISTERED = "reconn"; - private static final int MAX_QUEUE_SIZE = 20; + private static final int MAX_QUEUE_SIZE = 10; private static final Duration PING_INTERVAL = Duration.ofSeconds(10); private static final Duration AWAIT_TERMINATION = Duration.ofSeconds(2); private static final String DEFAULT_STREAMING_PATH = "/streaming"; private static final String AUTH_PARAMS = "?token=%s&type=server&version=2"; private static final Map NOT_RECONN_CLOSE_REASON = ImmutableMap.of(NORMAL_CLOSE, NORMAL_CLOSE_REASON, INVALID_REQUEST_CLOSE, INVALID_REQUEST_CLOSE_REASON); - private static final List> RECONNECT_EXCEPTIONS = ImmutableList.of(SocketTimeoutException.class, SocketException.class, EOFException.class); + private static final List> WEBSOCKET_EXCEPTION = ImmutableList.of(SocketTimeoutException.class, SocketException.class, EOFException.class); private static final Logger logger = Loggers.UPDATE_PROCESSOR; // final viariables @@ -300,7 +300,6 @@ public final void onClosed(@NotNull WebSocket webSocket, int code, @NotNull Stri @Override public final void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, @Nullable Response response) { - logger.error("FFC JAVA SDK: streaming webSocket Failure", t); isWSConnected.compareAndSet(true, false); boolean forceToUseMaxRetryDelay = false; boolean isReconn = false; @@ -311,28 +310,23 @@ public final void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, isReconn = tClass != JsonParseException.class; errorType = isReconn ? RUNTIME_ERROR : DATA_INVALID_ERROR; } else { - // restart a cause of network error - for (Class cls : RECONNECT_EXCEPTIONS) { - if (tClass == cls) { - isReconn = true; - errorType = NETWORK_ERROR; - // maybe kicked off by server side - if (tClass == EOFException.class) { - forceToUseMaxRetryDelay = true; - } - } - } - if (!isReconn && t instanceof IOException) { + isReconn = true; + if (WEBSOCKET_EXCEPTION.contains(tClass)) { errorType = WEBSOCKET_ERROR; - } else if (errorType == null) { + } else if (t instanceof IOException) { + errorType = NETWORK_ERROR; + forceToUseMaxRetryDelay = true; + } else { errorType = UNKNOWN_ERROR; } } Status.ErrorInfo errorInfo = Status.ErrorInfo.of(errorType, t.getMessage()); if (isReconn) { + logger.warn("FFC JAVA SDK: streaming webSocket will reconnect because of {}", t.getMessage()); updator.updateStatus(Status.StateType.INTERRUPTED, errorInfo); reconnect(forceToUseMaxRetryDelay); } else { + logger.error("FFC JAVA SDK: streaming webSocket Failure", t); updator.updateStatus(Status.StateType.OFF, errorInfo); // clean up thread and conn pool clearExecutor(); diff --git a/src/main/java/co/featureflags/server/Utils.java b/src/main/java/co/featureflags/server/Utils.java index 810a354..194629c 100644 --- a/src/main/java/co/featureflags/server/Utils.java +++ b/src/main/java/co/featureflags/server/Utils.java @@ -87,17 +87,22 @@ public static void buildProxyAndSocketFactoryFor(OkHttpClient.Builder builder, H } } - private static final Map ALPHABETS = - ImmutableMap.of("0", "Q", - "1", "B", - "2", "W", - "3", "S", - "4", "P", - "5", "H", - "6", "D", - "7", "X", - "8", "Z", - "9", "U"); + private static final Map ALPHABETS; + + static { + ImmutableMap.Builder builder = new ImmutableMap.Builder<>(); + ALPHABETS = builder.put("0", "Q") + .put("1", "B") + .put("2", "W") + .put("3", "S") + .put("4", "P") + .put("5", "H") + .put("6", "D") + .put("7", "X") + .put("8", "Z") + .put("9", "U") + .build(); + } private static String encodeNumber(long number, int length) { String str = "000000000000" + number; @@ -146,4 +151,8 @@ public static void shutDownThreadPool(String name, ThreadPoolExecutor pool, Dura Loggers.UTILS.debug("gracefully shut down thread pool of {}", name); } + public static int intLEFromBytes(byte[] bytes) { + return bytes[3] << 24 | (bytes[2] & 255) << 16 | (bytes[1] & 255) << 8 | bytes[0] & 255; + } + } diff --git a/src/main/java/co/featureflags/server/VariationSplittingAlgorithm.java b/src/main/java/co/featureflags/server/VariationSplittingAlgorithm.java index 8766cb6..22dc417 100644 --- a/src/main/java/co/featureflags/server/VariationSplittingAlgorithm.java +++ b/src/main/java/co/featureflags/server/VariationSplittingAlgorithm.java @@ -26,7 +26,7 @@ static double percentageOfKey(String key) { MessageDigest md5 = MessageDigest.getInstance("MD5"); md5.update(key.getBytes(StandardCharsets.US_ASCII)); byte[] digest = md5.digest(); - int magicNumber = Ints.fromByteArray(digest); + int magicNumber = Utils.intLEFromBytes(digest); return Math.abs((double) magicNumber / Integer.MIN_VALUE); } catch (Exception ex) { return 0D; diff --git a/src/main/java/co/featureflags/server/exterior/FFCClient.java b/src/main/java/co/featureflags/server/exterior/FFCClient.java index b7d801c..b48a443 100644 --- a/src/main/java/co/featureflags/server/exterior/FFCClient.java +++ b/src/main/java/co/featureflags/server/exterior/FFCClient.java @@ -79,7 +79,7 @@ public interface FFCClient extends Closeable { boolean isEnabled(String featureFlagKey, FFCUser user); /** - * alias of boolVariation for a given user + * alias of boolVariation for a current user *

* note that this method should be called in the context that support to capture automatically the current user * @@ -158,6 +158,29 @@ public interface FFCClient extends Closeable { */ long longVariation(String featureFlagKey, Long defaultValue); + /** + * Calculates the json value of a feature flag for a given user. + * + * @param featureFlagKey the unique key for the feature flag + * @param user the end user requesting the flag + * @param clazz json deserialization class + * @param defaultValue the default value of the flag + * @param json object type + * @return the variation for the given user, or {@code defaultValue} if the flag is disabled, current user doesn't exist + */ + T jsonVariation(String featureFlagKey, FFCUser user, Class clazz, T defaultValue); + + /** + * Calculates the json value of a feature flag for the current user. + * + * @param featureFlagKey the unique key for the feature flag + * @param clazz json deserialization class + * @param defaultValue the default value of the flag + * @param json object type + * @return the variation for the current user, or {@code defaultValue} if the flag is disabled, current user doesn't exist + */ + T jsonVariation(String featureFlagKey, Class clazz, T defaultValue); + /** * Returns true if the specified feature flag currently exists. * @@ -267,7 +290,7 @@ public interface FFCClient extends Closeable { FlagState doubleVariationDetail(String featureFlagKey, FFCUser user, Double defaultValue); /** - * Calculates the double value of a feature flag for a given user, and returns an object that describes the + * Calculates the double value of a feature flag for a current user, and returns an object that describes the * way the value was determined. *

* note that this method should be called in the context that support to capture automatically the current user @@ -293,7 +316,7 @@ public interface FFCClient extends Closeable { FlagState intVariationDetail(String featureFlagKey, FFCUser user, Integer defaultValue); /** - * Calculates the int value of a feature flag for a given user, and returns an object that describes the + * Calculates the int value of a feature flag for a current user, and returns an object that describes the * way the value was determined. *

* Note that If the variation has a numeric value, but not a int value, it is rounded toward zero(DOWN mode) @@ -315,13 +338,13 @@ public interface FFCClient extends Closeable { * * @param featureFlagKey the unique key for the feature flag * @param user the end user requesting the flag - * @param defaultValue the unique key for the feature flag + * @param defaultValue the default value of the flag * @return an {@link FlagState} object */ FlagState longVariationDetail(String featureFlagKey, FFCUser user, Long defaultValue); /** - * Calculates the long of a feature flag for a given user, and returns an object that describes the + * Calculates the long of a feature flag for the current user, and returns an object that describes the * way the value was determined. *

* Note that If the variation has a numeric value, but not a long value, it is rounded toward zero(DOWN mode) @@ -329,11 +352,36 @@ public interface FFCClient extends Closeable { * note that this method should be called in the context that support to capture automatically the current user * * @param featureFlagKey the unique key for the feature flag - * @param defaultValue the unique key for the feature flag + * @param defaultValue the default value of the flag * @return an {@link FlagState} object */ FlagState longVariationDetail(String featureFlagKey, Long defaultValue); + /** + * Calculates the json value of a feature flag for a given user, and returns an object that describes the + * way the value was determined. + * + * @param featureFlagKey the unique key for the feature flag + * @param user the end user requesting the flag + * @param clazz json deserialization class + * @param defaultValue the default value of the flag + * @param json object type + * @return an {@link FlagState} object + */ + FlagState jsonVariationDetail(String featureFlagKey, FFCUser user, Class clazz, T defaultValue); + + /** + * Calculates the json value of a feature flag for a current user, and returns an object that describes the + * way the value was determined. + * + * @param featureFlagKey the unique key for the feature flag + * @param clazz json deserialization class + * @param defaultValue the default value of the flag + * @param json object type + * @return an {@link FlagState} object + */ + FlagState jsonVariationDetail(String featureFlagKey, Class clazz, T defaultValue); + /** * Flushes all pending events. */ diff --git a/src/main/java/co/featureflags/server/exterior/HttpConfigurationBuilder.java b/src/main/java/co/featureflags/server/exterior/HttpConfigurationBuilder.java index be9de3b..e471b0d 100644 --- a/src/main/java/co/featureflags/server/exterior/HttpConfigurationBuilder.java +++ b/src/main/java/co/featureflags/server/exterior/HttpConfigurationBuilder.java @@ -27,8 +27,8 @@ * @see co.featureflags.server.Factory */ public abstract class HttpConfigurationBuilder implements HttpConfigFactory { - protected final Duration DEFAULT_CONN_TIME = Duration.ofSeconds(10); - protected final Duration DEFAULT_SOCK_TIME = Duration.ofSeconds(15); + protected final Duration DEFAULT_CONN_TIME = Duration.ofSeconds(5); + protected final Duration DEFAULT_SOCK_TIME = Duration.ofSeconds(10); protected Duration connectTime; protected Duration socketTime; protected Proxy proxy;