Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
justjanne committed Oct 11, 2022
1 parent fb0c350 commit 917adb1
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 137 deletions.
238 changes: 190 additions & 48 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5354,6 +5354,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
return timelineSet.getTimelineForEvent(eventId);
}

if (this.supportsExperimentalThreads()
&& Thread.hasServerSideSupport === FeatureSupport.Stable
&& timelineSet.thread) {
return this.getThreadTimeline(timelineSet, eventId);
}

const path = utils.encodeUri(
"/rooms/$roomId/context/$eventId", {
$roomId: timelineSet.room.roomId,
Expand Down Expand Up @@ -5388,38 +5394,6 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
...res.events_before.map(mapper),
];

if (this.supportsExperimentalThreads()) {
if (!timelineSet.canContain(event)) {
return undefined;
}

// Where the event is a thread reply (not a root) and running in MSC-enabled mode the Thread timeline only
// functions contiguously, so we have to jump through some hoops to get our target event in it.
// XXX: workaround for https://github.com/vector-im/element-meta/issues/150
if (Thread.hasServerSideSupport && timelineSet.thread) {
const thread = timelineSet.thread;
const opts: IRelationsRequestOpts = {
dir: Direction.Backward,
limit: 50,
};

await thread.fetchInitialEvents();
let nextBatch: string | null | undefined = thread.liveTimeline.getPaginationToken(Direction.Backward);

// Fetch events until we find the one we were asked for, or we run out of pages
while (!thread.findEventById(eventId)) {
if (nextBatch) {
opts.from = nextBatch;
}

({ nextBatch } = await thread.fetchEvents(opts));
if (!nextBatch) break;
}

return thread.liveTimeline;
}
}

// Here we handle non-thread timelines only, but still process any thread events to populate thread summaries.
let timeline = timelineSet.getTimelineForEvent(events[0].getId());
if (timeline) {
Expand All @@ -5444,6 +5418,118 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
?? timeline;
}

public async getThreadTimeline(timelineSet: EventTimelineSet, eventId: string): Promise<EventTimeline | undefined> {
if (!Thread.hasServerSideSupport) {
throw new Error("could not get thread timeline: no serverside support");
}

if (!this.supportsExperimentalThreads()) {
throw new Error("could not get thread timeline: no client support");
}

const path = utils.encodeUri(
"/rooms/$roomId/context/$eventId", {
$roomId: timelineSet.room.roomId,
$eventId: eventId,
},
);

const params: Record<string, string | string[]> = {
limit: "0",
};
if (this.clientOpts.lazyLoadMembers) {
params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER);
}

// TODO: we should implement a backoff (as per scrollback()) to deal more nicely with HTTP errors.
const res = await this.http.authedRequest<IContextResponse>(undefined, Method.Get, path, params);
const mapper = this.getEventMapper();
const event = mapper(res.event);

if (!timelineSet.canContain(event)) {
return undefined;
}

if (Thread.hasServerSideSupport === FeatureSupport.Stable) {
if (!timelineSet.thread) {
throw new Error("could not get thread timeline: not a thread timeline");
}

const thread = timelineSet.thread;
const resOlder = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
null,
{ dir: Direction.Backward, from: res.start },
);
const resNewer = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
null,
{ dir: Direction.Forward, from: res.end },
);
const events = [
// Order events from most recent to oldest (reverse-chronological).
// We start with the last event, since that's the point at which we have known state.
// events_after is already backwards; events_before is forwards.
...resNewer.chunk.reverse().map(mapper),
event,
...resOlder.chunk.map(mapper),
];
await timelineSet.thread?.fetchEditsWhereNeeded(...events);

// Here we handle non-thread timelines only, but still process any thread events to populate thread summaries.
let timeline = timelineSet.getTimelineForEvent(event.getId());
if (timeline) {
timeline.getState(EventTimeline.BACKWARDS).setUnknownStateEvents(res.state.map(mapper));
} else {
timeline = timelineSet.addTimeline();
timeline.initialiseState(res.state.map(mapper));
}

