Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Result implements future. #3916

Merged
merged 7 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package com.alibaba.dubbo.rpc;

import org.apache.dubbo.rpc.AppResponse;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

@Deprecated
Expand All @@ -37,23 +33,15 @@ default void setException(Throwable t) {

}

@Override
default org.apache.dubbo.rpc.Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
return this;
}

@Override
default <U> CompletableFuture<U> thenApply(Function<org.apache.dubbo.rpc.Result, ? extends U> fn) {
return null;
}
abstract class AbstractResult extends org.apache.dubbo.rpc.AbstractResult implements Result {

@Override
default org.apache.dubbo.rpc.Result get() throws InterruptedException, ExecutionException {
return this;
@Override
public org.apache.dubbo.rpc.Result thenApplyWithContext(Function<org.apache.dubbo.rpc.Result, org.apache.dubbo.rpc.Result> fn) {
return null;
}
}


class CompatibleResult implements Result {
class CompatibleResult extends AbstractResult {
private org.apache.dubbo.rpc.Result delegate;

public CompatibleResult(org.apache.dubbo.rpc.Result result) {
Expand All @@ -69,11 +57,21 @@ public Object getValue() {
return delegate.getValue();
}

@Override
public void setValue(Object value) {
delegate.setValue(value);
}

@Override
public Throwable getException() {
return delegate.getException();
}

@Override
public void setException(Throwable t) {
delegate.setException(t);
}

@Override
public boolean hasException() {
return delegate.hasException();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc;

import java.util.concurrent.CompletableFuture;

/**
*
*/
public abstract class AbstractResult extends CompletableFuture<Result> implements Result {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

/**
Expand All @@ -30,21 +28,19 @@
* <li>AppResponse only simply represents the business result</li>
* </ul>
*
* The relationship between them can be reflected in the definition of AsyncRpcResult:
* The relationship between them can be described as follow, an abstraction of the definition of AsyncRpcResult:
* <pre>
* {@code
* Public class AsyncRpcResult implements Result {
* private CompletableFuture <AppResponse> resultFuture;
* Public class AsyncRpcResult implements CompletionStage<AppResponse> {
* ......
* }
* }
* </pre>
*
* In theory, AppResponse does not need to implement the {@link Result} interface, this is done mainly for compatibility purpose.
* AsyncRpcResult is a future representing an unfinished RPC call, while AppResponse is the actual return type of this call.
* In theory, AppResponse does'n have to implement the {@link Result} interface, this is done mainly for compatibility purpose.
*
* @serial Do not change the class name and properties.
*/
public class AppResponse implements Result, Serializable {
public class AppResponse extends AbstractResult implements Serializable {

private static final long serialVersionUID = -6925924956850004727L;

Expand Down Expand Up @@ -139,17 +135,7 @@ public void setAttachment(String key, String value) {
}

@Override
public Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
}

@Override
public <U> CompletableFuture<U> thenApply(Function<Result, ? extends U> fn) {
throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
}

@Override
public Result get() throws InterruptedException, ExecutionException {
public Result thenApplyWithContext(Function<Result, Result> fn) {
throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

public class AsyncRpcResult implements Result {
public class AsyncRpcResult extends AbstractResult {
private static final Logger logger = LoggerFactory.getLogger(AsyncRpcResult.class);

/**
Expand All @@ -35,24 +34,35 @@ public class AsyncRpcResult implements Result {
private RpcContext storedContext;
private RpcContext storedServerContext;

private CompletableFuture<AppResponse> responseFuture;
private Invocation invocation;

public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
this.responseFuture = future;
public AsyncRpcResult(Invocation invocation) {
this.invocation = invocation;
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}

public AsyncRpcResult(AsyncRpcResult asyncRpcResult) {
this.invocation = asyncRpcResult.getInvocation();
this.storedContext = asyncRpcResult.getStoredContext();
this.storedServerContext = asyncRpcResult.getStoredServerContext();
}

/**
* Notice the return type of {@link #getValue} is the actual type of the RPC method, not {@link AppResponse}
*
* @return
*/
@Override
public Object getValue() {
return getAppResponse().getValue();
}

@Override
public void setValue(Object value) {

AppResponse appResponse = new AppResponse();
appResponse.setValue(value);
this.complete(appResponse);
}

@Override
Expand All @@ -62,26 +72,20 @@ public Throwable getException() {

@Override
public void setException(Throwable t) {

AppResponse appResponse = new AppResponse();
appResponse.setException(t);
this.complete(appResponse);
}

@Override
public boolean hasException() {
return getAppResponse().hasException();
}

public CompletableFuture<AppResponse> getResponseFuture() {
return responseFuture;
}

public void setResponseFuture(CompletableFuture<AppResponse> responseFuture) {
this.responseFuture = responseFuture;
}

public Result getAppResponse() {
try {
if (responseFuture.isDone()) {
return responseFuture.get();
if (this.isDone()) {
return this.get();
}
} catch (Exception e) {
// This should never happen;
Expand All @@ -97,7 +101,7 @@ public Object recreate() throws Throwable {
AppResponse appResponse = new AppResponse();
CompletableFuture<Object> future = new CompletableFuture<>();
appResponse.setValue(future);
responseFuture.whenComplete((result, t) -> {
this.whenComplete((result, t) -> {
if (t != null) {
if (t instanceof CompletionException) {
t = t.getCause();
Expand All @@ -112,25 +116,27 @@ public Object recreate() throws Throwable {
}
});
return appResponse.recreate();
} else if (responseFuture.isDone()) {
return responseFuture.get().recreate();
} else if (this.isDone()) {
return this.get().recreate();
}
return (new AppResponse()).recreate();
}

public Result get() throws InterruptedException, ExecutionException {
return responseFuture.get();
public Result thenApplyWithContext(Function<Result, Result> fn) {
CompletableFuture<Result> future = this.thenApply(fn.compose(beforeContext).andThen(afterContext));
AsyncRpcResult nextAsyncRpcResult = new AsyncRpcResult(this);
nextAsyncRpcResult.subscribeTo(future);
return nextAsyncRpcResult;
}

@Override
public Result thenApplyWithContext(Function<AppResponse, AppResponse> fn) {
this.responseFuture = responseFuture.thenApply(fn.compose(beforeContext).andThen(afterContext));
return this;
}

@Override
public <U> CompletableFuture<U> thenApply(Function<Result,? extends U> fn) {
return this.responseFuture.thenApply(fn);
public void subscribeTo(CompletableFuture<?> future) {
future.whenComplete((obj, t) -> {
if (t != null) {
this.completeExceptionally(t);
} else {
this.complete((Result) obj);
}
});
}

@Override
Expand Down Expand Up @@ -163,21 +169,33 @@ public void setAttachment(String key, String value) {
getAppResponse().setAttachment(key, value);
}

public RpcContext getStoredContext() {
return storedContext;
}

public RpcContext getStoredServerContext() {
return storedServerContext;
}

public Invocation getInvocation() {
return invocation;
}

/**
* tmp context to use when the thread switch to Dubbo thread.
*/
private RpcContext tmpContext;
private RpcContext tmpServerContext;

private Function<AppResponse, AppResponse> beforeContext = (appResponse) -> {
private Function<Result, Result> beforeContext = (appResponse) -> {
tmpContext = RpcContext.getContext();
tmpServerContext = RpcContext.getServerContext();
RpcContext.restoreContext(storedContext);
RpcContext.restoreServerContext(storedServerContext);
return appResponse;
};

private Function<AppResponse, AppResponse> afterContext = (appResponse) -> {
private Function<Result, Result> afterContext = (appResponse) -> {
RpcContext.restoreContext(tmpContext);
RpcContext.restoreServerContext(tmpServerContext);
return appResponse;
Expand All @@ -186,8 +204,10 @@ public void setAttachment(String key, String value) {
/**
* Some utility methods used to quickly generate default AsyncRpcResult instance.
*/
public static AsyncRpcResult newDefaultAsyncResult(AppResponse result, Invocation invocation) {
return new AsyncRpcResult(CompletableFuture.completedFuture(result), invocation);
public static AsyncRpcResult newDefaultAsyncResult(AppResponse appResponse, Invocation invocation) {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
asyncRpcResult.complete(appResponse);
return asyncRpcResult;
}

public static AsyncRpcResult newDefaultAsyncResult(Invocation invocation) {
Expand All @@ -203,15 +223,15 @@ public static AsyncRpcResult newDefaultAsyncResult(Throwable t, Invocation invoc
}

public static AsyncRpcResult newDefaultAsyncResult(Object value, Throwable t, Invocation invocation) {
CompletableFuture<AppResponse> future = new CompletableFuture<>();
AppResponse result = new AppResponse();
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation);
AppResponse appResponse = new AppResponse();
if (t != null) {
result.setException(t);
appResponse.setException(t);
} else {
result.setValue(value);
appResponse.setValue(value);
}
future.complete(result);
return new AsyncRpcResult(future, invocation);
asyncRpcResult.complete(appResponse);
return asyncRpcResult;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,19 @@ public interface Filter {
*/
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;

/**
* Filter itself should only be response for passing invocation, all callbacks has been placed into {@link Listener}
*
* @param appResponse
* @param invoker
* @param invocation
* @return
*/
@Deprecated
default Result onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
return appResponse;
}

interface Listener {

void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
Expand Down
Loading