Skip to content

Commit

Permalink
refactor: use query stream for automatic data update script (#224)
Browse files Browse the repository at this point in the history
* feat: add OVERRIDE_LH_PROJECT variable to override observed domains project

* feat: run bigquery in a stream to handle large results

* fix: can't increment a constant

* feat: use pipes to handle streams

* fix: add table schema

* fix: remove cache for queries

* refactor: rename project regex

Co-authored-by: Patrick Hulce <[email protected]>

* Update bin/automated-update.js

---------

Co-authored-by: Guillaume NICOLAS <[email protected]>
Co-authored-by: Patrick Hulce <[email protected]>
  • Loading branch information
3 people authored Aug 7, 2024
1 parent 5174773 commit 82f1900
Showing 1 changed file with 97 additions and 46 deletions.
143 changes: 97 additions & 46 deletions bin/automated-update.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
const fs = require('fs')
const prompts = require('prompts')
const childProcess = require('child_process')
const {getEntity} = require('../lib/')

const util = require('util')
const {Transform, finished} = require('stream')
const {BigQuery} = require('@google-cloud/bigquery')

const {getEntity} = require('../lib/')

const HA_REQUESTS_TABLE_REGEX = /`httparchive\.requests\.\w+`/g
const HA_LH_TABLE_REGEX = /`httparchive\.lighthouse\.\w+`/g
const LH_3P_TABLE_REGEX = /`lighthouse-infrastructure\.third_party_web\.\w+`/g
const DATE_UNDERSCORE_REGEX = /\d{4}_\d{2}_\d{2}/g
const LH_PROJECT_REGEX = /lighthouse-infrastructure/g

const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA
? [
Expand All @@ -20,6 +23,7 @@ const TABLE_REPLACEMENTS = process.env.USE_SAMPLE_DATA
[process.env.OVERRIDE_HA_LH_TABLE, HA_LH_TABLE_REGEX],
[process.env.OVERRIDE_HA_REQUESTS_TABLE, HA_REQUESTS_TABLE_REGEX],
[process.env.OVERRIDE_LH_3P_TABLE, LH_3P_TABLE_REGEX],
[process.env.OVERRIDE_LH_PROJECT, LH_PROJECT_REGEX],
].filter(([override]) => override)

function getQueryForTable(filename, dateUnderscore) {
Expand Down Expand Up @@ -76,6 +80,40 @@ async function getTargetDatasetDate() {
return {dateStringUnderscore, dateStringHypens}
}

const getQueryResultStream = async query => {
const [job] = await new BigQuery().createQueryJob({
query,
location: 'US',
useQueryCache: false,
})
return job.getQueryResultsStream()
}
const resolveOnFinished = streams => {
const toFinishedPromise = util.promisify(finished)
return Promise.all(streams.map(s => toFinishedPromise(s)))
}
const getJSONStringTransformer = rowCounter => {
return new Transform({
objectMode: true,
transform(row, _, callback) {
const prefix = rowCounter === undefined ? '' : !rowCounter++ ? '[\n' : ',\n'
callback(null, prefix + JSON.stringify(row))
},
})
}
const EntityCanonicalDomainTransformer = new Transform({
objectMode: true,
transform(row, _, callback) {
const entity = getEntity(row.domain)
const thirdPartyWebRow = {
domain: row.domain,
canonicalDomain: entity && entity.domains[0],
category: (entity && entity.categories[0]) || 'unknown',
}
callback(null, thirdPartyWebRow)
},
})

async function main() {
const {dateStringUnderscore, dateStringHypens} = await getTargetDatasetDate()

Expand All @@ -99,44 +137,49 @@ async function main() {
await withExistenceCheck(observedDomainsFilename, {
checkExistenceFn: () => fs.existsSync(observedDomainsFilename),
actionFn: async () => {
const bqClient = new BigQuery()

const queryOptions = {
query: allObservedDomainsQuery,
location: 'US',
}

const [job] = await bqClient.createQueryJob(queryOptions)
console.log(`Job ${job.id} started.`)

// Wait for the query to finish
const [rows] = await job.getQueryResults()
console.log(`Start observed domains query`)

console.log('Wrote', rows.length, 'rows to', observedDomainsFilename)
fs.writeFileSync(observedDomainsFilename, JSON.stringify(rows))
const start = Date.now()

const rowsForNewTable = rows.map(row => {
const entity = getEntity(row.domain)
const resultsStream = await getQueryResultStream(allObservedDomainsQuery)

return {
domain: row.domain,
canonicalDomain: entity && entity.domains[0],
category: (entity && entity.categories[0]) || 'unknown',
}
})
// Observed domain json file pipe
let observedDomainsNbRows = 0
const observedDomainsFileWriterStream = fs.createWriteStream(observedDomainsFilename)
resultsStream
// stringify observed domain json (with json array prefix based on row index)
.pipe(getJSONStringTransformer(observedDomainsNbRows))
// write to observed-domains json file
.pipe(observedDomainsFileWriterStream)

const schema = [
{name: 'domain', type: 'STRING'},
{name: 'canonicalDomain', type: 'STRING'},
{name: 'category', type: 'STRING'},
]

console.log('Creating', dateStringUnderscore, 'table. This may take a while...')
await bqClient
// Observed domain entity mapping table pipe
const thirdPartyWebTableWriterStream = new BigQuery()
.dataset('third_party_web')
.table(dateStringUnderscore)
.insert(rowsForNewTable, {schema, location: 'US'})
console.log('Inserted', rowsForNewTable.length, 'rows')
.createWriteStream({
schema: [
{name: 'domain', type: 'STRING'},
{name: 'canonicalDomain', type: 'STRING'},
{name: 'category', type: 'STRING'},
],
})
resultsStream
// map observed domain to entity
.pipe(EntityCanonicalDomainTransformer)
// stringify json
.pipe(getJSONStringTransformer())
// write to thrid_party_web table
.pipe(thirdPartyWebTableWriterStream)

// Wait both streams to finish
await resolveOnFinished([observedDomainsFileWriterStream, thirdPartyWebTableWriterStream])

// Close observed domains json array in file
fs.appendFileSync(observedDomainsFilename, '\n]')

console.log(
`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${observedDomainsNbRows} rows.`
)
},
deleteFn: async () => {
const bqClient = new BigQuery()
Expand All @@ -153,22 +196,30 @@ async function main() {
await withExistenceCheck(entityScriptingFilename, {
checkExistenceFn: () => fs.existsSync(entityScriptingFilename),
actionFn: async () => {
const bqClient = new BigQuery()
console.log(`Start entity scripting query`)

const start = Date.now()

const resultsStream = await getQueryResultStream(entityPerPageQuery)

const queryOptions = {
query: entityPerPageQuery,
location: 'US',
}
// Entity scripting json file pipe
let entityScriptingNbRows = 0
const entityScriptingFileWriterStream = fs.createWriteStream(entityScriptingFilename)
resultsStream
// stringify entity scripting json (with json array prefix based on row index)
.pipe(getJSONStringTransformer(entityScriptingNbRows))
// write to entity-scripting json file
.pipe(entityScriptingFileWriterStream)

console.log('Querying execution per entity...')
const [job] = await bqClient.createQueryJob(queryOptions)
console.log(`Job ${job.id} started.`)
// Wait stream to finish
await resolveOnFinished([entityScriptingFileWriterStream])

// Wait for the query to finish
const [rows] = await job.getQueryResults()
console.log(
`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${entityScriptingNbRows} rows.`
)

console.log('Wrote', rows.length, 'rows to', entityScriptingFilename)
fs.writeFileSync(entityScriptingFilename, JSON.stringify(rows, null, 2))
// Close observed domains json array in file
fs.appendFileSync(entityScriptingFilename, ']')
},
deleteFn: () => {},
exitFn: () => {},
Expand Down

0 comments on commit 82f1900

Please sign in to comment.