timelineSet.addEventsToTimeline(events, true, timeline, resNewer.next_batch);
if (!resOlder.next_batch) {
timelineSet.addEventsToTimeline([mapper(resOlder.original_event)], true, timeline, null);
}
timeline.setPaginationToken(resOlder.next_batch ?? null, Direction.Backward);
timeline.setPaginationToken(resNewer.next_batch ?? null, Direction.Forward);
this.processBeaconEvents(timelineSet.room, events);

// There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring
// timeline) - so check the room's index again. On the other hand, there's no guarantee the event ended up
// anywhere, if it was later redacted, so we just return the timeline we first thought of.
return timelineSet.getTimelineForEvent(eventId)
?? timeline;
} else {
// Where the event is a thread reply (not a root) and running in MSC-enabled mode the Thread timeline only
// functions contiguously, so we have to jump through some hoops to get our target event in it.
// XXX: workaround for https://github.com/vector-im/element-meta/issues/150

const thread = timelineSet.thread;
const opts: IRelationsRequestOpts = {
dir: Direction.Backward,
limit: 50,
};

await thread.fetchInitialEvents();
let nextBatch: string | null | undefined = thread.liveTimeline.getPaginationToken(Direction.Backward);

// Fetch events until we find the one we were asked for, or we run out of pages
while (!thread.findEventById(eventId)) {
if (nextBatch) {
opts.from = nextBatch;
}

({ nextBatch } = await thread.fetchEvents(opts));
if (!nextBatch) break;
}

return thread.liveTimeline;
}
}

/**
* Get an EventTimeline for the latest events in the room. This will just
* call `/messages` to get the latest message in the room, then use
Expand All @@ -5465,28 +5551,44 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
throw new Error("getLatestTimeline only supports room timelines");
}

let res: IMessagesResponse;
const roomId = timelineSet.room.roomId;
let event;
if (timelineSet.isThreadTimeline) {
res = await this.createThreadListMessagesRequest(
roomId,
const res = await this.createThreadListMessagesRequest(
timelineSet.room.roomId,
null,
1,
Direction.Backward,
timelineSet.getFilter(),
);
} else {
res = await this.createMessagesRequest(
roomId,
event = res.chunk?.[0];
} else if (timelineSet.thread && Thread.hasServerSideSupport) {
const res = await this.fetchRelations(
timelineSet.room.roomId,
timelineSet.thread.id,
THREAD_RELATION_TYPE.name,
null,
1,
Direction.Backward,
timelineSet.getFilter(),
{ dir: Direction.Backward, limit: 1 },
);
event = res.chunk?.[0];
} else {
const messagesPath = utils.encodeUri(
"/rooms/$roomId/messages", {
$roomId: timelineSet.room.roomId,
},
);

const params: Record<string, string | string[]> = {
dir: 'b',
};
if (this.clientOpts.lazyLoadMembers) {
params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER);
}

const res = await this.http.authedRequest<IMessagesResponse>(undefined, Method.Get, messagesPath, params);
event = res.chunk?.[0];
}
const event = res.chunk?.[0];
if (!event) {
throw new Error("No message returned from /messages when trying to construct getLatestTimeline");
throw new Error("No message returned when trying to construct getLatestTimeline");
}

return this.getEventTimeline(timelineSet, event.event_id);
Expand Down Expand Up @@ -5624,7 +5726,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
public paginateEventTimeline(eventTimeline: EventTimeline, opts: IPaginateOpts): Promise<boolean> {
const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet);
const room = this.getRoom(eventTimeline.getRoomId());
const isThreadTimeline = eventTimeline.getTimelineSet().isThreadTimeline;
const isThreadListTimeline = eventTimeline.getTimelineSet().isThreadTimeline;
const isThreadTimeline = (eventTimeline.getTimelineSet().thread);

// TODO: we should implement a backoff (as per scrollback()) to deal more
// nicely with HTTP errors.
Expand Down Expand Up @@ -5695,7 +5798,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else if (isThreadTimeline) {
} else if (isThreadListTimeline) {
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
}
Expand Down Expand Up @@ -5731,6 +5834,43 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else if (isThreadTimeline) {
const room = this.getRoom(eventTimeline.getRoomId());
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
}

promise = this.fetchRelations(
eventTimeline.getRoomId(),
eventTimeline.getTimelineSet().thread?.id,
THREAD_RELATION_TYPE.name,
null,
{ dir, limit: opts.limit, from: token },
).then((res) => {
const mapper = this.getEventMapper();
const matrixEvents = res.chunk.map(mapper);
eventTimeline.getTimelineSet().thread?.fetchEditsWhereNeeded(...matrixEvents);

const newToken = res.next_batch;

const timelineSet = eventTimeline.getTimelineSet();
timelineSet.addEventsToTimeline(matrixEvents, backwards, eventTimeline, newToken ?? null);
if (!newToken && backwards) {
timelineSet.addEventsToTimeline([mapper(res.original_event)], true, eventTimeline, null);
}
this.processBeaconEvents(timelineSet.room, matrixEvents);

// if we've hit the end of the timeline, we need to stop trying to
// paginate. We need to keep the 'forwards' token though, to make sure
// we can recover from gappy syncs.
if (backwards && !newToken) {
eventTimeline.setPaginationToken(null, dir);
}
return Boolean(newToken);
}).finally(() => {
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else {
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
Expand All @@ -5752,10 +5892,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const matrixEvents = res.chunk.map(this.getEventMapper());

const timelineSet = eventTimeline.getTimelineSet();
const [timelineEvents, threadedEvents] = room.partitionThreadedEvents(matrixEvents);
const [timelineEvents] = room.partitionThreadedEvents(matrixEvents);
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processBeaconEvents(room, timelineEvents);
this.processThreadEvents(room, threadedEvents, backwards);
this.processThreadRoots(room,
timelineEvents.filter(it => it.isRelation(THREAD_RELATION_TYPE.name)),
false);

const atEnd = res.end === undefined || res.end === res.start;

Expand Down
6 changes: 6 additions & 0 deletions src/models/event-timeline-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,12 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
}
}

if (this.thread) {
for (const event of events) {
EventTimeline.setEventMetadata(event, this.room.currentState, false);
}
}

const direction = toStartOfTimeline ? EventTimeline.BACKWARDS :
EventTimeline.FORWARDS;
const inverseDirection = toStartOfTimeline ? EventTimeline.FORWARDS :
Expand Down
36 changes: 30 additions & 6 deletions src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {
/**
* @experimental
*/
private threads = new Map<string, Thread>();
public threads = new Map<string, Thread>();
public lastThread: Thread;

