Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: Remove transcoder service logic in bulkGet apis. #710

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions src/main/java/net/spy/memcached/MemcachedClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import net.spy.memcached.internal.GetFuture;
import net.spy.memcached.internal.OperationFuture;
import net.spy.memcached.internal.SingleElementInfiniteIterator;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.internal.result.GetResult;
import net.spy.memcached.internal.result.GetResultImpl;
import net.spy.memcached.internal.result.GetsResultImpl;
import net.spy.memcached.ops.CASOperationStatus;
import net.spy.memcached.ops.CancelledOperationStatus;
import net.spy.memcached.ops.ConcatenationType;
Expand Down Expand Up @@ -1079,7 +1079,7 @@ public Object get(String key) {
*/
public <T> BulkFuture<Map<String, T>> asyncGetBulk(Collection<String> keys,
Iterator<Transcoder<T>> tc_iter) {
final Map<String, Future<T>> rvMap = new ConcurrentHashMap<String, Future<T>>();
final Map<String, GetResult<T>> rvMap = new ConcurrentHashMap<String, GetResult<T>>();

// This map does not need to be a ConcurrentHashMap
// because it is fully populated when it is used and
Expand Down Expand Up @@ -1112,8 +1112,9 @@ public void receivedStatus(OperationStatus status) {

public void gotData(String k, int flags, byte[] data) {
Transcoder<T> tc = tc_map.get(k);
rvMap.put(k, tcService.decode(tc,
new CachedData(flags, data, tc.getMaxSize())));
GetResult<T> result
= new GetResultImpl<T>(new CachedData(flags, data, tc.getMaxSize()), tc);
rvMap.put(k, result);
}

public void complete() {
Expand Down Expand Up @@ -1213,8 +1214,8 @@ public BulkFuture<Map<String, Object>> asyncGetBulk(String... keys) {
*/
public <T> BulkFuture<Map<String, CASValue<T>>> asyncGetsBulk(Collection<String> keys,
Iterator<Transcoder<T>> tc_iter) {
final Map<String, Future<CASValue<T>>> m
= new ConcurrentHashMap<String, Future<CASValue<T>>>();
final Map<String, GetResult<CASValue<T>>> rvMap
= new ConcurrentHashMap<String, GetResult<CASValue<T>>>();

// This map does not need to be a ConcurrentHashMap
// because it is fully populated when it is used and
Expand Down Expand Up @@ -1248,9 +1249,9 @@ public void receivedStatus(OperationStatus status) {

public void gotData(String k, int flags, long cas, byte[] data) {
Transcoder<T> tc = tc_map.get(k);

m.put(k, tcService.decode(tc, cas,
new CachedData(flags, data, tc.getMaxSize())));
GetResult<CASValue<T>> result
= new GetsResultImpl<T>(cas, new CachedData(flags, data, tc.getMaxSize()), tc);
rvMap.put(k, result);
brido4125 marked this conversation as resolved.
Show resolved Hide resolved
}

public void complete() {
Expand All @@ -1276,7 +1277,7 @@ public void complete() {
ops.add(op);
}
}
return new BulkGetFuture<CASValue<T>>(m, ops, latch, operationTimeout);
return new BulkGetFuture<CASValue<T>>(rvMap, ops, latch, operationTimeout);
}

/**
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/net/spy/memcached/internal/BulkGetFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.OperationTimeoutException;
import net.spy.memcached.compat.log.LoggerFactory;
import net.spy.memcached.internal.result.GetResult;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;

Expand All @@ -40,13 +40,13 @@
* @param <T> types of objects returned from the GET
*/
public class BulkGetFuture<T> implements BulkFuture<Map<String, T>> {
private final Map<String, Future<T>> rvMap;
private final Map<String, GetResult<T>> rvMap;
private final Collection<Operation> ops;
private final CountDownLatch latch;
private final long timeout;
private boolean isTimeout = false;

public BulkGetFuture(Map<String, Future<T>> rvMap, Collection<Operation> ops,
public BulkGetFuture(Map<String, GetResult<T>> rvMap, Collection<Operation> ops,
CountDownLatch latch, Long timeout) {
super();
this.rvMap = rvMap;
Expand Down Expand Up @@ -164,10 +164,9 @@ private Map<String, T> internalGet(long to, TimeUnit unit,
}

Map<String, T> resultMap = new HashMap<String, T>();
for (Map.Entry<String, Future<T>> me : rvMap.entrySet()) {
for (Map.Entry<String, GetResult<T>> me : rvMap.entrySet()) {
Copy link
Collaborator

@oliviarla oliviarla Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhpark816 @uhm0311 @brido4125
TranscodeService의 필요성에 대해 예전에 정리해둔 내용입니다.

  • key-value 에 저장된 데이터는 1MB까지도 저장되어 용량이 커 decode가 오래걸린다.
  • compression 되어있는 경우에는 압축 해제까지 필요하다.

asyncBulkGet() API를 사용할 때 for문을 돌면서 decode작업을 순차적으로 진행하게 되는데, 위와 같이 용량이 크거나 압축 해제하는 작업이 발생하면 각각 스레드에 맡기는것보다 오래걸릴 수 있을 것 같은데 문제가 발생하진 않을까요?

Copy link
Collaborator Author

@brido4125 brido4125 Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliviarla
WAS 환경이 spring 컨테이너라 가정하겠습니다.

기존 tcService의 thread들이 수행 -> tomcat의 쓰레드풀의 스레드가 수행으로
변경되었는데, 디코딩 작업 시간이 어떤 근거에서 증가할 수 있는지 설명 좀 부탁드리겠습니다.

제가 잘못 이해한 부분이 있네요 코멘트 수정해서 올려드릴게요

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliviarla

asyncGets가 아닌 asyncBulkGets 또는 asyncBulkGet 에서의 상황을 말씀하신거죠?
맞다면 코멘트 수정 부탁드려요

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

아 맞습니다. gets를 헷갈렸네요. asyncBulkGet 이 맞습니다.
기존 tcService를 쓰면 여러 데이터를 순차적으로 decode하는 것이 아니라, 스레드풀의 스레드에 태스크를 맡기고 비동기적으로 여러 데이터를 decode할 수 있는 것이 맞나요?
만약 그렇다면 tomcat 스레드풀의 스레드가 지금과 같이 for문을 돌며 여러 데이터를 decode하게 되면 동기와 비슷한 형태가 되지 않을까 싶어서요.

Copy link
Collaborator Author

@brido4125 brido4125 Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

기존 tcService를 쓰면 여러 데이터를 순차적으로 decode하는 것이 아니라, 스레드풀의 스레드에 태스크를 맡기고 비동기적으로 여러 데이터를 decode할 수 있는 것이 맞나요?

스레드풀을 이용하니까 동시에 여러 decode 작업이 가능합니다.

제 의견은 다음과 같습니다.

bulkGet과 같은 요청이 만약 100개가 넘어가게 온다면,
현재 쓰레드풀의 수(10)으로는 한번에 감당하지 못하고 대기가 발생하게 됩니다.
이 상황에서 asyngGet 요청이 발생하여 큐에서 디코딩 작업이 대기하면
해당 연산 자체가 타임아웃이 발생할 확률이 증가합니다.

TcService를 사용하지 않으면 말씀하것처럼 bulk 연산의 효율은 떨어지지만,
최소한 bulk 요청을 진행 중일 때, 들어온 asyncGet 요청이
타임아웃이 발생할 확률은 훨씬 줄일 수 있다고 볼 수 있습니다.

즉, 자주 사용되지 않는 연산이 늦어지는것과 자주 사용되는 연산의 타임아웃이 발생하는 것 중
선택해야한다면 전자를 선택해야하지 않나 생각합니다.

그러면 bulk 연산을 위해 tcService의 쓰레드풀의 수를 늘리는게 어떠냐? 라는 반문의 경우
자주 사용되지 않는 스레드를 항시 유지하는 리소스 또한 크기 때문에 추천되지 않는다고 생각합니다.
아무래도 asyncGet 연산이 asyncBulkGet 보다는 많이 사용될테니까요

Copy link
Collaborator

@oliviarla oliviarla Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

나중에 사용자가 직접 Executor를 만들어 인자로 입력하는 API 제공하는 등 더 나은 방법을 논의하면 좋을 것 같습니다.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oliviarla @brido4125 @uhm0311
OK. 이에 대해 별도로 offline 논의하시죠.

String key = me.getKey();
Future<T> future = me.getValue();
T value = future.get();
T value = me.getValue().getDecodedValue();

// put the key into the result map.
resultMap.put(key, value);
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/net/spy/memcached/ProtocolBaseCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ public void testAsyncGetBulkWithTranscoderIterator() throws Exception {
tcs.add(0, decodeTranscoder);
try {
client.asyncGetBulk(keys, tcs.listIterator()).get();
fail("Expected ExecutionException caused by key mismatch");
} catch (java.util.concurrent.ExecutionException e) {
fail("Expected AssertionError caused by key mismatch");
} catch (AssertionError e) {
// pass
}
}
Expand Down Expand Up @@ -552,8 +552,8 @@ public void testAsyncGetsBulkWithTranscoderIterator() throws Exception {
tcs.add(0, decodeTranscoder);
try {
client.asyncGetsBulk(keys, tcs.listIterator()).get();
fail("Expected ExecutionException caused by key mismatch");
} catch (java.util.concurrent.ExecutionException e) {
fail("Expected AssertionError caused by key mismatch");
} catch (AssertionError e) {
// pass
}
}
Expand Down
Loading