Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve security-crypto threadpool overflow handling #111369

Merged
merged 7 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/111369.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111369
summary: Improve security-crypto threadpool overflow handling
area: Authentication
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -1315,7 +1316,18 @@ void validateApiKeyCredentials(
AuthenticationResult.unsuccessful("invalid credentials for API key [" + credentials.getId() + "]", null)
);
}
}, listener::onFailure));
}, exception -> {
// Crypto threadpool queue is full, invalidate this cache entry and make sure nothing is going to wait on it
logger.warn(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that this might cause log spam, but I don't want to pull it all the way back to debug because I think it's good to be able to easily tell when this is happening.

Strings.format(
"rejecting possibly valid API key authentication because the [%s] threadpool is full",
SECURITY_CRYPTO_THREAD_POOL_NAME
)
);
apiKeyAuthCache.invalidate(credentials.getId(), listenableCacheEntry);
listenableCacheEntry.onFailure(exception);
listener.onFailure(exception);
}));
}
} else {
verifyKeyAgainstHash(apiKeyDoc.hash, credentials, ActionListener.wrap(verified -> {
Expand Down Expand Up @@ -1453,15 +1465,19 @@ void computeHashForApiKey(SecureString apiKey, ActionListener<char[]> listener)

// Protected instance method so this can be mocked
protected void verifyKeyAgainstHash(String apiKeyHash, ApiKeyCredentials credentials, ActionListener<Boolean> listener) {
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> {
Hasher hasher = Hasher.resolveFromHash(apiKeyHash.toCharArray());
final char[] apiKeyHashChars = apiKeyHash.toCharArray();
try {
return hasher.verify(credentials.getKey(), apiKeyHashChars);
} finally {
Arrays.fill(apiKeyHashChars, (char) 0);
}
}));
try {
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(ActionRunnable.supply(listener, () -> {
Hasher hasher = Hasher.resolveFromHash(apiKeyHash.toCharArray());
final char[] apiKeyHashChars = apiKeyHash.toCharArray();
try {
return hasher.verify(credentials.getKey(), apiKeyHashChars);
} finally {
Arrays.fill(apiKeyHashChars, (char) 0);
}
}));
} catch (RejectedExecutionException e) {
listener.onFailure(e);
gwbrown marked this conversation as resolved.
Show resolved Hide resolved
}
}

private static Instant getApiKeyExpiration(Instant now, @Nullable TimeValue expiration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -230,6 +232,9 @@ public class ApiKeyServiceTests extends ESTestCase {
"search": [ {"names": ["logs"]} ],
"replication": [ {"names": ["archive"]} ]
}""");

private static final int TEST_THREADPOOL_QUEUE_SIZE = 1000;

private ThreadPool threadPool;
private Client client;
private SecurityIndexManager securityIndex;
Expand All @@ -245,7 +250,7 @@ public void createThreadPool() {
Settings.EMPTY,
SECURITY_CRYPTO_THREAD_POOL_NAME,
1,
1000,
TEST_THREADPOOL_QUEUE_SIZE,
"xpack.security.crypto.thread_pool",
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
)
Expand All @@ -268,6 +273,97 @@ public void setupMocks() {
doAnswer(invocation -> Instant.now()).when(clock).instant();
}

public void testFloodThreadpool() throws Exception {
// We're going to be blocking the security-crypto threadpool so we need a new one for the client
ThreadPool clientThreadpool = new TestThreadPool(
this.getTestName(),
new FixedExecutorBuilder(
Settings.EMPTY,
this.getTestName(),
1,
100,
"no_settings_used",
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
)
);
try {
when(client.threadPool()).thenReturn(clientThreadpool);

// setup copied from testAuthenticateWithApiKey
final Settings settings = Settings.builder().put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true).build();
final ApiKeyService service = createApiKeyService(settings);

final String id = randomAlphaOfLength(12);
final String key = randomAlphaOfLength(16);

final User user, authUser;
if (randomBoolean()) {
user = new User("hulk", new String[] { "superuser" }, "Bruce Banner", "[email protected]", Map.of(), true);
authUser = new User("authenticated_user", "other");
} else {
user = new User("hulk", new String[] { "superuser" }, "Bruce Banner", "[email protected]", Map.of(), true);
authUser = null;
}
final ApiKey.Type type = randomFrom(ApiKey.Type.values());
final Map<String, Object> metadata = mockKeyDocument(id, key, user, authUser, false, Duration.ofSeconds(3600), null, type);

// Block the security crypto threadpool
CyclicBarrier barrier = new CyclicBarrier(2);
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(() -> {
try {
barrier.await();
gwbrown marked this conversation as resolved.
Show resolved Hide resolved
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
});
// Now fill it up while the one thread is blocked
for (int i = 0; i < TEST_THREADPOOL_QUEUE_SIZE; i++) {
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(() -> {});
}

// Check that it's full
for (var stat : threadPool.stats().stats()) {
if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) {
assertThat(stat.queue(), equalTo(TEST_THREADPOOL_QUEUE_SIZE));
assertThat(stat.rejected(), equalTo(0L));
}
}

// now try to auth with an API key
final AuthenticationResult<User> auth = tryAuthenticate(service, id, key, type);
assertThat(auth.getStatus(), is(AuthenticationResult.Status.TERMINATE));

// Make sure one was rejected and the queue is still full
for (var stat : threadPool.stats().stats()) {
if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) {
assertThat(stat.queue(), equalTo(TEST_THREADPOOL_QUEUE_SIZE));
assertThat(stat.rejected(), equalTo(1L));
}
}
ListenableFuture<CachedApiKeyHashResult> cachedValue = service.getApiKeyAuthCache().get(id);
assertThat("since the request was rejected, there should be no cache entry for this key", cachedValue, nullValue());

// unblock the threadpool
barrier.await();

// wait for the threadpool queue to drain
assertBusy(() -> {
for (var stat : threadPool.stats().stats()) {
if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) {
assertThat(stat.rejected(), equalTo(1L));
assertThat(stat.queue(), equalTo(0));
}
}
});
gwbrown marked this conversation as resolved.
Show resolved Hide resolved

// try to authenticate again with the same key - if this hangs, check the future caching
final AuthenticationResult<User> shouldSucceed = tryAuthenticate(service, id, key, type);
assertThat(shouldSucceed.getStatus(), is(AuthenticationResult.Status.SUCCESS));
} finally {
terminate(clientThreadpool);
}
}

public void testCreateApiKeyUsesBulkIndexAction() throws Exception {
final Settings settings = Settings.builder().put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true).build();
final ApiKeyService service = createApiKeyService(settings);
Expand Down