Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved threads reliability with/without server side support #2132

Merged
merged 11 commits into from
Jan 26, 2022
34 changes: 18 additions & 16 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ export class MatrixClient extends EventEmitter {
// TODO: This should expire: https://github.com/matrix-org/matrix-js-sdk/issues/1020
protected serverVersionsPromise: Promise<IServerVersions>;

protected cachedCapabilities: {
public cachedCapabilities: {
capabilities: ICapabilities;
expiration: number;
};
Expand Down Expand Up @@ -5055,7 +5055,7 @@ export class MatrixClient extends EventEmitter {
limit,
Direction.Backward,
);
}).then((res: IMessagesResponse) => {
}).then(async (res: IMessagesResponse) => {
const matrixEvents = res.chunk.map(this.getEventMapper());
if (res.state) {
const stateEvents = res.state.map(this.getEventMapper());
Expand All @@ -5065,7 +5065,7 @@ export class MatrixClient extends EventEmitter {
const [timelineEvents, threadedEvents] = this.partitionThreadedEvents(matrixEvents);

room.addEventsToTimeline(timelineEvents, true, room.getLiveTimeline());
this.processThreadEvents(room, threadedEvents);
await this.processThreadEvents(room, threadedEvents);

room.oldState.paginationToken = res.end;
if (res.chunk.length === 0) {
Expand Down Expand Up @@ -5143,7 +5143,7 @@ export class MatrixClient extends EventEmitter {

// TODO: we should implement a backoff (as per scrollback()) to deal more
// nicely with HTTP errors.
const promise = this.http.authedRequest<any>(undefined, Method.Get, path, params).then((res) => { // TODO types
const promise = this.http.authedRequest<any>(undefined, Method.Get, path, params).then(async (res) => { // TODO types
if (!res.event) {
throw new Error("'event' not in '/context' result - homeserver too old?");
}
Expand Down Expand Up @@ -5176,7 +5176,7 @@ export class MatrixClient extends EventEmitter {
const [timelineEvents, threadedEvents] = this.partitionThreadedEvents(matrixEvents);

timelineSet.addEventsToTimeline(timelineEvents, true, timeline, res.start);
this.processThreadEvents(timelineSet.room, threadedEvents);
await this.processThreadEvents(timelineSet.room, threadedEvents);

// there is no guarantee that the event ended up in "timeline" (we
// might have switched to a neighbouring timeline) - so check the
Expand Down Expand Up @@ -5291,7 +5291,7 @@ export class MatrixClient extends EventEmitter {

promise = this.http.authedRequest<any>( // TODO types
undefined, Method.Get, path, params, undefined,
).then((res) => {
).then(async (res) => {
const token = res.next_token;
const matrixEvents = [];

Expand All @@ -5309,7 +5309,7 @@ export class MatrixClient extends EventEmitter {

const timelineSet = eventTimeline.getTimelineSet();
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processThreadEvents(timelineSet.room, threadedEvents);
await this.processThreadEvents(timelineSet.room, threadedEvents);

// if we've hit the end of the timeline, we need to stop trying to
// paginate. We need to keep the 'forwards' token though, to make sure
Expand All @@ -5334,7 +5334,7 @@ export class MatrixClient extends EventEmitter {
opts.limit,
dir,
eventTimeline.getFilter(),
).then((res) => {
).then(async (res) => {
if (res.state) {
const roomState = eventTimeline.getState(dir);
const stateEvents = res.state.map(this.getEventMapper());
Expand All @@ -5347,7 +5347,7 @@ export class MatrixClient extends EventEmitter {

eventTimeline.getTimelineSet()
.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processThreadEvents(room, threadedEvents);
await this.processThreadEvents(room, threadedEvents);

// if we've hit the end of the timeline, we need to stop trying to
// paginate. We need to keep the 'forwards' token though, to make sure
Expand Down Expand Up @@ -9067,7 +9067,10 @@ export class MatrixClient extends EventEmitter {
const parentEvent = room?.findEventById(parentEventId) || events.find((mxEv: MatrixEvent) => {
return mxEv.getId() === parentEventId;
});
shouldLiveInThreadTimeline = parentEvent?.isThreadRelation;
if (parentEvent?.isThreadRelation) {
shouldLiveInThreadTimeline = true;
event.setThreadId(parentEvent.threadRootId);
}

// Copy all the reactions and annotations to the root event
// to the thread timeline. They will end up living in both
Expand All @@ -9094,12 +9097,11 @@ export class MatrixClient extends EventEmitter {
/**
* @experimental
*/
public processThreadEvents(room: Room, threadedEvents: MatrixEvent[]): void {
threadedEvents
.sort((a, b) => a.getTs() - b.getTs())
.forEach(event => {
room.addThreadedEvent(event);
});
public async processThreadEvents(room: Room, threadedEvents: MatrixEvent[]): Promise<void> {
threadedEvents.sort((a, b) => a.getTs() - b.getTs());
for (const event of threadedEvents) {
await room.addThreadedEvent(event);
}
}

/**
Expand Down
49 changes: 34 additions & 15 deletions src/models/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ export interface IUnsigned {
redacted_because?: IEvent;
transaction_id?: string;
invite_room_state?: StrippedState[];
"m.relations"?: Record<RelationType | string, any>; // No common pattern for aggregated relations
}

export interface IThreadBundledRelationship {
latest_event: IEvent;
count: number;
current_user_participated?: boolean;
}

export interface IEvent {
Expand All @@ -112,7 +119,7 @@ export interface IEvent {
age?: number;
}

interface IAggregatedRelation {
export interface IAggregatedRelation {
origin_server_ts: number;
event_id?: string;
sender?: string;
Expand Down Expand Up @@ -262,6 +269,7 @@ export class MatrixEvent extends EventEmitter {
* A reference to the thread this event belongs to
*/
private thread: Thread = null;
private threadId: string;

/* Set an approximate timestamp for the event relative the local clock.
* This will inherently be approximate because it doesn't take into account
Expand Down Expand Up @@ -499,28 +507,34 @@ export class MatrixEvent extends EventEmitter {
* @experimental
* Get the event ID of the thread head
*/
public get threadRootId(): string {
public get threadRootId(): string | undefined {
const relatesTo = this.getWireContent()?.["m.relates_to"];
if (relatesTo?.rel_type === RelationType.Thread) {
return relatesTo.event_id;
} else {
return this.threadId
|| this.getThread()?.id;
}
}

/**
* @experimental
*/
public get isThreadRelation(): boolean {
return !!this.threadRootId;
return !!this.threadRootId && this.threadId !== this.getId();
}

/**
* @experimental
*/
public get isThreadRoot(): boolean {
// TODO, change the inner working of this getter for it to use the
// bundled relationship return on the event, view MSC3440
const thread = this.getThread();
return thread?.id === this.getId();
const threadDetails = this
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);

// Bundled relationships only returned when the sync response is limited
// hence us having to check both bundled relation and inspect the thread
// model
return !!threadDetails || (this.getThread()?.id === this.getId());
}

public get parentEventId(): string {
Expand Down Expand Up @@ -1000,6 +1014,10 @@ export class MatrixEvent extends EventEmitter {
return this.event.unsigned || {};
}

public setUnsigned(unsigned: IUnsigned): void {
this.event.unsigned = unsigned;
}

public unmarkLocallyRedacted(): boolean {
const value = this._localRedactionEvent;
this._localRedactionEvent = null;
Expand Down Expand Up @@ -1340,11 +1358,8 @@ export class MatrixEvent extends EventEmitter {
return this.status;
}

public getServerAggregatedRelation(relType: RelationType): IAggregatedRelation {
const relations = this.getUnsigned()["m.relations"];
if (relations) {
return relations[relType];
}
public getServerAggregatedRelation<T>(relType: RelationType): T | undefined {
return this.getUnsigned()["m.relations"]?.[relType];
}

/**
Expand All @@ -1353,7 +1368,7 @@ export class MatrixEvent extends EventEmitter {
* @return {string?}
*/
public replacingEventId(): string | undefined {
const replaceRelation = this.getServerAggregatedRelation(RelationType.Replace);
const replaceRelation = this.getServerAggregatedRelation<IAggregatedRelation>(RelationType.Replace);
if (replaceRelation) {
return replaceRelation.event_id;
} else if (this._replacingEvent) {
Expand All @@ -1378,7 +1393,7 @@ export class MatrixEvent extends EventEmitter {
* @return {Date?}
*/
public replacingEventDate(): Date | undefined {
const replaceRelation = this.getServerAggregatedRelation(RelationType.Replace);
const replaceRelation = this.getServerAggregatedRelation<IAggregatedRelation>(RelationType.Replace);
if (replaceRelation) {
const ts = replaceRelation.origin_server_ts;
if (Number.isFinite(ts)) {
Expand Down Expand Up @@ -1544,9 +1559,13 @@ export class MatrixEvent extends EventEmitter {
/**
* @experimental
*/
public getThread(): Thread {
public getThread(): Thread | undefined {
return this.thread;
}

public setThreadId(threadId: string): void {
this.threadId = threadId;
}
}

/* REDACT_KEEP_KEYS gives the keys we keep when an event is redacted
Expand Down
4 changes: 2 additions & 2 deletions src/models/relations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.

import { EventEmitter } from 'events';

import { EventStatus, MatrixEvent } from './event';
import { EventStatus, MatrixEvent, IAggregatedRelation } from './event';
import { Room } from './room';
import { logger } from '../logger';
import { RelationType } from "../@types/event";
Expand Down Expand Up @@ -319,7 +319,7 @@ export class Relations extends EventEmitter {

// the all-knowning server tells us that the event at some point had
// this timestamp for its replacement, so any following replacement should definitely not be less
const replaceRelation = this.targetEvent.getServerAggregatedRelation(RelationType.Replace);
const replaceRelation = this.targetEvent.getServerAggregatedRelation<IAggregatedRelation>(RelationType.Replace);
const minTs = replaceRelation && replaceRelation.origin_server_ts;

const lastReplacement = this.getRelations().reduce((last, event) => {
Expand Down
7 changes: 4 additions & 3 deletions src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1371,9 +1371,11 @@ export class Room extends EventEmitter {
let rootEvent = this.findEventById(event.threadRootId);
// If the rootEvent does not exist in the current sync, then look for
// it over the network
const eventData = await this.client.fetchRoomEvent(this.roomId, event.threadRootId);
if (!rootEvent) {
const eventData = await this.client.fetchRoomEvent(this.roomId, event.threadRootId);
rootEvent = new MatrixEvent(eventData);
} else {
rootEvent.setUnsigned(eventData.unsigned);
}
events.unshift(rootEvent);
thread = this.createThread(events);
Expand Down Expand Up @@ -1563,8 +1565,7 @@ export class Room extends EventEmitter {
}
} else {
if (thread) {
thread.timelineSet.addEventToTimeline(event,
thread.timelineSet.getLiveTimeline(), false);
thread.addEvent(event, false);
} else {
for (let i = 0; i < this.timelineSets.length; i++) {
const timelineSet = this.timelineSets[i];
Expand Down
61 changes: 51 additions & 10 deletions src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ limitations under the License.

import { MatrixClient } from "../matrix";
import { ReEmitter } from "../ReEmitter";
import { MatrixEvent } from "./event";
import { RelationType } from "../@types/event";
import { MatrixEvent, IThreadBundledRelationship } from "./event";
import { EventTimeline } from "./event-timeline";
import { EventTimelineSet } from './event-timeline-set';
import { Room } from './room';
Expand Down Expand Up @@ -47,6 +48,9 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {

private reEmitter: ReEmitter;

private lastEvent: MatrixEvent;
private replyCount = 0;

constructor(
events: MatrixEvent[] = [],
public readonly room: Room,
Expand Down Expand Up @@ -76,6 +80,11 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
room.on("Room.timeline", this.onEcho);
}

public get hasServerSideSupport(): boolean {
return this.client.cachedCapabilities
?.capabilities?.[RelationType.Thread]?.enabled;
}

onEcho = (event: MatrixEvent) => {
if (this.timelineSet.eventIdToTimeline(event.getId())) {
this.emit(ThreadEvent.Update, this);
Expand All @@ -89,7 +98,7 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
* @param event The event to add
*/
public async addEvent(event: MatrixEvent, toStartOfTimeline = false): Promise<void> {
if (this.timelineSet.findEventById(event.getId()) || event.status !== null) {
if (this.timelineSet.findEventById(event.getId())) {
return;
}

Expand Down Expand Up @@ -121,11 +130,46 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
}

await this.client.decryptEventIfNeeded(event, {});
this.emit(ThreadEvent.Update, this);

if (event.isThreadRelation) {
this.emit(ThreadEvent.NewReply, this, event);
const isThreadReply = event.getRelation()?.rel_type === RelationType.Thread;
// If no thread support exists we want to count all thread relation
// added as a reply. We can't rely on the bundled relationships count
if (!this.hasServerSideSupport && isThreadReply) {
this.replyCount++;
}

if (!this.lastEvent || (isThreadReply && event.getTs() > this.lastEvent.getTs())) {
this.lastEvent = event;
if (this.lastEvent.getId() !== this.root) {
// This counting only works when server side support is enabled
// as we started the counting from the value returned in the
// bundled relationship
if (this.hasServerSideSupport) {
this.replyCount++;
}
this.emit(ThreadEvent.NewReply, this, event);
}
}

if (event.getId() === this.root) {
const bundledRelationship = event
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);

if (this.hasServerSideSupport && bundledRelationship) {
this.replyCount = bundledRelationship.count;
this._currentUserParticipated = bundledRelationship.current_user_participated;

const lastReply = this.findEventById(bundledRelationship.latest_event.event_id);
if (lastReply) {
this.lastEvent = lastReply;
} else {
const event = new MatrixEvent(bundledRelationship.latest_event);
this.lastEvent = event;
}
}
}

this.emit(ThreadEvent.Update, this);
}

/**
Expand Down Expand Up @@ -171,17 +215,14 @@ export class Thread extends TypedEventEmitter<ThreadEvent> {
* exclude annotations from that number
*/
public get length(): number {
return this.events
.filter(event => event.isThreadRelation)
.length;
return this.replyCount;
}

/**
* A getter for the last event added to the thread
*/
public get replyToEvent(): MatrixEvent {
const events = this.events;
return events[events.length -1];
return this.lastEvent;
}

public get events(): MatrixEvent[] {
Expand Down
Loading