From 77031da8c6d80b019a2694cc75b797e7d9520683 Mon Sep 17 00:00:00 2001 From: uhm0311 Date: Thu, 12 Oct 2023 15:17:39 +0900 Subject: [PATCH] CLEANUP: Refactored SMGetFuture. --- .../java/net/spy/memcached/ArcusClient.java | 439 ++---------------- .../spy/memcached/internal/SMGetFuture.java | 71 ++- .../spy/memcached/internal/SMGetResult.java | 85 ++++ .../memcached/internal/SMGetResultImpl.java | 134 ++++++ .../internal/SMGetResultOldImpl.java | 118 +++++ 5 files changed, 450 insertions(+), 397 deletions(-) create mode 100644 src/main/java/net/spy/memcached/internal/SMGetResult.java create mode 100644 src/main/java/net/spy/memcached/internal/SMGetResultImpl.java create mode 100644 src/main/java/net/spy/memcached/internal/SMGetResultOldImpl.java diff --git a/src/main/java/net/spy/memcached/ArcusClient.java b/src/main/java/net/spy/memcached/ArcusClient.java index b1d17be08..a942413b0 100644 --- a/src/main/java/net/spy/memcached/ArcusClient.java +++ b/src/main/java/net/spy/memcached/ArcusClient.java @@ -37,10 +37,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.jar.JarFile; @@ -120,13 +118,14 @@ import net.spy.memcached.internal.BTreeStoreAndGetFuture; import net.spy.memcached.internal.BroadcastFuture; import net.spy.memcached.internal.BulkOperationFuture; -import net.spy.memcached.internal.CheckedOperationTimeoutException; import net.spy.memcached.internal.CollectionFuture; import net.spy.memcached.internal.CollectionGetBulkFuture; import net.spy.memcached.internal.CollectionGetFuture; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.internal.PipedCollectionFuture; import net.spy.memcached.internal.SMGetFuture; +import net.spy.memcached.internal.SMGetResultImpl; +import net.spy.memcached.internal.SMGetResultOldImpl; import net.spy.memcached.ops.BTreeFindPositionOperation; import net.spy.memcached.ops.BTreeFindPositionWithGetOperation; import net.spy.memcached.ops.BTreeGetBulkOperation; @@ -144,7 +143,6 @@ import net.spy.memcached.ops.Mutator; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; -import net.spy.memcached.ops.OperationState; import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.StoreType; import net.spy.memcached.plugin.FrontCacheMemcachedClient; @@ -2069,31 +2067,6 @@ private Collection>> groupingKeys(List return resultList; } - /** - * Get the sublist of elements from the smget result. - * - * @param mergedResult smget result (list of elements) - * @param offset start index, negative offset indicates "start from the tail" - * @param count number of elements to get - * @return list of elements - */ - private List> getSubList(final List> mergedResult, int offset, int count) { - if (offset > 0) { - if ((offset + count) < mergedResult.size()) { - return mergedResult.subList(offset, (offset + count)); - } - if (offset < mergedResult.size()) { - return mergedResult.subList(offset, mergedResult.size()); - } - return Collections.emptyList(); - } else { - if (count < mergedResult.size()) { - return mergedResult.subList(0, count); - } - return mergedResult; - } - } - /** * Generic smget operation for b+tree items. Public smget methods call this method. * @@ -2109,30 +2082,15 @@ private SMGetFuture>> smget( final List> smGetList, final int offset, final int count, final boolean reverse, final Transcoder tc) { - final String END = "END"; final String TRIMMED = "TRIMMED"; - final String DUPLICATED = "DUPLICATED"; final String DUPLICATED_TRIMMED = "DUPLICATED_TRIMMED"; final CountDownLatch blatch = new CountDownLatch(smGetList.size()); final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); - final List missedKeyList - = Collections.synchronizedList(new ArrayList()); - final Map missedKeys - = Collections.synchronizedMap(new HashMap()); - final List mergedTrimmedKeys - = Collections.synchronizedList(new ArrayList()); - final int totalResultElementCount = count + offset; - - final List> mergedResult = new ArrayList>(totalResultElementCount); - final List resultOperationStatus - = Collections.synchronizedList(new ArrayList(1)); - final List failedOperationStatus - = Collections.synchronizedList(new ArrayList(1)); + final SMGetResultOldImpl result = new SMGetResultOldImpl(offset, count, reverse); // if processedSMGetCount is 0, then all smget is done. final AtomicInteger processedSMGetCount = new AtomicInteger(smGetList.size()); - final AtomicBoolean mergedTrim = new AtomicBoolean(false); final AtomicBoolean stopCollect = new AtomicBoolean(false); for (BTreeSMGet smGet : smGetList) { @@ -2141,86 +2099,22 @@ private SMGetFuture>> smget( @Override public void receivedStatus(OperationStatus status) { - processedSMGetCount.decrementAndGet(); + final int processed = processedSMGetCount.decrementAndGet(); - if (!status.isSuccess()) { - getLogger().warn("SMGetFailed. status=%s", status); - if (!stopCollect.get()) { - stopCollect.set(true); - failedOperationStatus.add(status); - } - mergedResult.clear(); - return; - } + if (status.isSuccess()) { + boolean isTrimmed = (TRIMMED.equals(status.getMessage()) || + DUPLICATED_TRIMMED.equals(status.getMessage())); - boolean isTrimmed = (TRIMMED.equals(status.getMessage()) || - DUPLICATED_TRIMMED.equals(status.getMessage())) - ? true : false; - if (mergedResult.size() == 0) { - // merged result is empty, add all. - mergedResult.addAll(eachResult); - mergedTrim.set(isTrimmed); - } else { - boolean addAll = true; - int pos = 0; - for (SMGetElement result : eachResult) { - for (; pos < mergedResult.size(); pos++) { - if ((reverse) ? (0 < result.compareTo(mergedResult.get(pos))) - : (0 > result.compareTo(mergedResult.get(pos)))) { - break; - } - } - if (pos >= totalResultElementCount) { - addAll = false; - break; - } - if (pos >= mergedResult.size() && mergedTrim.get() && - result.compareBkeyTo(mergedResult.get(pos - 1)) != 0) { - addAll = false; - break; - } - mergedResult.add(pos, result); - if (mergedResult.size() > totalResultElementCount) { - mergedResult.remove(totalResultElementCount); - } - pos += 1; - } - if (isTrimmed && addAll) { - while (pos < mergedResult.size()) { - if (mergedResult.get(pos).compareBkeyTo(mergedResult.get(pos - 1)) == 0) { - pos += 1; - } else { - mergedResult.remove(pos); - } - } - mergedTrim.set(true); - } - if (mergedResult.size() >= totalResultElementCount) { - mergedTrim.set(false); + result.mergeSMGetElements(eachResult, isTrimmed); + if (processed == 0) { + result.makeResultOperationStatus(); } - } - - if (processedSMGetCount.get() == 0) { - boolean isDuplicated = false; - for (int i = 1; i < mergedResult.size(); i++) { - if (mergedResult.get(i).compareBkeyTo(mergedResult.get(i - 1)) == 0) { - isDuplicated = true; - break; - } - } - if (mergedTrim.get()) { - if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED_TRIMMED")); - } else { - resultOperationStatus.add(new OperationStatus(true, "TRIMMED")); - } - } else { - if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); - } else { - resultOperationStatus.add(new OperationStatus(true, "END")); - } + } else { + if (!stopCollect.getAndSet(true)) { + result.addFailedOperationStatus(status); } + result.clearMergedResult(); + getLogger().warn("SMGetFailed. status=%s", status); } } @@ -2246,78 +2140,15 @@ public void gotData(String key, int flags, Object subkey, byte[] eflag, byte[] d @Override public void gotMissedKey(byte[] data) { - missedKeyList.add(new String(data)); - OperationStatus cause = new OperationStatus(false, "UNDEFINED"); - missedKeys.put(new String(data), new CollectionOperationStatus(cause)); + OperationStatus status = new OperationStatus(false, "UNDEFINED"); + result.addMissedKey(new String(data), new CollectionOperationStatus(status)); } }); ops.add(op); addOp(smGet.getMemcachedNode(), op); } - return new SMGetFuture>>(ops, operationTimeout) { - @Override - public List> get(long duration, TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - - if (!blatch.await(duration, units)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - MemcachedConnection.opsTimedOut(timedoutOps); - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - if (smGetList.size() == 1) { - return mergedResult; - } - - return getSubList(mergedResult, offset, count); - } - - @Override - public List getMissedKeyList() { - return missedKeyList; - } - - @Override - public Map getMissedKeys() { - return missedKeys; - } - - @Override - public List getTrimmedKeys() { - return mergedTrimmedKeys; - } - - @Override - public CollectionOperationStatus getOperationStatus() { - if (!failedOperationStatus.isEmpty()) { - return new CollectionOperationStatus(failedOperationStatus.get(0)); - } - return new CollectionOperationStatus(resultOperationStatus.get(0)); - } - }; + return new SMGetFuture>>(ops, result, blatch, operationTimeout); } private SMGetFuture>> smget( @@ -2326,22 +2157,11 @@ private SMGetFuture>> smget( final CountDownLatch blatch = new CountDownLatch(smGetList.size()); final ConcurrentLinkedQueue ops = new ConcurrentLinkedQueue(); - final List missedKeyList - = Collections.synchronizedList(new ArrayList()); - final Map missedKeys - = Collections.synchronizedMap(new HashMap()); - - final List> mergedResult = new ArrayList>(count); - final List mergedTrimmedKeys - = Collections.synchronizedList(new ArrayList()); - - final List resultOperationStatus - = Collections.synchronizedList(new ArrayList(1)); - final List failedOperationStatus - = Collections.synchronizedList(new ArrayList(1)); + final SMGetResultImpl result = new SMGetResultImpl(count, reverse, smgetMode); // if processedSMGetCount is 0, then all smget is done. final AtomicInteger processedSMGetCount = new AtomicInteger(smGetList.size()); + final AtomicBoolean stopCollect = new AtomicBoolean(false); for (BTreeSMGet smGet : smGetList) { Operation op = opFact.bopsmget(smGet, new BTreeSortMergeGetOperation.Callback() { @@ -2350,121 +2170,17 @@ private SMGetFuture>> smget( @Override public void receivedStatus(OperationStatus status) { - processedSMGetCount.decrementAndGet(); + final int processed = processedSMGetCount.decrementAndGet(); - if (!status.isSuccess()) { - getLogger().warn("SMGetFailed. status=%s", status); - if (failedOperationStatus.isEmpty()) { - mergedResult.clear(); - mergedTrimmedKeys.clear(); + if (status.isSuccess()) { + result.mergeSMGetElements(eachResult, eachTrimmedResult); + if (processed == 0) { + result.makeResultOperationStatus(); } - failedOperationStatus.add(status); - return; - } - - if (mergedResult.isEmpty()) { - // merged result is empty, add all. - mergedResult.addAll(eachResult); } else { - // do sort merge - boolean duplicated; - int comp, pos = 0; - for (SMGetElement result : eachResult) { - duplicated = false; - for (; pos < mergedResult.size(); pos++) { - // compare b+tree key - comp = result.compareBkeyTo(mergedResult.get(pos)); - if ((reverse) ? (0 < comp) : (0 > comp)) { - break; - } - if (comp == 0) { // compare key string - int keyComp = result.compareKeyTo(mergedResult.get(pos)); - if ((reverse) ? (0 < keyComp) : (0 > keyComp)) { - if (smgetMode == SMGetMode.UNIQUE) { - mergedResult.remove(pos); // remove dup bkey - } - break; - } else { - if (smgetMode == SMGetMode.UNIQUE) { - duplicated = true; - break; - } - } - } - } - if (duplicated) { // UNIQUE - continue; - } - if (pos >= count) { - // At this point, following conditions are met. - // - mergedResult.size() == totalResultElementCount && - // - The current of eachResult is - // behind of the last of mergedResult. - // Then, all the next elements of eachResult are - // definitely behind of the last of mergedResult. - // So, stop the current sort-merge. - break; - } - mergedResult.add(pos, result); - if (mergedResult.size() > count) { - // Remove elements that exceed the requested count. - mergedResult.remove(count); - } - pos += 1; - } - } - - if (!eachTrimmedResult.isEmpty()) { - if (mergedTrimmedKeys.isEmpty()) { - mergedTrimmedKeys.addAll(eachTrimmedResult); - } else { - // do sort merge trimmed list - int pos = 0; - for (SMGetTrimKey result : eachTrimmedResult) { - for (; pos < mergedTrimmedKeys.size(); pos++) { - if ((reverse) ? (0 < result.compareTo(mergedTrimmedKeys.get(pos))) - : (0 > result.compareTo(mergedTrimmedKeys.get(pos)))) { - break; - } - } - mergedTrimmedKeys.add(pos, result); - pos += 1; - } - } - } - - if (processedSMGetCount.get() == 0) { - if (!mergedTrimmedKeys.isEmpty() && count <= mergedResult.size()) { - // remove trimed keys whose bkeys are behind of the last element. - SMGetElement lastElement = mergedResult.get(mergedResult.size() - 1); - SMGetTrimKey lastTrimKey = new SMGetTrimKey(lastElement.getKey(), - lastElement.getBkeyObject()); - for (int i = mergedTrimmedKeys.size() - 1; i >= 0; i--) { - SMGetTrimKey me = mergedTrimmedKeys.get(i); - if ((reverse) ? (0 >= me.compareTo(lastTrimKey)) - : (0 <= me.compareTo(lastTrimKey))) { - mergedTrimmedKeys.remove(i); - } else { - break; - } - } - } - if (smgetMode == SMGetMode.UNIQUE) { - resultOperationStatus.add(new OperationStatus(true, "END")); - } else { - boolean isDuplicated = false; - for (int i = 1; i < mergedResult.size(); i++) { - if (mergedResult.get(i).compareBkeyTo(mergedResult.get(i - 1)) == 0) { - isDuplicated = true; - break; - } - } - if (isDuplicated) { - resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); - } else { - resultOperationStatus.add(new OperationStatus(true, "END")); - } - } + stopCollect.set(true); + result.addFailedOperationStatus(status); + getLogger().warn("SMGetFailed. status=%s", status); } } @@ -2475,31 +2191,34 @@ public void complete() { @Override public void gotData(String key, int flags, Object subkey, byte[] eflag, byte[] data) { - if (failedOperationStatus.isEmpty()) { - if (subkey instanceof Long) { - eachResult.add(new SMGetElement(key, (Long) subkey, eflag, - tc.decode(new CachedData(flags, data, tc.getMaxSize())))); - } else { - eachResult.add(new SMGetElement(key, (byte[]) subkey, eflag, - tc.decode(new CachedData(flags, data, tc.getMaxSize())))); - } + if (stopCollect.get()) { + return; + } + + if (subkey instanceof Long) { + eachResult.add(new SMGetElement(key, (Long) subkey, eflag, + tc.decode(new CachedData(flags, data, tc.getMaxSize())))); + } else { + eachResult.add(new SMGetElement(key, (byte[]) subkey, eflag, + tc.decode(new CachedData(flags, data, tc.getMaxSize())))); } } @Override public void gotMissedKey(String key, OperationStatus cause) { - missedKeyList.add(key); - missedKeys.put(key, new CollectionOperationStatus(cause)); + result.addMissedKey(key, new CollectionOperationStatus(cause)); } @Override public void gotTrimmedKey(String key, Object subkey) { - if (failedOperationStatus.isEmpty()) { - if (subkey instanceof Long) { - eachTrimmedResult.add(new SMGetTrimKey(key, (Long) subkey)); - } else { - eachTrimmedResult.add(new SMGetTrimKey(key, (byte[]) subkey)); - } + if (stopCollect.get()) { + return; + } + + if (subkey instanceof Long) { + eachTrimmedResult.add(new SMGetTrimKey(key, (Long) subkey)); + } else { + eachTrimmedResult.add(new SMGetTrimKey(key, (byte[]) subkey)); } } }); @@ -2507,69 +2226,7 @@ public void gotTrimmedKey(String key, Object subkey) { addOp(smGet.getMemcachedNode(), op); } - return new SMGetFuture>>(ops, operationTimeout) { - @Override - public List> get(long duration, TimeUnit units) - throws InterruptedException, TimeoutException, ExecutionException { - - if (!blatch.await(duration, units)) { - Collection timedoutOps = new HashSet(); - for (Operation op : ops) { - if (op.getState() != OperationState.COMPLETE) { - timedoutOps.add(op); - } else { - MemcachedConnection.opSucceeded(op); - } - } - if (timedoutOps.size() > 0) { - MemcachedConnection.opsTimedOut(timedoutOps); - throw new CheckedOperationTimeoutException(duration, units, timedoutOps); - } - } else { - // continuous timeout counter will be reset - MemcachedConnection.opsSucceeded(ops); - } - - for (Operation op : ops) { - if (op != null && op.hasErrored()) { - throw new ExecutionException(op.getException()); - } - - if (op != null && op.isCancelled()) { - throw new ExecutionException(new RuntimeException(op.getCancelCause())); - } - } - - if (smGetList.size() == 1) { - return mergedResult; - } - - return getSubList(mergedResult, 0, count); - } - - @Override - public List getMissedKeyList() { - return missedKeyList; - } - - @Override - public Map getMissedKeys() { - return missedKeys; - } - - @Override - public List getTrimmedKeys() { - return mergedTrimmedKeys; - } - - @Override - public CollectionOperationStatus getOperationStatus() { - if (!failedOperationStatus.isEmpty()) { - return new CollectionOperationStatus(failedOperationStatus.get(0)); - } - return new CollectionOperationStatus(resultOperationStatus.get(0)); - } - }; + return new SMGetFuture>>(ops, result, blatch, operationTimeout); } @Override diff --git a/src/main/java/net/spy/memcached/internal/SMGetFuture.java b/src/main/java/net/spy/memcached/internal/SMGetFuture.java index 2dd890450..66d0ea46b 100644 --- a/src/main/java/net/spy/memcached/internal/SMGetFuture.java +++ b/src/main/java/net/spy/memcached/internal/SMGetFuture.java @@ -17,27 +17,78 @@ package net.spy.memcached.internal; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import net.spy.memcached.MemcachedConnection; import net.spy.memcached.OperationTimeoutException; import net.spy.memcached.collection.SMGetTrimKey; import net.spy.memcached.ops.CollectionOperationStatus; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; -public abstract class SMGetFuture implements Future { +public final class SMGetFuture> implements Future { private final Collection ops; + private final SMGetResult result; + private final CountDownLatch latch; private final long timeout; - public SMGetFuture(Collection ops, long timeout) { + public SMGetFuture(Collection ops, + SMGetResult result, + CountDownLatch latch, + long timeout) { + + this.latch = latch; this.ops = ops; this.timeout = timeout; + this.result = result; + } + + @Override + @SuppressWarnings("unchecked") + public T get(long duration, TimeUnit units) + throws InterruptedException, TimeoutException, ExecutionException { + + if (!latch.await(duration, units)) { + Collection timedoutOps = new HashSet(); + for (Operation op : ops) { + if (op.getState() != OperationState.COMPLETE) { + timedoutOps.add(op); + } else { + MemcachedConnection.opSucceeded(op); + } + } + if (!timedoutOps.isEmpty()) { + MemcachedConnection.opsTimedOut(timedoutOps); + throw new CheckedOperationTimeoutException(duration, units, timedoutOps); + } + } else { + // continuous timeout counter will be reset + MemcachedConnection.opsSucceeded(ops); + } + + for (Operation op : ops) { + if (op != null && op.hasErrored()) { + throw new ExecutionException(op.getException()); + } + + if (op != null && op.isCancelled()) { + throw new ExecutionException(new RuntimeException(op.getCancelCause())); + } + } + + if (ops.size() == 1) { + return (T) result.getMergedResult(); + } + + return (T) result.getSubList(); } @Override @@ -78,11 +129,19 @@ public boolean isDone() { return true; } - public abstract Map getMissedKeys(); + public List getMissedKeyList() { + return result.getMissedKeyList(); + } - public abstract List getMissedKeyList(); + public Map getMissedKeys() { + return result.getMissedKeyMap(); + } - public abstract List getTrimmedKeys(); + public List getTrimmedKeys() { + return result.getMergedTrimmedKeys(); + } - public abstract CollectionOperationStatus getOperationStatus(); + public CollectionOperationStatus getOperationStatus() { + return result.getOperationStatus(); + } } diff --git a/src/main/java/net/spy/memcached/internal/SMGetResult.java b/src/main/java/net/spy/memcached/internal/SMGetResult.java new file mode 100644 index 000000000..96cf03985 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/SMGetResult.java @@ -0,0 +1,85 @@ +package net.spy.memcached.internal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import net.spy.memcached.collection.SMGetElement; +import net.spy.memcached.collection.SMGetTrimKey; +import net.spy.memcached.ops.CollectionOperationStatus; +import net.spy.memcached.ops.OperationStatus; + +public abstract class SMGetResult { + protected final int count; + protected final int totalResultElementCount; + protected final boolean reverse; + + protected final List missedKeyList; + protected final Map missedKeyMap; + protected final List mergedTrimmedKeys; + + protected final List> mergedResult; + protected final List resultOperationStatus; + protected final List failedOperationStatus; + + protected List> mergedSubList = null; + + public SMGetResult(int count, int totalResultElementCount, boolean reverse) { + this.count = count; + this.totalResultElementCount = totalResultElementCount; + this.reverse = reverse; + + this.missedKeyList = Collections.synchronizedList(new ArrayList()); + this.missedKeyMap + = Collections.synchronizedMap(new HashMap()); + this.mergedTrimmedKeys = Collections.synchronizedList(new ArrayList()); + + this.mergedResult = new ArrayList>(totalResultElementCount); + this.resultOperationStatus = Collections.synchronizedList(new ArrayList(1)); + this.failedOperationStatus = Collections.synchronizedList(new ArrayList(1)); + } + + public List getMissedKeyList() { + return missedKeyList; + } + + public Map getMissedKeyMap() { + return missedKeyMap; + } + + public List getMergedTrimmedKeys() { + return mergedTrimmedKeys; + } + + public List> getMergedResult() { + return mergedResult; + } + + public void addMissedKey(String key, CollectionOperationStatus cstatus) { + missedKeyList.add(key); + missedKeyMap.put(key, cstatus); + } + + public List> getSubList() { + if (mergedSubList == null) { + if (count < mergedResult.size()) { + mergedSubList = mergedResult.subList(0, count); + } else { + mergedSubList = mergedResult; + } + } + + return mergedSubList; + } + + public CollectionOperationStatus getOperationStatus() { + if (!failedOperationStatus.isEmpty()) { + return new CollectionOperationStatus(failedOperationStatus.get(0)); + } + return new CollectionOperationStatus(resultOperationStatus.get(0)); + } + + public abstract void makeResultOperationStatus(); +} diff --git a/src/main/java/net/spy/memcached/internal/SMGetResultImpl.java b/src/main/java/net/spy/memcached/internal/SMGetResultImpl.java new file mode 100644 index 000000000..b0329701c --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/SMGetResultImpl.java @@ -0,0 +1,134 @@ +package net.spy.memcached.internal; + +import java.util.List; + +import net.spy.memcached.collection.SMGetElement; +import net.spy.memcached.collection.SMGetMode; +import net.spy.memcached.collection.SMGetTrimKey; +import net.spy.memcached.ops.OperationStatus; + +public class SMGetResultImpl extends SMGetResult { + private final SMGetMode smGetMode; + + public SMGetResultImpl(int count, boolean reverse, SMGetMode smGetMode) { + super(count, count, reverse); + + this.smGetMode = smGetMode; + } + + public void addFailedOperationStatus(OperationStatus status) { + failedOperationStatus.add(status); + mergedResult.clear(); + mergedTrimmedKeys.clear(); + } + + public void mergeSMGetElements(final List> eachResult, + final List eachTrimmedResult) { + + if (mergedResult.isEmpty()) { + mergedResult.addAll(eachResult); + } else { + // do sort merge + boolean duplicated; + int comp, pos = 0; + for (SMGetElement result : eachResult) { + duplicated = false; + for (; pos < mergedResult.size(); pos++) { + // compare b+tree key + comp = result.compareBkeyTo(mergedResult.get(pos)); + if ((reverse) ? (0 < comp) : (0 > comp)) { + break; + } + if (comp == 0) { // compare key string + int keyComp = result.compareKeyTo(mergedResult.get(pos)); + if ((reverse) ? (0 < keyComp) : (0 > keyComp)) { + if (smGetMode == SMGetMode.UNIQUE) { + mergedResult.remove(pos); // remove dup bkey + } + break; + } else { + if (smGetMode == SMGetMode.UNIQUE) { + duplicated = true; + break; + } + } + } + } + if (duplicated) { // UNIQUE + continue; + } + if (pos >= count) { + // At this point, following conditions are met. + // - mergedResult.size() == totalResultElementCount && + // - The current of eachResult is + // behind of the last of mergedResult. + // Then, all the next elements of eachResult are + // definitely behind of the last of mergedResult. + // So, stop the current sort-merge. + break; + } + mergedResult.add(pos, result); + if (mergedResult.size() > count) { + // Remove elements that exceed the requested count. + mergedResult.remove(count); + } + pos += 1; + } + } + + if (!eachTrimmedResult.isEmpty()) { + if (mergedTrimmedKeys.isEmpty()) { + mergedTrimmedKeys.addAll(eachTrimmedResult); + } else { + // do sort merge trimmed list + int pos = 0; + for (SMGetTrimKey result : eachTrimmedResult) { + for (; pos < mergedTrimmedKeys.size(); pos++) { + if ((reverse) ? (0 < result.compareTo(mergedTrimmedKeys.get(pos))) + : (0 > result.compareTo(mergedTrimmedKeys.get(pos)))) { + break; + } + } + mergedTrimmedKeys.add(pos, result); + pos += 1; + } + } + } + } + + @Override + public void makeResultOperationStatus() { + if (!mergedTrimmedKeys.isEmpty() && count <= mergedTrimmedKeys.size()) { + // remove trimed keys whose bkeys are behind of the last element. + SMGetElement lastElement = mergedResult.get(mergedResult.size() - 1); + SMGetTrimKey lastTrimKey = new SMGetTrimKey(lastElement.getKey(), + lastElement.getBkeyObject()); + for (int i = mergedTrimmedKeys.size() - 1; i >= 0; i--) { + SMGetTrimKey me = mergedTrimmedKeys.get(i); + if ((reverse) ? (0 >= me.compareTo(lastTrimKey)) + : (0 <= me.compareTo(lastTrimKey))) { + mergedTrimmedKeys.remove(i); + } else { + break; + } + } + } + + if (smGetMode == SMGetMode.UNIQUE) { + resultOperationStatus.add(new OperationStatus(true, "END")); + } else { + boolean isDuplicated = false; + for (int i = 1; i < mergedResult.size(); i++) { + if (mergedResult.get(i).compareBkeyTo(mergedResult.get(i - 1)) == 0) { + isDuplicated = true; + break; + } + } + if (isDuplicated) { + resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); + } else { + resultOperationStatus.add(new OperationStatus(true, "END")); + } + } + } +} diff --git a/src/main/java/net/spy/memcached/internal/SMGetResultOldImpl.java b/src/main/java/net/spy/memcached/internal/SMGetResultOldImpl.java new file mode 100644 index 000000000..c871d0f67 --- /dev/null +++ b/src/main/java/net/spy/memcached/internal/SMGetResultOldImpl.java @@ -0,0 +1,118 @@ +package net.spy.memcached.internal; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import net.spy.memcached.collection.SMGetElement; +import net.spy.memcached.ops.OperationStatus; + +public class SMGetResultOldImpl extends SMGetResult { + private final int offset; + private final AtomicBoolean mergedTrim = new AtomicBoolean(false); + + public SMGetResultOldImpl(int offset, int count, boolean reverse) { + super(count, offset + count, reverse); + + this.offset = offset; + } + + @Override + public List> getSubList() { + if (mergedSubList == null) { + if (offset > 0) { + if ((offset + count) < mergedResult.size()) { + mergedSubList = mergedResult.subList(offset, (offset + count)); + } else if (offset < mergedResult.size()) { + mergedSubList = mergedResult.subList(offset, mergedResult.size()); + } else { + mergedSubList = Collections.emptyList(); + } + } else { + mergedSubList = super.getSubList(); + } + } + + return mergedSubList; + } + + public void addFailedOperationStatus(OperationStatus status) { + failedOperationStatus.add(status); + } + + public void clearMergedResult() { + mergedResult.clear(); + } + + public void mergeSMGetElements(final List> eachResult, + final boolean isTrimmed) { + + if (mergedResult.isEmpty()) { + // merged result is empty, add all. + mergedResult.addAll(eachResult); + mergedTrim.set(isTrimmed); + } else { + boolean addAll = true; + int pos = 0; + for (SMGetElement result : eachResult) { + for (; pos < mergedResult.size(); pos++) { + if ((reverse) ? (0 < result.compareTo(mergedResult.get(pos))) + : (0 > result.compareTo(mergedResult.get(pos)))) { + break; + } + } + if (pos >= totalResultElementCount) { + addAll = false; + break; + } + if (pos >= mergedResult.size() && mergedTrim.get() && + result.compareBkeyTo(mergedResult.get(pos - 1)) != 0) { + addAll = false; + break; + } + mergedResult.add(pos, result); + if (mergedResult.size() > totalResultElementCount) { + mergedResult.remove(totalResultElementCount); + } + pos += 1; + } + if (isTrimmed && addAll) { + while (pos < mergedResult.size()) { + if (mergedResult.get(pos).compareBkeyTo(mergedResult.get(pos - 1)) == 0) { + pos += 1; + } else { + mergedResult.remove(pos); + } + } + mergedTrim.set(true); + } + if (mergedResult.size() >= totalResultElementCount) { + mergedTrim.set(false); + } + } + } + + @Override + public void makeResultOperationStatus() { + boolean isDuplicated = false; + for (int i = 1; i < mergedResult.size(); i++) { + if (mergedResult.get(i).compareBkeyTo(mergedResult.get(i - 1)) == 0) { + isDuplicated = true; + break; + } + } + if (mergedTrim.get()) { + if (isDuplicated) { + resultOperationStatus.add(new OperationStatus(true, "DUPLICATED_TRIMMED")); + } else { + resultOperationStatus.add(new OperationStatus(true, "TRIMMED")); + } + } else { + if (isDuplicated) { + resultOperationStatus.add(new OperationStatus(true, "DUPLICATED")); + } else { + resultOperationStatus.add(new OperationStatus(true, "END")); + } + } + } +}