Skip to content

Commit

Permalink
[Rest Api Compatibility] Deprecate the use of synced flush (elastic#7…
Browse files Browse the repository at this point in the history
…5372)

synced flush is going to be replaced by flush. This commit allows to synced_flush api only in v7 compatibility mode.
Worth noting - sync_id is gone and won't be available in v7 responses from indices.stats

relates removal pr elastic#50882
relates elastic#51816
  • Loading branch information
pgomulka authored Jul 28, 2021
1 parent 71546b3 commit c96139d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,6 @@ public void testRecovery() throws Exception {
flushRequest.addParameter("force", "true");
flushRequest.addParameter("wait_if_ongoing", "true");
assertOK(client().performRequest(flushRequest));
if (randomBoolean()) {
syncedFlush(index);
}

if (shouldHaveTranslog) {
// Update a few documents so we are sure to have a translog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.MediaType;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand Down Expand Up @@ -309,9 +312,58 @@ public void testSyncedFlushTransition() throws Exception {
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
Request request = new Request("POST", index + "/_flush/synced");
List<String> warningMsg = List.of("Synced flush was removed and a normal flush was performed instead. " +
"This transition will be removed in a future version.");
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warnings -> warnings.equals(warningMsg) == false));
final String v7MediaType = XContentType.VND_JSON.toParsedMediaType()
.responseContentTypeHeader(Map.of(MediaType.COMPATIBLE_WITH_PARAMETER_NAME,
String.valueOf(RestApiVersion.minimumSupported().major)));
List<String> warningMsg = List.of("Synced flush is deprecated and will be removed in 8.0." +
" Use flush at /_flush or /{index}/_flush instead.");
request.setOptions(RequestOptions.DEFAULT.toBuilder()
.setWarningsHandler(warnings -> warnings.equals(warningMsg) == false)
.addHeader("Accept", v7MediaType));

assertBusy(() -> {
Map<String, Object> result = ObjectPath.createFromResponse(newNodeClient.performRequest(request)).evaluate("_shards");
assertThat(result.get("total"), equalTo(totalShards));
assertThat(result.get("successful"), equalTo(totalShards));
assertThat(result.get("failed"), equalTo(0));
});
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
}
}

public void testFlushTransition() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("no new node found", nodes.getNewNodes().isEmpty());
assumeFalse("no bwc node found", nodes.getBWCNodes().isEmpty());
// Allocate shards to new nodes then verify flush requests processed by old nodes/new nodes
String newNodes = nodes.getNewNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
int numShards = randomIntBetween(1, 10);
int numOfReplicas = randomIntBetween(0, nodes.getNewNodes().size() - 1);
int totalShards = numShards * (numOfReplicas + 1);
final String index = "test_flush";
createIndex(index, Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put("index.routing.allocation.include._name", newNodes).build());
ensureGreen(index);
indexDocs(index, randomIntBetween(0, 100), between(1, 100));
try (RestClient oldNodeClient = buildClient(restClientSettings(),
nodes.getBWCNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
Request request = new Request("POST", index + "/_flush");
assertBusy(() -> {
Map<String, Object> result = ObjectPath.createFromResponse(oldNodeClient.performRequest(request)).evaluate("_shards");
assertThat(result.get("total"), equalTo(totalShards));
assertThat(result.get("successful"), equalTo(totalShards));
assertThat(result.get("failed"), equalTo(0));
});
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
}
indexDocs(index, randomIntBetween(0, 100), between(1, 100));
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
Request request = new Request("POST", index + "/_flush");
assertBusy(() -> {
Map<String, Object> result = ObjectPath.createFromResponse(newNodeClient.performRequest(request)).evaluate("_shards");
assertThat(result.get("total"), equalTo(totalShards));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
if (randomBoolean()) {
syncedFlush(index);
flush(index, randomBoolean());
}
}

Expand Down Expand Up @@ -309,7 +309,7 @@ public void testRecovery() throws Exception {
}
}
if (randomBoolean()) {
syncedFlush(index);
flush(index, randomBoolean());
}
ensureGreen(index);
}
Expand Down Expand Up @@ -584,7 +584,7 @@ public void testUpdateDoc() throws Exception {
assertThat(XContentMapValues.extractValue("_source.updated_field", doc), equalTo(updates.get(docId)));
}
if (randomBoolean()) {
syncedFlush(index);
flush(index, randomBoolean());
}
}

