Skip to content

Commit

Permalink
fix(MySQL): export crash with large databases
Browse files Browse the repository at this point in the history
  • Loading branch information
toriphes committed Nov 4, 2021
1 parent 409ed54 commit 8cf738b
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 65 deletions.
127 changes: 70 additions & 57 deletions src/main/libs/exporters/sql/MysqlExporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,19 @@ ${footer}
return `DROP TABLE IF EXISTS \`${tableName}\`;`;
}

async getTableInsert (tableName) {
async * getTableInsert (tableName) {
let rowCount = 0;
let sqlStr = '';
const pageSize = 1000;

const countResults = await this._client.raw(
`SELECT COUNT(1) as count FROM \`${this.schemaName}\`.\`${tableName}\``
);
if (countResults.rows.length === 1) rowCount = countResults.rows[0].count;

if (rowCount > 0) {
const totalPages = Math.ceil(rowCount / pageSize);
let queryLength = 0;
let rowsWritten = 0;
let rowIndex = 0;
const { sqlInsertDivider, sqlInsertAfter } = this._options;
const columns = await this._client.getTableColumns({
table: tableName,
Expand All @@ -76,69 +75,70 @@ ${footer}
sqlStr += `LOCK TABLES \`${tableName}\` WRITE;\n`;
sqlStr += `/*!40000 ALTER TABLE \`${tableName}\` DISABLE KEYS */;`;
sqlStr += '\n\n';
yield sqlStr;

for (let pageNumber = 0; pageNumber < totalPages; pageNumber++) {
const tableResult = await this._client.raw(
`SELECT ${columnNames.join(', ')} FROM \`${
this.schemaName
}\`.\`${tableName}\`
LIMIT ${pageSize} OFFSET ${pageSize * pageNumber}`
);

sqlStr += insertStmt;

for (const rowIndex in tableResult.rows) {
const row = tableResult.rows[rowIndex];
let sqlInsertString = '';

if (
(sqlInsertDivider === 'bytes' &&
queryLength >= sqlInsertAfter * 1024) ||
(sqlInsertDivider === 'rows' && rowsWritten === sqlInsertAfter)
) {
sqlInsertString += `;\n${insertStmt}\n\t(`;

queryLength = 0;
rowsWritten = 0;
}
else if (parseInt(rowIndex) === 0) sqlInsertString += '\n\t(';
else sqlInsertString += ',\n\t(';

for (const i in columns) {
const column = columns[i];
const val = row[column.name];

if (val === null) sqlInsertString += 'NULL';
else if (BIT.includes(column.type)) {
sqlInsertString += `b'${hexToBinary(
Buffer.from(val).toString('hex')
)}'`;
}
else if (BLOB.includes(column.type))
sqlInsertString += `X'${val.toString('hex').toUpperCase()}'`;
else if (val === '') sqlInsertString += '\'\'';
else {
sqlInsertString +=
typeof val === 'string' ? this.escapeAndQuote(val) : val;
}

if (parseInt(i) !== columns.length - 1) sqlInsertString += ', ';
}
yield insertStmt;

const stream = await this._queryStream(
`SELECT ${columnNames.join(', ')} FROM \`${this.schemaName}\`.\`${tableName}\``
);

for await (const row of stream) {
if (this.isCancelled) {
stream.destroy();
yield null;
return;
}

sqlInsertString += ')';
sqlStr += sqlInsertString;
let sqlInsertString = '';

queryLength += sqlInsertString.length;
rowsWritten++;
if (
(sqlInsertDivider === 'bytes' && queryLength >= sqlInsertAfter * 1024) ||
(sqlInsertDivider === 'rows' && rowsWritten === sqlInsertAfter)
) {
sqlInsertString += `;\n${insertStmt}\n\t(`;
queryLength = 0;
rowsWritten = 0;
}
sqlStr += ';\n\n';
else if (parseInt(rowIndex) === 0) sqlInsertString += '\n\t(';
else sqlInsertString += ',\n\t(';

for (const i in columns) {
const column = columns[i];
const val = row[column.name];

if (val === null) sqlInsertString += 'NULL';
else if (BIT.includes(column.type))
sqlInsertString += `b'${hexToBinary(Buffer.from(val).toString('hex'))}'`;

else if (BLOB.includes(column.type))
sqlInsertString += `X'${val.toString('hex').toUpperCase()}'`;

else if (val === '') sqlInsertString += '\'\'';
else {
sqlInsertString += typeof val === 'string'
? this.escapeAndQuote(val)
: val;
}

if (parseInt(i) !== columns.length - 1)
sqlInsertString += ', ';
}

sqlInsertString += ')';

queryLength += sqlInsertString.length;
rowsWritten++;
rowIndex++;
yield sqlInsertString;
}

sqlStr = ';\n\n';
sqlStr += `/*!40000 ALTER TABLE \`${tableName}\` ENABLE KEYS */;\n`;
sqlStr += 'UNLOCK TABLES;';
}

return sqlStr;
yield sqlStr;
}
}

async getViews () {
Expand Down Expand Up @@ -301,6 +301,19 @@ ${footer}
return sqlString;
}

async _queryStream (sql) {
console.log(sql);
const isPool = typeof this._client._connection.getConnection === 'function';
const connection = isPool ? await this._client._connection.getConnection() : this._client._connection;
const stream = connection.connection.query(sql).stream();
const dispose = () => connection.destroy();

stream.on('end', dispose);
stream.on('error', dispose);
stream.on('close', dispose);
return stream;
}

getEscapedDefiner (definer) {
return definer
.split('@')
Expand Down
15 changes: 7 additions & 8 deletions src/main/libs/exporters/sql/SqlExporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class SqlExporter extends BaseExporter {

exportState.currentItemIndex++;
exportState.currentItem = item.table;
exportState.op = 'PROCESSING';
exportState.op = 'FETCH';

this.emitUpdate(exportState);

Expand All @@ -78,22 +78,21 @@ export class SqlExporter extends BaseExporter {
}

if (item.includeContent) {
exportState.op = 'FETCH';
this.emitUpdate(exportState);
const tableInsertSyntax = await this.getTableInsert(item.table);

exportState.op = 'WRITE';
this.emitUpdate(exportState);
this.writeString(tableInsertSyntax);
for await (const sqlStr of this.getTableInsert(item.table)) {
if (this.isCancelled) return;
this.writeString(sqlStr);
}

this.writeString('\n\n');
}

this.writeString('\n\n');
}

for (const item of extraItems) {
const processingMethod = `get${item.charAt(0).toUpperCase() +
item.slice(1)}`;
const processingMethod = `get${item.charAt(0).toUpperCase() + item.slice(1)}`;
exportState.currentItemIndex++;
exportState.currentItem = item;
exportState.op = 'PROCESSING';
Expand Down

0 comments on commit 8cf738b

Please sign in to comment.