Skip to content

Commit

Permalink
Merge pull request #3387 from specklesystems/alessandro/fileimport-se…
Browse files Browse the repository at this point in the history
…rvice-ioc-1-server-api

Fileimport IoC 1 ServerAPI refactor multiregion
  • Loading branch information
alemagio authored Oct 24, 2024
2 parents 60795e9 + 5f7d16a commit 30fc2a5
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
44 changes: 25 additions & 19 deletions packages/fileimport-service/ifc/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@ const bcrypt = require('bcrypt')
const { chunk } = require('lodash')
const { logger: parentLogger } = require('../observability/logging')

const knex = require('../knex')
const Observability = require('@speckle/shared/dist/commonjs/observability/index.js')
const Streams = () => knex('streams')
const Branches = () => knex('branches')
const Objects = () => knex('objects')
const Closures = () => knex('object_children_closure')
const ApiTokens = () => knex('api_tokens')
const TokenScopes = () => knex('token_scopes')

const tables = (db) => ({
objects: db('objects'),
closures: db('object_children_closure'),
branches: db('branches'),
streams: db('streams'),
apiTokens: db('api_tokens'),
tokenScopes: db('token_scopes')
})

module.exports = class ServerAPI {
constructor({ streamId, logger }) {
constructor({ db, streamId, logger }) {
this.tables = tables(db)
this.db = db
this.streamId = streamId
this.isSending = false
this.buffer = []
Expand Down Expand Up @@ -68,10 +72,10 @@ module.exports = class ServerAPI {
totalChildrenCountByDepth
)

await Objects().insert(insertionObject).onConflict().ignore()
await this.tables.objects.insert(insertionObject).onConflict().ignore()

if (closures.length > 0) {
await Closures().insert(closures).onConflict().ignore()
await this.tables.closures.insert(closures).onConflict().ignore()
}

return insertionObject.id
Expand Down Expand Up @@ -123,7 +127,7 @@ module.exports = class ServerAPI {
const batches = chunk(objsToInsert, objectsBatchSize)
for (const [index, batch] of batches.entries()) {
this.prepInsertionObjectBatch(batch)
await Objects().insert(batch).onConflict().ignore()
await this.tables.objects.insert(batch).onConflict().ignore()
this.logger.info(
{
currentBatchCount: batch.length,
Expand All @@ -141,7 +145,7 @@ module.exports = class ServerAPI {

for (const [index, batch] of batches.entries()) {
this.prepInsertionClosureBatch(batch)
await Closures().insert(batch).onConflict().ignore()
await this.tables.closures.insert(batch).onConflict().ignore()
this.logger.info(
{
currentBatchCount: batch.length,
Expand Down Expand Up @@ -196,10 +200,10 @@ module.exports = class ServerAPI {
}

async getBranchByNameAndStreamId({ streamId, name }) {
const query = Branches()
const query = this.tables.branches
.select('*')
.where({ streamId })
.andWhere(knex.raw('LOWER(name) = ?', [name]))
.andWhere(this.db.raw('LOWER(name) = ?', [name]))
.first()
return await query
}
Expand All @@ -212,10 +216,12 @@ module.exports = class ServerAPI {
branch.name = name.toLowerCase()
branch.description = description

await Branches().returning('id').insert(branch)
await this.tables.branches.returning('id').insert(branch)

// update stream updated at
await Streams().where({ id: streamId }).update({ updatedAt: knex.fn.now() })
await this.tables.streams
.where({ id: streamId })
.update({ updatedAt: this.db.fn.now() })

return branch.id
}
Expand Down Expand Up @@ -244,14 +250,14 @@ module.exports = class ServerAPI {
}
const tokenScopes = scopes.map((scope) => ({ tokenId, scopeName: scope }))

await ApiTokens().insert(token)
await TokenScopes().insert(tokenScopes)
await this.tables.apiTokens.insert(token)
await this.tables.tokenScopes.insert(tokenScopes)

return { id: tokenId, token: tokenId + tokenString }
}

async revokeTokenById(tokenId) {
const delCount = await ApiTokens()
const delCount = await this.tables.apiTokens
.where({ id: tokenId.slice(0, 10) })
.del()

Expand Down
3 changes: 2 additions & 1 deletion packages/fileimport-service/ifc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const Parser = require('./parser_v2')
const ServerAPI = require('./api.js')
const Observability = require('@speckle/shared/dist/commonjs/observability/index.js')
const { logger: parentLogger } = require('../observability/logging')
const knex = require('../knex')

async function parseAndCreateCommit({
data,
Expand All @@ -20,7 +21,7 @@ async function parseAndCreateCommit({
'ifc'
)
}
const serverApi = new ServerAPI({ streamId, logger })
const serverApi = new ServerAPI({ db: knex, streamId, logger })
const myParser = new Parser({ serverApi, fileId, logger })

const start = performance.now()
Expand Down
4 changes: 2 additions & 2 deletions packages/fileimport-service/src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ if (providedTimeLimit) TIME_LIMIT = providedTimeLimit * 60 * 1000
async function startTask() {
const { rows } = await knex.raw(`
UPDATE file_uploads
SET
SET
"convertedStatus" = 1,
"convertedLastUpdate" = NOW()
FROM (
Expand Down Expand Up @@ -85,7 +85,7 @@ async function doTask(task) {
})
fs.mkdirSync(TMP_INPUT_DIR, { recursive: true })

serverApi = new ServerAPI({ streamId: info.streamId, logger: taskLogger })
serverApi = new ServerAPI({ db: knex, streamId: info.streamId, logger: taskLogger })

branchMetadata = {
branchName: info.branchName,
Expand Down

0 comments on commit 30fc2a5

Please sign in to comment.