/**
Expand Down Expand Up @@ -1790,13 +1790,19 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {

private onThreadNewReply(thread: Thread): void {
const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS);
for (const timelineSet of this.threadsTimelineSets) {
timelineSet.removeEvent(thread.id);
timelineSet.addLiveEvent(thread.rootEvent, {
if (thread.length) {
this.threadsTimelineSets?.[0]?.addLiveEvent(thread.rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState,
});
if (thread.hasCurrentUserParticipated) {
this.threadsTimelineSets?.[1]?.addLiveEvent(thread.rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState,
});
}
}
}

Expand Down Expand Up @@ -1924,7 +1930,6 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {
}

const thread = new Thread(threadId, rootEvent, {
initialEvents: events,
room: this,
client: this.client,
});
Expand All @@ -1946,7 +1951,11 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {
this.threadsTimelineSets.forEach(timelineSet => {
if (thread.rootEvent) {
if (Thread.hasServerSideSupport) {
timelineSet.addLiveEvent(thread.rootEvent);
timelineSet.addLiveEvent(thread.rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState: this.currentState,
});
} else {
timelineSet.addEventToTimeline(
thread.rootEvent,
Expand All @@ -1960,6 +1969,21 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {

this.emit(ThreadEvent.New, thread, toStartOfTimeline);

if (thread.length) {
this.threadsTimelineSets?.[0]?.addLiveEvent(thread.rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState: this.currentState,
});
if (thread.hasCurrentUserParticipated) {
this.threadsTimelineSets?.[1]?.addLiveEvent(thread.rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState: this.currentState,
});
}
}

return thread;
}

Expand Down
Loading

0 comments on commit 917adb1

Please sign in to comment.