diff --git a/LICENSE b/LICENSE index d0549103c1..a5d943cea3 100644 --- a/LICENSE +++ b/LICENSE @@ -220,6 +220,7 @@ The text of each license is the standard Apache 2.0 license. java/fury-core/src/main/java/org/apache/fury/reflect/TypeParameter.java java/fury-core/src/main/java/org/apache/fury/reflect/Types.java java/fury-core/src/main/java/org/apache/fury/reflect/TypeRef.java + java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java * spark (https://github.com/apache/spark) Files: diff --git a/java/fury-core/src/main/java/org/apache/fury/builder/JITContext.java b/java/fury-core/src/main/java/org/apache/fury/builder/JITContext.java index b06775b66b..a19efd4a9b 100644 --- a/java/fury-core/src/main/java/org/apache/fury/builder/JITContext.java +++ b/java/fury-core/src/main/java/org/apache/fury/builder/JITContext.java @@ -19,15 +19,12 @@ package org.apache.fury.builder; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import org.apache.fury.Fury; @@ -85,16 +82,13 @@ public T registerSerializerJITCallback( if (fury.getConfig().isAsyncCompilationEnabled() && !isAsyncVisitingFury()) { // TODO(chaokunyang) stash callbacks and submit jit task if the serialization speed // is really needed. - ListeningExecutorService compilationService = CodeGenerator.getCompilationService(); - ListenableFuture future; + ExecutorService compilationService = CodeGenerator.getCompilationService(); hasJITResult.put(callback.id(), new ArrayList<>()); numRunningTask++; - future = compilationService.submit(jitAction); - Futures.addCallback( - future, - new FutureCallback() { - @Override - public void onSuccess(T result) { + compilationService.execute( + () -> { + try { + T result = jitAction.call(); try { lock(); callback.onSuccess(result); @@ -108,10 +102,7 @@ public void onSuccess(T result) { } unlock(); } - } - - @Override - public void onFailure(Throwable t) { + } catch (Throwable t) { try { lock(); callback.onFailure(t); @@ -124,8 +115,7 @@ public void onFailure(Throwable t) { unlock(); } } - }, - compilationService); + }); return interpreterModeAction.call(); } else { return jitAction.call(); diff --git a/java/fury-core/src/main/java/org/apache/fury/codegen/CodeGenerator.java b/java/fury-core/src/main/java/org/apache/fury/codegen/CodeGenerator.java index 2d03b5d774..776df27d18 100644 --- a/java/fury-core/src/main/java/org/apache/fury/codegen/CodeGenerator.java +++ b/java/fury-core/src/main/java/org/apache/fury/codegen/CodeGenerator.java @@ -19,10 +19,6 @@ package org.apache.fury.codegen; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -30,7 +26,9 @@ import java.util.Map; import java.util.StringJoiner; import java.util.WeakHashMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -47,6 +45,8 @@ import org.apache.fury.util.GraalvmSupport; import org.apache.fury.util.Preconditions; import org.apache.fury.util.StringUtils; +import org.apache.fury.util.concurrency.DirectExecutorService; +import org.apache.fury.util.concurrency.FuryJitCompilerThreadFactory; /** * Code generator will take a list of {@link CompileUnit} and compile it into a list of classes. @@ -80,7 +80,7 @@ public class CodeGenerator { private static final String FALLBACK_PACKAGE = Generated.class.getPackage().getName(); public static final boolean ENABLE_FURY_GENERATED_CLASS_UNIQUE_ID; private static int maxPoolSize = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); - private static ListeningExecutorService compilationExecutorService; + private static ExecutorService compilationExecutorService; static { boolean useUniqueId = StringUtils.isBlank(CodeGenerator.getCodeDir()); @@ -205,34 +205,35 @@ private ClassLoader defineClasses(Map classes) { return resultClassLoader; } - public ListenableFuture[]> asyncCompile(CompileUnit... compileUnits) { - return getCompilationService() - .submit( - () -> { - ClassLoader loader = compile(compileUnits); - return Arrays.stream(compileUnits) - .map( - compileUnit -> { - try { - return (Class) loader.loadClass(compileUnit.getQualifiedClassName()); - } catch (ClassNotFoundException e) { - throw new IllegalStateException( - "Impossible because we just compiled class", e); - } - }) - .toArray(Class[]::new); - }); + public CompletableFuture[]> asyncCompile(CompileUnit... compileUnits) { + ExecutorService executorService = getCompilationService(); + return CompletableFuture.supplyAsync( + () -> { + ClassLoader loader = compile(compileUnits); + return Arrays.stream(compileUnits) + .map( + compileUnit -> { + try { + return (Class) loader.loadClass(compileUnit.getQualifiedClassName()); + } catch (ClassNotFoundException e) { + throw new IllegalStateException( + "Impossible because we just compiled class", e); + } + }) + .toArray(Class[]::new); + }, + executorService); } public static void seMaxCompilationThreadPoolSize(int maxCompilationThreadPoolSize) { maxPoolSize = maxCompilationThreadPoolSize; } - public static synchronized ListeningExecutorService getCompilationService() { + public static synchronized ExecutorService getCompilationService() { if (compilationExecutorService == null) { if (GraalvmSupport.isGraalBuildtime()) { // GraalVM build time can't reachable thread. - return compilationExecutorService = MoreExecutors.newDirectExecutorService(); + return compilationExecutorService = new DirectExecutorService(); } ThreadPoolExecutor executor = new ThreadPoolExecutor( @@ -241,13 +242,13 @@ public static synchronized ListeningExecutorService getCompilationService() { 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder().setNameFormat("fury-jit-compiler-%d").build(), + new FuryJitCompilerThreadFactory(), (r, e) -> LOG.warn("Task {} rejected from {}", r.toString(), e)); // Normally task won't be rejected by executor, since we used an unbound queue. // But when we shut down executor for debug, it'll be rejected by executor, // in such cases we just ignore the reject exception by log it. executor.allowCoreThreadTimeOut(true); - compilationExecutorService = MoreExecutors.listeningDecorator(executor); + compilationExecutorService = executor; } return compilationExecutorService; } diff --git a/java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java b/java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java new file mode 100644 index 0000000000..2aa904fe40 --- /dev/null +++ b/java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fury.util.concurrency; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +// Mostly derived from Guava 32.1.2 +// com.google.common.util.concurrent.MoreExecutors.DirectExecutorService +// https://github.com/google/guava/blob/9f6a3840/guava/src/com/google/common/util/concurrent/MoreExecutors.java +public class DirectExecutorService extends AbstractExecutorService { + private final Object lock = new Object(); + private int runningTasks = 0; + private boolean shutdown = false; + + @Override + public void execute(Runnable command) { + synchronized (lock) { + if (shutdown) { + throw new RejectedExecutionException("Executor already shutdown"); + } + runningTasks++; + } + try { + command.run(); + } finally { + synchronized (lock) { + int numRunning = --runningTasks; + if (numRunning == 0) { + lock.notifyAll(); + } + } + } + } + + @Override + public boolean isShutdown() { + synchronized (lock) { + return shutdown; + } + } + + @Override + public void shutdown() { + synchronized (lock) { + shutdown = true; + if (runningTasks == 0) { + lock.notifyAll(); + } + } + } + + @Override + public List shutdownNow() { + shutdown(); + return Collections.emptyList(); + } + + @Override + public boolean isTerminated() { + synchronized (lock) { + return shutdown && runningTasks == 0; + } + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + synchronized (lock) { + while (true) { + if (shutdown && runningTasks == 0) { + return true; + } else if (nanos <= 0) { + return false; + } else { + long now = System.nanoTime(); + TimeUnit.NANOSECONDS.timedWait(lock, nanos); + nanos -= System.nanoTime() - now; // subtract the actual time we waited + } + } + } + } +} diff --git a/java/fury-core/src/main/java/org/apache/fury/util/concurrency/FuryJitCompilerThreadFactory.java b/java/fury-core/src/main/java/org/apache/fury/util/concurrency/FuryJitCompilerThreadFactory.java new file mode 100644 index 0000000000..1ec883c06a --- /dev/null +++ b/java/fury-core/src/main/java/org/apache/fury/util/concurrency/FuryJitCompilerThreadFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury.util.concurrency; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class FuryJitCompilerThreadFactory implements ThreadFactory { + private final ThreadFactory backingThreadFactory = Executors.defaultThreadFactory(); + private final AtomicInteger threadNumber = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable task) { + Thread thread = backingThreadFactory.newThread(task); + thread.setName("fury-jit-compiler-" + threadNumber.incrementAndGet()); + return thread; + } +} diff --git a/java/fury-core/src/main/resources/META-INF/LICENSE b/java/fury-core/src/main/resources/META-INF/LICENSE index 83adf1aac1..187a82391b 100644 --- a/java/fury-core/src/main/resources/META-INF/LICENSE +++ b/java/fury-core/src/main/resources/META-INF/LICENSE @@ -220,6 +220,7 @@ The text of each license is the standard Apache 2.0 license. java/fury-core/src/main/java/org/apache/fury/reflect/TypeParameter.java java/fury-core/src/main/java/org/apache/fury/reflect/Types.java java/fury-core/src/main/java/org/apache/fury/reflect/TypeRef.java + java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java * spark (https://github.com/apache/spark) Files: diff --git a/licenserc.toml b/licenserc.toml index 6ae262dfd4..46cd3e5890 100644 --- a/licenserc.toml +++ b/licenserc.toml @@ -48,6 +48,7 @@ excludes = [ "java/fury-core/src/main/java/org/apache/fury/reflect/Types.java", "java/fury-core/src/main/java/org/apache/fury/type/Generics.java", "java/fury-core/src/main/java/org/apache/fury/util/MurmurHash3.java", + "java/fury-core/src/main/java/org/apache/fury/util/concurrency/DirectExecutorService.java", "java/fury-core/src/main/java/org/apache/fury/memory/Platform.java", "java/fury-core/src/main/java/org/apache/fury/util/Preconditions.java", "java/fury-core/src/test/java/org/apache/fury/type/GenericsTest.java",