Skip to content

Commit

Permalink
refactor(Fixed merge conflicts): Fixed conflicts with merging dev-fle…
Browse files Browse the repository at this point in the history
…x into flex-merge-feeds
  • Loading branch information
br648 committed Nov 17, 2023
2 parents 80fd154 + 78bc0db commit 2f93558
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 46 deletions.
3 changes: 1 addition & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,7 @@
<dependency>
<groupId>com.github.conveyal</groupId>
<artifactId>gtfs-lib</artifactId>
<!-- Latest dev build on jitpack.io -->
<version>8a9c3424747b05cb4c3b379e59e7762ef9f58e18</version>
<version>e83292580a44880e1efcf6a35d24c2d46044c6f1</version>
<!-- Exclusions added in order to silence SLF4J warnings about multiple bindings:
http://www.slf4j.org/codes.html#multiple_bindings
-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ private static Snapshot getSnapshotFromRequest(Request req) {
return Persistence.snapshots.getById(id);
}

private static boolean getPublishProprietaryFiles(Request req) {
return Boolean.parseBoolean(
req.queryParamOrDefault("publishProprietaryFiles", Boolean.FALSE.toString())
);
}

/**
* HTTP endpoint that returns the list of snapshots for a given feed source.
*/
Expand All @@ -84,6 +90,7 @@ private static String createSnapshot (Request req, Response res) throws IOExcept
boolean publishNewVersion = Boolean.parseBoolean(
req.queryParamOrDefault("publishNewVersion", Boolean.FALSE.toString())
);
boolean publishProprietaryFiles = getPublishProprietaryFiles(req);
FeedSource feedSource = FeedVersionController.requestFeedSourceById(req, Actions.EDIT, "feedId");
// Take fields from request body for creating snapshot (i.e., feedId/feedSourceId, name, comment).
Snapshot snapshot = json.read(req.body());
Expand All @@ -99,7 +106,7 @@ private static String createSnapshot (Request req, Response res) throws IOExcept
new CreateSnapshotJob(userProfile, snapshot, bufferIsEmpty, !bufferIsEmpty, false);
// Add publish feed version job if specified by request.
if (publishNewVersion) {
createSnapshotJob.addNextJob(new CreateFeedVersionFromSnapshotJob(feedSource, snapshot, userProfile));
createSnapshotJob.addNextJob(new CreateFeedVersionFromSnapshotJob(feedSource, snapshot, userProfile, publishProprietaryFiles));
}
// Begin asynchronous execution.
JobUtils.heavyExecutor.execute(createSnapshotJob);
Expand Down Expand Up @@ -173,9 +180,10 @@ private static String restoreSnapshot (Request req, Response res) {
private static String downloadSnapshotAsGTFS(Request req, Response res) {
Auth0UserProfile userProfile = req.attribute("user");
Snapshot snapshot = getSnapshotFromRequest(req);
boolean publishProprietaryFiles = getPublishProprietaryFiles(req);
// Create and kick off export job.
// FIXME: what if a snapshot is already written to S3?
ExportSnapshotToGTFSJob exportSnapshotToGTFSJob = new ExportSnapshotToGTFSJob(userProfile, snapshot);
ExportSnapshotToGTFSJob exportSnapshotToGTFSJob = new ExportSnapshotToGTFSJob(userProfile, snapshot, publishProprietaryFiles);
JobUtils.heavyExecutor.execute(exportSnapshotToGTFSJob);
return formatJobMessage(exportSnapshotToGTFSJob.jobId, "Exporting snapshot to GTFS.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ public class ExportSnapshotToGTFSJob extends MonitorableJob {
private final Snapshot snapshot;
private final FeedVersion feedVersion;
private File tempFile;
private final boolean publishProprietaryFiles;

public ExportSnapshotToGTFSJob(Auth0UserProfile owner, Snapshot snapshot, FeedVersion feedVersion) {
public ExportSnapshotToGTFSJob(Auth0UserProfile owner, Snapshot snapshot, FeedVersion feedVersion, boolean publishProprietaryFiles) {
super(owner, "Exporting snapshot " + snapshot.name, JobType.EXPORT_SNAPSHOT_TO_GTFS);
this.snapshot = snapshot;
this.feedVersion = feedVersion;
this.publishProprietaryFiles = publishProprietaryFiles;
status.update("Starting database snapshot...", 10);
}

public ExportSnapshotToGTFSJob(Auth0UserProfile owner, Snapshot snapshot) {
this(owner, snapshot, null);
public ExportSnapshotToGTFSJob(Auth0UserProfile owner, Snapshot snapshot, boolean publishProprietaryFiles) {
this(owner, snapshot, null, publishProprietaryFiles);
}

@JsonProperty
Expand All @@ -57,7 +59,7 @@ public void jobLogic() {
status.fail("Error creating local file for snapshot.", e);
return;
}
JdbcGtfsExporter exporter = new JdbcGtfsExporter(snapshot.namespace, tempFile.getAbsolutePath(), DataManager.GTFS_DATA_SOURCE, true, true);
JdbcGtfsExporter exporter = new JdbcGtfsExporter(snapshot.namespace, tempFile.getAbsolutePath(), DataManager.GTFS_DATA_SOURCE, true, publishProprietaryFiles);
FeedLoadResult result = exporter.exportTables();
if (result.fatalException != null) {
status.fail(String.format("Error (%s) encountered while exporting database tables.", result.fatalException));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,16 @@ protected static FeedVersion cleanFeedVersionForNonAdmins(FeedVersion feedVersio
private static boolean createFeedVersionFromSnapshot (Request req, Response res) {

Auth0UserProfile userProfile = req.attribute("user");
Boolean publishProprietaryFiles = Boolean.parseBoolean(req.queryParams("publishProprietaryFiles"));
// TODO: Should the ability to create a feedVersion from snapshot be controlled by the 'edit-gtfs' privilege?
FeedSource feedSource = requestFeedSourceById(req, Actions.MANAGE);
Snapshot snapshot = Persistence.snapshots.getById(req.queryParams("snapshotId"));
if (snapshot == null) {
logMessageAndHalt(req, 400, "Must provide valid snapshot ID");
}
// Publishing the proprietary files will preserve the pattern names in the newly published feed version.
CreateFeedVersionFromSnapshotJob createFromSnapshotJob =
new CreateFeedVersionFromSnapshotJob(feedSource, snapshot, userProfile);
new CreateFeedVersionFromSnapshotJob(feedSource, snapshot, userProfile, publishProprietaryFiles);
JobUtils.heavyExecutor.execute(createFromSnapshotJob);

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ public class CreateFeedVersionFromSnapshotJob extends FeedSourceJob {

private final FeedVersion feedVersion;
private final Snapshot snapshot;
private final boolean publishProprietaryFiles;

public CreateFeedVersionFromSnapshotJob(FeedSource feedSource, Snapshot snapshot, Auth0UserProfile owner) {
public CreateFeedVersionFromSnapshotJob(FeedSource feedSource, Snapshot snapshot, Auth0UserProfile owner, boolean publishProprietaryFiles) {
super(owner, "Creating Feed Version from Snapshot for " + feedSource.name, JobType.CREATE_FEEDVERSION_FROM_SNAPSHOT);
this.feedVersion = new FeedVersion(feedSource, snapshot);
this.snapshot = snapshot;
this.publishProprietaryFiles = publishProprietaryFiles;
}

@Override
Expand All @@ -35,7 +37,7 @@ public void jobLogic() {
// Add the jobs to handle this operation in order.
addNextJob(
// First export the snapshot to GTFS.
new ExportSnapshotToGTFSJob(owner, snapshot, feedVersion),
new ExportSnapshotToGTFSJob(owner, snapshot, feedVersion, publishProprietaryFiles),
// Then, process feed version once GTFS file written.
new ProcessSingleFeedJob(feedVersion, owner, true)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public void jobLogic() {
snapshot.feedTransformResult = dbTarget.feedTransformResult;
// If the user has selected to create a new version from the resulting snapshot, do so here.
if (rules.createNewVersion) {
addNextJob(new CreateFeedVersionFromSnapshotJob(feedSource, snapshot, owner));
// Publishing the proprietary files will preserve the pattern names in the newly created feed version.
addNextJob(new CreateFeedVersionFromSnapshotJob(feedSource, snapshot, owner, true));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private boolean checkCalendarDatesIds(FieldContext fieldContext) throws IOExcept
// the valid date range, i.e., before the future feed's first date.
if (!shouldSkipRecord && fieldContext.nameEquals(SERVICE_ID)) {
mergeFeedsResult.serviceIds.add(fieldContext.getValueToWrite());
mergeFeedsResult.calendarDatesServiceIds.add(fieldContext.getValueToWrite());
}

return !shouldSkipRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ private boolean checkCalendarIds(Set<NewGTFSError> idErrors, FieldContext fieldC
// If service is going to be cloned, add to the output service ids.
if (!shouldSkipRecord && fieldContext.nameEquals(SERVICE_ID)) {
mergeFeedsResult.serviceIds.add(fieldContext.getValueToWrite());
mergeFeedsResult.calendarServiceIds.add(fieldContext.getValueToWrite());
}

return !shouldSkipRecord;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,21 @@ public class MergeFeedsResult implements Serializable {
/** Contains the set of IDs for records that were excluded in the merged feed */
public Set<String> skippedIds = new HashSet<>();
/**
* Track the set of service IDs to end up in the merged feed in order to determine which calendar_dates and trips
* records should be retained in the merged result.
* Track the set of service IDs to end up in the merged feed in order to determine which calendar, calendar_dates and
* trip records should be retained in the merged result.
*/
public Set<String> serviceIds = new HashSet<>();

/**
* Track the set of service IDs obtained from calendar records.
*/
public Set<String> calendarServiceIds = new HashSet<>();

/**
* Track the set of service IDs obtained from calendar date records.
*/
public Set<String> calendarDatesServiceIds = new HashSet<>();

/**
* Track the set of route IDs to end up in the merged feed in order to determine which route_attributes
* records should be retained in the merged result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,41 +237,67 @@ public void startNewRow() throws IOException {
.collect(Collectors.toList());
}

/**
* Determine which reference table to use. If there is only one reference use this. If there are multiple references
* determine the context and then the correct reference table to use.
*/
private Table getReferenceTable(FieldContext fieldContext, Field field) {
if (field.referenceTables.size() == 1) {
return field.referenceTables.iterator().next();
}

switch (ReferenceTableDiscovery.getReferenceTableKey(field, table)) {
case TRIP_SERVICE_ID_KEY:
return ReferenceTableDiscovery.getTripServiceIdReferenceTable(
fieldContext.getValueToWrite(),
mergeFeedsResult,
getTableScopedValue(Table.CALENDAR, fieldContext.getValue()),
getTableScopedValue(Table.CALENDAR_DATES, fieldContext.getValue())
);
case STOP_TIMES_STOP_ID_KEY:
case STOP_AREA_STOP_ID_KEY:
// Include other cases as multiple references are added e.g. flex!.
default:
return null;
}
}

public boolean checkForeignReferences(FieldContext fieldContext) throws IOException {
Field field = fieldContext.getField();
if (field.isForeignReference()) {
for (Table referenceTable: field.referenceTables) {
String key = getTableScopedValue(referenceTable, fieldContext.getValue());
// Check if we're performing a service period merge, this ref field is a service_id, and it
// is not found in the list of service_ids (e.g., it was removed).
boolean isValidServiceId = mergeFeedsResult.serviceIds.contains(fieldContext.getValueToWrite());

// If the current foreign ref points to another record that has
// been skipped or is a ref to a non-existent service_id during a service period merge, skip
// this record and add its primary key to the list of skipped IDs (so that other references
// can be properly omitted).
if (serviceIdHasKeyOrShouldBeSkipped(fieldContext, key, isValidServiceId)) {
// If a calendar#service_id has been skipped (it's listed in skippedIds), but there were
// valid service_ids found in calendar_dates, do not skip that record for both the
// calendar_date and any related trips.
if (fieldContext.nameEquals(SERVICE_ID) && isValidServiceId) {
LOG.warn("Not skipping valid service_id {} for {} {}", fieldContext.getValueToWrite(), table.name, keyValue);
} else {
String skippedKey = getTableScopedValue(keyValue);
if (orderField != null) {
skippedKey = String.join(":", skippedKey, getCsvValue(orderField));
}
mergeFeedsResult.skippedIds.add(skippedKey);
return false;
Table refTable = getReferenceTable(fieldContext, field);
String key = (refTable != null)
? getTableScopedValue(refTable, fieldContext.getValue())
: "unknown";
// Check if we're performing a service period merge, this ref field is a service_id, and it
// is not found in the list of service_ids (e.g., it was removed).
boolean isValidServiceId = mergeFeedsResult.serviceIds.contains(fieldContext.getValueToWrite());

// If the current foreign ref points to another record that has
// been skipped or is a ref to a non-existent service_id during a service period merge, skip
// this record and add its primary key to the list of skipped IDs (so that other references
// can be properly omitted).
if (serviceIdHasKeyOrShouldBeSkipped(fieldContext, key, isValidServiceId)) {
// If a calendar#service_id has been skipped (it's listed in skippedIds), but there were
// valid service_ids found in calendar_dates, do not skip that record for both the
// calendar_date and any related trips.
if (fieldContext.nameEquals(SERVICE_ID) && isValidServiceId) {
LOG.warn("Not skipping valid service_id {} for {} {}", fieldContext.getValueToWrite(), table.name, keyValue);
} else {
String skippedKey = getTableScopedValue(keyValue);
if (orderField != null) {
skippedKey = String.join(":", skippedKey, getCsvValue(orderField));
}
mergeFeedsResult.skippedIds.add(skippedKey);
return false;
}
// If the field is a foreign reference, check to see whether the reference has been
// remapped due to a conflicting ID from another feed (e.g., calendar#service_id).
if (mergeFeedsResult.remappedIds.containsKey(key)) {
mergeFeedsResult.remappedReferences++;
// If the value has been remapped update the value to write.
fieldContext.setValueToWrite(mergeFeedsResult.remappedIds.get(key));
}
}
// If the field is a foreign reference, check to see whether the reference has been
// remapped due to a conflicting ID from another feed (e.g., calendar#service_id).
if (mergeFeedsResult.remappedIds.containsKey(key)) {
mergeFeedsResult.remappedReferences++;
// If the value has been remapped update the value to write.
fieldContext.setValueToWrite(mergeFeedsResult.remappedIds.get(key));
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.conveyal.datatools.manager.jobs.feedmerge;

import com.conveyal.gtfs.loader.Field;
import com.conveyal.gtfs.loader.Table;

import java.util.stream.Collectors;

import static com.conveyal.datatools.manager.jobs.feedmerge.MergeLineContext.SERVICE_ID;

public class ReferenceTableDiscovery {

public static final String REF_TABLE_SEPARATOR = "#~#";

public enum ReferenceTableKey {

TRIP_SERVICE_ID_KEY(
String.join(
REF_TABLE_SEPARATOR,
Table.TRIPS.name,
SERVICE_ID,
Table.CALENDAR.name,
Table.CALENDAR_DATES.name,
Table.SCHEDULE_EXCEPTIONS.name
)
),
STOP_AREA_STOP_ID_KEY(
String.join(
REF_TABLE_SEPARATOR,
Table.STOP_AREAS.name,
"stop_id",
Table.STOPS.name,
Table.LOCATIONS.name
)
),
STOP_TIMES_STOP_ID_KEY(
String.join(
REF_TABLE_SEPARATOR,
Table.STOP_TIMES.name,
"stop_id",
Table.STOPS.name,
Table.LOCATIONS.name,
Table.STOP_AREAS.name
)
);

private final String value;

ReferenceTableKey(String value) {
this.value = value;
}

public String getValue() {
return value;
}

public static ReferenceTableKey fromValue(String key) {
for (ReferenceTableKey ref: ReferenceTableKey.values()) {
if (ref.getValue().equals(key)) {
return ref;
}
}
throw new UnsupportedOperationException(String.format("Unsupported reference table key: %s.", key));
}
}

/**
* Get reference table key by matching the provided values to predefined reference table keys.
*/
public static ReferenceTableKey getReferenceTableKey(Field field, Table table) {
return ReferenceTableKey.fromValue(createKey(field, table));
}

/**
* Create a unique key for this table, field and reference tables.
*/
public static String createKey(Field field, Table table) {
return String.format(
"%s%s%s%s%s",
table.name,
REF_TABLE_SEPARATOR,
field.name,
REF_TABLE_SEPARATOR,
field.referenceTables.stream().map(r -> r.name).collect(Collectors.joining(REF_TABLE_SEPARATOR))
);
}

/**
* Define the reference table for a trip service id.
*/
public static Table getTripServiceIdReferenceTable(
String fieldValue,
MergeFeedsResult mergeFeedsResult,
String calendarKey,
String calendarDatesKey
) {
if (
mergeFeedsResult.calendarServiceIds.contains(fieldValue) ||
mergeFeedsResult.skippedIds.contains(calendarKey)
) {
return Table.CALENDAR;
} else if (
mergeFeedsResult.calendarDatesServiceIds.contains(fieldValue) ||
mergeFeedsResult.skippedIds.contains(calendarDatesKey)
) {
return Table.CALENDAR_DATES;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void canDeleteTrips() throws IOException {
);
// Fetch snapshot where modifications were made and create new version from it.
Snapshot snapshotWithModifications = feedSource.retrieveSnapshots().iterator().next();
CreateFeedVersionFromSnapshotJob newVersionJob = new CreateFeedVersionFromSnapshotJob(feedSource, snapshotWithModifications, user);
CreateFeedVersionFromSnapshotJob newVersionJob = new CreateFeedVersionFromSnapshotJob(feedSource, snapshotWithModifications, user, false);
newVersionJob.run();
// Grab the modified version and check that the trips count matches expectation.
FeedVersion newVersion = feedSource.retrieveLatest();
Expand Down
Loading

0 comments on commit 2f93558

Please sign in to comment.