Skip to content

Commit

Permalink
(core) Remove expired attachments every hour and on shutdown
Browse files Browse the repository at this point in the history
Summary:
Call ActiveDoc.removeUnusedAttachments every hour using setInterval, and in ActiveDoc.shutdown (which also clears said interval).

Unrelated: small fix to my webhooks code which was creating a redis client on shutdown just to quit it.

Test Plan:
Tweaked DocApi test to remove expired attachments by force-reloading the doc, so that it removes them during shutdown. Extracted a new testing endpoint /verifyFiles to support this test (previously running that code only happened with `/removeUnused?verifyfiles=1`).

Tested the setInterval part manually.

Reviewers: paulfitz, dsagal

Reviewed By: paulfitz

Subscribers: dsagal

Differential Revision: https://phab.getgrist.com/D3387
  • Loading branch information
alexmojaki committed Apr 22, 2022
1 parent 890c550 commit a701b4b
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 61 deletions.
19 changes: 19 additions & 0 deletions app/server/lib/ActiveDoc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ const ACTIVEDOC_TIMEOUT = (process.env.NODE_ENV === 'production') ? 30 : 5;
// We'll wait this long between re-measuring sandbox memory.
const MEMORY_MEASUREMENT_INTERVAL_MS = 60 * 1000;

// Cleanup expired attachments every hour (also happens when shutting down)
const REMOVE_UNUSED_ATTACHMENTS_INTERVAL_MS = 60 * 60 * 1000;

// A hook for dependency injection.
export const Deps = {ACTIVEDOC_TIMEOUT};

