Skip to content

Commit

Permalink
Fix blocking operation warnings from FileService (line#2478)
Browse files Browse the repository at this point in the history
Motivation:

`EventLoopCheckingFuture` logs a warning about any blocking operations
that occur in an event loop, and we do blocking operations in
`FileService` and `CachingHttpFile`.

Modifications:

- Read files via `blockingTaskExecutor` or a user-specified `Executor`.

Result:

- Fixes line#2473
  • Loading branch information
trustin authored Feb 10, 2020
1 parent c72e119 commit 2ed97c0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.linecorp.armeria.server.file;

import static com.google.common.base.MoreObjects.firstNonNull;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
Expand All @@ -25,7 +27,6 @@
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;
import com.google.common.util.concurrent.MoreExecutors;
import com.spotify.futures.CompletableFutures;

import com.linecorp.armeria.common.HttpResponse;
Expand Down Expand Up @@ -65,7 +66,7 @@ public ResponseHeaders readHeaders() throws IOException {
@Override
public HttpResponse read(Executor fileReadExecutor, ByteBufAllocator alloc) {
try {
final HttpFile file = getFile();
final HttpFile file = getFile(fileReadExecutor);
return file != null ? file.read(fileReadExecutor, alloc) : null;
} catch (Exception e) {
return HttpResponse.ofFailure(e);
Expand All @@ -75,7 +76,7 @@ public HttpResponse read(Executor fileReadExecutor, ByteBufAllocator alloc) {
@Override
public CompletableFuture<AggregatedHttpFile> aggregate(Executor fileReadExecutor) {
try {
final HttpFile file = getFile();
final HttpFile file = getFile(fileReadExecutor);
return file != null ? file.aggregate(fileReadExecutor)
: CompletableFuture.completedFuture(HttpFile.nonExistent());
} catch (Exception e) {
Expand All @@ -87,7 +88,7 @@ public CompletableFuture<AggregatedHttpFile> aggregate(Executor fileReadExecutor
public CompletableFuture<AggregatedHttpFile> aggregateWithPooledObjects(Executor fileReadExecutor,
ByteBufAllocator alloc) {
try {
final HttpFile file = getFile();
final HttpFile file = getFile(fileReadExecutor);
return file != null ? file.aggregateWithPooledObjects(fileReadExecutor, alloc)
: CompletableFuture.completedFuture(HttpFile.nonExistent());
} catch (Exception e) {
Expand All @@ -98,13 +99,13 @@ public CompletableFuture<AggregatedHttpFile> aggregateWithPooledObjects(Executor
@Override
public HttpService asService() {
return (ctx, req) -> {
final HttpFile file = MoreObjects.firstNonNull(getFile(), HttpFile.nonExistent());
final HttpFile file = firstNonNull(getFile(ctx.blockingTaskExecutor()), HttpFile.nonExistent());
return file.asService().serve(ctx, req);
};
}

@Nullable
private HttpFile getFile() throws IOException {
private HttpFile getFile(Executor fileReadExecutor) throws IOException {
final HttpFileAttributes uncachedAttrs = file.readAttributes();
if (uncachedAttrs == null) {
// Non-existent file. Invalidate the cache just in case it existed before.
Expand All @@ -121,7 +122,7 @@ private HttpFile getFile() throws IOException {
final AggregatedHttpFile cachedFile = this.cachedFile;
if (cachedFile == null) {
// Cache miss. Add a new entry to the cache.
return cache();
return cache(fileReadExecutor);
}

final HttpFileAttributes cachedAttrs = cachedFile.readAttributes();
Expand All @@ -133,21 +134,20 @@ private HttpFile getFile() throws IOException {

// Cache hit, but the cached file is out of date. Replace the old entry from the cache.
this.cachedFile = null;
return cache();
return cache(fileReadExecutor);
}

private HttpFile cache() {
// TODO(trustin): We assume here that the file being read is small enough that it will not block
// an event loop for a long time. Revisit if the assumption turns out to be false.
AggregatedHttpFile cachedFile = null;
try {
this.cachedFile = cachedFile = file.aggregate(MoreExecutors.directExecutor()).get();
} catch (Exception e) {
this.cachedFile = null;
logger.warn("Failed to cache a file: {}", file, Exceptions.peel(e));
}

return MoreObjects.firstNonNull(cachedFile, file);
private HttpFile cache(Executor fileReadExecutor) {
final AggregatedHttpFile maybeAggregated =
file.aggregate(fileReadExecutor).thenApply(aggregated -> {
cachedFile = aggregated;
return aggregated;
}).exceptionally(cause -> {
logger.warn("Failed to cache a file: {}", file, Exceptions.peel(cause));
return null;
}).getNow(null);

return firstNonNull(maybeAggregated, file);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linecorp.armeria.server.file;

import static com.google.common.base.MoreObjects.firstNonNull;
import static java.util.Objects.requireNonNull;

import java.io.File;
Expand All @@ -24,6 +25,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;

import javax.annotation.Nullable;

Expand All @@ -34,7 +36,6 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.google.common.base.Splitter;
import com.google.common.util.concurrent.MoreExecutors;

import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.HttpData;
Expand Down Expand Up @@ -318,18 +319,17 @@ private HttpFile findFile(ServiceRequestContext ctx, String path,
private HttpFile cache(ServiceRequestContext ctx, PathAndEncoding pathAndEncoding, HttpFile file) {
assert cache != null;

// TODO(trustin): We assume here that the file being read is small enough that it will not block
// an event loop for a long time. Revisit if the assumption turns out to be false.
final AggregatedHttpFile cachedFile = cache.get(pathAndEncoding, key -> {
try {
return file.aggregateWithPooledObjects(MoreExecutors.directExecutor(), ctx.alloc()).get();
} catch (Exception e) {
logger.warn("{} Failed to cache a file: {}", ctx, file, Exceptions.peel(e));
return null;
}
});

return cachedFile != null ? cachedFile : file;
final Executor executor = ctx.blockingTaskExecutor();
final AggregatedHttpFile maybeAggregated =
file.aggregateWithPooledObjects(executor, ctx.alloc()).thenApply(aggregated -> {
cache.put(pathAndEncoding, aggregated);
return aggregated;
}).exceptionally(cause -> {
logger.warn("{} Failed to cache a file: {}", ctx, file, Exceptions.peel(cause));
return null;
}).getNow(null);

return firstNonNull(maybeAggregated, file);
}

/**
Expand Down

0 comments on commit 2ed97c0

Please sign in to comment.