Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dubbo-1876] Enhancements for the new async way of Dubbo #1957

Merged
merged 47 commits into from
Jul 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
55d77b7
Async support start
chickenlj May 25, 2018
6aec4d2
A basically worked version for provider async
chickenlj May 28, 2018
7b8e8e9
asynchronously send response
chickenlj May 28, 2018
94c21b5
should send Response for sync invoke
chickenlj May 29, 2018
7d46be7
demo
chickenlj May 29, 2018
6ee2b60
Support method with Future return type.
chickenlj May 29, 2018
0c35d8f
Merge branch 'develop-async-response' into develop-demo-test
chickenlj May 29, 2018
a1305f6
Add async interface annotation and processor
chickenlj May 30, 2018
c06ff03
demo
chickenlj May 30, 2018
4a891d8
polish async interface processor
chickenlj May 30, 2018
fcd30ec
fix concurrent write back problem when use provider async.
chickenlj May 30, 2018
fe0fe6b
revert demo changes
chickenlj May 30, 2018
77c52c2
config async processor as spi.
chickenlj May 30, 2018
f2d0476
add missed pom files
chickenlj May 30, 2018
f1f0cf0
make DubboAsync work at source compile phase.
chickenlj May 30, 2018
3e9ffb4
change annotation value to array
chickenlj May 30, 2018
ff22444
transfer async-processor module to eco-system
chickenlj May 30, 2018
0380d1b
Fix NPE in UT
chickenlj May 31, 2018
b08454c
add async module to pom for package and bom dependency.
chickenlj May 31, 2018
a82db94
Delete dubbo-async module, it's not necessary.
chickenlj Jun 1, 2018
a79d7f4
handle exception throw by provider
chickenlj Jun 1, 2018
39d9a9c
remove jdk7 support
chickenlj Jun 1, 2018
452638c
support method with original CompletableFuture return type
chickenlj Jun 13, 2018
f7c19fc
Merge branch 'master' into local/chickenlj/develop-async-response
chickenlj Jun 13, 2018
274d594
Merge branch 'master' into local/chickenlj/develop-async-response
chickenlj Jun 13, 2018
d65835f
support method with original future return type.
chickenlj Jun 14, 2018
9a96df2
Merge remote-tracking branch 'chickenlj/develop-async-response' into …
chickenlj Jun 14, 2018
0a81d5c
demo test
chickenlj Jun 14, 2018
a9ca101
optimization
chickenlj Jun 15, 2018
2b3b2cf
test biz exception
chickenlj Jun 15, 2018
fe30380
Merge branch 'master' into develop-async-response
chickenlj Jun 15, 2018
bae63ea
Manually resolve conflicts after merge master(repackage)
chickenlj Jun 15, 2018
321bf46
support context switch & optimization
chickenlj Jun 17, 2018
2a6a0b4
fix several bugs: context restore, consumer callback
chickenlj Jun 17, 2018
c0d79d6
consumerFilter: clear attachments instead of remove context
chickenlj Jun 17, 2018
14ed6a3
change async API in RpcContext to CompletableFuture.
chickenlj Jun 18, 2018
063a80e
Delete demo codes
chickenlj Jun 18, 2018
63db2d8
fix compilation problem
chickenlj Jun 19, 2018
71a2206
Fix RpcUtils, get the right method return type.
chickenlj Jun 19, 2018
84b8b48
unified to AsyncRpcResult on consumer side.
chickenlj Jun 19, 2018
f21fc21
fix UT error
chickenlj Jun 19, 2018
4c5d60f
Merge branch 'master' into develop-async-response
chickenlj Jun 19, 2018
2671a04
resolve compile problem after merge master
chickenlj Jun 19, 2018
4977d6c
Fix bug when use startAsync().
chickenlj Jun 30, 2018
16ea830
Merge branch 'master' into develop-async-response
chickenlj Jul 4, 2018
75b6f55
Fix async UT
chickenlj Jul 4, 2018
0ff5134
Merge branch 'master' into develop-async-response
chickenlj Jul 10, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;

/**
* Constants
*/
Expand Down Expand Up @@ -281,7 +280,8 @@ public class Constants {

public static final String ASYNC_KEY = "async";

public static final String FUTURE_KEY = "async_future";
public static final String FUTURE_GENERATED_KEY = "future_generated";
public static final String FUTURE_RETURNTYPE_KEY = "future_returntype";

public static final String ASYNC_SUFFIX = "Async";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@
Class<?> value();

}

