From 3f253487f07362f8b23fec381dad1343cc761a41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Mon, 30 Oct 2017 10:45:50 +0100 Subject: [PATCH 1/6] [Docs] Remove first person "I" from getting started (#27155) Avoid first person style and consistently switch to an unpersonal style in the getting started docs. --- docs/reference/getting-started.asciidoc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/reference/getting-started.asciidoc b/docs/reference/getting-started.asciidoc index 46989857235ac..f72d08397a150 100755 --- a/docs/reference/getting-started.asciidoc +++ b/docs/reference/getting-started.asciidoc @@ -13,7 +13,7 @@ Here are a few sample use-cases that Elasticsearch could be used for: * You run a price alerting platform which allows price-savvy customers to specify a rule like "I am interested in buying a specific electronic gadget and I want to be notified if the price of gadget falls below $X from any vendor within the next month". In this case you can scrape vendor prices, push them into Elasticsearch and use its reverse-search (Percolator) capability to match price movements against customer queries and eventually push the alerts out to the customer once matches are found. * You have analytics/business-intelligence needs and want to quickly investigate, analyze, visualize, and ask ad-hoc questions on a lot of data (think millions or billions of records). In this case, you can use Elasticsearch to store your data and then use Kibana (part of the Elasticsearch/Logstash/Kibana stack) to build custom dashboards that can visualize aspects of your data that are important to you. Additionally, you can use the Elasticsearch aggregations functionality to perform complex business intelligence queries against your data. -For the rest of this tutorial, I will guide you through the process of getting Elasticsearch up and running, taking a peek inside it, and performing basic operations like indexing, searching, and modifying your data. At the end of this tutorial, you should have a good idea of what Elasticsearch is, how it works, and hopefully be inspired to see how you can use it to either build sophisticated search applications or to mine intelligence from your data. +For the rest of this tutorial, you will be guided through the process of getting Elasticsearch up and running, taking a peek inside it, and performing basic operations like indexing, searching, and modifying your data. At the end of this tutorial, you should have a good idea of what Elasticsearch is, how it works, and hopefully be inspired to see how you can use it to either build sophisticated search applications or to mine intelligence from your data. -- == Basic Concepts @@ -660,7 +660,7 @@ Now that we've gotten a glimpse of the basics, let's try to work on a more reali -------------------------------------------------- // NOTCONSOLE -For the curious, I generated this data from http://www.json-generator.com/[`www.json-generator.com/`] so please ignore the actual values and semantics of the data as these are all randomly generated. +For the curious, this data was generated using http://www.json-generator.com/[`www.json-generator.com/`], so please ignore the actual values and semantics of the data as these are all randomly generated. [float] === Loading the Sample Dataset @@ -1284,4 +1284,4 @@ There are many other aggregations capabilities that we won't go into detail here == Conclusion -Elasticsearch is both a simple and complex product. We've so far learned the basics of what it is, how to look inside of it, and how to work with it using some of the REST APIs. I hope that this tutorial has given you a better understanding of what Elasticsearch is and more importantly, inspired you to further experiment with the rest of its great features! +Elasticsearch is both a simple and complex product. We've so far learned the basics of what it is, how to look inside of it, and how to work with it using some of the REST APIs. Hopefully this tutorial has given you a better understanding of what Elasticsearch is and more importantly, inspired you to further experiment with the rest of its great features! From 06f4f7ee71e56b8833cbff2fe1e52c23fbb4c981 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Mon, 30 Oct 2017 11:29:40 +0100 Subject: [PATCH 2/6] [Docs] Clarify `span_not` query behavior for non-overlapping matches (#27150) Closes #27134 --- docs/reference/query-dsl/span-not-query.asciidoc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/reference/query-dsl/span-not-query.asciidoc b/docs/reference/query-dsl/span-not-query.asciidoc index 5a648bd4b0e90..1632ee03b2fb8 100644 --- a/docs/reference/query-dsl/span-not-query.asciidoc +++ b/docs/reference/query-dsl/span-not-query.asciidoc @@ -1,7 +1,9 @@ [[query-dsl-span-not-query]] === Span Not Query -Removes matches which overlap with another span query. The span not +Removes matches which overlap with another span query or which are +within x tokens before (controlled by the parameter `pre`) or y tokens +after (controled by the parameter `post`) another SpanQuery. The span not query maps to Lucene `SpanNotQuery`. Here is an example: [source,js] @@ -39,7 +41,7 @@ In the above example all documents with the term hoya are filtered except the on Other top level options: [horizontal] -`pre`:: If set the amount of tokens before the include span can't have overlap with the exclude span. -`post`:: If set the amount of tokens after the include span can't have overlap with the exclude span. +`pre`:: If set the amount of tokens before the include span can't have overlap with the exclude span. Defaults to 0. +`post`:: If set the amount of tokens after the include span can't have overlap with the exclude span. Defaults to 0. `dist`:: If set the amount of tokens from within the include span can't have overlap with the exclude span. Equivalent of setting both `pre` and `post`. From 287052b47da5c90abb96b3a05a1c4534b5ce433d Mon Sep 17 00:00:00 2001 From: Clarkie Date: Mon, 30 Oct 2017 10:53:57 +0000 Subject: [PATCH 3/6] =?UTF-8?q?[Docs]=C2=A0Fix=20indentation=20of=20exampl?= =?UTF-8?q?es=20(#27168)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../query-dsl/common-terms-query.asciidoc | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/reference/query-dsl/common-terms-query.asciidoc b/docs/reference/query-dsl/common-terms-query.asciidoc index a0c58597f7a5a..41034f357ce4c 100644 --- a/docs/reference/query-dsl/common-terms-query.asciidoc +++ b/docs/reference/query-dsl/common-terms-query.asciidoc @@ -76,7 +76,7 @@ GET /_search "common": { "body": { "query": "this is bonsai cool", - "cutoff_frequency": 0.001 + "cutoff_frequency": 0.001 } } } @@ -100,8 +100,8 @@ GET /_search "common": { "body": { "query": "nelly the elephant as a cartoon", - "cutoff_frequency": 0.001, - "low_freq_operator": "and" + "cutoff_frequency": 0.001, + "low_freq_operator": "and" } } } @@ -200,11 +200,11 @@ GET /_search "common": { "body": { "query": "nelly the elephant not as a cartoon", - "cutoff_frequency": 0.001, - "minimum_should_match": { - "low_freq" : 2, - "high_freq" : 3 - } + "cutoff_frequency": 0.001, + "minimum_should_match": { + "low_freq" : 2, + "high_freq" : 3 + } } } } @@ -261,11 +261,11 @@ GET /_search "common": { "body": { "query": "how not to be", - "cutoff_frequency": 0.001, - "minimum_should_match": { - "low_freq" : 2, - "high_freq" : 3 - } + "cutoff_frequency": 0.001, + "minimum_should_match": { + "low_freq" : 2, + "high_freq" : 3 + } } } } From 3904a189e3058811c55ae4474d91587c6b2fca69 Mon Sep 17 00:00:00 2001 From: Clinton Gormley Date: Mon, 30 Oct 2017 14:26:48 +0100 Subject: [PATCH 4/6] Added release notes for 6.0.0-rc2 --- docs/reference/release-notes.asciidoc | 2 + .../release-notes/6.0.0-rc2.asciidoc | 118 ++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 docs/reference/release-notes/6.0.0-rc2.asciidoc diff --git a/docs/reference/release-notes.asciidoc b/docs/reference/release-notes.asciidoc index ee6f3794d471d..4b8b3b470952c 100644 --- a/docs/reference/release-notes.asciidoc +++ b/docs/reference/release-notes.asciidoc @@ -5,6 +5,7 @@ -- This section summarizes the changes in each release. +* <> * <> * <> * <> @@ -13,6 +14,7 @@ This section summarizes the changes in each release. * <> -- +include::release-notes/6.0.0-rc2.asciidoc[] include::release-notes/6.0.0-rc1.asciidoc[] include::release-notes/6.0.0-beta2.asciidoc[] include::release-notes/6.0.0-beta1.asciidoc[] diff --git a/docs/reference/release-notes/6.0.0-rc2.asciidoc b/docs/reference/release-notes/6.0.0-rc2.asciidoc new file mode 100644 index 0000000000000..e1e296b7436f5 --- /dev/null +++ b/docs/reference/release-notes/6.0.0-rc2.asciidoc @@ -0,0 +1,118 @@ +[[release-notes-6.0.0-rc2]] +== 6.0.0-rc2 Release Notes + +Also see <>. + +[[breaking-6.0.0-rc2]] +[float] +=== Breaking changes + +Inner Hits:: +* Return the _source of inner hit nested as is without wrapping it into its full path context {pull}26982[#26982] (issues: {issue}26102[#26102], {issue}26944[#26944]) + + + +[[enhancement-6.0.0-rc2]] +[float] +=== Enhancements + +Core:: +* Ignore .DS_Store files on macOS {pull}27108[#27108] (issue: {issue}23982[#23982]) + +Index Templates:: +* Fix error message for a put index template request without index_patterns {pull}27102[#27102] (issue: {issue}27100[#27100]) + +Mapping:: +* Don't detect source's XContentType in DocumentParser.parseDocument() {pull}26880[#26880] + +Network:: +* Add additional low-level logging handler {pull}26887[#26887] +* Unwrap causes when maybe dying {pull}26884[#26884] + +Plugins:: +* Adjust SHA-512 supported format on plugin install {pull}27093[#27093] + +REST:: +* Cat shards bytes {pull}26952[#26952] + + + +[[bug-6.0.0-rc2]] +[float] +=== Bug fixes + +Aggregations:: +* Create weights lazily in filter and filters aggregation {pull}26983[#26983] +* Fix IndexOutOfBoundsException in histograms for NaN doubles (#26787) {pull}26856[#26856] (issue: {issue}26787[#26787]) +* Scripted_metric _agg parameter disappears if params are provided {pull}19863[#19863] (issue: {issue}19768[#19768]) + +CAT API:: +* Fix NPE for /_cat/indices when no primary shard {pull}26953[#26953] (issue: {issue}26942[#26942]) + +Cache:: +* Reduce the default number of cached queries. {pull}26949[#26949] (issue: {issue}26938[#26938]) + +Core:: +* Timed runnable should delegate to abstract runnable {pull}27095[#27095] (issue: {issue}27069[#27069]) +* Stop invoking non-existent syscall {pull}27016[#27016] (issue: {issue}20179[#20179]) +* MetaData Builder doesn't properly prevent an alias with the same name as an index {pull}26804[#26804] + +Ingest:: +* date processor should not fail if timestamp is specified as json number {pull}26986[#26986] (issue: {issue}26967[#26967]) +* date_index_name processor should not fail if timestamp is specified as json number {pull}26910[#26910] (issue: {issue}26890[#26890]) + +Internal:: +* Upgrade Lucene to version 7.0.1 {pull}26926[#26926] + +Java High Level REST Client:: +* Make ShardSearchTarget optional when parsing ShardSearchFailure {pull}27078[#27078] (issue: {issue}27055[#27055]) + +Java REST Client:: +* rest-client-sniffer: configurable threadfactory {pull}26897[#26897] + +Mapping:: +* wrong link target for datatype murmur3 {pull}27143[#27143] + +Network:: +* Check for closed connection while opening {pull}26932[#26932] + +Packaging:: +* Fix handling of Windows paths containing parentheses {pull}26916[#26916] (issue: {issue}26454[#26454]) + +Percolator:: +* Also support query extraction for queries wrapped inside a ESToParentBlockJoinQuery {pull}26754[#26754] + +Plugin Analysis Phonetic:: +* Fix beidermorse phonetic token filter for unspecified `languageset` {pull}27112[#27112] (issue: {issue}26771[#26771]) + +Plugin Repository Azure:: +* Use Azure upload method instead of our own implementation {pull}26751[#26751] + +REST:: +* Fix inconsistencies in the rest api specs for cat.snapshots {pull}26996[#26996] (issues: {issue}25737[#25737], {issue}26923[#26923]) +* Fix inconsistencies in the rest api specs for *_script {pull}26971[#26971] (issue: {issue}26923[#26923]) +* exists template needs a template name {pull}25988[#25988] + +Reindex API:: +* Fix update_by_query's default size parameter {pull}26784[#26784] (issue: {issue}26761[#26761]) + +Search:: +* Avoid stack overflow on search phases {pull}27069[#27069] (issue: {issue}27042[#27042]) +* Fix search_after with geo distance sorting {pull}26891[#26891] +* Fix serialization errors when cross cluster search goes to a single shard {pull}26881[#26881] (issue: {issue}26833[#26833]) +* Early termination with index sorting should not set terminated_early in the response {pull}26597[#26597] (issue: {issue}26408[#26408]) +* Format doc values fields. {pull}22146[#22146] + +Sequence IDs:: +* Fire global checkpoint sync under system context {pull}26984[#26984] + +Settings:: +* Emit settings deprecation logging on empty update {pull}27017[#27017] (issue: {issue}26419[#26419]) +* Fix filtering for ListSetting {pull}26914[#26914] + +Stats:: +* Keep cumulative elapsed scroll time in microseconds {pull}27068[#27068] (issue: {issue}27046[#27046]) + +Suggesters:: +* Fix division by zero in phrase suggester that causes assertion to fail {pull}27149[#27149] + From 966bd7e854bc3a7bb9d13eb3b50c8fe6d3da8bbc Mon Sep 17 00:00:00 2001 From: Dimitrios Athanasiou Date: Mon, 30 Oct 2017 15:19:43 +0000 Subject: [PATCH 5/6] [Docs] Fix note in bucket_selector --- .../aggregations/pipeline/bucket-selector-aggregation.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc b/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc index 1dc44876c5361..cd0218e7c4353 100644 --- a/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/bucket-selector-aggregation.asciidoc @@ -6,7 +6,7 @@ in the parent multi-bucket aggregation. The specified metric must be numeric and If the script language is `expression` then a numeric return value is permitted. In this case 0.0 will be evaluated as `false` and all other values will evaluate to true. -Note: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that +NOTE: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that using the bucket_selector aggregation to filter the returned buckets in the response does not save on execution time running the aggregations. ==== Syntax From 39ef2c48a30e7476b9ce1fee0f52e90a2736fc69 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 30 Oct 2017 13:10:20 -0400 Subject: [PATCH 6/6] Refactor internal engine This commit is a minor refactoring of internal engine to move hooks for generating sequence numbers into the engine itself. As such, we refactor tests that relied on this hook to use the new hook, and remove the hook from the sequence number service itself. Relates #27082 --- .../elasticsearch/index/engine/Engine.java | 6 +- .../index/engine/InternalEngine.java | 147 ++++--- .../index/seqno/SequenceNumbersService.java | 2 +- .../index/engine/InternalEngineTests.java | 401 +++++++++--------- .../IndexLevelReplicationTests.java | 2 +- .../RecoveryDuringReplicationTests.java | 1 + .../index/engine/TranslogHandler.java | 145 +++++++ 7 files changed, 433 insertions(+), 271 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 1cf18cb4ee5c8..4299fa0cb6ea3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -572,7 +572,11 @@ public CommitStats commitStats() { return new CommitStats(getLastCommittedSegmentInfos()); } - /** get the sequence number service */ + /** + * The sequence number service for this engine. + * + * @return the sequence number service + */ public abstract SequenceNumbersService seqNoService(); /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2d0842ba32e80..39ddc02629b51 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -145,6 +145,12 @@ public class InternalEngine extends Engine { private final String historyUUID; public InternalEngine(EngineConfig engineConfig) { + this(engineConfig, InternalEngine::sequenceNumberService); + } + + InternalEngine( + final EngineConfig engineConfig, + final BiFunction seqNoServiceSupplier) { super(engineConfig); openMode = engineConfig.getOpenMode(); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { @@ -152,11 +158,11 @@ public InternalEngine(EngineConfig engineConfig) { } this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( - engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), - engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() + engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), + engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() ); this.deletionPolicy = new CombinedDeletionPolicy( - new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); + new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode); store.incRef(); IndexWriter writer = null; Translog translog = null; @@ -184,20 +190,20 @@ public InternalEngine(EngineConfig engineConfig) { case CREATE_INDEX_AND_TRANSLOG: writer = createWriter(true); seqNoStats = new SeqNoStats( - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO); + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.UNASSIGNED_SEQ_NO); break; default: throw new IllegalArgumentException(openMode.toString()); } logger.trace("recovered [{}]", seqNoStats); - seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats); + seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID()); Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; - translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); + translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService.getGlobalCheckpoint()); assert translog.getGeneration() != null; // we can only do this after we generated and committed a translog uuid. other wise the combined // retention policy, which listens to commits, gets all confused. @@ -243,12 +249,12 @@ public InternalEngine(EngineConfig engineConfig) { public void restoreLocalCheckpointFromTranslog() throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); - final long localCheckpoint = seqNoService().getLocalCheckpoint(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() > localCheckpoint) { - seqNoService().markSeqNoAsCompleted(operation.seqNo()); + seqNoService.markSeqNoAsCompleted(operation.seqNo()); } } } @@ -259,17 +265,17 @@ public void restoreLocalCheckpointFromTranslog() throws IOException { public int fillSeqNoGaps(long primaryTerm) throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { ensureOpen(); - final long localCheckpoint = seqNoService().getLocalCheckpoint(); - final long maxSeqNo = seqNoService().getMaxSeqNo(); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); + final long maxSeqNo = seqNoService.getMaxSeqNo(); int numNoOpsAdded = 0; for ( long seqNo = localCheckpoint + 1; seqNo <= maxSeqNo; - seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) { + seqNo = seqNoService.getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) { innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps")); numNoOpsAdded++; - assert seqNo <= seqNoService().getLocalCheckpoint() - : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]"; + assert seqNo <= seqNoService.getLocalCheckpoint() + : "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService.getLocalCheckpoint() + "]"; } return numNoOpsAdded; @@ -287,15 +293,13 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); } - private static SequenceNumbersService sequenceNumberService( - final ShardId shardId, - final String allocationId, - final IndexSettings indexSettings, + static SequenceNumbersService sequenceNumberService( + final EngineConfig engineConfig, final SeqNoStats seqNoStats) { return new SequenceNumbersService( - shardId, - allocationId, - indexSettings, + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), seqNoStats.getMaxSeqNo(), seqNoStats.getLocalCheckpoint(), seqNoStats.getGlobalCheckpoint()); @@ -634,8 +638,7 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" + " index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo; } else if (origin == Operation.Origin.PRIMARY) { - // sequence number should not be set when operation origin is primary - assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "primary ops should never have an assigned seq no.; seqNo: " + seqNo; + assert assertOriginPrimarySequenceNumber(seqNo); } else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) { // sequence number should be set when operation origin is not primary assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin; @@ -643,6 +646,13 @@ private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origi return true; } + protected boolean assertOriginPrimarySequenceNumber(final long seqNo) { + // sequence number should not be set when operation origin is primary + assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO + : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; + return true; + } + private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) || origin == Operation.Origin.PRIMARY) { @@ -652,6 +662,20 @@ private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin return true; } + private long generateSeqNoForOperation(final Operation operation) { + assert operation.origin() == Operation.Origin.PRIMARY; + return doGenerateSeqNoForOperation(operation); + } + + /** + * Generate the sequence number for the specified operation. + * + * @param operation the operation + * @return the sequence number + */ + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoService.generateSeqNo(); + } @Override public IndexResult index(Index index) throws IOException { @@ -721,7 +745,7 @@ public IndexResult index(Index index) throws IOException { indexResult.setTranslogLocation(location); } if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(indexResult.getSeqNo()); + seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo()); } indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); @@ -741,14 +765,12 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio final IndexingStrategy plan; if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) { // no need to deal with out of order delivery - we never saw this one - assert index.version() == 1L : - "can optimize on replicas but incoming version is [" + index.version() + "]"; + assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]"; plan = IndexingStrategy.optimizedAppendOnly(index.seqNo()); } else { // drop out of order operations assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() : - "resolving out of order delivery based on versioning but version type isn't fit for it. got [" - + index.versionType() + "]"; + "resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]"; // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in // a delete state and return false for the created flag in favor of code simplicity @@ -783,15 +805,14 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { - assert index.origin() == Operation.Origin.PRIMARY : - "planing as primary but origin isn't. got " + index.origin(); + assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { if (mayHaveBeenIndexedBefore(index)) { - plan = IndexingStrategy.overrideExistingAsIfNotThere(seqNoService().generateSeqNo(), 1L); + plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L); } else { - plan = IndexingStrategy.optimizedAppendOnly(seqNoService().generateSeqNo()); + plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index)); } } else { // resolves incoming version @@ -812,7 +833,7 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion); } else { plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted, - seqNoService().generateSeqNo(), + generateSeqNoForOperation(index), index.versionType().updateVersion(currentVersion, index.version()) ); } @@ -1021,7 +1042,7 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult.setTranslogLocation(location); } if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo()); + seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo()); } deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); @@ -1038,8 +1059,7 @@ public DeleteResult delete(Delete delete) throws IOException { } private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException { - assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " - + delete.origin(); + assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); // drop out of order operations assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() : "resolving out of order delivery based on versioning but version type isn't fit for it. got [" @@ -1077,8 +1097,7 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0 } private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { - assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " - + delete.origin(); + assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete); assert incrementVersionLookup(); @@ -1096,9 +1115,10 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); } else { - plan = DeletionStrategy.processNormally(currentlyDeleted, - seqNoService().generateSeqNo(), - delete.versionType().updateVersion(currentVersion, delete.version())); + plan = DeletionStrategy.processNormally( + currentlyDeleted, + generateSeqNoForOperation(delete), + delete.versionType().updateVersion(currentVersion, delete.version())); } return plan; } @@ -1199,7 +1219,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { return noOpResult; } finally { if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(seqNo); + seqNoService.markSeqNoAsCompleted(seqNo); } } } @@ -1932,13 +1952,13 @@ protected void doRun() throws Exception { * @throws IOException if an I/O exception occurs committing the specfied writer */ protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { - final long localCheckpoint = seqNoService().getLocalCheckpoint(); - final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); - final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); - final String translogUUID = translogGeneration.translogUUID; - final String localCheckpointValue = Long.toString(localCheckpoint); + final long localCheckpoint = seqNoService.getLocalCheckpoint(); + final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); + final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); + final String translogUUID = translogGeneration.translogUUID; + final String localCheckpointValue = Long.toString(localCheckpoint); - final Iterable> commitIterable = () -> { + final Iterable> commitIterable = () -> { /* * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want @@ -1948,19 +1968,19 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(6); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); - commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); - if (syncId != null) { - commitData.put(Engine.SYNC_COMMIT_ID, syncId); - } - commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); - commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); - commitData.put(HISTORY_UUID_KEY, historyUUID); - logger.trace("committing writer with commit data [{}]", commitData); - return commitData.entrySet().iterator(); - }; + final Map commitData = new HashMap<>(6); + commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); + commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); + if (syncId != null) { + commitData.put(Engine.SYNC_COMMIT_ID, syncId); + } + commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService.getMaxSeqNo())); + commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); + commitData.put(HISTORY_UUID_KEY, historyUUID); + logger.trace("committing writer with commit data [{}]", commitData); + return commitData.entrySet().iterator(); + }; commitIndexWriter(writer, commitIterable); } @@ -2036,8 +2056,7 @@ public MergeStats getMergeStats() { return mergeScheduler.stats(); } - @Override - public SequenceNumbersService seqNoService() { + public final SequenceNumbersService seqNoService() { return seqNoService; } diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index 1c8911a0cd886..1c0b320558400 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -66,7 +66,7 @@ public SequenceNumbersService( * * @return the next assigned sequence number */ - public long generateSeqNo() { + public final long generateSeqNo() { return localCheckpointTracker.generateSeqNo(); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index d2b15c0a113d9..f6d2d99658098 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.filter.RegexFilter; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat; import org.apache.lucene.document.Field; @@ -95,46 +94,36 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.analysis.AnalyzerScope; -import org.elasticsearch.index.analysis.IndexAnalyzers; -import org.elasticsearch.index.analysis.NamedAnalyzer; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine.Searcher; import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.ContentPath; -import org.elasticsearch.index.mapper.DocumentMapper; -import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.Mapper.BuilderContext; -import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexSearcherWrapper; -import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; -import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryUtils; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; -import org.elasticsearch.indices.IndicesModule; -import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -171,9 +160,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; -import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.function.ToLongBiFunction; import java.util.stream.Collectors; import java.util.stream.LongStream; @@ -184,7 +173,6 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; -import static org.elasticsearch.index.mapper.SourceToParse.source; import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.equalTo; @@ -356,11 +344,19 @@ protected InternalEngine createEngine(Store store, Path translogPath) throws IOE return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null); } - protected InternalEngine createEngine(Store store, Path translogPath, - Function sequenceNumbersServiceSupplier) throws IOException { + protected InternalEngine createEngine(Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier) throws IOException { return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier); } + protected InternalEngine createEngine(Store store, + Path translogPath, + BiFunction sequenceNumbersServiceSupplier, + ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null); + } + protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException { return createEngine(indexSettings, store, translogPath, mergePolicy, null); @@ -377,8 +373,19 @@ protected InternalEngine createEngine( Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory, - @Nullable Function sequenceNumbersServiceSupplier) throws IOException { - return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null); + @Nullable BiFunction sequenceNumbersServiceSupplier) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null); + } + + protected InternalEngine createEngine( + IndexSettings indexSettings, + Store store, + Path translogPath, + MergePolicy mergePolicy, + @Nullable IndexWriterFactory indexWriterFactory, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation) throws IOException { + return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, null); } protected InternalEngine createEngine( @@ -387,10 +394,11 @@ protected InternalEngine createEngine( Path translogPath, MergePolicy mergePolicy, @Nullable IndexWriterFactory indexWriterFactory, - @Nullable Function sequenceNumbersServiceSupplier, + @Nullable BiFunction sequenceNumbersServiceSupplier, + @Nullable ToLongBiFunction seqNoForOperation, @Nullable Sort indexSort) throws IOException { EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); - InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config); + InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config); if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); } @@ -404,21 +412,39 @@ public interface IndexWriterFactory { } public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory, - @Nullable final Function sequenceNumbersServiceSupplier, + @Nullable final BiFunction sequenceNumbersServiceSupplier, + @Nullable final ToLongBiFunction seqNoForOperation, final EngineConfig config) { - return new InternalEngine(config) { - @Override - IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { - return (indexWriterFactory != null) ? - indexWriterFactory.createWriter(directory, iwc) : - super.createWriter(directory, iwc); - } + if (sequenceNumbersServiceSupplier == null) { + return new InternalEngine(config) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } + + @Override + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); + } + }; + } else { + return new InternalEngine(config, sequenceNumbersServiceSupplier) { + @Override + IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException { + return (indexWriterFactory != null) ? + indexWriterFactory.createWriter(directory, iwc) : + super.createWriter(directory, iwc); + } @Override - public SequenceNumbersService seqNoService() { - return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.apply(config) : super.seqNoService(); + protected long doGenerateSeqNoForOperation(final Operation operation) { + return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation); } }; + } + } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, @@ -672,8 +698,8 @@ public void testSegmentsWithMergeFlag() throws Exception { public void testSegmentsWithIndexSort() throws Exception { Sort indexSort = new Sort(new SortedSetSortField("_type", false)); try (Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - null, null, indexSort)) { + Engine engine = + createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, null, indexSort)) { List segments = engine.segments(true); assertThat(segments.isEmpty(), equalTo(true)); @@ -729,13 +755,28 @@ public void testCommitStats() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); try ( Store store = createStore(); - InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService( - config.getShardId(), - config.getAllocationId(), - config.getIndexSettings(), - maxSeqNo.get(), - localCheckpoint.get(), - globalCheckpoint.get()) + InternalEngine engine = createEngine(store, createTempDir(), (config, seqNoStats) -> new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { + @Override + public long getMaxSeqNo() { + return maxSeqNo.get(); + } + + @Override + public long getLocalCheckpoint() { + return localCheckpoint.get(); + } + + @Override + public long getGlobalCheckpoint() { + return globalCheckpoint.get(); + } + } )) { CommitStats stats1 = engine.commitStats(); assertThat(stats1.getGeneration(), greaterThan(0L)); @@ -902,20 +943,11 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException { Store store = createStore(); final AtomicInteger counter = new AtomicInteger(); try { - initialEngine = createEngine(store, createTempDir(), (config) -> - new SequenceNumbersService( - config.getShardId(), - config.getAllocationId(), - config.getIndexSettings(), - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { - @Override - public long generateSeqNo() { - return seqNos.get(counter.getAndIncrement()); - } - } - ); + initialEngine = createEngine( + store, + createTempDir(), + InternalEngine::sequenceNumberService, + (engine, operation) -> seqNos.get(counter.getAndIncrement())); for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); @@ -2711,7 +2743,7 @@ public void testTranslogReplay() throws IOException { assertVisibleCount(engine, numDocs, false); parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(numDocs, parser.appliedOperations.get()); + assertEquals(numDocs, parser.appliedOperations()); if (parser.mappingUpdate != null) { assertEquals(1, parser.getRecoveredTypes().size()); assertTrue(parser.getRecoveredTypes().containsKey("test")); @@ -2723,7 +2755,7 @@ public void testTranslogReplay() throws IOException { engine = createEngine(store, primaryTranslogDir); assertVisibleCount(engine, numDocs, false); parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(0, parser.appliedOperations.get()); + assertEquals(0, parser.appliedOperations()); final boolean flush = randomBoolean(); int randomId = randomIntBetween(numDocs + 1, numDocs + 10); @@ -2753,7 +2785,7 @@ public void testTranslogReplay() throws IOException { assertThat(topDocs.totalHits, equalTo(numDocs + 1L)); } parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner(); - assertEquals(flush ? 1 : 2, parser.appliedOperations.get()); + assertEquals(flush ? 1 : 2, parser.appliedOperations()); engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc))); if (randomBoolean()) { engine.refresh("test"); @@ -2767,97 +2799,6 @@ public void testTranslogReplay() throws IOException { } } - public static class TranslogHandler implements EngineConfig.TranslogRecoveryRunner { - - private final MapperService mapperService; - public Mapping mappingUpdate = null; - private final Map recoveredTypes = new HashMap<>(); - private final AtomicLong appliedOperations = new AtomicLong(); - - public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { - NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()); - IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap()); - SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap()); - MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); - mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, - () -> null); - } - - private DocumentMapperForType docMapper(String type) { - RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type); - DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService); - return new DocumentMapperForType(b.build(mapperService), mappingUpdate); - } - - private void applyOperation(Engine engine, Engine.Operation operation) throws IOException { - switch (operation.operationType()) { - case INDEX: - Engine.Index engineIndex = (Engine.Index) operation; - Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate(); - if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { - recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false)); - } - engine.index(engineIndex); - break; - case DELETE: - engine.delete((Engine.Delete) operation); - break; - case NO_OP: - engine.noOp((Engine.NoOp) operation); - break; - default: - throw new IllegalStateException("No operation defined for [" + operation + "]"); - } - } - - /** - * Returns the recovered types modifying the mapping during the recovery - */ - public Map getRecoveredTypes() { - return recoveredTypes; - } - - @Override - public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { - int opsRecovered = 0; - Translog.Operation operation; - while ((operation = snapshot.next()) != null) { - applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); - opsRecovered++; - appliedOperations.incrementAndGet(); - } - return opsRecovered; - } - - private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { - switch (operation.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) operation; - final String indexName = mapperService.index().getName(); - final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), - mapperService.getIndexSettings().getIndexVersionCreated(), - source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) - .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), - index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, - index.getAutoGeneratedIdTimestamp(), true); - return engineIndex; - case DELETE: - final Translog.Delete delete = (Translog.Delete) operation; - final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), - delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), - origin, System.nanoTime()); - return engineDelete; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) operation; - final Engine.NoOp engineNoOp = - new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason()); - return engineNoOp; - default: - throw new IllegalStateException("No operation defined for [" + operation + "]"); - } - } - } - public void testRecoverFromForeignTranslog() throws IOException { final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { @@ -3786,47 +3727,38 @@ public void testSequenceIDs() throws Exception { } /** - * A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier - * and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the - * value of {@code expectedLocalCheckpoint} is set accordingly. + * A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the + * referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of + * {@code expectedLocalCheckpoint} is set accordingly. * * @param latchReference to latch the thread for the purpose of stalling * @param barrier to signal the thread has generated a new sequence number * @param stall whether or not the thread should stall * @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence * number - * @return a sequence number service + * @return a sequence number generator */ - private SequenceNumbersService getStallingSeqNoService( + private ToLongBiFunction getStallingSeqNoGenerator( final AtomicReference latchReference, final CyclicBarrier barrier, final AtomicBoolean stall, final AtomicLong expectedLocalCheckpoint) { - return new SequenceNumbersService( - shardId, - allocationId.getId(), - defaultSettings, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { - @Override - public long generateSeqNo() { - final long seqNo = super.generateSeqNo(); - final CountDownLatch latch = latchReference.get(); - if (stall.get()) { - try { - barrier.await(); - latch.await(); - } catch (BrokenBarrierException | InterruptedException e) { - throw new RuntimeException(e); - } - } else { - if (expectedLocalCheckpoint.get() + 1 == seqNo) { - expectedLocalCheckpoint.set(seqNo); - } + return (engine, operation) -> { + final long seqNo = engine.seqNoService().generateSeqNo(); + final CountDownLatch latch = latchReference.get(); + if (stall.get()) { + try { + barrier.await(); + latch.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { + if (expectedLocalCheckpoint.get() + 1 == seqNo) { + expectedLocalCheckpoint.set(seqNo); } - return seqNo; } + return seqNo; }; } @@ -3840,8 +3772,8 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro final AtomicBoolean stall = new AtomicBoolean(); final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final List threads = new ArrayList<>(); - final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); - initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService); + initialEngine = + createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint)); final InternalEngine finalInitialEngine = initialEngine; for (int i = 0; i < docs; i++) { final String id = Integer.toString(i); @@ -4015,17 +3947,17 @@ public void testNoOps() throws IOException { final int localCheckpoint = randomIntBetween(0, maxSeqNo); final int globalCheckpoint = randomIntBetween(0, localCheckpoint); try { - final SequenceNumbersService seqNoService = - new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) { - @Override - public long generateSeqNo() { - throw new UnsupportedOperationException(); - } - }; - noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { + final BiFunction supplier = (engineConfig, ignored) -> new SequenceNumbersService( + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), + maxSeqNo, + localCheckpoint, + globalCheckpoint); + noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier) { @Override - public SequenceNumbersService seqNoService() { - return seqNoService; + protected long doGenerateSeqNoForOperation(Operation operation) { + throw new UnsupportedOperationException(); } }; noOpEngine.recoverFromTranslog(); @@ -4070,8 +4002,8 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti final AtomicBoolean stall = new AtomicBoolean(); final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final Map threads = new LinkedHashMap<>(); - final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint); - actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService); + actualEngine = + createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint)); final InternalEngine finalActualEngine = actualEngine; final Translog translog = finalActualEngine.getTranslog(); final long generation = finalActualEngine.getTranslog().currentFileGeneration(); @@ -4160,26 +4092,20 @@ public void testRestoreLocalCheckpointFromTranslog() throws IOException { InternalEngine actualEngine = null; try { final Set completedSeqNos = new HashSet<>(); - final SequenceNumbersService seqNoService = - new SequenceNumbersService( - shardId, - allocationId.getId(), - defaultSettings, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.NO_OPS_PERFORMED, - SequenceNumbers.UNASSIGNED_SEQ_NO) { + final BiFunction supplier = (engineConfig, seqNoStats) -> new SequenceNumbersService( + engineConfig.getShardId(), + engineConfig.getAllocationId(), + engineConfig.getIndexSettings(), + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()) { @Override public void markSeqNoAsCompleted(long seqNo) { super.markSeqNoAsCompleted(seqNo); completedSeqNos.add(seqNo); } }; - actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) { - @Override - public SequenceNumbersService seqNoService() { - return seqNoService; - } - }; + actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier); final int operations = randomIntBetween(0, 1024); final Set expectedCompletedSeqNos = new HashSet<>(); for (int i = 0; i < operations; i++) { @@ -4347,4 +4273,71 @@ public void testRefreshScopedSearcher() throws IOException { assertSameReader(getSearcher, searchSearcher); } } + + public void testSeqNoGenerator() throws IOException { + engine.close(); + final long seqNo = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE); + final BiFunction seqNoService = (config, seqNoStats) -> new SequenceNumbersService( + config.getShardId(), + config.getAllocationId(), + config.getIndexSettings(), + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.NO_OPS_PERFORMED, + SequenceNumbers.UNASSIGNED_SEQ_NO); + final AtomicLong seqNoGenerator = new AtomicLong(seqNo); + try (Engine e = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, seqNoService, (engine, operation) -> seqNoGenerator.getAndIncrement())) { + final String id = "id"; + final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE); + final String type = "type"; + final Field versionField = new NumericDocValuesField("_version", 0); + final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + final ParseContext.Document document = new ParseContext.Document(); + document.add(uidField); + document.add(versionField); + document.add(seqID.seqNo); + document.add(seqID.seqNoDocValue); + document.add(seqID.primaryTerm); + final BytesReference source = new BytesArray(new byte[]{1}); + final ParsedDocument parsedDocument = new ParsedDocument( + versionField, + seqID, + id, + type, + "routing", + Collections.singletonList(document), + source, + XContentType.JSON, + null); + + final Engine.Index index = new Engine.Index( + new Term("_id", parsedDocument.id()), + parsedDocument, + SequenceNumbers.UNASSIGNED_SEQ_NO, + (long) randomIntBetween(1, 8), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis(), + System.currentTimeMillis(), + randomBoolean()); + final Engine.IndexResult indexResult = e.index(index); + assertThat(indexResult.getSeqNo(), equalTo(seqNo)); + assertThat(seqNoGenerator.get(), equalTo(seqNo + 1)); + + final Engine.Delete delete = new Engine.Delete( + type, + id, + new Term("_id", parsedDocument.id()), + SequenceNumbers.UNASSIGNED_SEQ_NO, + (long) randomIntBetween(1, 8), + Versions.MATCH_ANY, + VersionType.INTERNAL, + Engine.Operation.Origin.PRIMARY, + System.currentTimeMillis()); + final Engine.DeleteResult deleteResult = e.delete(delete); + assertThat(deleteResult.getSeqNo(), equalTo(seqNo + 1)); + assertThat(seqNoGenerator.get(), equalTo(seqNo + 2)); + } + } + } diff --git a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 1447ea8ae50a9..cf4dab733f237 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -316,7 +316,7 @@ public long addDocument(Iterable doc) throws IOExcepti assert documentFailureMessage != null; throw new IOException(documentFailureMessage); } - }, null, config); + }, null, null, config); } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 96f6aa6d47acb..844d6b0aaf957 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -637,6 +637,7 @@ public long addDocument(final Iterable doc) throws IOE } }, null, + null, config); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java new file mode 100644 index 0000000000000..6834d124c499a --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java @@ -0,0 +1,145 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.analysis.AnalyzerScope; +import org.elasticsearch.index.analysis.IndexAnalyzers; +import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.DocumentMapperForType; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.index.mapper.RootObjectMapper; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.similarity.SimilarityService; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.IndicesModule; +import org.elasticsearch.indices.mapper.MapperRegistry; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; +import static org.elasticsearch.index.mapper.SourceToParse.source; + +public class TranslogHandler implements EngineConfig.TranslogRecoveryRunner { + + private final MapperService mapperService; + public Mapping mappingUpdate = null; + private final Map recoveredTypes = new HashMap<>(); + + private final AtomicLong appliedOperations = new AtomicLong(); + + long appliedOperations() { + return appliedOperations.get(); + } + + public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) { + NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer()); + IndexAnalyzers indexAnalyzers = + new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, emptyMap(), emptyMap()); + SimilarityService similarityService = new SimilarityService(indexSettings, null, emptyMap()); + MapperRegistry mapperRegistry = new IndicesModule(emptyList()).getMapperRegistry(); + mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry, + () -> null); + } + + private DocumentMapperForType docMapper(String type) { + RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type); + DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService); + return new DocumentMapperForType(b.build(mapperService), mappingUpdate); + } + + private void applyOperation(Engine engine, Engine.Operation operation) throws IOException { + switch (operation.operationType()) { + case INDEX: + Engine.Index engineIndex = (Engine.Index) operation; + Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate(); + if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) { + recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false)); + } + engine.index(engineIndex); + break; + case DELETE: + engine.delete((Engine.Delete) operation); + break; + case NO_OP: + engine.noOp((Engine.NoOp) operation); + break; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + + /** + * Returns the recovered types modifying the mapping during the recovery + */ + public Map getRecoveredTypes() { + return recoveredTypes; + } + + @Override + public int run(Engine engine, Translog.Snapshot snapshot) throws IOException { + int opsRecovered = 0; + Translog.Operation operation; + while ((operation = snapshot.next()) != null) { + applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY)); + opsRecovered++; + appliedOperations.incrementAndGet(); + } + return opsRecovered; + } + + private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { + switch (operation.opType()) { + case INDEX: + final Translog.Index index = (Translog.Index) operation; + final String indexName = mapperService.index().getName(); + final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), + mapperService.getIndexSettings().getIndexVersionCreated(), + source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source())) + .routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(), + index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, + index.getAutoGeneratedIdTimestamp(), true); + return engineIndex; + case DELETE: + final Translog.Delete delete = (Translog.Delete) operation; + final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), + delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), + origin, System.nanoTime()); + return engineDelete; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) operation; + final Engine.NoOp engineNoOp = + new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason()); + return engineNoOp; + default: + throw new IllegalStateException("No operation defined for [" + operation + "]"); + } + } + +}