Expand Down
5 changes: 4 additions & 1 deletion rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ tasks.named("yamlRestCompatTest").configure {
OS.current() != OS.WINDOWS
}
systemProperty 'tests.rest.blacklist', ([
'indices.flush/10_basic/Index synced flush rest test',
'search.aggregation/200_top_hits_metric/top_hits aggregation with sequence numbers',
'search/310_match_bool_prefix/multi_match multiple fields with cutoff_frequency throws exception', //cutoff_frequency
'search/340_type_query/type query', // type_query - probably should behave like match_all
Expand Down Expand Up @@ -227,6 +226,10 @@ tasks.named("transformV7RestTests").configure({ task ->
task.replaceValueTextByKeyValue("catch",
'/Please set node identifiers correctly. One and only one of \\[node_name\\], \\[node_names\\] and \\[node_ids\\] has to be set/',
'/You must set \\[node_names\\] or \\[node_ids\\] but not both/')

// sync_id is no longer available in SegmentInfos.userData // "indices.flush/10_basic/Index synced flush rest test"
task.replaceIsTrue("indices.testing.shards.0.0.commit.user_data.sync_id", "indices.testing.shards.0.0.commit.user_data")

})

tasks.register('enforceYamlTestConvention').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
Expand All @@ -32,15 +31,23 @@

public class RestSyncedFlushAction extends BaseRestHandler {

private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestSyncedFlushAction.class);

private static final String DEPRECATION_MESSAGE =
"Synced flush is deprecated and will be removed in 8.0. Use flush at /_flush or /{index}/_flush instead.";
@Override
public List<Route> routes() {
return List.of(
new Route(GET, "/_flush/synced"),
new Route(POST, "/_flush/synced"),
new Route(GET, "/{index}/_flush/synced"),
new Route(POST, "/{index}/_flush/synced"));
Route.builder(GET, "/_flush/synced")
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
.build(),
Route.builder(POST, "/_flush/synced")
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
.build(),
Route.builder(GET, "/{index}/_flush/synced")
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
.build(),
Route.builder(POST, "/{index}/_flush/synced")
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
.build());
}

@Override
Expand All @@ -50,8 +57,6 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
DEPRECATION_LOGGER.deprecate(DeprecationCategory.API, "synced_flush",
"Synced flush was removed and a normal flush was performed instead. This transition will be removed in a future version.");
final FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
return channel -> client.admin().indices().flush(flushRequest, new SimulateSyncedFlushResponseListener(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1651,48 +1651,6 @@ protected static Version minimumNodeVersion() throws IOException {
return minVersion;
}

protected void syncedFlush(String indexName) throws Exception {
final List<String> deprecationMessages = List.of(
"Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead.");
final List<String> fixedDeprecationMessages = List.of(
"Synced flush is deprecated and will be removed in 8.0. Use flush at /_flush or /{index}/_flush instead.");
final List<String> transitionMessages = List.of(
"Synced flush was removed and a normal flush was performed instead. This transition will be removed in a future version.");
final WarningsHandler warningsHandler;
if (minimumNodeVersion().onOrAfter(Version.V_8_0_0)) {
warningsHandler = warnings -> warnings.equals(transitionMessages) == false;
} else if (minimumNodeVersion().onOrAfter(Version.V_7_6_0)) {
warningsHandler = warnings -> warnings.equals(deprecationMessages) == false && warnings.equals(transitionMessages) == false &&
warnings.equals(fixedDeprecationMessages) == false;
} else if (nodeVersions.stream().anyMatch(n -> n.onOrAfter(Version.V_8_0_0))) {
warningsHandler = warnings -> warnings.isEmpty() == false && warnings.equals(transitionMessages) == false;
} else {
warningsHandler = warnings -> warnings.isEmpty() == false;
}
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
final Request request = new Request("POST", indexName + "/_flush/synced");
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler));
Response resp = client().performRequest(request);
if (nodeVersions.stream().allMatch(v -> v.before(Version.V_8_0_0))) {
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("failed"), equalTo(0));
}
} catch (ResponseException ex) {
if (ex.getResponse().getStatusLine().getStatusCode() == RestStatus.CONFLICT.getStatus()
&& ex.getResponse().getWarnings().equals(transitionMessages)) {
logger.info("a normal flush was performed instead");
} else {
throw new AssertionError(ex); // cause assert busy to retry
}
}
});
// ensure the global checkpoint is synced; otherwise we might trim the commit with syncId
ensureGlobalCheckpointSynced(indexName);
}

@SuppressWarnings("unchecked")
private void ensureGlobalCheckpointSynced(String index) throws Exception {
assertBusy(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,10 @@ private void assertUserExecutes(String user, String action, String index, boolea
if (userIsAllowed) {
assertAccessIsAllowed(user, "POST", "/" + index + "/_refresh");
assertAccessIsAllowed(user, "POST", "/" + index + "/_flush");
assertAccessIsAllowed(user, "POST", "/" + index + "/_flush/synced");
assertAccessIsAllowed(user, "POST", "/" + index + "/_forcemerge");
} else {
assertAccessIsDenied(user, "POST", "/" + index + "/_refresh");
assertAccessIsDenied(user, "POST", "/" + index + "/_flush");
assertAccessIsDenied(user, "POST", "/" + index + "/_flush/synced");
assertAccessIsDenied(user, "POST", "/" + index + "/_forcemerge");
}
break;
Expand Down

0 comments on commit c96139d

Please sign in to comment.