15 changes: 15 additions & 0 deletions dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ public Map<String, String> getAttachments() {
return delegate.getAttachments();
}

@Override
public void addAttachments(Map<String, String> map) {
delegate.addAttachments(map);
}

@Override
public void setAttachments(Map<String, String> map) {
delegate.setAttachments(map);
}

@Override
public String getAttachment(String key) {
return delegate.getAttachment(key);
Expand All @@ -72,5 +82,10 @@ public String getAttachment(String key) {
public String getAttachment(String key, String defaultValue) {
return delegate.getAttachment(key, defaultValue);
}

@Override
public void setAttachment(String key, String value) {
delegate.setAttachment(key, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.dubbo.demo.consumer;

import org.apache.dubbo.demo.DemoService;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) thro
return null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;


/**
* ExchangeReceiver
*/
Expand Down Expand Up @@ -102,7 +103,7 @@ void handleRequest(ExchangeChannel channel, Request req) throws RemotingExceptio
channel.send(res);
return;
}
future.whenCompleteAsync((result, t) -> {
future.whenComplete((result, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc;

/**
*
*/
public abstract class AbstractPostProcessFilter implements PostProcessFilter {
@Override
public Result postProcessResult(Result result, Invoker<?> invoker, Invocation invocation) {
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> doPostProcess(r, invoker, invocation));
return asyncResult;
} else {
return doPostProcess(result, invoker, invocation);
}
}

protected abstract Result doPostProcess(Result result, Invoker<?> invoker, Invocation invocation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc;

import java.util.HashMap;
import java.util.Map;

/**
*
*/
public abstract class AbstractResult implements Result {
protected Map<String, String> attachments = new HashMap<String, String>();

protected Object result;

protected Throwable exception;

@Override
public Map<String, String> getAttachments() {
return attachments;
}

@Override
public void setAttachments(Map<String, String> map) {
this.attachments = map == null ? new HashMap<String, String>() : map;
}

@Override
public void addAttachments(Map<String, String> map) {
if (map == null) {
return;
}
if (this.attachments == null) {
this.attachments = new HashMap<String, String>();
}
this.attachments.putAll(map);
}

@Override
public String getAttachment(String key) {
return attachments.get(key);
}

@Override
public String getAttachment(String key, String defaultValue) {
String result = attachments.get(key);
if (result == null || result.length() == 0) {
result = defaultValue;
}
return result;
}

public void setAttachment(String key, String value) {
attachments.put(key, value);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.dubbo.rpc;

import java.util.concurrent.CompletableFuture;

/**
* AsyncContext works like {@see javax.servlet.AsyncContext} in the Servlet 3.0.
* An AsyncContext is stated by a call to {@link RpcContext#startAsync()}.
Expand All @@ -25,7 +27,7 @@
*/
public interface AsyncContext {

void addListener(Runnable run);
CompletableFuture getInternalFuture();

/**
* write value and complete the async context.
Expand All @@ -45,7 +47,9 @@ public interface AsyncContext {
boolean stop();

/**
* change the context state to stop
* change the context state to start
*/
void start();

void signalContextSwitch();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,30 @@ public class AsyncContextImpl implements AsyncContext {
private static final Logger logger = LoggerFactory.getLogger(AsyncContextImpl.class);

private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stoped = new AtomicBoolean(false);

private CompletableFuture<Object> future;

private RpcContext storedContext;
private RpcContext storedServerContext;

public AsyncContextImpl() {
}

public AsyncContextImpl(CompletableFuture<Object> future) {
this.future = future;
}

@Override
public void addListener(Runnable run) {

this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}

@Override
public void write(Object value) {
if (stop()) {
if (isAsyncStarted() && stop()) {
if (value instanceof Throwable) {
// TODO check exception type like ExceptionFilter do.
Throwable bizExe = (Throwable) value;
future.complete(new RpcResult(bizExe));
future.completeExceptionally(bizExe);
} else {
future.complete(new RpcResult(value));
future.complete(value);
}
} else {
throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");
Expand All @@ -63,11 +63,22 @@ public boolean isAsyncStarted() {

@Override
public boolean stop() {
return started.compareAndSet(true, false);
return stoped.compareAndSet(false, true);
}

@Override
public void start() {
this.started.set(true);
}

public void signalContextSwitch() {
RpcContext.restoreContext(storedContext);
RpcContext.restoreServerContext(storedServerContext);
// Restore any other contexts in here if necessary.
}

@Override
public CompletableFuture getInternalFuture() {
return future;
}
}
Loading