Skip to content

Commit

Permalink
Provide a way to invoke Thrift service impls from `BlockingTaskExecut…
Browse files Browse the repository at this point in the history
…or` (#5619)

Related issue: #4917

Motivation:

There's currently no way to make Thrift services run from
the `BlockingTaskExecutor`.

Modifications:

- Added `useBlockingTaskExecutor` property
- Added `ThriftCallServiceBuilder` to build `ThriftCallService` fluently
- `ThriftCallService` now calls the service implementation from
  the `BlockingTaskExecutor` if configured so.

Result:

- Closes #4917
- User can now specify whether to invoke the service implementation
  from the `BlockingTaskExecutor` for Thrift services.

---------

Co-authored-by: Trustin Lee <[email protected]>
  • Loading branch information
ChangguHan and trustin authored Jun 20, 2024
1 parent 071a342 commit 52114ed
Show file tree
Hide file tree
Showing 5 changed files with 430 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -84,6 +85,7 @@ public final class THttpServiceBuilder {
// -1 means to use the default request length of the Server.
private int maxRequestStringLength = -1;
private int maxRequestContainerLength = -1;
private boolean useBlockingTaskExecutor;

THttpServiceBuilder() {}

Expand Down Expand Up @@ -190,6 +192,17 @@ public THttpServiceBuilder maxRequestContainerLength(int maxRequestContainerLeng
return this;
}

/**
* Sets whether the service executes service methods using the blocking executor. By default, service
* methods are executed directly on the event loop for implementing fully asynchronous services. If your
* service uses blocking logic, you should either execute such logic in a separate thread using something
* like {@link Executors#newCachedThreadPool()} or enable this setting.
*/
public THttpServiceBuilder useBlockingTaskExecutor(boolean useBlockingTaskExecutor) {
this.useBlockingTaskExecutor = useBlockingTaskExecutor;
return this;
}

/**
* Sets the {@link BiFunction} that returns an {@link RpcResponse} using the given {@link Throwable}
* and {@link ServiceRequestContext}.
Expand Down Expand Up @@ -225,10 +238,11 @@ private RpcService decorate(RpcService service) {
* Builds a new instance of {@link THttpService}.
*/
public THttpService build() {
@SuppressWarnings("UnstableApiUsage")
final Map<String, List<Object>> implementations = Multimaps.asMap(implementationsBuilder.build());

final ThriftCallService tcs = ThriftCallService.of(implementations);
final ThriftCallService tcs = new ThriftCallServiceBuilder()
.addServices(implementations)
.useBlockingTaskExecutor(useBlockingTaskExecutor)
.build();
return build0(tcs);
}

Expand All @@ -244,7 +258,9 @@ private THttpService build0(RpcService tcs) {
builder.add(defaultSerializationFormat);
builder.addAll(otherSerializationFormats);

return new THttpService(decorate(tcs), defaultSerializationFormat, builder.build(),
maxRequestStringLength, maxRequestContainerLength, exceptionHandler);
return new THttpService(
decorate(tcs), defaultSerializationFormat, builder.build(),
maxRequestStringLength, maxRequestContainerLength, exceptionHandler
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.linecorp.armeria.server.thrift;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import java.util.List;
Expand All @@ -31,14 +31,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.common.CompletableRpcResponse;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.RpcRequest;
import com.linecorp.armeria.common.RpcResponse;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.internal.common.thrift.ThriftFunction;
import com.linecorp.armeria.server.RpcService;
import com.linecorp.armeria.server.ServiceRequestContext;
Expand Down Expand Up @@ -70,7 +68,7 @@ public void onError(Exception e) {
*/
public static ThriftCallService of(Object implementation) {
requireNonNull(implementation, "implementation");
return new ThriftCallService(ImmutableMap.of("", ImmutableList.of(implementation)));
return builder().addService(implementation).build();
}

/**
Expand All @@ -82,19 +80,27 @@ public static ThriftCallService of(Object implementation) {
*/
public static ThriftCallService of(Map<String, ? extends Iterable<?>> implementations) {
requireNonNull(implementations, "implementations");
return new ThriftCallService(implementations);
checkArgument(!implementations.isEmpty(), "implementations is empty");

return builder().addServices(implementations).build();
}

/**
* Creates a new instance of {@link ThriftCallServiceBuilder} which can build
* an instance of {@link ThriftCallService} fluently.
*/
@UnstableApi
public static ThriftCallServiceBuilder builder() {
return new ThriftCallServiceBuilder();
}

private final Map<String, ThriftServiceEntry> entries;

private ThriftCallService(Map<String, ? extends Iterable<?>> implementations) {
requireNonNull(implementations, "implementations");
if (implementations.isEmpty()) {
throw new IllegalArgumentException("empty implementations");
}
private final boolean useBlockingTaskExecutor;

entries = implementations.entrySet().stream().collect(
toImmutableMap(Map.Entry::getKey, ThriftServiceEntry::new));
ThriftCallService(Map<String, ThriftServiceEntry> entries, boolean useBlockingTaskExecutor) {
this.entries = entries;
this.useBlockingTaskExecutor = useBlockingTaskExecutor;
}

/**
Expand Down Expand Up @@ -140,14 +146,24 @@ public RpcResponse serve(ServiceRequestContext ctx, RpcRequest call) throws Exce
TApplicationException.UNKNOWN_METHOD, "unknown method: " + call.method()));
}

private static void invoke(
private void invoke(
ServiceRequestContext ctx,
Object impl, ThriftFunction func, List<Object> args, CompletableRpcResponse reply) {

try {
final TBase<?, ?> tArgs = func.newArgs(args);
if (func.isAsync()) {
invokeAsynchronously(impl, func, tArgs, reply);
if (useBlockingTaskExecutor) {
ctx.blockingTaskExecutor().execute(() -> {
try {
invokeAsynchronously(impl, func, tArgs, reply);
} catch (Throwable t) {
reply.completeExceptionally(t);
}
});
} else {
invokeAsynchronously(impl, func, tArgs, reply);
}
} else {
invokeSynchronously(ctx, impl, func, tArgs, reply);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.server.thrift;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.Objects.requireNonNull;

import java.util.Map;
import java.util.concurrent.Executors;

import com.google.common.collect.ImmutableListMultimap;

import com.linecorp.armeria.common.annotation.UnstableApi;

/**
* A fluent builder to build an instance of {@link ThriftCallService}.
*
* <h2>Example</h2>
* <pre>{@code
* ThriftCallService service = ThriftCallService
* .builder()
* .addService(defaultServiceImpl) // Adds a service
* .addService("foo", fooServiceImpl) // Adds a service with a key
* .addService("foobar", fooServiceImpl) // Adds multiple services to the same key
* .addService("foobar", barServiceImpl)
* // Adds multiple services at once
* .addServices("foobarOnce", fooServiceImpl, barServiceImpl)
* // Adds multiple services by list
* .addServices("foobarList", ImmutableList.of(fooServiceImpl, barServiceImpl))
* // Adds multiple services by map
* .addServices(ImmutableMap.of("fooMap", fooServiceImpl, "barMap", barServiceImpl))
* // Adds multiple services by map
* .addServices(ImmutableMap.of("fooIterableMap",
* ImmutableList.of(fooServiceImpl, barServiceImpl)))
* .build();
* }</pre>
*
* @see ThriftCallService
*/
@UnstableApi
public final class ThriftCallServiceBuilder {
private final ImmutableListMultimap.Builder<String, Object> servicesBuilder =
ImmutableListMultimap.builder();

private boolean useBlockingTaskExecutor;

ThriftCallServiceBuilder() {}

/**
* Adds a service for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder addService(Object service) {
requireNonNull(service, "service");
servicesBuilder.put("", service);
return this;
}

/**
* Adds a service with a key for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder addService(String key, Object service) {
requireNonNull(key, "key");
requireNonNull(service, "service");
servicesBuilder.put(key, service);
return this;
}

/**
* Adds a service for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder addServices(Object... services) {
requireNonNull(services, "services");
checkArgument(services.length != 0, "service should not be empty");
servicesBuilder.putAll("", services);
return this;
}

/**
* Adds a service with a key for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder addServices(String key, Object... services) {
requireNonNull(key, "key");
requireNonNull(services, "service");
checkArgument(services.length != 0, "service should not be empty");
servicesBuilder.putAll(key, services);
return this;
}

/**
* Adds services with key by iterable for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder addServices(String key, Iterable<?> services) {
requireNonNull(key, "key");
requireNonNull(services, "services");
checkArgument(services.iterator().hasNext(), "service should not be empty");
servicesBuilder.putAll(key, services);
return this;
}

/**
* Adds multiple services by map for {@link ThriftServiceEntry}.
*/
public ThriftCallServiceBuilder addServices(Map<String, ?> services) {
requireNonNull(services, "services");
checkArgument(!services.isEmpty(), "service should not be empty");

services.forEach((k, v) -> {
if (v instanceof Iterable<?>) {
servicesBuilder.putAll(k, (Iterable<?>) v);
} else {
servicesBuilder.put(k, v);
}
});
return this;
}

/**
* Sets whether the service executes service methods using the blocking executor. By default, service
* methods are executed directly on the event loop for implementing fully asynchronous services. If your
* service uses blocking logic, you should either execute such logic in a separate thread using something
* like {@link Executors#newCachedThreadPool()} or enable this setting.
*/
public ThriftCallServiceBuilder useBlockingTaskExecutor(boolean useBlockingTaskExecutor) {
this.useBlockingTaskExecutor = useBlockingTaskExecutor;
return this;
}

/**
* Builds a new instance of {@link ThriftCallService}.
*/
public ThriftCallService build() {
return new ThriftCallService(
servicesBuilder.build().asMap().entrySet().stream().collect(
toImmutableMap(Map.Entry::getKey, ThriftServiceEntry::new)),
useBlockingTaskExecutor
);
}
}
Loading

0 comments on commit 52114ed

Please sign in to comment.