Expand Down Expand Up @@ -179,6 +182,12 @@ export class ActiveDoc extends EventEmitter {
private _recoveryMode: boolean = false;
private _shuttingDown: boolean = false;

// Cleanup expired attachments every hour (also happens when shutting down)
private _removeUnusedAttachmentsInterval = setInterval(
() => this.removeUnusedAttachments(true),
REMOVE_UNUSED_ATTACHMENTS_INTERVAL_MS,
);

constructor(docManager: DocManager, docName: string, private _options?: ICreateActiveDocOptions) {
super();
if (_options?.safeMode) { this._recoveryMode = true; }
Expand Down Expand Up @@ -389,6 +398,16 @@ export class ActiveDoc extends EventEmitter {
// Clear the MapWithTTL to remove all timers from the event loop.
this._fetchCache.clear();

clearInterval(this._removeUnusedAttachmentsInterval);
try {
// Remove expired attachments, i.e. attachments that were soft deleted a while ago.
// This needs to happen periodically, and doing it here means we can guarantee that it happens even if
// the doc is only ever opened briefly, without having to slow down startup.
await this.removeUnusedAttachments(true);
} catch (e) {
this._log.error(docSession, "Failed to remove expired attachments", e);
}

try {
await this._docManager.storageManager.closeDocument(this.docName);
} catch (err) {
Expand Down
16 changes: 12 additions & 4 deletions app/server/lib/DocApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,21 @@ export class DocWorkerApi {
const verifyFiles = isAffirmative(req.query.verifyfiles);
await activeDoc.removeUnusedAttachments(expiredOnly);
if (verifyFiles) {
assert.deepStrictEqual(
await activeDoc.docStorage.all(`SELECT DISTINCT fileIdent AS ident FROM _grist_Attachments ORDER BY ident`),
await activeDoc.docStorage.all(`SELECT ident FROM _gristsys_Files ORDER BY ident`),
);
await verifyAttachmentFiles(activeDoc);
}
res.json(null);
}));
this._app.post('/api/docs/:docId/attachments/verifyFiles', isOwner, withDoc(async (activeDoc, req, res) => {
await verifyAttachmentFiles(activeDoc);
res.json(null);
}));

async function verifyAttachmentFiles(activeDoc: ActiveDoc) {
assert.deepStrictEqual(
await activeDoc.docStorage.all(`SELECT DISTINCT fileIdent AS ident FROM _grist_Attachments ORDER BY ident`),
await activeDoc.docStorage.all(`SELECT ident FROM _gristsys_Files ORDER BY ident`),
);
}

// Adds records given in a column oriented format,
// returns an array of row IDs
Expand Down
50 changes: 24 additions & 26 deletions app/server/lib/DocStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import * as log from 'app/server/lib/log';
import * as assert from 'assert';
import * as bluebird from 'bluebird';
import * as fse from 'fs-extra';
import {RunResult} from 'sqlite3';
import * as _ from 'underscore';
import * as util from 'util';
import * as uuidv4 from "uuid/v4";
Expand Down Expand Up @@ -1037,7 +1038,7 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
* @param {String} rowId - Row ID.
* @returns {Promise} - A promise for the SQL execution.
*/
public _process_RemoveRecord(tableId: string, rowId: string): Promise<void> {
public _process_RemoveRecord(tableId: string, rowId: string): Promise<RunResult> {
const sql = "DELETE FROM " + quoteIdent(tableId) + " WHERE id=?";
debuglog("RemoveRecord SQL: " + sql, [rowId]);
return this.run(sql, [rowId]);
Expand All @@ -1060,8 +1061,8 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
* @param {Array[Integer]} rowIds - Array of row IDs to be deleted.
* @returns {Promise} - Promise for SQL execution.
*/
public _process_BulkRemoveRecord(tableId: string, rowIds: number[]): Promise<void> {
if (rowIds.length === 0) { return Promise.resolve(); }// If we have nothing to remove, done.
public async _process_BulkRemoveRecord(tableId: string, rowIds: number[]): Promise<void> {
if (rowIds.length === 0) { return; }// If we have nothing to remove, done.

const chunkSize = 10;
const preSql = "DELETE FROM " + quoteIdent(tableId) + " WHERE id IN (";
Expand All @@ -1071,12 +1072,10 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
const numChunks = Math.floor(rowIds.length / chunkSize);
const numLeftovers = rowIds.length % chunkSize;

let chunkPromise;

if (numChunks > 0) {
debuglog("DocStorage.BulkRemoveRecord: splitting " + rowIds.length +
" deletes into chunks of size " + chunkSize);
chunkPromise = this.prepare(preSql + chunkParams + postSql)
await this.prepare(preSql + chunkParams + postSql)
.then(function(stmt) {
return bluebird.Promise.each(_.range(0, numChunks * chunkSize, chunkSize), function(index: number) {
debuglog("DocStorage.BulkRemoveRecord: chunk delete " + index + "-" + (index + chunkSize - 1));
Expand All @@ -1086,18 +1085,14 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
return bluebird.Promise.fromCallback((cb: any) => stmt.finalize(cb));
});
});
} else {
chunkPromise = Promise.resolve();
}

return chunkPromise.then(() => {
if (numLeftovers > 0) {
debuglog("DocStorage.BulkRemoveRecord: leftover delete " + (numChunks * chunkSize) + "-" + (rowIds.length - 1));
const leftoverParams = _.range(numLeftovers).map(q).join(',');
return this.run(preSql + leftoverParams + postSql,
rowIds.slice(numChunks * chunkSize, rowIds.length));
}
});
if (numLeftovers > 0) {
debuglog("DocStorage.BulkRemoveRecord: leftover delete " + (numChunks * chunkSize) + "-" + (rowIds.length - 1));
const leftoverParams = _.range(numLeftovers).map(q).join(',');
await this.run(preSql + leftoverParams + postSql,
rowIds.slice(numChunks * chunkSize, rowIds.length));
}
}

/**
Expand Down Expand Up @@ -1333,7 +1328,7 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
* Delete attachments from _gristsys_Files that have no matching metadata row in _grist_Attachments.
*/
public async removeUnusedAttachments() {
await this.run(`
const result = await this._getDB().run(`
DELETE FROM _gristsys_Files
WHERE ident IN (
SELECT ident
Expand All @@ -1343,13 +1338,16 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
WHERE fileIdent IS NULL
)
`);
if (result.changes > 0) {
await this._markAsChanged(Promise.resolve());
}
}

public all(sql: string, ...args: any[]): Promise<ResultRow[]> {
return this._getDB().all(sql, ...args);
}

public run(sql: string, ...args: any[]): Promise<void> {
public run(sql: string, ...args: any[]): Promise<RunResult> {
return this._markAsChanged(this._getDB().run(sql, ...args));
}

Expand Down Expand Up @@ -1393,17 +1391,17 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
return typeof row !== 'undefined';
}

public setPluginDataItem(pluginId: string, key: string, value: string): Promise<void> {
return this.run('INSERT OR REPLACE into _gristsys_PluginData (pluginId, key, value) values (?, ?, ?)',
public async setPluginDataItem(pluginId: string, key: string, value: string): Promise<void> {
await this.run('INSERT OR REPLACE into _gristsys_PluginData (pluginId, key, value) values (?, ?, ?)',
pluginId, key, value);
}

public removePluginDataItem(pluginId: string, key: string): Promise<void> {
return this.run('DELETE from _gristsys_PluginData where pluginId = ? and key = ?', pluginId, key);
public async removePluginDataItem(pluginId: string, key: string): Promise<void> {
await this.run('DELETE from _gristsys_PluginData where pluginId = ? and key = ?', pluginId, key);
}

public clearPluginDataItem(pluginId: string): Promise<void> {
return this.run('DELETE from _gristsys_PluginData where pluginId = ?', pluginId);
public async clearPluginDataItem(pluginId: string): Promise<void> {
await this.run('DELETE from _gristsys_PluginData where pluginId = ?', pluginId);
}

/**
Expand Down Expand Up @@ -1486,9 +1484,9 @@ export class DocStorage implements ISQLiteDB, OnDemandStorage {
/**
* Internal helper for applying Bulk Update or Add Record sql
*/
private _applyMaybeBulkUpdateOrAddSql(sql: string, sqlParams: any[]): Promise<void> {
private async _applyMaybeBulkUpdateOrAddSql(sql: string, sqlParams: any[]): Promise<void> {
if (sqlParams.length === 1) {
return this.run(sql, sqlParams[0]);
await this.run(sql, sqlParams[0]);
} else {
return this.prepare(sql)
.then(function(stmt) {
Expand Down
8 changes: 5 additions & 3 deletions app/server/lib/ExternalStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export interface ExternalStorage {
head(key: string, snapshotId?: string): Promise<ObjSnapshotWithMetadata|null>;

// Upload content from file to the given key. Returns a snapshotId if store supports that.
upload(key: string, fname: string, metadata?: ObjMetadata): Promise<string|null>;
upload(key: string, fname: string, metadata?: ObjMetadata): Promise<string|null|typeof Unchanged>;

// Download content from key to given file. Can download a specific version of the key
// if store supports that (should throw a fatal exception if not).
Expand Down Expand Up @@ -162,11 +162,11 @@ export class ChecksummedExternalStorage implements ExternalStorage {
const snapshotId = await this._options.latestVersion.load(key);
log.info("ext %s upload: %s unchanged, not sending (checksum %s, version %s)", this.label, key,
checksum, snapshotId);
return snapshotId;
return Unchanged;
}
const snapshotId = await this._ext.upload(key, fname, metadata);
log.info("ext %s upload: %s checksum %s version %s", this.label, this._ext.url(key), checksum, snapshotId);
if (snapshotId) { await this._options.latestVersion.save(key, snapshotId); }
if (typeof snapshotId === "string") { await this._options.latestVersion.save(key, snapshotId); }
await this._options.localHash.save(key, checksum);
await this._options.sharedHash.save(key, checksum);
return snapshotId;
Expand Down Expand Up @@ -364,3 +364,5 @@ export interface PropStorage {
save(key: string, val: string): Promise<void>;
load(key: string): Promise<string|null>;
}

export const Unchanged = Symbol('Unchanged');
6 changes: 5 additions & 1 deletion app/server/lib/HostedStorageManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {HomeDBManager} from 'app/gen-server/lib/HomeDBManager';
import {checksumFile} from 'app/server/lib/checksumFile';
import {DocSnapshotInventory, DocSnapshotPruner} from 'app/server/lib/DocSnapshots';
import {IDocWorkerMap} from 'app/server/lib/DocWorkerMap';
import {ChecksummedExternalStorage, DELETED_TOKEN, ExternalStorage} from 'app/server/lib/ExternalStorage';
import {ChecksummedExternalStorage, DELETED_TOKEN, ExternalStorage, Unchanged} from 'app/server/lib/ExternalStorage';
import {HostedMetadataManager} from 'app/server/lib/HostedMetadataManager';
import {ICreate} from 'app/server/lib/ICreate';
import {IDocStorageManager} from 'app/server/lib/IDocStorageManager';
Expand Down Expand Up @@ -707,6 +707,10 @@ export class HostedStorageManager implements IDocStorageManager {
};
const prevSnapshotId = this._latestVersions.get(docId) || null;
const newSnapshotId = await this._ext.upload(docId, tmpPath, metadata);
if (newSnapshotId === Unchanged) {
// Nothing uploaded because nothing changed
return;
}
if (!newSnapshotId) {
// This is unexpected.
throw new Error('No snapshotId allocated after upload');
Expand Down
47 changes: 29 additions & 18 deletions app/server/lib/SQLiteDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import * as sqlite3 from '@gristlabs/sqlite3';
import * as assert from 'assert';
import {each} from 'bluebird';
import * as fse from 'fs-extra';
import {RunResult} from 'sqlite3';
import fromPairs = require('lodash/fromPairs');
import isEqual = require('lodash/isEqual');
import noop = require('lodash/noop');
Expand Down Expand Up @@ -132,7 +133,7 @@ export interface MigrationHooks {
*/
export interface ISQLiteDB {
exec(sql: string): Promise<void>;
run(sql: string, ...params: any[]): Promise<void>;
run(sql: string, ...params: any[]): Promise<RunResult>;
get(sql: string, ...params: any[]): Promise<ResultRow|undefined>;
all(sql: string, ...params: any[]): Promise<ResultRow[]>;
prepare(sql: string, ...params: any[]): Promise<sqlite3.Statement>;
Expand Down Expand Up @@ -288,8 +289,17 @@ export class SQLiteDB implements ISQLiteDB {
return fromCallback(cb => this._db.exec(sql, cb));
}

public run(sql: string, ...params: any[]): Promise<void> {
return fromCallback(cb => this._db.run(sql, ...params, cb));
public run(sql: string, ...params: any[]): Promise<RunResult> {
return new Promise((resolve, reject) => {
function callback(this: RunResult, err: Error | null) {
if (err) {
reject(err);
} else {
resolve(this);
}
}
this._db.run(sql, ...params, callback);
});
}

public get(sql: string, ...params: any[]): Promise<ResultRow|undefined> {
Expand Down Expand Up @@ -339,11 +349,19 @@ export class SQLiteDB implements ISQLiteDB {
* to db.run, e.g. [sqlString, [params...]].
*/
public runEach(...statements: Array<string | [string, any[]]>): Promise<void> {
return each(statements, (stmt: any) => {
return (Array.isArray(stmt) ? this.run(stmt[0], ...stmt[1]) :
this.exec(stmt))
.catch(err => { log.warn(`SQLiteDB: Failed to run ${stmt}`); throw err; });
});
return each(statements,
async (stmt: any) => {
try {
return await (Array.isArray(stmt) ?
this.run(stmt[0], ...stmt[1]) :
this.exec(stmt)
);
} catch (err) {
log.warn(`SQLiteDB: Failed to run ${stmt}`);
throw err;
}
}
);
}

public close(): Promise<void> {
Expand All @@ -356,16 +374,9 @@ export class SQLiteDB implements ISQLiteDB {
* is sqlite's rowid for the last insert made on this database connection. This method
* is only useful if the sql is actually an INSERT operation, but we don't check this.
*/
public runAndGetId(sql: string, ...params: any[]): Promise<number> {
return new Promise<number>((resolve, reject) => {
this._db.run(sql, ...params, function(this: any, err: any) {
if (err) {
reject(err);
} else {
resolve(this.lastID);
}
});
});
public async runAndGetId(sql: string, ...params: any[]): Promise<number> {
const result = await this.run(sql, ...params);
return result.lastID;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion app/server/lib/Triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class DocTriggers {
public shutdown() {
this._shuttingDown = true;
if (!this._sending) {
this._redisClient?.quitAsync();
this._redisClientField?.quitAsync();
}
}

Expand Down
12 changes: 6 additions & 6 deletions sandbox/grist/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,12 @@ def migration17(tdset):
actions.ModifyColumn(tables_map[c.parentId].tableId, c.colId, {'type': 'Attachments'})
for c in affected_cols
)
# Update the types in the metadata tables
doc_actions.append(actions.BulkUpdateRecord(
'_grist_Tables_column',
[c.id for c in affected_cols],
{'type': ['Attachments' for c in affected_cols]}
))
# Update the values to lists
for c in affected_cols:
if c.isFormula:
Expand All @@ -710,12 +716,6 @@ def migration17(tdset):
actions.BulkUpdateRecord(table_id, table.row_ids,
{c.colId: [conv(val) for val in table.columns[c.colId]]})
)
# Update the types in the metadata tables
doc_actions.append(actions.BulkUpdateRecord(
'_grist_Tables_column',
[c.id for c in affected_cols],
{'type': ['Attachments' for c in affected_cols]}
))

return tdset.apply_doc_actions(doc_actions)

Expand Down
7 changes: 5 additions & 2 deletions test/server/lib/DocApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1707,17 +1707,20 @@ function testDocApi() {
assert.equal(resp.status, 200);
await checkAttachmentIds([1, 2, 3]);

// Remove the expired attachment (1)
// Remove the expired attachment (1) by force-reloading, so it removes it during shutdown.
// It has a duplicate (3) that hasn't expired and thus isn't removed,
// although they share the same fileIdent and row in _gristsys_Files.
// So for now only the metadata is removed.
resp = await axios.post(`${docUrl}/attachments/removeUnused?verifyfiles=1&expiredonly=1`, null, chimpy);
resp = await axios.post(`${docUrl}/force-reload`, null, chimpy);
assert.equal(resp.status, 200);
await checkAttachmentIds([2, 3]);
resp = await axios.post(`${docUrl}/attachments/verifyFiles`, null, chimpy);
assert.equal(resp.status, 200);

// Remove the not expired attachments (2 and 3).
// We didn't set a timeDeleted for 3, but it gets set automatically by updateUsedAttachments.
resp = await axios.post(`${docUrl}/attachments/removeUnused?verifyfiles=1`, null, chimpy);
assert.equal(resp.status, 200);
await checkAttachmentIds([]);
});

Expand Down

0 comments on commit a701b4b

Please sign in to comment.