Skip to content

Commit

Permalink
Start resource watcher service early (elastic#55275)
Browse files Browse the repository at this point in the history
The ResourceWatcherService enables watching of files for modifications
and deletions. During startup various consumers register the files that
should be watched by this service. There is behavior that might be
unexpected in that the service may not start polling until later in the
startup process due to the use of lifecycle states to control when the
service actually starts the jobs to monitor resources. This change
removes this unexpected behavior so that upon construction the service
has already registered its tasks to poll resources for changes. In
making this modification, the service no longer extends
AbstractLifecycleComponent and instead implements the Closeable
interface so that the polling jobs can be terminated when the service
is no longer required.

Relates elastic#54867
Backport of elastic#54993
  • Loading branch information
jaymode authored Apr 16, 2020
1 parent f75e3b4 commit 2d9e3c7
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 251 deletions.
8 changes: 4 additions & 4 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ protected Node(final Environment initialEnvironment,

final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);
// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
DeprecationLogger.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
Expand All @@ -351,7 +353,7 @@ protected Node(final Environment initialEnvironment,
additionalSettings.addAll(builder.getRegisteredSettings());
}
client = new NodeClient(settings, threadPool);
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);

final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
Expand All @@ -367,7 +369,6 @@ protected Node(final Environment initialEnvironment,
final SettingsModule settingsModule =
new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));

Expand Down Expand Up @@ -716,7 +717,6 @@ public Node start() throws NodeValidationException {
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);

injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
Expand Down Expand Up @@ -830,7 +830,7 @@ private Node stop() {
}
logger.info("stopping ...");

injector.getInstance(ResourceWatcherService.class).stop();
injector.getInstance(ResourceWatcherService.class).close();
injector.getInstance(HttpServerTransport.class).stop();

