From 22b8bcf89f07e6940d15110214efa39f16ed0c3b Mon Sep 17 00:00:00 2001 From: markuczy <129275100+markuczy@users.noreply.github.com> Date: Mon, 3 Jun 2024 14:20:40 +0200 Subject: [PATCH] feat: topic message id for message ordering (#265) * feat: topics now use message ids * feat: topic next message tests * fix: lint * refactor: tests and topic adjusted --- libs/accelerator/src/lib/topic/message.ts | 10 ++ .../src/lib/topic/topic-message.ts | 16 +-- libs/accelerator/src/lib/topic/topic.spec.ts | 128 ++++++++++++++++++ libs/accelerator/src/lib/topic/topic.ts | 26 +++- 4 files changed, 165 insertions(+), 15 deletions(-) diff --git a/libs/accelerator/src/lib/topic/message.ts b/libs/accelerator/src/lib/topic/message.ts index 6e1756e4..63e488d1 100644 --- a/libs/accelerator/src/lib/topic/message.ts +++ b/libs/accelerator/src/lib/topic/message.ts @@ -1,7 +1,17 @@ +declare global { + interface Window { + onecxMessageId: number + } +} + +window['onecxMessageId'] = 0 + export class Message { timestamp: number + id: number // id can be undefined while used via old implementation constructor(public type: string) { this.timestamp = window.performance.now() + this.id = window['onecxMessageId']++ } } diff --git a/libs/accelerator/src/lib/topic/topic-message.ts b/libs/accelerator/src/lib/topic/topic-message.ts index 33496277..1cb87734 100644 --- a/libs/accelerator/src/lib/topic/topic-message.ts +++ b/libs/accelerator/src/lib/topic/topic-message.ts @@ -1,12 +1,8 @@ -import { Message } from "./message"; -import { TopicMessageType } from "./topic-message-type"; +import { Message } from './message' +import { TopicMessageType } from './topic-message-type' export class TopicMessage extends Message { - constructor( - type: TopicMessageType, - public name: string, - public version: number - ) { - super(type); - } - } \ No newline at end of file + constructor(type: TopicMessageType, public name: string, public version: number) { + super(type) + } +} diff --git a/libs/accelerator/src/lib/topic/topic.spec.ts b/libs/accelerator/src/lib/topic/topic.spec.ts index 4c89a887..b6c006ea 100644 --- a/libs/accelerator/src/lib/topic/topic.spec.ts +++ b/libs/accelerator/src/lib/topic/topic.spec.ts @@ -8,6 +8,8 @@ import { map } from 'rxjs' import { Topic } from './topic' +import { TopicMessageType } from './topic-message-type' +import { TopicDataMessage } from './topic-data-message' describe('Topic', () => { const origAddEventListener = window.addEventListener @@ -155,4 +157,130 @@ describe('Topic', () => { done() }) }) + + describe('integration with older versions of library', () => { + let previousMessage: TopicDataMessage + let incomingMessage: MessageEvent> + + beforeEach(() => { + previousMessage = { + type: TopicMessageType.TopicNext, + name: testTopic1.name, + version: testTopic1.version, + data: '', + timestamp: 0, + id: 0, + } + incomingMessage = { + data: { + type: TopicMessageType.TopicNext, + name: testTopic1.name, + version: testTopic1.version, + data: '', + timestamp: 0, + id: 0, + }, + } as any + + // initialize topic + testTopic1.publish('initMsg') + }) + + it('should have value if incoming id is greater than previous id', () => { + previousMessage.data = 'msg1' + previousMessage.id = 0 + incomingMessage.data.data = 'msg2' + incomingMessage.data.id = 1 + ;(testTopic1).data.next(previousMessage) + ;(testTopic1).onMessage(incomingMessage) + + expect(values1).toEqual(['initMsg', 'msg1', 'msg2']) + }) + + it('should have value if incoming timestamp is greater than previous timestamp with no ids provided', () => { + previousMessage.data = 'msg1' + ;(previousMessage).id = undefined + previousMessage.timestamp = 1 + incomingMessage.data.data = 'msg2' + ;(incomingMessage.data).id = undefined + incomingMessage.data.timestamp = 3 + ;(testTopic1).data.next(previousMessage) + ;(testTopic1).onMessage(incomingMessage) + + expect(values1).toEqual(['initMsg', 'msg1', 'msg2']) + }) + + it('should have value if incoming timestamp is greater than previous timestamp when current message has id', () => { + previousMessage.data = 'msg1' + previousMessage.id = 1 + previousMessage.timestamp = 1 + incomingMessage.data.data = 'msg2' + ;(incomingMessage.data).id = undefined + incomingMessage.data.timestamp = 3 + ;(testTopic1).data.next(previousMessage) + ;(testTopic1).onMessage(incomingMessage) + + expect(values1).toEqual(['initMsg', 'msg1', 'msg2']) + }) + + it('should have value if incoming timestamp is greater than previous timestamp when incoming message has id', () => { + previousMessage.data = 'msg1' + ;(previousMessage).id = undefined + previousMessage.timestamp = 1 + incomingMessage.data.data = 'msg2' + incomingMessage.data.id = 1 + incomingMessage.data.timestamp = 3 + ;(testTopic1).data.next(previousMessage) + ;(testTopic1).onMessage(incomingMessage) + + expect(values1).toEqual(['initMsg', 'msg1', 'msg2']) + }) + + it('should have no value if incoming timestamp is equal to the previous timestamp with no ids provided', () => { + previousMessage.data = 'msg1' + ;(previousMessage).id = undefined + previousMessage.timestamp = 3 + incomingMessage.data.data = 'msg2' + ;(incomingMessage.data).id = undefined + incomingMessage.data.timestamp = 3 + ;(testTopic1).data.next(previousMessage) + ;(testTopic1).onMessage(incomingMessage) + + expect(values1).toEqual(['initMsg', 'msg1']) + }) + + it('should have no value if incoming timestamp is equal to the previous timestamp when current message has id', () => { + jest.spyOn(console, 'warn') + previousMessage.data = 'msg1' + previousMessage.id = 1 + previousMessage.timestamp = 3 + incomingMessage.data.data = 'msg2' + ;(incomingMessage.data).id = undefined + incomingMessage.data.timestamp = 3 + ;(testTopic1).data.next(previousMessage) + ;(testTopic1).onMessage(incomingMessage) + + expect(values1).toEqual(['initMsg', 'msg1']) + expect(console.warn).toHaveBeenLastCalledWith( + 'Message was dropped because of equal timestamps, because there was an old style message in the system. Please upgrade all libraries to the latest version.' + ) + }) + + it('should have no value if incoming timestamp is equal to previous timestamp when incoming message has id', () => { + jest.spyOn(console, 'warn') + previousMessage.data = 'msg1' + ;(previousMessage).id = undefined + previousMessage.timestamp = 3 + incomingMessage.data.data = 'msg2' + incomingMessage.data.id = 1 + incomingMessage.data.timestamp = 3 + ;(testTopic1).data.next(previousMessage) + ;(testTopic1).onMessage(incomingMessage) + + expect(values1).toEqual(['initMsg', 'msg1']) + expect(console.warn).toHaveBeenLastCalledWith( + 'Message was dropped because of equal timestamps, because there was an old style message in the system. Please upgrade all libraries to the latest version.' + ) + }) + }) }) diff --git a/libs/accelerator/src/lib/topic/topic.ts b/libs/accelerator/src/lib/topic/topic.ts index d958a421..e5aa5822 100644 --- a/libs/accelerator/src/lib/topic/topic.ts +++ b/libs/accelerator/src/lib/topic/topic.ts @@ -23,7 +23,7 @@ export class Topic extends TopicPublisher implements Subscribable { constructor(name: string, version: number, sendGetMessage = true) { super(name, version) - + this.isInitializedPromise = new Promise((resolve) => { this.resolveInitPromise = resolve }) @@ -34,7 +34,7 @@ export class Topic extends TopicPublisher implements Subscribable { window.postMessage(message, '*') } } - + get isInitialized(): Promise { return this.isInitializedPromise } @@ -145,10 +145,17 @@ export class Topic extends TopicPublisher implements Subscribable { private onMessage(m: MessageEvent): any { switch (m.data.type) { case TopicMessageType.TopicNext: + if (m.data.name !== this.name || m.data.version !== this.version) { + break + } + if ( - m.data.name === this.name && - m.data.version === this.version && - (!this.data.value || (this.isInit && (m.data).timestamp > this.data.value.timestamp)) + !this.data.value || + (this.isInit && + (m.data).id !== undefined && + this.data.value.id !== undefined && + (m.data).id > this.data.value.id) || + (this.isInit && (m.data).timestamp > this.data.value.timestamp) ) { this.isInit = true this.data.next(>m.data) @@ -158,6 +165,15 @@ export class Topic extends TopicPublisher implements Subscribable { publishPromiseResolver() delete this.publishPromiseResolver[m.data.timestamp] } + } else if ( + this.data.value && + this.isInit && + (m.data).timestamp === this.data.value.timestamp && + ((m.data).id || this.data.value.id) + ) { + console.warn( + 'Message was dropped because of equal timestamps, because there was an old style message in the system. Please upgrade all libraries to the latest version.' + ) } break case TopicMessageType.TopicGet: