Skip to content

Commit

Permalink
feat: topic message id for message ordering (#265)
Browse files Browse the repository at this point in the history
* feat: topics now use message ids

* feat: topic next message tests

* fix: lint

* refactor: tests and topic adjusted
  • Loading branch information
markuczy authored Jun 3, 2024
1 parent 4e71709 commit 22b8bcf
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 15 deletions.
10 changes: 10 additions & 0 deletions libs/accelerator/src/lib/topic/message.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
declare global {
interface Window {
onecxMessageId: number
}
}

window['onecxMessageId'] = 0

export class Message {
timestamp: number
id: number // id can be undefined while used via old implementation

constructor(public type: string) {
this.timestamp = window.performance.now()
this.id = window['onecxMessageId']++
}
}
16 changes: 6 additions & 10 deletions libs/accelerator/src/lib/topic/topic-message.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import { Message } from "./message";
import { TopicMessageType } from "./topic-message-type";
import { Message } from './message'
import { TopicMessageType } from './topic-message-type'

export class TopicMessage extends Message {
constructor(
type: TopicMessageType,
public name: string,
public version: number
) {
super(type);
}
}
constructor(type: TopicMessageType, public name: string, public version: number) {
super(type)
}
}
128 changes: 128 additions & 0 deletions libs/accelerator/src/lib/topic/topic.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import { map } from 'rxjs'
import { Topic } from './topic'
import { TopicMessageType } from './topic-message-type'
import { TopicDataMessage } from './topic-data-message'

describe('Topic', () => {
const origAddEventListener = window.addEventListener
Expand Down Expand Up @@ -155,4 +157,130 @@ describe('Topic', () => {
done()
})
})