injector.getInstance(SnapshotsService.class).stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -30,6 +28,7 @@
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
Expand All @@ -42,7 +41,7 @@
* registered watcher periodically. The frequency of checks can be specified using {@code resource.reload.interval} setting, which
* defaults to {@code 60s}. The service can be disabled by setting {@code resource.reload.enabled} setting to {@code false}.
*/
public class ResourceWatcherService extends AbstractLifecycleComponent {
public class ResourceWatcherService implements Closeable {
private static final Logger logger = LogManager.getLogger(ResourceWatcherService.class);

public enum Frequency {
Expand Down Expand Up @@ -78,51 +77,42 @@ public enum Frequency {
Setting.timeSetting("resource.reload.interval.low", Frequency.LOW.interval, Property.NodeScope);

private final boolean enabled;
private final ThreadPool threadPool;

final ResourceMonitor lowMonitor;
final ResourceMonitor mediumMonitor;
final ResourceMonitor highMonitor;

private volatile Cancellable lowFuture;
private volatile Cancellable mediumFuture;
private volatile Cancellable highFuture;
private final Cancellable lowFuture;
private final Cancellable mediumFuture;
private final Cancellable highFuture;

@Inject
public ResourceWatcherService(Settings settings, ThreadPool threadPool) {
this.enabled = ENABLED.get(settings);
this.threadPool = threadPool;

TimeValue interval = RELOAD_INTERVAL_LOW.get(settings);
lowMonitor = new ResourceMonitor(interval, Frequency.LOW);
interval = RELOAD_INTERVAL_MEDIUM.get(settings);
mediumMonitor = new ResourceMonitor(interval, Frequency.MEDIUM);
interval = RELOAD_INTERVAL_HIGH.get(settings);
highMonitor = new ResourceMonitor(interval, Frequency.HIGH);
}

@Override
protected void doStart() {
if (!enabled) {
return;
if (enabled) {
lowFuture = threadPool.scheduleWithFixedDelay(lowMonitor, lowMonitor.interval, Names.SAME);
mediumFuture = threadPool.scheduleWithFixedDelay(mediumMonitor, mediumMonitor.interval, Names.SAME);
highFuture = threadPool.scheduleWithFixedDelay(highMonitor, highMonitor.interval, Names.SAME);
} else {
lowFuture = null;
mediumFuture = null;
highFuture = null;
}
lowFuture = threadPool.scheduleWithFixedDelay(lowMonitor, lowMonitor.interval, Names.SAME);
mediumFuture = threadPool.scheduleWithFixedDelay(mediumMonitor, mediumMonitor.interval, Names.SAME);
highFuture = threadPool.scheduleWithFixedDelay(highMonitor, highMonitor.interval, Names.SAME);
}

@Override
protected void doStop() {
if (!enabled) {
return;
public void close() {
if (enabled) {
lowFuture.cancel();
mediumFuture.cancel();
highFuture.cancel();
}
lowFuture.cancel();
mediumFuture.cancel();
highFuture.cancel();
}

@Override
protected void doClose() {
}

/**
Expand All @@ -149,10 +139,6 @@ public <W extends ResourceWatcher> WatcherHandle<W> add(W watcher, Frequency fre
}
}

public void notifyNow() {
notifyNow(Frequency.MEDIUM);
}

public void notifyNow(Frequency frequency) {
switch (frequency) {
case LOW:
Expand All @@ -169,7 +155,7 @@ public void notifyNow(Frequency frequency) {
}
}

class ResourceMonitor implements Runnable {
static class ResourceMonitor implements Runnable {

final TimeValue interval;
final Frequency frequency;
Expand All @@ -188,7 +174,7 @@ private <W extends ResourceWatcher> WatcherHandle<W> add(W watcher) {

@Override
public synchronized void run() {
for(ResourceWatcher watcher : watchers) {
for (ResourceWatcher watcher : watchers) {
try {
watcher.checkAndNotify();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ public void setup() throws Exception {
Settings settings = Settings.builder()
.put("resource.reload.interval.high", "10ms")
.build();
watcherService = new ResourceWatcherService(settings,
threadPool);
watcherService.start();
watcherService = new ResourceWatcherService(settings, threadPool);
licenseModePath = createTempFile();
onChangeCounter = new AtomicReference<>(new CountDownLatch(1));
operationModeFileWatcher = new OperationModeFileWatcher(watcherService, licenseModePath, logger,
Expand All @@ -47,8 +45,8 @@ public void setup() throws Exception {

@After
public void shutdown() throws InterruptedException {
watcherService.close();
terminate(threadPool);
watcherService.stop();
}

public void testInit() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ public void setup() {
threadPool = new TestThreadPool("reload tests");
resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool);
resourceWatcherService.start();
}

@After
public void cleanup() {
if (resourceWatcherService != null) {
resourceWatcherService.close();
}
if (threadPool != null) {
terminate(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ public void testStore_ConfiguredWithUnreadableFile() throws Exception {
Files.write(file, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);

RealmConfig config = getRealmConfig();
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService);
assertThat(store.usersCount(), is(0));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService);
assertThat(store.usersCount(), is(0));
}
}

public void testStore_AutoReload() throws Exception {
Expand All @@ -89,47 +90,46 @@ public void testStore_AutoReload() throws Exception {
Files.copy(users, file, StandardCopyOption.REPLACE_EXISTING);
final Hasher hasher = Hasher.resolve(settings.get("xpack.security.authc.password_hashing.algorithm"));
RealmConfig config = getRealmConfig();
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
final CountDownLatch latch = new CountDownLatch(1);

FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService, latch::countDown);
//Test users share the hashing algorithm name for convenience
String username = settings.get("xpack.security.authc.password_hashing.algorithm");
User user = new User(username);
assertThat(store.userExists(username), is(true));
AuthenticationResult result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));

watcherService.start();

try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
}

watcherService.notifyNow(ResourceWatcherService.Frequency.HIGH);
if (latch.getCount() != 1) {
fail("Listener should not be called as users passwords are not changed.");
}

assertThat(store.userExists(username), is(true));
result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));

try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("foobar:").append(new String(hasher.hash(new SecureString("barfoo"))));
}

if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
final CountDownLatch latch = new CountDownLatch(1);

FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService, latch::countDown);
//Test users share the hashing algorithm name for convenience
String username = settings.get("xpack.security.authc.password_hashing.algorithm");
User user = new User(username);
assertThat(store.userExists(username), is(true));
AuthenticationResult result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));

try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
}

watcherService.notifyNow(ResourceWatcherService.Frequency.HIGH);
if (latch.getCount() != 1) {
fail("Listener should not be called as users passwords are not changed.");
}

assertThat(store.userExists(username), is(true));
result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));

try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("foobar:").append(new String(hasher.hash(new SecureString("barfoo"))));
}

if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}

assertThat(store.userExists("foobar"), is(true));
result = store.verifyPassword("foobar", new SecureString("barfoo"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
}

assertThat(store.userExists("foobar"), is(true));
result = store.verifyPassword("foobar", new SecureString("barfoo"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
}

private RealmConfig getRealmConfig() {
Expand All @@ -145,27 +145,26 @@ public void testStore_AutoReload_WithParseFailures() throws Exception {
Files.copy(users, testUsers, StandardCopyOption.REPLACE_EXISTING);

RealmConfig config = getRealmConfig();
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
final CountDownLatch latch = new CountDownLatch(1);
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
final CountDownLatch latch = new CountDownLatch(1);

FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService, latch::countDown);
//Test users share the hashing algorithm name for convenience
String username = settings.get("xpack.security.authc.password_hashing.algorithm");
User user = new User(username);
final AuthenticationResult result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService, latch::countDown);
//Test users share the hashing algorithm name for convenience
String username = settings.get("xpack.security.authc.password_hashing.algorithm");
User user = new User(username);
final AuthenticationResult result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));

watcherService.start();
// now replacing the content of the users file with something that cannot be read
Files.write(testUsers, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);

// now replacing the content of the users file with something that cannot be read
Files.write(testUsers, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}

if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
assertThat(store.usersCount(), is(0));
}

assertThat(store.usersCount(), is(0));
}

public void testParseFile() throws Exception {
Expand Down
Loading

0 comments on commit 2d9e3c7

Please sign in to comment.