diff --git a/build.gradle b/build.gradle index 547df6d..2cddd6c 100644 --- a/build.gradle +++ b/build.gradle @@ -71,7 +71,7 @@ integTestCluster { allprojects { group = 'com.amazon.opendistroforelasticsearch' - version = "${opendistroVersion}.0" + version = "${opendistroVersion}.1" apply from: "$rootDir/build-tools/repositories.gradle" diff --git a/sample-extension-plugin/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sampleextension/SampleExtensionPlugin.java b/sample-extension-plugin/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sampleextension/SampleExtensionPlugin.java index fcfdeb0..814bb76 100644 --- a/sample-extension-plugin/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sampleextension/SampleExtensionPlugin.java +++ b/sample-extension-plugin/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sampleextension/SampleExtensionPlugin.java @@ -91,7 +91,7 @@ public ScheduledJobRunner getJobRunner() { @Override public ScheduledJobParser getJobParser() { - return (parser, id, version) -> { + return (parser, id, jobDocVersion) -> { SampleJobParameter jobParameter = new SampleJobParameter(); XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); diff --git a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/ScheduledJobParser.java b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/ScheduledJobParser.java index 4ba50d8..92243e1 100644 --- a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/ScheduledJobParser.java +++ b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/ScheduledJobParser.java @@ -20,5 +20,5 @@ import java.io.IOException; public interface ScheduledJobParser { - ScheduledJobParameter parse(XContentParser xContentParser, String id, Long version) throws IOException; + ScheduledJobParameter parse(XContentParser xContentParser, String id, JobDocVersion jobDocVersion) throws IOException; } diff --git a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/CronSchedule.java b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/CronSchedule.java index ce5d175..3b43c02 100644 --- a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/CronSchedule.java +++ b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/CronSchedule.java @@ -20,6 +20,7 @@ import com.cronutils.model.time.ExecutionTime; import com.cronutils.parser.CronParser; import com.cronutils.utils.VisibleForTesting; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -30,6 +31,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.util.Objects; import java.util.Optional; /** @@ -138,4 +140,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .endObject(); return builder; } + + @Override + public String toString() { + return Strings.toString(this, false, true); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CronSchedule cronSchedule = (CronSchedule) o; + return timezone.equals(cronSchedule.timezone) && + expression.equals(cronSchedule.expression); + } + + @Override + public int hashCode() { + return Objects.hash(timezone, expression); + } } diff --git a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/IntervalSchedule.java b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/IntervalSchedule.java index 7cbc088..0dec003 100644 --- a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/IntervalSchedule.java +++ b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/IntervalSchedule.java @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule; import com.cronutils.utils.VisibleForTesting; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -27,6 +28,8 @@ import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.HashSet; +import java.util.Locale; +import java.util.Objects; import java.util.Set; /** @@ -57,7 +60,9 @@ public class IntervalSchedule implements Schedule { public IntervalSchedule(Instant startTime, int interval, ChronoUnit unit) { if (!SUPPORTED_UNITS.contains(unit)) { - throw new IllegalArgumentException(String.format("Interval unit %s is not supported, expects %s", unit, SUPPORTED_UNITS)); + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Interval unit %s is not supported, expects %s", + unit, SUPPORTED_UNITS)); } this.startTime = startTime; this.interval = interval; @@ -139,4 +144,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws void setClock(Clock clock) { this.clock = clock; } + + @Override + public String toString() { + return Strings.toString(this, false, true); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + IntervalSchedule intervalSchedule = (IntervalSchedule) o; + return startTime.equals(intervalSchedule.startTime) && + interval == intervalSchedule.interval && + unit == intervalSchedule.unit && + intervalInMillis == intervalSchedule.intervalInMillis; + } + + @Override + public int hashCode() { + return Objects.hash(startTime, interval, unit, intervalInMillis); + } } diff --git a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/ScheduleParser.java b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/ScheduleParser.java index 460409e..19f170d 100644 --- a/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/ScheduleParser.java +++ b/spi/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/ScheduleParser.java @@ -22,6 +22,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.temporal.ChronoUnit; +import java.util.Locale; /** * Schedule XContent parser. @@ -43,7 +44,9 @@ public static Schedule parse(XContentParser parser) throws IOException { switch (cronField) { case CronSchedule.EXPRESSION_FIELD: expression = parser.text(); break; case CronSchedule.TIMEZONE_FIELD: timezone = ZoneId.of(parser.text()); break; - default: throw new IllegalArgumentException(String.format("Unknown cron field %s", cronField)); + default: + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Unknown cron field %s", cronField)); } } XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser::getTokenLocation); @@ -59,14 +62,18 @@ public static Schedule parse(XContentParser parser) throws IOException { switch (intervalField) { case IntervalSchedule.START_TIME_FIELD: startTime = Instant.ofEpochMilli(parser.longValue()); break; case IntervalSchedule.PERIOD_FIELD: period = parser.intValue(); break; - case IntervalSchedule.UNIT_FIELD: unit = ChronoUnit.valueOf(parser.text().toUpperCase()); break; - default: throw new IllegalArgumentException(String.format("Unknown interval field %s", intervalField)); + case IntervalSchedule.UNIT_FIELD: unit = ChronoUnit.valueOf(parser.text().toUpperCase(Locale.ROOT)); break; + default: + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Unknown interval field %s", intervalField)); } } XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser::getTokenLocation); parser.nextToken(); return new IntervalSchedule(startTime, period, unit); - default: throw new IllegalArgumentException(String.format("Unknown schedule type %s", fieldName)); + default: + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Unknown schedule type %s", fieldName)); } } throw new IllegalArgumentException("Invalid schedule document object."); diff --git a/spi/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/CronScheduleTest.java b/spi/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/CronScheduleTest.java index a62bce7..5a08e73 100644 --- a/spi/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/CronScheduleTest.java +++ b/spi/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/CronScheduleTest.java @@ -149,4 +149,15 @@ public void testToXContent() throws IOException { Assert.assertEquals(expectedJsonStr, XContentHelper.toXContent(schedule, XContentType.JSON, false).utf8ToString()); } + + @Test + public void testCronScheduleEqualsAndHashCode() { + CronSchedule cronScheduleOne = new CronSchedule("* * * * *", ZoneId.of("PST8PDT")); + CronSchedule cronScheduleTwo = new CronSchedule("* * * * *", ZoneId.of("PST8PDT")); + CronSchedule cronScheduleThree = new CronSchedule("1 * * * *", ZoneId.of("PST8PDT")); + + Assert.assertEquals(cronScheduleOne, cronScheduleTwo); + Assert.assertNotEquals(cronScheduleOne, cronScheduleThree); + Assert.assertEquals(cronScheduleOne.hashCode(), cronScheduleTwo.hashCode()); + } } diff --git a/spi/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/IntervalScheduleTest.java b/spi/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/IntervalScheduleTest.java index 0421b15..297ff18 100644 --- a/spi/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/IntervalScheduleTest.java +++ b/spi/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/spi/schedule/IntervalScheduleTest.java @@ -139,4 +139,16 @@ public void testToXContent() throws IOException { Assert.assertEquals(xContentJsonStr, XContentHelper.toXContent(this.intervalSchedule, XContentType.JSON, false) .utf8ToString()); } + + @Test + public void testIntervalScheduleEqualsAndHashCode() { + Long epochMilli = Instant.now().toEpochMilli(); + IntervalSchedule intervalScheduleOne = new IntervalSchedule(Instant.ofEpochMilli(epochMilli), 5, ChronoUnit.MINUTES); + IntervalSchedule intervalScheduleTwo = new IntervalSchedule(Instant.ofEpochMilli(epochMilli), 5, ChronoUnit.MINUTES); + IntervalSchedule intervalScheduleThree = new IntervalSchedule(Instant.ofEpochMilli(epochMilli), 4, ChronoUnit.MINUTES); + + Assert.assertEquals(intervalScheduleOne, intervalScheduleTwo); + Assert.assertNotEquals(intervalScheduleOne, intervalScheduleThree); + Assert.assertEquals(intervalScheduleOne.hashCode(), intervalScheduleTwo.hashCode()); + } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeper.java b/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeper.java index 534a055..1193518 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeper.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeper.java @@ -214,7 +214,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul } @VisibleForTesting - void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersion version) { + void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersion jobDocVersion) { ConcurrentHashMap jobVersionMap; if (this.sweptJobs.containsKey(shardId)) { jobVersionMap = this.sweptJobs.get(shardId); @@ -222,10 +222,10 @@ void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersio jobVersionMap = new ConcurrentHashMap<>(); this.sweptJobs.put(shardId, jobVersionMap); } - jobVersionMap.compute(docId, (id, currentVersion) -> { - if (version.compareTo(currentVersion) <= 0) { - log.debug("Skipping job {}, new version {} <= current version {}", docId, version, currentVersion); - return currentVersion; + jobVersionMap.compute(docId, (id, currentJobDocVersion) -> { + if (jobDocVersion.compareTo(currentJobDocVersion) <= 0) { + log.debug("Skipping job {}, new version {} <= current version {}", docId, jobDocVersion, currentJobDocVersion); + return currentJobDocVersion; } if (this.scheduler.getScheduledJobIds(shardId.getIndexName()).contains(docId)) { @@ -236,20 +236,20 @@ void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersio ScheduledJobProvider provider = this.indexToProviders.get(shardId.getIndexName()); XContentParser parser = XContentHelper.createParser(this.xContentRegistry, LoggingDeprecationHandler.INSTANCE, jobSource, XContentType.JSON); - ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, version.getVersion()); + ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, jobDocVersion); if (jobParameter == null) { // allow parser to return null, which means this is not a scheduled job document. return null; } ScheduledJobRunner jobRunner = this.indexToProviders.get(shardId.getIndexName()).getJobRunner(); if (jobParameter.isEnabled()) { - this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, version); + this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, jobDocVersion); } - return version; + return jobDocVersion; } catch (Exception e) { log.warn("Unable to parse job, error message: {} , message source: {}", e.getMessage(), Strings.cleanTruncate(jobSource.utf8ToString(), 1000)); - return currentVersion; + return currentJobDocVersion; } } else { return null; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeperTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeperTests.java index f9dd06b..df57ebe 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeperTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/jobscheduler/sweeper/JobSweeperTests.java @@ -261,7 +261,8 @@ public void testSweep() throws IOException { ScheduledJobParameter mockJobParameter = Mockito.mock(ScheduledJobParameter.class); Mockito.when(mockJobParameter.isEnabled()).thenReturn(true); - Mockito.when(this.jobParser.parse(Mockito.any(), Mockito.anyString(), Mockito.anyLong())).thenReturn(mockJobParameter); + Mockito.when(this.jobParser.parse(Mockito.any(), Mockito.anyString(), Mockito.any(JobDocVersion.class))) + .thenReturn(mockJobParameter); this.sweeper.sweep(shardId, "id", this.getTestJsonSource(), new JobDocVersion(1L, 1L, 2L)); Mockito.verify(this.scheduler).schedule(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(),