Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Brings bug fixes/changes from master into opendistro-1.0 (#17)
Browse files Browse the repository at this point in the history
* Adds equals, hashCode, toString overrides to IntervalSchedule and CronSchedule (#13)

* Use ROOT Locale for Strings to prevent inconsistencies in format (#14)

* update ScheduledJobParser to use the JobDocVersion (#15)

* update ScheduledJobParser to use the JobDocVersion

* Bump version to 1.0.0.1
  • Loading branch information
dbbaughe authored Jun 21, 2019
1 parent 4f493f2 commit 61df774
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 18 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ integTestCluster {

allprojects {
group = 'com.amazon.opendistroforelasticsearch'
version = "${opendistroVersion}.0"
version = "${opendistroVersion}.1"

apply from: "$rootDir/build-tools/repositories.gradle"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public ScheduledJobRunner getJobRunner() {

@Override
public ScheduledJobParser getJobParser() {
return (parser, id, version) -> {
return (parser, id, jobDocVersion) -> {
SampleJobParameter jobParameter = new SampleJobParameter();
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
import java.io.IOException;

public interface ScheduledJobParser {
ScheduledJobParameter parse(XContentParser xContentParser, String id, Long version) throws IOException;
ScheduledJobParameter parse(XContentParser xContentParser, String id, JobDocVersion jobDocVersion) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import com.cronutils.utils.VisibleForTesting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentBuilder;

Expand All @@ -30,6 +31,7 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Optional;

/**
Expand Down Expand Up @@ -138,4 +140,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.endObject();
return builder;
}

@Override
public String toString() {
return Strings.toString(this, false, true);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CronSchedule cronSchedule = (CronSchedule) o;
return timezone.equals(cronSchedule.timezone) &&
expression.equals(cronSchedule.expression);
}

@Override
public int hashCode() {
return Objects.hash(timezone, expression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule;

import com.cronutils.utils.VisibleForTesting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentBuilder;

Expand All @@ -27,6 +28,8 @@
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -57,7 +60,9 @@ public class IntervalSchedule implements Schedule {

public IntervalSchedule(Instant startTime, int interval, ChronoUnit unit) {
if (!SUPPORTED_UNITS.contains(unit)) {
throw new IllegalArgumentException(String.format("Interval unit %s is not supported, expects %s", unit, SUPPORTED_UNITS));
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Interval unit %s is not supported, expects %s",
unit, SUPPORTED_UNITS));
}
this.startTime = startTime;
this.interval = interval;
Expand Down Expand Up @@ -139,4 +144,25 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
void setClock(Clock clock) {
this.clock = clock;
}

@Override
public String toString() {
return Strings.toString(this, false, true);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IntervalSchedule intervalSchedule = (IntervalSchedule) o;
return startTime.equals(intervalSchedule.startTime) &&
interval == intervalSchedule.interval &&
unit == intervalSchedule.unit &&
intervalInMillis == intervalSchedule.intervalInMillis;
}

@Override
public int hashCode() {
return Objects.hash(startTime, interval, unit, intervalInMillis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Locale;

/**
* Schedule XContent parser.
Expand All @@ -43,7 +44,9 @@ public static Schedule parse(XContentParser parser) throws IOException {
switch (cronField) {
case CronSchedule.EXPRESSION_FIELD: expression = parser.text(); break;
case CronSchedule.TIMEZONE_FIELD: timezone = ZoneId.of(parser.text()); break;
default: throw new IllegalArgumentException(String.format("Unknown cron field %s", cronField));
default:
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Unknown cron field %s", cronField));
}
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser::getTokenLocation);
Expand All @@ -59,14 +62,18 @@ public static Schedule parse(XContentParser parser) throws IOException {
switch (intervalField) {
case IntervalSchedule.START_TIME_FIELD: startTime = Instant.ofEpochMilli(parser.longValue()); break;
case IntervalSchedule.PERIOD_FIELD: period = parser.intValue(); break;
case IntervalSchedule.UNIT_FIELD: unit = ChronoUnit.valueOf(parser.text().toUpperCase()); break;
default: throw new IllegalArgumentException(String.format("Unknown interval field %s", intervalField));
case IntervalSchedule.UNIT_FIELD: unit = ChronoUnit.valueOf(parser.text().toUpperCase(Locale.ROOT)); break;
default:
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Unknown interval field %s", intervalField));
}
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser::getTokenLocation);
parser.nextToken();
return new IntervalSchedule(startTime, period, unit);
default: throw new IllegalArgumentException(String.format("Unknown schedule type %s", fieldName));
default:
throw new IllegalArgumentException(
String.format(Locale.ROOT, "Unknown schedule type %s", fieldName));
}
}
throw new IllegalArgumentException("Invalid schedule document object.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,15 @@ public void testToXContent() throws IOException {
Assert.assertEquals(expectedJsonStr,
XContentHelper.toXContent(schedule, XContentType.JSON, false).utf8ToString());
}

@Test
public void testCronScheduleEqualsAndHashCode() {
CronSchedule cronScheduleOne = new CronSchedule("* * * * *", ZoneId.of("PST8PDT"));
CronSchedule cronScheduleTwo = new CronSchedule("* * * * *", ZoneId.of("PST8PDT"));
CronSchedule cronScheduleThree = new CronSchedule("1 * * * *", ZoneId.of("PST8PDT"));

Assert.assertEquals(cronScheduleOne, cronScheduleTwo);
Assert.assertNotEquals(cronScheduleOne, cronScheduleThree);
Assert.assertEquals(cronScheduleOne.hashCode(), cronScheduleTwo.hashCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,16 @@ public void testToXContent() throws IOException {
Assert.assertEquals(xContentJsonStr, XContentHelper.toXContent(this.intervalSchedule, XContentType.JSON, false)
.utf8ToString());
}

@Test
public void testIntervalScheduleEqualsAndHashCode() {
Long epochMilli = Instant.now().toEpochMilli();
IntervalSchedule intervalScheduleOne = new IntervalSchedule(Instant.ofEpochMilli(epochMilli), 5, ChronoUnit.MINUTES);
IntervalSchedule intervalScheduleTwo = new IntervalSchedule(Instant.ofEpochMilli(epochMilli), 5, ChronoUnit.MINUTES);
IntervalSchedule intervalScheduleThree = new IntervalSchedule(Instant.ofEpochMilli(epochMilli), 4, ChronoUnit.MINUTES);

Assert.assertEquals(intervalScheduleOne, intervalScheduleTwo);
Assert.assertNotEquals(intervalScheduleOne, intervalScheduleThree);
Assert.assertEquals(intervalScheduleOne.hashCode(), intervalScheduleTwo.hashCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,18 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
}

@VisibleForTesting
void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersion version) {
void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersion jobDocVersion) {
ConcurrentHashMap<String, JobDocVersion> jobVersionMap;
if (this.sweptJobs.containsKey(shardId)) {
jobVersionMap = this.sweptJobs.get(shardId);
} else {
jobVersionMap = new ConcurrentHashMap<>();
this.sweptJobs.put(shardId, jobVersionMap);
}
jobVersionMap.compute(docId, (id, currentVersion) -> {
if (version.compareTo(currentVersion) <= 0) {
log.debug("Skipping job {}, new version {} <= current version {}", docId, version, currentVersion);
return currentVersion;
jobVersionMap.compute(docId, (id, currentJobDocVersion) -> {
if (jobDocVersion.compareTo(currentJobDocVersion) <= 0) {
log.debug("Skipping job {}, new version {} <= current version {}", docId, jobDocVersion, currentJobDocVersion);
return currentJobDocVersion;
}

if (this.scheduler.getScheduledJobIds(shardId.getIndexName()).contains(docId)) {
Expand All @@ -236,20 +236,20 @@ void sweep(ShardId shardId, String docId, BytesReference jobSource, JobDocVersio
ScheduledJobProvider provider = this.indexToProviders.get(shardId.getIndexName());
XContentParser parser = XContentHelper.createParser(this.xContentRegistry, LoggingDeprecationHandler.INSTANCE,
jobSource, XContentType.JSON);
ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, version.getVersion());
ScheduledJobParameter jobParameter = provider.getJobParser().parse(parser, docId, jobDocVersion);
if (jobParameter == null) {
// allow parser to return null, which means this is not a scheduled job document.
return null;
}
ScheduledJobRunner jobRunner = this.indexToProviders.get(shardId.getIndexName()).getJobRunner();
if (jobParameter.isEnabled()) {
this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, version);
this.scheduler.schedule(shardId.getIndexName(), docId, jobParameter, jobRunner, jobDocVersion);
}
return version;
return jobDocVersion;
} catch (Exception e) {
log.warn("Unable to parse job, error message: {} , message source: {}", e.getMessage(),
Strings.cleanTruncate(jobSource.utf8ToString(), 1000));
return currentVersion;
return currentJobDocVersion;
}
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,8 @@ public void testSweep() throws IOException {

ScheduledJobParameter mockJobParameter = Mockito.mock(ScheduledJobParameter.class);
Mockito.when(mockJobParameter.isEnabled()).thenReturn(true);
Mockito.when(this.jobParser.parse(Mockito.any(), Mockito.anyString(), Mockito.anyLong())).thenReturn(mockJobParameter);
Mockito.when(this.jobParser.parse(Mockito.any(), Mockito.anyString(), Mockito.any(JobDocVersion.class)))
.thenReturn(mockJobParameter);

this.sweeper.sweep(shardId, "id", this.getTestJsonSource(), new JobDocVersion(1L, 1L, 2L));
Mockito.verify(this.scheduler).schedule(Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any(),
Expand Down

0 comments on commit 61df774

Please sign in to comment.