Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IndexAuditTrail rolling upgrade on rollover edge 2 (#38286) #38381

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import static org.elasticsearch.xpack.security.audit.AuditUtil.indices;
import static org.elasticsearch.xpack.security.audit.AuditUtil.restRequestContent;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.resolve;
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.resolveNext;
import static org.elasticsearch.xpack.security.support.SecurityIndexManager.SECURITY_VERSION_STRING;

/**
Expand Down Expand Up @@ -308,6 +309,17 @@ private String getIndexName() {
return index;
}

private String getNextIndexName() {
final Message first = peek();
final String index;
if (first == null) {
index = resolveNext(IndexAuditTrailField.INDEX_NAME_PREFIX, DateTime.now(DateTimeZone.UTC), rollover);
} else {
index = resolveNext(IndexAuditTrailField.INDEX_NAME_PREFIX, first.timestamp, rollover);
}
return index;
}

private boolean hasStaleMessage() {
final Message first = peek();
if (first == null) {
Expand Down Expand Up @@ -337,7 +349,7 @@ public void onResponse(ClusterStateResponse clusterStateResponse) {
updateCurrentIndexMappingsIfNecessary(clusterStateResponse.getState());
} else if (TemplateUtils.checkTemplateExistsAndVersionMatches(INDEX_TEMPLATE_NAME,
SECURITY_VERSION_STRING, clusterStateResponse.getState(), logger,
Version.CURRENT::onOrAfter) == false) {
Version.CURRENT::onOrBefore) == false) {
putTemplate(customAuditIndexSettings(settings, logger),
e -> {
logger.error("failed to put audit trail template", e);
Expand Down Expand Up @@ -377,6 +389,7 @@ public void onFailure(Exception e) {

// pkg private for tests
void updateCurrentIndexMappingsIfNecessary(ClusterState state) {
final String nextIndex = getNextIndexName();
final String index = getIndexName();

AliasOrIndex aliasOrIndex = state.getMetaData().getAliasAndIndexLookup().get(index);
Expand All @@ -391,48 +404,60 @@ void updateCurrentIndexMappingsIfNecessary(ClusterState state) {
MappingMetaData docMapping = indexMetaData.mapping("doc");
if (docMapping == null) {
if (indexToRemoteCluster || state.nodes().isLocalNodeElectedMaster() || hasStaleMessage()) {
putAuditIndexMappingsAndStart(index);
putAuditIndexMappingsAndStart(index, nextIndex);
} else {
logger.trace("audit index [{}] is missing mapping for type [{}]", index, DOC_TYPE);
logger.debug("audit index [{}] is missing mapping for type [{}]", index, DOC_TYPE);
transitionStartingToInitialized();
}
} else {
@SuppressWarnings("unchecked")
Map<String, Object> meta = (Map<String, Object>) docMapping.sourceAsMap().get("_meta");
if (meta == null) {
logger.info("Missing _meta field in mapping [{}] of index [{}]", docMapping.type(), index);
throw new IllegalStateException("Cannot read security-version string in index " + index);
}

final String versionString = (String) meta.get(SECURITY_VERSION_STRING);
if (versionString != null && Version.fromString(versionString).onOrAfter(Version.CURRENT)) {
innerStart();
} else {
logger.warn("Missing _meta field in mapping [{}] of index [{}]", docMapping.type(), index);
if (indexToRemoteCluster || state.nodes().isLocalNodeElectedMaster() || hasStaleMessage()) {
putAuditIndexMappingsAndStart(index);
} else if (versionString == null) {
logger.trace("audit index [{}] mapping is missing meta field [{}]", index, SECURITY_VERSION_STRING);
transitionStartingToInitialized();
putAuditIndexMappingsAndStart(index, nextIndex);
} else {
logger.trace("audit index [{}] has the incorrect version [{}]", index, versionString);
logger.debug("audit index [{}] is missing _meta for type [{}]", index, DOC_TYPE);
transitionStartingToInitialized();
}
} else {
final String versionString = (String) meta.get(SECURITY_VERSION_STRING);
if (versionString != null && Version.fromString(versionString).onOrAfter(Version.CURRENT)) {
innerStart();
} else {
if (indexToRemoteCluster || state.nodes().isLocalNodeElectedMaster() || hasStaleMessage()) {
putAuditIndexMappingsAndStart(index, nextIndex);
} else if (versionString == null) {
logger.debug("audit index [{}] mapping is missing meta field [{}]", index, SECURITY_VERSION_STRING);
transitionStartingToInitialized();
} else {
logger.debug("audit index [{}] has the incorrect version [{}]", index, versionString);
transitionStartingToInitialized();
}
}
}
}
} else {
innerStart();
}
}

private void putAuditIndexMappingsAndStart(String index) {
putAuditIndexMappings(index, getPutIndexTemplateRequest(Settings.EMPTY).mappings().get(DOC_TYPE),
ActionListener.wrap(ignore -> {
logger.trace("updated mappings on audit index [{}]", index);
private void putAuditIndexMappingsAndStart(String index, String nextIndex) {
final String docMapping = getPutIndexTemplateRequest(Settings.EMPTY).mappings().get(DOC_TYPE);
putAuditIndexMappings(index, docMapping, ActionListener.wrap(ignore -> {
logger.debug("updated mappings on audit index [{}]", index);
putAuditIndexMappings(nextIndex, docMapping, ActionListener.wrap(ignoreToo -> {
logger.debug("updated mappings on next audit index [{}]", nextIndex);
innerStart();
}, e2 -> {
// best effort only
logger.debug("Failed to update mappings on next audit index [{}]", nextIndex);
innerStart();
}, e -> {
logger.error(new ParameterizedMessage("failed to update mappings on audit index [{}]", index), e);
transitionStartingToInitialized(); // reset to initialized so we can retry
}));
}, e -> {
logger.error(new ParameterizedMessage("failed to update mappings on audit index [{}]", index), e);
transitionStartingToInitialized(); // reset to initialized so we can retry
}));
}

private void transitionStartingToInitialized() {
Expand All @@ -451,7 +476,7 @@ void innerStart() {
assert false : message;
logger.error(message);
} else {
logger.trace("successful state transition from starting to started, current value: [{}]", state.get());
logger.debug("successful state transition from starting to started, current value: [{}]", state.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,31 @@
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.util.function.Function;

public class IndexNameResolver {

public enum Rollover {
HOURLY ("-yyyy.MM.dd.HH"),
DAILY ("-yyyy.MM.dd"),
WEEKLY ("-yyyy.w"),
MONTHLY ("-yyyy.MM");
HOURLY ("-yyyy.MM.dd.HH", d -> d.plusHours(1)),
DAILY ("-yyyy.MM.dd", d -> d.plusDays(1)),
WEEKLY ("-yyyy.w", d -> d.plusWeeks(1)),
MONTHLY ("-yyyy.MM", d -> d.plusMonths(1));

private final DateTimeFormatter formatter;
private final Function<DateTime, DateTime> next;

Rollover(String format) {
Rollover(String format, Function<DateTime, DateTime> next) {
this.formatter = DateTimeFormat.forPattern(format);
this.next = next;
}

DateTimeFormatter formatter() {
return formatter;
}

Function<DateTime, DateTime> getNext() {
return next;
}
}

private IndexNameResolver() {}
Expand All @@ -34,6 +42,10 @@ public static String resolve(DateTime timestamp, Rollover rollover) {
return rollover.formatter().print(timestamp);
}

public static String resolveNext(String indexNamePrefix, DateTime timestamp, Rollover rollover) {
return resolve(indexNamePrefix, rollover.getNext().apply(timestamp), rollover);
}

public static String resolve(String indexNamePrefix, DateTime timestamp, Rollover rollover) {
return indexNamePrefix + resolve(timestamp, rollover);
}
Expand Down
2 changes: 2 additions & 0 deletions x-pack/qa/rolling-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ subprojects {
setting 'xpack.security.audit.outputs', 'index'
setting 'xpack.ssl.keystore.path', 'testnode.jks'
setting 'xpack.ssl.keystore.password', 'testnode'
setting 'logger.org.elasticsearch.xpack.security.audit.index', 'DEBUG'
if (version.onOrAfter('6.0.0') == false) {
// this is needed since in 5.6 we don't bootstrap the token service if there is no explicit initial password
keystoreSetting 'xpack.security.authc.token.passphrase', 'xpack_token_passphrase'
Expand Down Expand Up @@ -211,6 +212,7 @@ subprojects {
setting 'xpack.security.transport.ssl.enabled', 'true'
setting 'xpack.ssl.keystore.path', 'testnode.jks'
keystoreSetting 'xpack.ssl.keystore.secure_password', 'testnode'
setting 'logger.org.elasticsearch.xpack.security.audit.index', 'DEBUG'
if (version.onOrAfter('6.0.0') == false) {
// this is needed since in 5.6 we don't bootstrap the token service if there is no explicit initial password
keystoreSetting 'xpack.security.authc.token.passphrase', 'xpack_token_passphrase'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.hasSize;

Expand Down Expand Up @@ -62,12 +63,11 @@ public void findMinVersionInCluster() throws IOException {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33867")
public void testAuditLogs() throws Exception {
assertBusy(() -> {
assertAuditDocsExist();
assertNumUniqueNodeNameBuckets(expectedNumUniqueNodeNameBuckets());
});
}, 30, TimeUnit.SECONDS);
}

private int expectedNumUniqueNodeNameBuckets() throws IOException {
Expand Down