Skip to content

Commit

Permalink
Merge pull request #195 from A248/futures-and-thread-safety
Browse files Browse the repository at this point in the history
Fix some thread safety and exception-related problems
  • Loading branch information
lokka30 authored Aug 28, 2022
2 parents a105929 + 6690f82 commit 1605815
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public <T> FireCompletion<T> fire(@NotNull T event) {
List<Class<?>> friends = eventTypes.getFriendsOf(event.getClass());
ExecutorService async = EventExecutorTracker.INSTANCE.getExecutor(event.getClass());
FireCompletion<T> ret = new FireCompletion<>(event.getClass());
async.submit(() -> {
async.execute(() -> {
List<Completion> completions = new ArrayList<>();
EventCaller caller = events.get(event.getClass());
if (caller != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public Completion call(Object event) {
return Completion.completed();
}
Completion completion = new Completion();
EventExecutorTracker.INSTANCE.getExecutor(eventClass).submit(() -> {
EventExecutorTracker.INSTANCE.getExecutor(eventClass).execute(() -> {
List<Throwable> errors = call(event, new ArrayList<>(), 0);
if (!errors.isEmpty()) {
completion.completeExceptionally(errors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,32 @@

import com.google.common.reflect.TypeToken;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class EventTypeTracker {

private Map<Class<?>, List<Class<?>>> friends = new HashMap<>();
private final Map<Class<?>, List<Class<?>>> friends = new ConcurrentHashMap<>();

public List<Class<?>> getFriendsOf(Class<?> eventType) {
if (friends.containsKey(eventType)) {
return Collections.unmodifiableList(friends.get(eventType));
}

List<Class<?>> types = getEventTypes(eventType);
friends.put(
eventType,
types.stream().filter(type -> type != eventType).collect(Collectors.toList())
);

return friends.get(eventType);
public List<Class<?>> getFriendsOf(Class<?> event) {
List<Class<?>> computedFriends = friends.computeIfAbsent((event), (eventType) -> {
return getEventTypes(eventType)
.filter(type -> type != eventType)
.collect(Collectors.toList());
});
return Collections.unmodifiableList(computedFriends);
}

private static List<Class<?>> getEventTypes(Class<?> eventType) {
private static <E> Stream<Class<? super E>> getEventTypes(Class<E> eventType) {
return TypeToken
.of(eventType)
.getTypes()
.rawTypes()
.stream()
.filter(type -> type != Object.class && type != Cancellable.class)
.collect(Collectors.toList());
.filter(type -> type != Object.class && type != Cancellable.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

package me.lokka30.treasury.api.economy;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -222,20 +225,12 @@ default void retrieveAllAccountsPlayerIsMemberOf(
subscription.succeed(identifiers);
return;
}
Set<String> ret = new HashSet<>();
Set<String> ret = Collections.synchronizedSet(new HashSet<>());
List<CompletableFuture<Void>> futures = new ArrayList<>(identifiers.size());
for (String identifier : identifiers) {
EconomySubscriber.<Account>asFuture(subscriber -> retrieveAccount(identifier,
subscriber
)).exceptionally(throwable -> {
if (throwable instanceof EconomyException) {
subscription.fail((EconomyException) throwable);
} else {
subscription.fail(new EconomyException(EconomyFailureReason.OTHER_FAILURE,
throwable
));
}
return null;
}).thenCompose(account -> {
futures.add(EconomySubscriber.<Account>asFuture(
subscriber -> retrieveAccount(identifier, subscriber)
).thenCompose(account -> {
if (account == null) {
return CompletableFuture.completedFuture(false);
}
Expand All @@ -250,10 +245,22 @@ default void retrieveAllAccountsPlayerIsMemberOf(
if (val) {
ret.add(identifier);
}
});

}));
}
subscription.succeed(ret);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
subscription.succeed(ret);
})
.exceptionally((throwable) -> {
if (throwable instanceof EconomyException) {
subscription.fail((EconomyException) throwable);
} else {
subscription.fail(new EconomyException(EconomyFailureReason.OTHER_FAILURE,
throwable
));
}
return null;
});
});
}

Expand Down Expand Up @@ -288,20 +295,12 @@ default void retrieveAllAccountsPlayerHasPermission(
subscription.succeed(identifiers);
return;
}
Set<String> ret = new HashSet<>();
Set<String> ret = Collections.synchronizedSet(new HashSet<>());
List<CompletableFuture<Void>> futures = new ArrayList<>(identifiers.size());
for (String identifier : identifiers) {
EconomySubscriber.<Account>asFuture(subscriber -> retrieveAccount(identifier,
subscriber
)).exceptionally(throwable -> {
if (throwable instanceof EconomyException) {
subscription.fail((EconomyException) throwable);
} else {
subscription.fail(new EconomyException(EconomyFailureReason.OTHER_FAILURE,
throwable
));
}
return null;
}).thenCompose(account -> {
futures.add(EconomySubscriber.<Account>asFuture(
subscriber -> retrieveAccount(identifier, subscriber)
).thenCompose(account -> {
if (account == null) {
return CompletableFuture.completedFuture(false);
}
Expand All @@ -327,9 +326,22 @@ public void fail(@NotNull final EconomyException exception) {
if (val) {
ret.add(identifier);
}
});
}));
}
subscription.succeed(ret);
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
subscription.succeed(ret);
})
.exceptionally((throwable) -> {
if (throwable instanceof EconomyException) {
subscription.fail((EconomyException) throwable);
} else {
subscription.fail(new EconomyException(EconomyFailureReason.OTHER_FAILURE,
throwable
));
}
return null;
});
});
}

Expand Down

0 comments on commit 1605815

Please sign in to comment.