Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flex merge feeds #573

Merged
merged 16 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,9 @@
- AWS S3 SDK - putting/getting objects into/out of S3.
-->
<dependency>
<groupId>com.github.conveyal</groupId>
<groupId>com.github.ibi-group</groupId>
<artifactId>gtfs-lib</artifactId>
<version>e83292580a44880e1efcf6a35d24c2d46044c6f1</version>
<version>6da9e9999b</version>
<!-- Exclusions added in order to silence SLF4J warnings about multiple bindings:
http://www.slf4j.org/codes.html#multiple_bindings
-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@
import com.conveyal.datatools.manager.persistence.Persistence;
import com.conveyal.datatools.manager.utils.ErrorUtils;
import com.conveyal.gtfs.loader.Feed;
import com.conveyal.gtfs.loader.JdbcGtfsExporter;
import com.conveyal.gtfs.loader.Table;
import com.conveyal.gtfs.model.Location;
import com.conveyal.gtfs.model.LocationShape;
import com.conveyal.gtfs.model.StopTime;
import com.conveyal.gtfs.util.GeoJsonUtil;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
Expand All @@ -38,13 +42,17 @@
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import static com.conveyal.datatools.manager.jobs.feedmerge.MergeFeedsType.SERVICE_PERIOD;
import static com.conveyal.datatools.manager.jobs.feedmerge.MergeFeedsType.REGIONAL;
import static com.conveyal.datatools.manager.jobs.feedmerge.MergeStrategy.CHECK_STOP_TIMES;
import static com.conveyal.datatools.manager.models.FeedRetrievalMethod.REGIONAL_MERGE;
import static com.conveyal.datatools.manager.utils.MergeFeedUtils.*;
import static com.conveyal.datatools.manager.utils.MergeFeedUtils.getMergedVersion;
import static com.conveyal.datatools.manager.utils.MergeFeedUtils.stopTimesMatchSimplified;
import static com.conveyal.datatools.manager.utils.StringUtils.getCleanName;
import static com.conveyal.gtfs.loader.Table.LOCATION_GEO_JSON_FILE_NAME;

/**
* This job handles merging two or more feed versions according to logic specific to the specified merge type.
Expand Down Expand Up @@ -204,6 +212,8 @@ public void jobLogic() {
}
}

mergeLocations(out);

// Loop over GTFS tables and merge each feed one table at a time.
for (int i = 0; i < numberOfTables; i++) {
Table table = tablesToMerge.get(i);
Expand All @@ -225,7 +235,7 @@ public void jobLogic() {
status.fail(message, e);
} finally {
try {
feedMergeContext.close();
if (feedMergeContext != null) feedMergeContext.close();
} catch (IOException e) {
logAndReportToBugsnag(e, "Error closing FeedMergeContext object");
}
Expand All @@ -245,6 +255,39 @@ public void jobLogic() {
}
}

/**
* Merge locations.geojson files. These files are not compatible with the CSV merging strategy. Instead, the
* location.geojson file is flattened into locations and locations shapes. The location id is then updated with the
* scope id to keep feed locations unique, converted back into geojson and written to the zip file.
*
* Locations must be processed prior to other CSV files so the location ids are available for foreign reference
* checks.
*/
void mergeLocations(ZipOutputStream out) throws IOException {
Set<Location> mergedLocations = new HashSet<>();
Set<LocationShape> mergedLocationShapes = new HashSet<>();
for (FeedToMerge feed : feedMergeContext.feedsToMerge) {
ZipEntry locationGeoJsonFile = feed.zipFile.getEntry(LOCATION_GEO_JSON_FILE_NAME);
if (locationGeoJsonFile != null) {
String idScope = getCleanName(feed.version.parentFeedSource().name) + feed.version.version;
List<Location> locations = GeoJsonUtil.getLocationsFromGeoJson(feed.zipFile, locationGeoJsonFile, null);
for (Location location : locations) {
location.location_id = String.join(":", idScope, location.location_id);
mergedLocations.add(location);
feedMergeContext.locationIds.add(location.location_id);
}
List<LocationShape> locationShapes = GeoJsonUtil.getLocationShapesFromGeoJson(feed.zipFile, locationGeoJsonFile, null);
for (LocationShape locationShape : locationShapes) {
locationShape.location_id = String.join(":", idScope, locationShape.location_id);
mergedLocationShapes.add(locationShape);
}
}
}
if (!mergedLocations.isEmpty()) {
JdbcGtfsExporter.writeLocationsToFile(out, new ArrayList<>(mergedLocations), new ArrayList<>(mergedLocationShapes));
}
}