describe('integration with older versions of library', () => {
let previousMessage: TopicDataMessage<string>
let incomingMessage: MessageEvent<TopicDataMessage<string>>

beforeEach(() => {
previousMessage = {
type: TopicMessageType.TopicNext,
name: testTopic1.name,
version: testTopic1.version,
data: '',
timestamp: 0,
id: 0,
}
incomingMessage = {
data: {
type: TopicMessageType.TopicNext,
name: testTopic1.name,
version: testTopic1.version,
data: '',
timestamp: 0,
id: 0,
},
} as any

// initialize topic
testTopic1.publish('initMsg')
})

it('should have value if incoming id is greater than previous id', () => {
previousMessage.data = 'msg1'
previousMessage.id = 0
incomingMessage.data.data = 'msg2'
incomingMessage.data.id = 1
;(<any>testTopic1).data.next(previousMessage)
;(<any>testTopic1).onMessage(incomingMessage)

expect(values1).toEqual(['initMsg', 'msg1', 'msg2'])
})

it('should have value if incoming timestamp is greater than previous timestamp with no ids provided', () => {
previousMessage.data = 'msg1'
;(<any>previousMessage).id = undefined
previousMessage.timestamp = 1
incomingMessage.data.data = 'msg2'
;(<any>incomingMessage.data).id = undefined
incomingMessage.data.timestamp = 3
;(<any>testTopic1).data.next(previousMessage)
;(<any>testTopic1).onMessage(incomingMessage)

expect(values1).toEqual(['initMsg', 'msg1', 'msg2'])
})

it('should have value if incoming timestamp is greater than previous timestamp when current message has id', () => {
previousMessage.data = 'msg1'
previousMessage.id = 1
previousMessage.timestamp = 1
incomingMessage.data.data = 'msg2'
;(<any>incomingMessage.data).id = undefined
incomingMessage.data.timestamp = 3
;(<any>testTopic1).data.next(previousMessage)
;(<any>testTopic1).onMessage(incomingMessage)

expect(values1).toEqual(['initMsg', 'msg1', 'msg2'])
})

it('should have value if incoming timestamp is greater than previous timestamp when incoming message has id', () => {
previousMessage.data = 'msg1'
;(<any>previousMessage).id = undefined
previousMessage.timestamp = 1
incomingMessage.data.data = 'msg2'
incomingMessage.data.id = 1
incomingMessage.data.timestamp = 3
;(<any>testTopic1).data.next(previousMessage)
;(<any>testTopic1).onMessage(incomingMessage)

expect(values1).toEqual(['initMsg', 'msg1', 'msg2'])
})

it('should have no value if incoming timestamp is equal to the previous timestamp with no ids provided', () => {
previousMessage.data = 'msg1'
;(<any>previousMessage).id = undefined
previousMessage.timestamp = 3
incomingMessage.data.data = 'msg2'
;(<any>incomingMessage.data).id = undefined
incomingMessage.data.timestamp = 3
;(<any>testTopic1).data.next(previousMessage)
;(<any>testTopic1).onMessage(incomingMessage)

expect(values1).toEqual(['initMsg', 'msg1'])
})

it('should have no value if incoming timestamp is equal to the previous timestamp when current message has id', () => {
jest.spyOn(console, 'warn')
previousMessage.data = 'msg1'
previousMessage.id = 1
previousMessage.timestamp = 3
incomingMessage.data.data = 'msg2'
;(<any>incomingMessage.data).id = undefined
incomingMessage.data.timestamp = 3
;(<any>testTopic1).data.next(previousMessage)
;(<any>testTopic1).onMessage(incomingMessage)

expect(values1).toEqual(['initMsg', 'msg1'])
expect(console.warn).toHaveBeenLastCalledWith(
'Message was dropped because of equal timestamps, because there was an old style message in the system. Please upgrade all libraries to the latest version.'
)
})

it('should have no value if incoming timestamp is equal to previous timestamp when incoming message has id', () => {
jest.spyOn(console, 'warn')
previousMessage.data = 'msg1'
;(<any>previousMessage).id = undefined
previousMessage.timestamp = 3
incomingMessage.data.data = 'msg2'
incomingMessage.data.id = 1
incomingMessage.data.timestamp = 3
;(<any>testTopic1).data.next(previousMessage)
;(<any>testTopic1).onMessage(incomingMessage)

expect(values1).toEqual(['initMsg', 'msg1'])
expect(console.warn).toHaveBeenLastCalledWith(
'Message was dropped because of equal timestamps, because there was an old style message in the system. Please upgrade all libraries to the latest version.'
)
})
})
})
26 changes: 21 additions & 5 deletions libs/accelerator/src/lib/topic/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class Topic<T> extends TopicPublisher<T> implements Subscribable<T> {

constructor(name: string, version: number, sendGetMessage = true) {
super(name, version)

this.isInitializedPromise = new Promise<void>((resolve) => {
this.resolveInitPromise = resolve
})
Expand All @@ -34,7 +34,7 @@ export class Topic<T> extends TopicPublisher<T> implements Subscribable<T> {
window.postMessage(message, '*')
}
}

get isInitialized(): Promise<void> {
return this.isInitializedPromise
}
Expand Down Expand Up @@ -145,10 +145,17 @@ export class Topic<T> extends TopicPublisher<T> implements Subscribable<T> {
private onMessage(m: MessageEvent<TopicMessage>): any {
switch (m.data.type) {
case TopicMessageType.TopicNext:
if (m.data.name !== this.name || m.data.version !== this.version) {
break
}

if (
m.data.name === this.name &&
m.data.version === this.version &&
(!this.data.value || (this.isInit && (<TopicMessage>m.data).timestamp > this.data.value.timestamp))
!this.data.value ||
(this.isInit &&
(<TopicMessage>m.data).id !== undefined &&
this.data.value.id !== undefined &&
(<TopicMessage>m.data).id > this.data.value.id) ||
(this.isInit && (<TopicMessage>m.data).timestamp > this.data.value.timestamp)
) {
this.isInit = true
this.data.next(<TopicDataMessage<T>>m.data)
Expand All @@ -158,6 +165,15 @@ export class Topic<T> extends TopicPublisher<T> implements Subscribable<T> {
publishPromiseResolver()
delete this.publishPromiseResolver[m.data.timestamp]
}
} else if (
this.data.value &&
this.isInit &&
(<TopicMessage>m.data).timestamp === this.data.value.timestamp &&
((<TopicMessage>m.data).id || this.data.value.id)
) {
console.warn(
'Message was dropped because of equal timestamps, because there was an old style message in the system. Please upgrade all libraries to the latest version.'
)
}
break
case TopicMessageType.TopicGet:
Expand Down

0 comments on commit 22b8bcf

Please sign in to comment.