diff --git a/packages/fileimport-service/ifc/api.js b/packages/fileimport-service/ifc/api.js index e902823612..f75c13faac 100644 --- a/packages/fileimport-service/ifc/api.js +++ b/packages/fileimport-service/ifc/api.js @@ -5,17 +5,21 @@ const bcrypt = require('bcrypt') const { chunk } = require('lodash') const { logger: parentLogger } = require('../observability/logging') -const knex = require('../knex') const Observability = require('@speckle/shared/dist/commonjs/observability/index.js') -const Streams = () => knex('streams') -const Branches = () => knex('branches') -const Objects = () => knex('objects') -const Closures = () => knex('object_children_closure') -const ApiTokens = () => knex('api_tokens') -const TokenScopes = () => knex('token_scopes') + +const tables = (db) => ({ + objects: db('objects'), + closures: db('object_children_closure'), + branches: db('branches'), + streams: db('streams'), + apiTokens: db('api_tokens'), + tokenScopes: db('token_scopes') +}) module.exports = class ServerAPI { - constructor({ streamId, logger }) { + constructor({ db, streamId, logger }) { + this.tables = tables(db) + this.db = db this.streamId = streamId this.isSending = false this.buffer = [] @@ -68,10 +72,10 @@ module.exports = class ServerAPI { totalChildrenCountByDepth ) - await Objects().insert(insertionObject).onConflict().ignore() + await this.tables.objects.insert(insertionObject).onConflict().ignore() if (closures.length > 0) { - await Closures().insert(closures).onConflict().ignore() + await this.tables.closures.insert(closures).onConflict().ignore() } return insertionObject.id @@ -123,7 +127,7 @@ module.exports = class ServerAPI { const batches = chunk(objsToInsert, objectsBatchSize) for (const [index, batch] of batches.entries()) { this.prepInsertionObjectBatch(batch) - await Objects().insert(batch).onConflict().ignore() + await this.tables.objects.insert(batch).onConflict().ignore() this.logger.info( { currentBatchCount: batch.length, @@ -141,7 +145,7 @@ module.exports = class ServerAPI { for (const [index, batch] of batches.entries()) { this.prepInsertionClosureBatch(batch) - await Closures().insert(batch).onConflict().ignore() + await this.tables.closures.insert(batch).onConflict().ignore() this.logger.info( { currentBatchCount: batch.length, @@ -196,10 +200,10 @@ module.exports = class ServerAPI { } async getBranchByNameAndStreamId({ streamId, name }) { - const query = Branches() + const query = this.tables.branches .select('*') .where({ streamId }) - .andWhere(knex.raw('LOWER(name) = ?', [name])) + .andWhere(this.db.raw('LOWER(name) = ?', [name])) .first() return await query } @@ -212,10 +216,12 @@ module.exports = class ServerAPI { branch.name = name.toLowerCase() branch.description = description - await Branches().returning('id').insert(branch) + await this.tables.branches.returning('id').insert(branch) // update stream updated at - await Streams().where({ id: streamId }).update({ updatedAt: knex.fn.now() }) + await this.tables.streams + .where({ id: streamId }) + .update({ updatedAt: this.db.fn.now() }) return branch.id } @@ -244,14 +250,14 @@ module.exports = class ServerAPI { } const tokenScopes = scopes.map((scope) => ({ tokenId, scopeName: scope })) - await ApiTokens().insert(token) - await TokenScopes().insert(tokenScopes) + await this.tables.apiTokens.insert(token) + await this.tables.tokenScopes.insert(tokenScopes) return { id: tokenId, token: tokenId + tokenString } } async revokeTokenById(tokenId) { - const delCount = await ApiTokens() + const delCount = await this.tables.apiTokens .where({ id: tokenId.slice(0, 10) }) .del() diff --git a/packages/fileimport-service/ifc/index.js b/packages/fileimport-service/ifc/index.js index 81ec4c1a57..c8f763c7f7 100644 --- a/packages/fileimport-service/ifc/index.js +++ b/packages/fileimport-service/ifc/index.js @@ -4,6 +4,7 @@ const Parser = require('./parser_v2') const ServerAPI = require('./api.js') const Observability = require('@speckle/shared/dist/commonjs/observability/index.js') const { logger: parentLogger } = require('../observability/logging') +const knex = require('../knex') async function parseAndCreateCommit({ data, @@ -20,7 +21,7 @@ async function parseAndCreateCommit({ 'ifc' ) } - const serverApi = new ServerAPI({ streamId, logger }) + const serverApi = new ServerAPI({ db: knex, streamId, logger }) const myParser = new Parser({ serverApi, fileId, logger }) const start = performance.now() diff --git a/packages/fileimport-service/src/daemon.js b/packages/fileimport-service/src/daemon.js index 54d8ea9c64..e5f4d1cce5 100644 --- a/packages/fileimport-service/src/daemon.js +++ b/packages/fileimport-service/src/daemon.js @@ -34,7 +34,7 @@ if (providedTimeLimit) TIME_LIMIT = providedTimeLimit * 60 * 1000 async function startTask() { const { rows } = await knex.raw(` UPDATE file_uploads - SET + SET "convertedStatus" = 1, "convertedLastUpdate" = NOW() FROM ( @@ -85,7 +85,7 @@ async function doTask(task) { }) fs.mkdirSync(TMP_INPUT_DIR, { recursive: true }) - serverApi = new ServerAPI({ streamId: info.streamId, logger: taskLogger }) + serverApi = new ServerAPI({ db: knex, streamId: info.streamId, logger: taskLogger }) branchMetadata = { branchName: info.branchName,