/**
* Obtains trip ids whose entries in the stop_times table differ between the active and future feed.
*/
Expand Down Expand Up @@ -281,6 +324,10 @@ private boolean shouldSkipTable(String tableName) {
LOG.warn("Skipping editor-only table {}.", tableName);
return true;
}
if (tableName.equals(Table.LOCATIONS.name) || tableName.equals(Table.LOCATION_SHAPES.name)) {
LOG.warn("{} detected. Skipping traditional merge in favour of bespoke merge.", LOCATION_GEO_JSON_FILE_NAME);
return true;
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.time.LocalDate;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand All @@ -23,6 +24,8 @@ public class FeedMergeContext implements Closeable {
public final boolean tripIdsMatch;
public final LocalDate futureFirstCalendarStartDate;
public final Set<String> sharedTripIds;
public Set<String> locationIds = new HashSet<>();


public FeedMergeContext(Set<FeedVersion> feedVersions, Auth0UserProfile owner) throws IOException {
feedsToMerge = MergeFeedUtils.collectAndSortFeeds(feedVersions, owner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public class MergeFeedsResult implements Serializable {
*/
public Set<String> calendarDatesServiceIds = new HashSet<>();

/**
* Track various table ids for resolving foreign references.
*/
public Set<String> stopIds = new HashSet<>();
public Set<String> stopAreaIds = new HashSet<>();

/**
* Track the set of route IDs to end up in the merged feed in order to determine which route_attributes
* records should be retained in the merged result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public boolean iterateOverRows() throws IOException {

public void startNewRow() throws IOException {
keyValue = csvReader.get(keyFieldIndex);
setForeignReferenceKeyValues();
// Get the spec fields to export
List<Field> specFields = table.specFields();
// Filter the spec fields on the set of fields found in all feeds to be merged.
Expand All @@ -238,8 +239,24 @@ public void startNewRow() throws IOException {
}

/**
* Determine which reference table to use. If there is only one reference use this. If there are multiple references
* determine the context and then the correct reference table to use.
* Build a list of table key id values to be used in foreign key field look-ups.
*/
private void setForeignReferenceKeyValues() {
switch (table.name) {
case "stops":
mergeFeedsResult.stopIds.add(getIdWithScope(keyValue));
break;
case "stop_areas":
mergeFeedsResult.stopAreaIds.add(getIdWithScope(keyValue));
break;
default:
// nothing.
}
}

/**
* Determine which reference table to use. If there is only one reference use this. If there are multiple
* references, determine the context and then the correct reference table to use.
*/
private Table getReferenceTable(FieldContext fieldContext, Field field) {
if (field.referenceTables.size() == 1) {
Expand All @@ -255,8 +272,17 @@ private Table getReferenceTable(FieldContext fieldContext, Field field) {
getTableScopedValue(Table.CALENDAR_DATES, fieldContext.getValue())
);
case STOP_TIMES_STOP_ID_KEY:
return ReferenceTableDiscovery.getStopTimeStopIdReferenceTable(
fieldContext.getValueToWrite(),
mergeFeedsResult,
feedMergeContext.locationIds
);
case STOP_AREA_STOP_ID_KEY:
// Include other cases as multiple references are added e.g. flex!.
return ReferenceTableDiscovery.getStopAreaAreaIdReferenceTable(
fieldContext.getValueToWrite(),
mergeFeedsResult,
feedMergeContext.locationIds
);
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.conveyal.gtfs.loader.Field;
import com.conveyal.gtfs.loader.Table;

import java.util.Set;
import java.util.stream.Collectors;

import static com.conveyal.datatools.manager.jobs.feedmerge.MergeLineContext.SERVICE_ID;
Expand All @@ -11,6 +12,9 @@ public class ReferenceTableDiscovery {

public static final String REF_TABLE_SEPARATOR = "#~#";

/**
* Tables that have two or more foreign references.
*/
public enum ReferenceTableKey {

TRIP_SERVICE_ID_KEY(
Expand Down Expand Up @@ -106,4 +110,38 @@ public static Table getTripServiceIdReferenceTable(
}
return null;
}

/**
* Define the reference table for a stop area's area id. This will either be a stop or location.
*/
public static Table getStopAreaAreaIdReferenceTable(
String fieldValue,
MergeFeedsResult mergeFeedsResult,
Set<String> locationIds
) {
if (mergeFeedsResult.stopIds.contains(fieldValue)) {
return Table.STOPS;
} else if (locationIds.contains(fieldValue)) {
return Table.LOCATIONS;
}
return null;
}

/**
* Define the reference table for a stop time's stop id. This will either be a stop, location or stop area.
*/
public static Table getStopTimeStopIdReferenceTable(
String fieldValue,
MergeFeedsResult mergeFeedsResult,
Set<String> locationIds
) {
if (mergeFeedsResult.stopIds.contains(fieldValue)) {
return Table.STOPS;
} else if (locationIds.contains(fieldValue)) {
return Table.LOCATIONS;
} else if (mergeFeedsResult.stopAreaIds.contains(fieldValue)) {
return Table.STOP_AREAS;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,15 +422,14 @@ public void validateMobility(MonitorableJob.Status status) {
// Wait for the file to be entirely copied into the directory.
// 5 seconds + ~1 second per 10mb
Thread.sleep(5000 + (this.fileSize / 10000));
File gtfsZip = this.retrieveGtfsFile();
// Namespace based folders avoid clash for validation being run on multiple versions of a feed.
// TODO: do we know that there will always be a namespace?
String validatorOutputDirectory = "/tmp/datatools_gtfs/" + this.namespace + "/";

status.update("MobilityData Analysis...", 20);
// Set up MobilityData validator.
ValidationRunnerConfig.Builder builder = ValidationRunnerConfig.builder();
builder.setGtfsSource(gtfsZip.toURI());
builder.setGtfsSource(this.retrieveGtfsFile().toURI());
builder.setOutputDirectory(Path.of(validatorOutputDirectory));
ValidationRunnerConfig mbValidatorConfig = builder.build();

Expand All @@ -442,8 +441,10 @@ public void validateMobility(MonitorableJob.Status status) {
status.update("MobilityData Analysis...", 80);
// Read generated report and save to Mongo.
String json;
try (FileReader fr = new FileReader(validatorOutputDirectory + "report.json")) {
BufferedReader in = new BufferedReader(fr);
try (
FileReader fr = new FileReader(validatorOutputDirectory + "report.json");
BufferedReader in = new BufferedReader(fr)
) {
json = in.lines().collect(Collectors.joining(System.lineSeparator()));
}

Expand Down
Loading
Loading