Skip to content

Commit

Permalink
feat: (observability): trace Database.runTransactionAsync
Browse files Browse the repository at this point in the history
Extracted out of PR googleapis#2158, this change traces
Database.runTransactionAsync. However, testing isn't effective
because of bugs such as googleapis#2166.
  • Loading branch information
odeke-em committed Oct 21, 2024
1 parent 2a19ef1 commit ed8cb34
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 38 deletions.
134 changes: 134 additions & 0 deletions observability-test/benchmark.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*!
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

const lessComparator = (a, b) => {
if (a < b) return -1;
if (a > b) return 1;
return 0;
};

/*
* runBenchmarks runs each of the functions in runners ${nRuns} times
* each time collecting RAM usage and time spent and then produces
* a map of functionNames to the percentiles of RAM usage and time spent.
*/
export async function runBenchmarks(runners: Function[], done: Function) {
const nRuns = 1000;
const benchmarkValues = {};

let k = 0;
for (k = 0; k < runners.length; k++) {
const fn = runners[k];
const functionName = fn.name;
const timeSpentL: bigint[] = [];
const ramL: number[] = [];
let i = 0;
for (i = 0; i < nRuns; i++) {
const startTime: bigint = process.hrtime.bigint();
const startHeapUsedBytes: number = process.memoryUsage().heapUsed;
const _ = await fn();
timeSpentL.push(process.hrtime.bigint() - startTime);
ramL.push(process.memoryUsage().heapUsed - startHeapUsedBytes);
}

timeSpentL.sort(lessComparator);
ramL.sort(lessComparator);

benchmarkValues[functionName] = {
ram: percentiles(functionName, ramL, 'bytes'),
timeSpent: percentiles(functionName, timeSpentL, 'time'),
};
}

done(benchmarkValues);
}

function percentiles(method, sortedValues, kind) {
const n = sortedValues.length;
const p50 = sortedValues[Math.floor(n * 0.5)];
const p75 = sortedValues[Math.floor(n * 0.75)];
const p90 = sortedValues[Math.floor(n * 0.9)];
const p95 = sortedValues[Math.floor(n * 0.95)];
const p99 = sortedValues[Math.floor(n * 0.99)];

return {
p50: p50,
p75: p75,
p90: p90,
p95: p95,
p99: p99,
p50_s: humanize(p50, kind),
p75_s: humanize(p75, kind),
p90_s: humanize(p90, kind),
p95_s: humanize(p95, kind),
p99_s: humanize(p99, kind),
};
}

function humanize(values, kind) {
let converterFn = humanizeTime;
if (kind === 'bytes') {
converterFn = humanizeBytes;
}
return converterFn(values);
}

const secondUnits = ['ns', 'us', 'ms', 's'];
interface unitDivisor {
unit: string;
divisor: number;
}
const pastSecondUnits: unitDivisor[] = [
{unit: 'min', divisor: 60},
{unit: 'hr', divisor: 60},
{unit: 'day', divisor: 24},
{unit: 'week', divisor: 7},
{unit: 'month', divisor: 30},
];
function humanizeTime(ns) {
let value = Number(ns);
for (const unit of secondUnits) {
if (value < 1000) {
return `${value} ${unit}`;
}
value /= 1000;
}

let i = 0;
for (i = 0; i < pastSecondUnits.length; i++) {
const unitPlusValue = pastSecondUnits[i];
const unitName = unitPlusValue.unit;
const divisor = unitPlusValue.divisor;
if (value < divisor) {
return `${value} ${unitName}`;
}
value = value / divisor;
}
return `${value} ${pastSecondUnits[pastSecondUnits.length - 1][0]}`;
}

const bytesUnits = ['B', 'kB', 'MB', 'GB', 'TB', 'PB', 'ExB'];
function humanizeBytes(b) {
let value = b;
for (const unit of bytesUnits) {
if (value < 1024) {
return `${value.toFixed(3)} ${unit}`;
}
value = value / 1024;
}

return `${value.toFixed(3)} ${bytesUnits[bytesUnits.length - 1]}`;
}
75 changes: 75 additions & 0 deletions observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,81 @@ describe('Database', () => {
});
});

