Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faster import 2 #171

Merged
merged 8 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
"scripts": {
"prepare": "husky",
"test": "node --experimental-vm-modules node_modules/jest/bin/jest.js",
"build": "tsup"
"build": "tsup",
"build-watch": "tsup --watch"
},
"exports": {
".": {
Expand Down
116 changes: 82 additions & 34 deletions src/lib/import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,14 @@ const createTables = (db: Database.Database) => {
db.prepare(
`CREATE TABLE ${model.filenameBase} (${columns.join(', ')});`,
).run();
}
};

const createIndexes = (db: Database.Database) => {
for (const model of Object.values(models) as Model[]) {
if (!model.schema) {
return;
}
for (const column of model.schema.filter((column) => column.index)) {
db.prepare(
`CREATE INDEX idx_${model.filenameBase}_${column.name} ON ${model.filenameBase} (${column.name});`,
Expand Down Expand Up @@ -604,45 +611,57 @@ const formatLine = (
}

// Convert to midnight timestamp and add timestamp columns as integer seconds from midnight
const timeColumnNames = [
'start_time',
'end_time',
'arrival_time',
'departure_time',
'prior_notice_last_time',
'prior_notice_start_time',
'start_pickup_drop_off_window',
];

for (const timeColumnName of timeColumnNames) {
if (formattedLine[timeColumnName]) {
const timestampColumnName = timeColumnName.endsWith('time')
? `${timeColumnName}stamp`
: `${timeColumnName}_timestamp`;
formattedLine[timestampColumnName] = calculateSecondsFromMidnight(
formattedLine[timeColumnName],
);

for (const [timeColumnName, timestampColumnName] of timeColumnNamesCouples) {
const value = formattedLine[timeColumnName];
if (value) {
const [seconds, date] = cachedCalculateDates(value);
formattedLine[timestampColumnName] = seconds;

// Ensure leading zeros for time columns
formattedLine[timeColumnName] = padLeadingZeros(
formattedLine[timeColumnName],
);
formattedLine[timeColumnName] = date;
}
}

return formattedLine;
};

interface Dictionary<T> {
[key: string]: T;
}
type Tuple = [seconds: number | null, date: string | null];
const cache: Dictionary<Tuple> = {};
const cachedCalculateDates = (value: string) => {
const cached = cache[value];
if (cached != null) return cached;
const seconds = calculateSecondsFromMidnight(value);
const date = padLeadingZeros(value);
const computed: Tuple = [seconds, date];
cache[value] = computed;
return computed;
};

const timeColumnNames = [
'start_time',
'end_time',
'arrival_time',
'departure_time',
'prior_notice_last_time',
'prior_notice_start_time',
'start_pickup_drop_off_window',
],
timeColumnNamesCouples = timeColumnNames.map((name) => [
name,
name.endsWith('time') ? `${name}stamp` : `${name}_timestamp`,
]);

const importLines = (
db: Database.Database,
task: ITask,
lines: { [x: string]: any; geojson?: string }[],
model: Model,
totalLineCount: number,
) => {
const db = openDb({
sqlitePath: task.sqlitePath,
});

if (lines.length === 0) {
return;
}
Expand Down Expand Up @@ -702,7 +721,7 @@ const importLines = (
);
};

const importFiles = (task: ITask) =>
const importFiles = (db: Database.Database, task: ITask) =>
mapSeries(
Object.values(models),
(model: Model) =>
Expand Down Expand Up @@ -757,17 +776,44 @@ const importFiles = (task: ITask) =>
...task.csvOptions,
});

const columns = model.schema.filter((column) => column.name !== 'id');

const placeholder = columns.map(({ name }) => '@' + name).join(', ');
const prepareStatement = `INSERT ${task.ignoreDuplicates ? 'OR IGNORE' : ''} INTO ${
model.filenameBase
} (${columns
.map((column) => column.name)
.join(', ')}) VALUES (${placeholder})`;

const insert = db.prepare(prepareStatement);

const insertMany = db.transaction((lines) => {
for (const line of lines) {
if (task.prefix === undefined) {
insert.run(line);
} else {
const prefixedLine = Object.fromEntries(
Object.entries(line).map(([columnName, value], index) => [
columnName,
columns[index].prefix === true
? `${task.prefix}${value}`
: value,
]),
);
insert.run(prefixedLine);
}
}
});

let lines: { [x: string]: any; geojson?: string }[] = [];

parser.on('readable', () => {
let record;

while ((record = parser.read())) {
try {
totalLineCount += 1;
lines.push(formatLine(record, model, totalLineCount));
// If we have a bunch of lines ready to insert, then do it
if (lines.length >= maxInsertVariables / model.schema.length) {
importLines(task, lines, model, totalLineCount);
}
} catch (error) {
reject(error);
}
Expand All @@ -776,8 +822,7 @@ const importFiles = (task: ITask) =>

parser.on('end', () => {
try {
// Insert all remaining lines
importLines(task, lines, model, totalLineCount);
insertMany(lines);
} catch (error) {
reject(error);
}
Expand All @@ -798,7 +843,7 @@ const importFiles = (task: ITask) =>
);
}
const line = formatLine({ geojson: data }, model, totalLineCount);
importLines(task, [line], model, totalLineCount);
importLines(db, task, [line], model, totalLineCount);
resolve();
})
.catch(reject);
Expand Down Expand Up @@ -861,7 +906,7 @@ export async function importGtfs(initialConfig: Config) {
}

await readFiles(task);
await importFiles(task);
await importFiles(db, task);
await updateRealtimeData(task);

await rm(tempPath, { recursive: true });
Expand All @@ -874,6 +919,9 @@ export async function importGtfs(initialConfig: Config) {
}
});

log(`Will now create DB indexes`);
createIndexes(db);

log(
`Completed GTFS import for ${pluralize('agency', agencyCount, true)}\n`,
);
Expand Down
7 changes: 6 additions & 1 deletion src/test/get-stops.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import config from './test-config.ts';
import { openDb, closeDb, importGtfs, getStops } from '../index.ts';
import { sortBy } from 'lodash-es';
import exp from 'constants';

beforeAll(async () => {
Expand Down Expand Up @@ -316,6 +317,10 @@ describe('getStops():', () => {
];

expect(results).toHaveLength(3);
expect(results).toEqual(expectedResult);

// Results aren't sorted by distance, so the DB insert statement can influence the result order
expect(sortBy(results, 'stop_id')).toEqual(
sortBy(expectedResult, 'stop_id'),
);
});
});
Loading