Skip to content

Commit

Permalink
Merge pull request #4 from feywind/otel-4-links
Browse files Browse the repository at this point in the history
feat: update to OTel PR for latest design
  • Loading branch information
feywind authored Jan 31, 2024
2 parents d4cb0e8 + f3b4070 commit be5fa9c
Show file tree
Hide file tree
Showing 15 changed files with 402 additions and 184 deletions.
20 changes: 9 additions & 11 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,14 @@ export class LeaseManager extends EventEmitter {
}
}
/**
* Removes ALL messages from inventory.
* Removes ALL messages from inventory, and returns the ones removed.
* @private
*/
clear(): void {
clear(): Message[] {
const wasFull = this.isFull();

this._pending = [];
this._messages.forEach(m => {
m.endParentSpan();
});
const remaining = Array.from(this._messages);
this._messages.clear();
this.bytes = 0;

Expand All @@ -141,6 +139,8 @@ export class LeaseManager extends EventEmitter {
}

this._cancelExtension();

return remaining;
}
/**
* Indicates if we're at or over capacity.
Expand All @@ -162,9 +162,6 @@ export class LeaseManager extends EventEmitter {
* @private
*/
remove(message: Message): void {
// The subscriber span ends when it leaves leasing.
message.endParentSpan();

if (!this._messages.has(message)) {
return;
}
Expand Down Expand Up @@ -269,7 +266,8 @@ export class LeaseManager extends EventEmitter {
const lifespan = (Date.now() - message.received) / (60 * 1000);

if (lifespan < this._options.maxExtensionMinutes!) {
message.subSpans.modAckStart(Duration.from({seconds: deadline}), false);
const deadlineDuration = Duration.from({seconds: deadline});
message.subSpans.modAckStart(deadlineDuration, false);

if (this._subscriber.isExactlyOnceDelivery) {
message
Expand All @@ -281,11 +279,11 @@ export class LeaseManager extends EventEmitter {
this.remove(message);
})
.finally(() => {
message.subSpans.modAckStop();
message.subSpans.modAckEnd();
});
} else {
message.modAck(deadline);
message.subSpans.modAckStop();
message.subSpans.modAckStart(deadlineDuration, false);
}
} else {
this.remove(message);
Expand Down
40 changes: 30 additions & 10 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,19 @@ import {
import {Duration} from './temporal';
import {addToBucket} from './util';
import {DebugMessage} from './debug';
import * as tracing from './telemetry-tracing';

export interface ReducedMessage {
ackId: string;
tracingSpan?: tracing.Span;
}

/**
* @private
*/
export interface QueuedMessage {
ackId: string;
deadline?: number;
message: ReducedMessage;
deadline?: number; // seconds
responsePromise?: defer.DeferredPromise<void>;
retryCount: number;
}
Expand Down Expand Up @@ -176,10 +182,10 @@ export abstract class MessageQueue {
* Adds a message to the queue.
*
* @param {Message} message The message to add.
* @param {number} [deadline] The deadline.
* @param {number} [deadline] The deadline in seconds.
* @private
*/
add({ackId}: Message, deadline?: number): Promise<void> {
add(message: Message, deadline?: number): Promise<void> {
if (this._closed) {
if (this._subscriber.isExactlyOnceDelivery) {
throw new AckError(AckResponses.Invalid, 'Subscriber closed');
Expand All @@ -192,7 +198,10 @@ export abstract class MessageQueue {

const responsePromise = defer<void>();
this._requests.push({
ackId,
message: {
ackId: message.ackId,
tracingSpan: message.parentSpan,
},
deadline,
responsePromise,
retryCount: 0,
Expand Down Expand Up @@ -379,9 +388,9 @@ export abstract class MessageQueue {
const codes: AckErrorCodes = processAckErrorInfo(rpcError);

for (const m of batch) {
if (codes.has(m.ackId)) {
if (codes.has(m.message.ackId)) {
// This ack has an ErrorInfo entry, so use that to route it.
const code = codes.get(m.ackId)!;
const code = codes.get(m.message.ackId)!;
if (code.transient) {
// Transient errors get retried.
toRetry.push(m);
Expand All @@ -407,7 +416,7 @@ export abstract class MessageQueue {
// stream message if an unknown error happens during ack.
const others = toError.get(AckResponses.Other);
if (others?.length) {
const otherIds = others.map(e => e.ackId);
const otherIds = others.map(e => e.message.ackId);
const debugMsg = new BatchError(rpcError, otherIds, operation);
this._subscriber.emit('debug', debugMsg);
}
Expand Down Expand Up @@ -468,15 +477,20 @@ export class AckQueue extends MessageQueue {
* @return {Promise}
*/
protected async _sendBatch(batch: QueuedMessages): Promise<QueuedMessages> {
const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan(
batch.map(b => b.message.tracingSpan),
this._subscriber.name
);
const client = await this._subscriber.getClient();
const ackIds = batch.map(({ackId}) => ackId);
const ackIds = batch.map(({message}) => message.ackId);
const reqOpts = {subscription: this._subscriber.name, ackIds};

try {
await client.acknowledge(reqOpts, this.getCallOptions());

// It's okay if these pass through since they're successful anyway.
this.handleAckSuccesses(batch);
responseSpan?.end();
return [];
} catch (e) {
// If exactly-once delivery isn't enabled, don't do error processing. We'll
Expand All @@ -500,6 +514,7 @@ export class AckQueue extends MessageQueue {
batch.forEach(m => {
m.responsePromise?.reject(exc);
});
responseSpan?.end();
return [];
}
}
Expand All @@ -524,6 +539,10 @@ export class ModAckQueue extends MessageQueue {
* @return {Promise}
*/
protected async _sendBatch(batch: QueuedMessages): Promise<QueuedMessages> {
const responseSpan = tracing.PubsubSpans.createReceiveResponseRpcSpan(
batch.map(b => b.message.tracingSpan),
this._subscriber.name
);
const client = await this._subscriber.getClient();
const subscription = this._subscriber.name;
const modAckTable: {[index: string]: QueuedMessages} = batch.reduce(
Expand All @@ -541,7 +560,7 @@ export class ModAckQueue extends MessageQueue {
const callOptions = this.getCallOptions();
const modAckRequests = Object.keys(modAckTable).map(async deadline => {
const messages = modAckTable[deadline];
const ackIds = messages.map(m => m.ackId);
const ackIds = messages.map(m => m.message.ackId);
const ackDeadlineSeconds = Number(deadline);
const reqOpts = {subscription, ackIds, ackDeadlineSeconds};

Expand Down Expand Up @@ -575,6 +594,7 @@ export class ModAckQueue extends MessageQueue {

// This catches the sub-failures and bubbles up anything we need to bubble.
const allNewBatches: QueuedMessages[] = await Promise.all(modAckRequests);
responseSpan?.end();
return allNewBatches.reduce((p: QueuedMessage[], c: QueuedMessage[]) => [
...(p ?? []),
...c,
Expand Down
27 changes: 24 additions & 3 deletions src/publisher/message-batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@

import {BATCH_LIMITS, PubsubMessage, PublishCallback} from './';
import {calculateMessageSize} from './pubsub-message';
import * as tracing from '../telemetry-tracing';

export interface BatchPublishOptions {
maxBytes?: number;
maxMessages?: number;
maxMilliseconds?: number;
}

export interface BatchResults {
messages: PubsubMessage[];
callbacks: PublishCallback[];
}

/**
* @typedef BatchPublishOptions
* @property {number} [maxBytes=1 * 1024 * 1024] The maximum number of bytes to
Expand All @@ -40,13 +46,15 @@ export interface BatchPublishOptions {
* @param {BatchPublishOptions} options The batching options.
*/
export class MessageBatch {
options: BatchPublishOptions;
messages: PubsubMessage[];
callbacks: PublishCallback[];
created: number;
bytes: number;
constructor(options: BatchPublishOptions) {
this.options = options;

constructor(
public options: BatchPublishOptions,
public topicName: string
) {
this.messages = [];
this.callbacks = [];
this.created = Date.now();
Expand All @@ -72,7 +80,18 @@ export class MessageBatch {
this.messages.push(message);
this.callbacks.push(callback);
this.bytes += calculateMessageSize(message);

tracing.PubsubSpans.createPublishSchedulerSpan(message);
}

end(): BatchResults {
this.messages.forEach(m => m.publishSchedulerSpan?.end());
return {
messages: this.messages,
callbacks: this.callbacks,
};
}

/**
* Indicates if a given message can fit in the batch.
*
Expand All @@ -86,6 +105,7 @@ export class MessageBatch {
this.bytes + calculateMessageSize(message) <= maxBytes!
);
}

/**
* Checks to see if this batch is at the maximum allowed payload size.
* When publishing ordered messages, it is ok to exceed the user configured
Expand All @@ -97,6 +117,7 @@ export class MessageBatch {
const {maxMessages, maxBytes} = BATCH_LIMITS;
return this.messages.length >= maxMessages! || this.bytes >= maxBytes!;
}

/**
* Indicates if the batch is at capacity.
*
Expand Down
26 changes: 10 additions & 16 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ export abstract class MessageQueue extends EventEmitter {
});
}

messages.forEach(m => {
const span = tracing.PubsubSpans.createPublishRpcSpan(m, messages.length);
if (span) {
m.rpcSpan = span;
}
});
const rpcSpan = tracing.PubsubSpans.createPublishRpcSpan(
spanMessages,
topic.name
);

const requestCallback = topic.request<google.pubsub.v1.IPublishResponse>;
const request = promisify(requestCallback.bind(topic));
Expand All @@ -144,7 +142,7 @@ export abstract class MessageQueue extends EventEmitter {
messages.forEach(m => {
// We're finished with both the RPC and the whole publish operation,
// so close out all of the related spans.
m.rpcSpan?.end();
rpcSpan?.end();
m.parentSpan?.end();
});
}
Expand All @@ -163,7 +161,7 @@ export class Queue extends MessageQueue {
batch: MessageBatch;
constructor(publisher: Publisher) {
super(publisher);
this.batch = new MessageBatch(this.batchOptions);
this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name);
}

// This needs to update our existing message batch.
Expand All @@ -186,8 +184,6 @@ export class Queue extends MessageQueue {
this.publish().catch(() => {});
}

message.batchingSpan = tracing.PubsubSpans.createPublishBatchSpan(message);

this.batch.add(message, callback);

if (this.batch.isFull()) {
Expand Down Expand Up @@ -230,17 +226,15 @@ export class Queue extends MessageQueue {
* @emits Queue#drain when all messages are sent.
*/
async _publishInternal(fullyDrain: boolean): Promise<void> {
const {messages, callbacks} = this.batch;
const {messages, callbacks} = this.batch.end();

this.batch = new MessageBatch(this.batchOptions);
this.batch = new MessageBatch(this.batchOptions, this.publisher.topic.name);

if (this.pending) {
clearTimeout(this.pending);
delete this.pending;
}

messages.forEach(m => m.batchingSpan?.end());

await this._publish(messages, callbacks);
if (this.batch.messages.length) {
// We only do the indefinite go-arounds when we're trying to do a
Expand Down Expand Up @@ -358,7 +352,7 @@ export class OrderedQueue extends MessageQueue {
* @returns {MessageBatch}
*/
createBatch(): MessageBatch {
return new MessageBatch(this.batchOptions);
return new MessageBatch(this.batchOptions, this.publisher.topic.name);
}
/**
* In the event of a publish failure, we need to cache the error in question
Expand Down Expand Up @@ -401,7 +395,7 @@ export class OrderedQueue extends MessageQueue {
delete this.pending;
}

const {messages, callbacks} = this.batches.pop()!;
const {messages, callbacks} = this.batches.pop()!.end();

try {
await this._publish(messages, callbacks);
Expand Down
8 changes: 4 additions & 4 deletions src/publisher/pubsub-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,18 @@ export interface PubsubMessage
// don't get to control what these objects are. They come from grpc.

/**
* If tracing is enabled, track the batch span.
* If tracing is enabled, track the message span.
*
* @private
*/
batchingSpan?: tracing.Span;
messageSpan?: tracing.Span;

/**
* If tracing is enabled, track the RPC send time span.
* If tracing is enabled, track the batching (publish scheduling) period.
*
* @private
*/
rpcSpan?: tracing.Span;
publishSchedulerSpan?: tracing.Span;
}

/**
Expand Down
Loading

0 comments on commit be5fa9c

Please sign in to comment.