describe('runTransactionAsync', () => {
const SESSION = new FakeSession();
const TRANSACTION = new FakeTransaction(
{} as google.spanner.v1.TransactionOptions.ReadWrite
);

let pool: FakeSessionPool;

beforeEach(() => {
pool = database.pool_;

(sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake(
callback => {
callback(null, SESSION, TRANSACTION);
}
);
});

it('with error getting session', async () => {
const fakeErr = new Error('getting a session');

(pool.getSession as sinon.SinonStub).callsFake(callback =>
callback(fakeErr)
);

await database.runTransactionAsync(async transaction => {
await transaction.run('SELECT 1');
});

traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Database.runTransactionAsync',
'CloudSpanner.Transaction.run',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

// Ensure that the span actually produced an error that was recorded.
const firstSpan = spans[0];
assert.strictEqual(
SpanStatusCode.ERROR,
firstSpan.status.code,
'Expected an ERROR span status'
);
assert.strictEqual(
'getting a session',
firstSpan.status.message,
'Mismatched span status message'
);

// We don't expect events.
const expectedEventNames = [];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);
});
});

describe('runStream', () => {
const QUERY = {
sql: 'SELECT * FROM table',
Expand Down
46 changes: 46 additions & 0 deletions observability-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,52 @@ describe('EndToEnd', () => {
});
});

it('runTransactionAsync', async () => {
const withAllSpansHaveDBName = generateWithAllSpansHaveDBName(
database.formattedName_
);
await database.runTransactionAsync(async transaction => {
const [rows] = await transaction!.run('SELECT 1');
});

traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
withAllSpansHaveDBName(spans);

const actualEventNames: string[] = [];
const actualSpanNames: string[] = [];
spans.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Snapshot.runStream',
'CloudSpanner.Snapshot.run',
'CloudSpanner.Database.runTransactionAsync',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

const expectedEventNames = [
'Transaction Creation Done',
'Acquiring session',
'Cache hit: has usable session',
'Acquired session',
'Using Session',
];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);
});

it('writeAtLeastOnce', done => {
const withAllSpansHaveDBName = generateWithAllSpansHaveDBName(
database.formattedName_
Expand Down
84 changes: 46 additions & 38 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3363,46 +3363,54 @@ class Database extends common.GrpcServiceObject {

let sessionId = '';
const getSession = this.pool_.getSession.bind(this.pool_);
const span = getActiveOrNoopSpan();
// Loop to retry 'Session not found' errors.
// (and yes, we like while (true) more than for (;;) here)
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const [session, transaction] = await promisify(getSession)();
transaction.requestOptions = Object.assign(
transaction.requestOptions || {},
options.requestOptions
);
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
sessionId = session?.id;
span.addEvent('Using Session', {'session.id': sessionId});
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
runFn,
options
);

try {
return await runner.run();
} finally {
this.pool_.release(session);
}
} catch (e) {
if (!isSessionNotFoundError(e as ServiceError)) {
span.addEvent('No session available', {
'session.id': sessionId,
});
throw e;
return startTrace(
'Database.runTransactionAsync',
this._traceConfig,
async span => {
// Loop to retry 'Session not found' errors.
// (and yes, we like while (true) more than for (;;) here)
// eslint-disable-next-line no-constant-condition
while (true) {
try {
const [session, transaction] = await promisify(getSession)();
transaction.requestOptions = Object.assign(
transaction.requestOptions || {},
options.requestOptions
);
if (options.optimisticLock) {
transaction.useOptimisticLock();
}
if (options.excludeTxnFromChangeStreams) {
transaction.excludeTxnFromChangeStreams();
}
sessionId = session?.id;
span.addEvent('Using Session', {'session.id': sessionId});
const runner = new AsyncTransactionRunner<T>(
session,
transaction,
runFn,
options
);

try {
const result = await runner.run();
span.end();
return result;
} finally {
this.pool_.release(session);
}
} catch (e) {
if (!isSessionNotFoundError(e as ServiceError)) {
span.addEvent('No session available', {
'session.id': sessionId,
});
span.end();
throw e;
}
}
}
}
}
);
}

/**
Expand Down

0 comments on commit ed8cb34

Please sign in to comment.