From d90daaefec42d506dc6a9c2e0155236c676571e6 Mon Sep 17 00:00:00 2001 From: Jinsoo <46536764+jinsoor-amzn@users.noreply.github.com> Date: Thu, 20 Jun 2019 10:39:03 -0700 Subject: [PATCH] update ScheduledJobParser to use the JobDocVersion (#15) * update ScheduledJobParser to use the JobDocVersion --- .../sampleextension/SampleExtensionPlugin.java | 2 +- .../jobscheduler/spi/ScheduledJobParser.java | 2 +- .../jobscheduler/sweeper/JobSweeper.java | 18 +++++++++--------- .../jobscheduler/sweeper/JobSweeperTests.java | 3 ++- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/sample-extension-plugin/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sampleextension/SampleExtensionPlugin.java b/sample-extension-plugin/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sampleextension/SampleExtensionPlugin.java index fcfdeb0..814bb76 100644 --- a/sample-extension-plugin/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sampleextension/SampleExtensionPlugin.java +++ b/sample-extension-plugin/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sampleextension/SampleExtensionPlugin.java @@ -91,7 +91,7 @@ public ScheduledJobRunner getJobRunner() { @Override public ScheduledJobParser getJobParser() { - return (parser, id, version) -> { + return (parser, id, jobDocVersion) -> { SampleJobParameter jobParameter = new SampleJobParameter(); XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); diff --git a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/ScheduledJobParser.java b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/ScheduledJobParser.java index 4ba50d8..92243e1 100644 --- a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/ScheduledJobParser.java +++ b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/ScheduledJobParser.java @@ -20,5 +20,5 @@ import java.io.IOException; public interface ScheduledJobParser { - ScheduledJobParameter parse(XContentParser xContentParser, String id, Long version) throws IOException; + ScheduledJobParameter parse(XContentParser xContentParser, String id, JobDocVersion jobDocVersion) throws IOException; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeper.java b/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeper.java index 534a055..1193518 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeper.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeper.java @@ -214,7 +214,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul } @VisibleForTesting - void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersion version) { + void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersion jobDocVersion) { ConcurrentHashMap jobVersionMap; if (this.sweptJobs.containsKey(shardId)) { jobVersionMap = this.sweptJobs.get(shardId); @@ -222,10 +222,10 @@ void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersio jobVersionMap = new ConcurrentHashMap<>(); this.sweptJobs.put(shardId, jobVersionMap); } - jobVersionMap.compute(docId, (id, currentVersion) -> { - if (version.compareTo(currentVersion) <= 0) { - log.debug("Skipping job {}, new version {} <= current version {}", docId, version, currentVersion); - return currentVersion; + jobVersionMap.compute(docId, (id, currentJobDocVersion) -> { + if (jobDocVersion.compareTo(currentJobDocVersion) <= 0) { + log.debug("Skipping job {}, new version {} <= current version {}", docId, jobDocVersion, currentJobDocVersion); + return currentJobDocVersion; } if (this.scheduler.getScheduledJobIds(shardId.getIndexName()).contains(docId)) { @@ -236,20 +236,20 @@ void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersio ScheduledJobProvider provider = this.indexToProviders.get(shardId.getIndexName()); XContentParser parser = XContentHelper.createParser(this.xContentRegistry, LoggingDeprecationHandler.INSTANCE, jobSource, XContentType.JSON); - ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, version.getVersion()); + ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, jobDocVersion); if (jobParameter == null) { // allow parser to return null, which means this is not a scheduled job document. return null; } ScheduledJobRunner jobRunner = this.indexToProviders.get(shardId.getIndexName()).getJobRunner(); if (jobParameter.isEnabled()) { - this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, version); + this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, jobDocVersion); } - return version; + return jobDocVersion; } catch (Exception e) { log.warn("Unable to parse job, error message: {} , message source: {}", e.getMessage(), Strings.cleanTruncate(jobSource.utf8ToString(), 1000)); - return currentVersion; + return currentJobDocVersion; } } else { return null; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeperTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeperTests.java index f9dd06b..df57ebe 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeperTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeperTests.java @@ -261,7 +261,8 @@ public void testSweep() throws IOException { ScheduledJobParameter mockJobParameter = Mockito.mock(ScheduledJobParameter.class); Mockito.when(mockJobParameter.isEnabled()).thenReturn(true); - Mockito.when(this.jobParser.parse(Mockito.any(), Mockito.anyString(), Mockito.anyLong())).thenReturn(mockJobParameter); + Mockito.when(this.jobParser.parse(Mockito.any(), Mockito.anyString(), Mockito.any(JobDocVersion.class))) + .thenReturn(mockJobParameter); this.sweeper.sweep(shardId, "id", this.getTestJsonSource(), new JobDocVersion(1L, 1L, 2L)); Mockito.verify(this.scheduler).schedule(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(),