From cbf7248370811326c4fa5f66d0b74d201e571964 Mon Sep 17 00:00:00 2001 From: somebody1234 Date: Tue, 19 Dec 2023 15:41:14 +1000 Subject: [PATCH] Fixes for Language Server sync server (#8514) - Closes #8398 # Important Notes - The original error caused by a failing `text/openFile` (`openTextFile`) is still present, but (seemingly?) harder to repro now --- app/gui2/shared/__tests__/yjsModel.test.ts | 54 ++ app/gui2/shared/languageServer.ts | 52 +- app/gui2/shared/languageServerTypes.ts | 4 +- app/gui2/shared/retry.ts | 118 +++++ app/gui2/shared/yjsModel.ts | 105 +--- app/gui2/src/components/GraphEditor/upload.ts | 49 +- app/gui2/src/stores/visualization/index.ts | 21 +- app/gui2/src/util/net.ts | 10 +- app/gui2/ydoc-server/languageServerSession.ts | 481 +++++++++++------- app/gui2/ydoc-server/ydoc.ts | 64 +-- 10 files changed, 586 insertions(+), 372 deletions(-) create mode 100644 app/gui2/shared/__tests__/yjsModel.test.ts create mode 100644 app/gui2/shared/retry.ts diff --git a/app/gui2/shared/__tests__/yjsModel.test.ts b/app/gui2/shared/__tests__/yjsModel.test.ts new file mode 100644 index 000000000000..844bf83cea03 --- /dev/null +++ b/app/gui2/shared/__tests__/yjsModel.test.ts @@ -0,0 +1,54 @@ +import { rangeEncloses, rangeIntersects, type ContentRange } from 'shared/yjsModel' +import { expect, test } from 'vitest' + +type RangeTest = { a: ContentRange; b: ContentRange } + +const equalRanges: RangeTest[] = [ + { a: [0, 0], b: [0, 0] }, + { a: [0, 1], b: [0, 1] }, + { a: [-5, 5], b: [-5, 5] }, +] + +const totalOverlap: RangeTest[] = [ + { a: [0, 1], b: [0, 0] }, + { a: [0, 2], b: [2, 2] }, + { a: [-1, 1], b: [1, 1] }, + { a: [0, 2], b: [0, 1] }, + { a: [-10, 10], b: [-3, 7] }, + { a: [0, 5], b: [1, 2] }, + { a: [3, 5], b: [3, 4] }, +] + +const reverseTotalOverlap: RangeTest[] = totalOverlap.map(({ a, b }) => ({ a: b, b: a })) + +const noOverlap: RangeTest[] = [ + { a: [0, 1], b: [2, 3] }, + { a: [0, 1], b: [-1, -1] }, + { a: [5, 6], b: [2, 3] }, + { a: [0, 2], b: [-2, -1] }, + { a: [-5, -3], b: [9, 10] }, + { a: [-3, 2], b: [3, 4] }, +] + +const partialOverlap: RangeTest[] = [ + { a: [0, 3], b: [-1, 1] }, + { a: [0, 1], b: [-1, 0] }, + { a: [0, 0], b: [-1, 0] }, + { a: [0, 2], b: [1, 4] }, + { a: [-8, 0], b: [0, 10] }, +] + +test.each([...equalRanges, ...totalOverlap])('Range $a should enclose $b', ({ a, b }) => + expect(rangeEncloses(a, b)).toBe(true), +) +test.each([...noOverlap, ...partialOverlap, ...reverseTotalOverlap])( + 'Range $a should not enclose $b', + ({ a, b }) => expect(rangeEncloses(a, b)).toBe(false), +) +test.each([...equalRanges, ...totalOverlap, ...reverseTotalOverlap, ...partialOverlap])( + 'Range $a should intersect $b', + ({ a, b }) => expect(rangeIntersects(a, b)).toBe(true), +) +test.each([...noOverlap])('Range $a should not intersect $b', ({ a, b }) => + expect(rangeIntersects(a, b)).toBe(false), +) diff --git a/app/gui2/shared/languageServer.ts b/app/gui2/shared/languageServer.ts index 866f5a198e73..d32a8f60d6b1 100644 --- a/app/gui2/shared/languageServer.ts +++ b/app/gui2/shared/languageServer.ts @@ -132,14 +132,14 @@ export class LanguageServer extends ObservableV2 { console.dir(params) } return await this.client.request({ method, params }, RPC_TIMEOUT_MS) - } catch (e) { - const remoteError = RemoteRpcErrorSchema.safeParse(e) + } catch (error) { + const remoteError = RemoteRpcErrorSchema.safeParse(error) if (remoteError.success) { throw new LsRpcError(new RemoteRpcError(remoteError.data), method, params) - } else if (e instanceof Error) { - throw new LsRpcError(e, method, params) + } else if (error instanceof Error) { + throw new LsRpcError(error, method, params) } - throw e + throw error } finally { if (DEBUG_LOG_RPC) { console.log(`LS [${uuid}] ${method} took ${performance.now() - now}ms`) @@ -402,27 +402,29 @@ export class LanguageServer extends ObservableV2 { retry: (cb: () => Promise) => Promise = (f) => f(), ) { let running = true - ;(async () => { - this.on('file/event', callback) - walkFs(this, { rootId, segments }, (type, path) => { - if ( - !running || - type !== 'File' || - path.segments.length < segments.length || - segments.some((seg, i) => seg !== path.segments[i]) - ) - return - callback({ - path: { rootId: path.rootId, segments: path.segments.slice(segments.length) }, - kind: 'Added', + const self = this + return { + promise: (async () => { + self.on('file/event', callback) + await retry(async () => running && self.acquireReceivesTreeUpdates({ rootId, segments })) + await walkFs(self, { rootId, segments }, (type, path) => { + if ( + !running || + type !== 'File' || + path.segments.length < segments.length || + segments.some((segment, i) => segment !== path.segments[i]) + ) + return + callback({ + path: { rootId: path.rootId, segments: path.segments.slice(segments.length) }, + kind: 'Added', + }) }) - }) - await retry(() => this.acquireReceivesTreeUpdates({ rootId, segments })) - if (!running) return - })() - return () => { - running = false - this.off('file/event', callback) + })(), + unsubscribe() { + running = false + self.off('file/event', callback) + }, } } diff --git a/app/gui2/shared/languageServerTypes.ts b/app/gui2/shared/languageServerTypes.ts index 4e98cb76ec74..8d21a9affaff 100644 --- a/app/gui2/shared/languageServerTypes.ts +++ b/app/gui2/shared/languageServerTypes.ts @@ -386,9 +386,7 @@ export namespace response { contentRoots: ContentRoot[] } - export interface FileContents { - contents: TextFileContents - } + export interface FileContents extends TextFileContents {} export interface FileExists { exists: boolean diff --git a/app/gui2/shared/retry.ts b/app/gui2/shared/retry.ts new file mode 100644 index 000000000000..6e98d487d6f0 --- /dev/null +++ b/app/gui2/shared/retry.ts @@ -0,0 +1,118 @@ +import { wait } from 'lib0/promise' + +export interface BackoffOptions { + maxRetries?: number + retryDelay?: number + retryDelayMultiplier?: number + retryDelayMax?: number + /** Called when the promise throws an error, and the next retry is about to be attempted. + * When this function returns `false`, the backoff is immediately aborted. When this function + * is not provided, the backoff will always continue until the maximum number of retries + * is reached. * */ + onBeforeRetry?: ( + error: E, + retryCount: number, + maxRetries: number, + delay: number, + ) => boolean | void + /** Called right before returning. */ + onSuccess?: (retryCount: number) => void + /** Called after the final retry, right before throwing an error. + * Note that `onBeforeRetry` is *not* called on the final retry, as there is nothing after the + * final retry. */ + onFailure?: (error: E, retryCount: number) => void +} + +const defaultBackoffOptions: Required> = { + maxRetries: 3, + retryDelay: 1000, + retryDelayMultiplier: 2, + retryDelayMax: 10000, + onBeforeRetry: () => {}, + onSuccess: () => {}, + onFailure: () => {}, +} + +/** Retry a failing promise function with exponential backoff. */ +export async function exponentialBackoff( + f: () => Promise, + backoffOptions?: BackoffOptions, +): Promise { + const { + maxRetries, + retryDelay, + retryDelayMultiplier, + retryDelayMax, + onBeforeRetry, + onSuccess, + onFailure, + } = { + ...defaultBackoffOptions, + ...backoffOptions, + } + for ( + let retries = 0, delay = retryDelay; + ; + retries += 1, delay = Math.min(retryDelayMax, delay * retryDelayMultiplier) + ) { + try { + const result = await f() + onSuccess(retries) + return result + } catch (error) { + if (retries >= maxRetries) { + onFailure(error as E, retries) + throw error + } + if (onBeforeRetry(error as E, retries, maxRetries, delay) === false) throw error + await wait(delay) + } + } +} + +export function defaultOnBeforeRetry( + description: string, +): NonNullable['onBeforeRetry']> { + return (error, retryCount, maxRetries, delay) => { + console.error( + 'Could not ' + + description + + ` (${retryCount}/${maxRetries} retries), retrying after ${delay}ms...`, + ) + console.error(error) + } +} + +export function defaultOnFailure( + description: string, +): NonNullable['onFailure']> { + return (error, retryCount) => { + console.error( + 'Could not ' + description + ` (${retryCount}/${retryCount} retries), throwing error.`, + ) + console.error(error) + } +} + +export function defaultOnSuccess( + description: string, +): NonNullable['onSuccess']> { + return (retryCount) => { + if (retryCount === 0) return + console.info( + 'Successfully ' + + description + + ` after ${retryCount} ${retryCount === 1 ? 'failure' : 'failures'}.`, + ) + } +} + +/** @param successDescription Should be in past tense, without an initial capital letter. + * @param errorDescription Should be in present tense, without an initial capital letter. */ +export function printingCallbacks(successDescription: string, errorDescription: string) { + return { + onBeforeRetry: defaultOnBeforeRetry(errorDescription), + onSuccess: defaultOnSuccess(successDescription), + onFailure: defaultOnFailure(errorDescription), + } satisfies BackoffOptions +} diff --git a/app/gui2/shared/yjsModel.ts b/app/gui2/shared/yjsModel.ts index 10f2191eb3b2..6d3ce08ff0a3 100644 --- a/app/gui2/shared/yjsModel.ts +++ b/app/gui2/shared/yjsModel.ts @@ -215,7 +215,8 @@ export class DistributedModule { } } -export type RelativeRange = [Y.RelativePosition, Y.RelativePosition] +export type SourceRange = readonly [start: number, end: number] +export type RelativeRange = [start: Y.RelativePosition, end: Y.RelativePosition] /** * Accessor for the ID map stored in shared yjs map as relative ranges. Synchronizes the ranges @@ -260,45 +261,39 @@ export class IdMap { return new IdMap(map, text) } - public static keyForRange(range: readonly [number, number]): string { + public static keyForRange(range: SourceRange): string { return `${range[0].toString(16)}:${range[1].toString(16)}` } - public static rangeForKey(key: string): [number, number] { + public static rangeForKey(key: string): SourceRange { return key.split(':').map((x) => parseInt(x, 16)) as [number, number] } - private modelToIndices(rangeBuffer: Uint8Array): [number, number] | null { - const [relStart, relEnd] = decodeRange(rangeBuffer) - const start = Y.createAbsolutePositionFromRelativePosition(relStart, this.doc) - const end = Y.createAbsolutePositionFromRelativePosition(relEnd, this.doc) - if (start == null || end == null) return null + private modelToIndices(rangeBuffer: Uint8Array): SourceRange | null { + const [relativeStart, relativeEnd] = decodeRange(rangeBuffer) + const start = Y.createAbsolutePositionFromRelativePosition(relativeStart, this.doc) + const end = Y.createAbsolutePositionFromRelativePosition(relativeEnd, this.doc) + if (!start || !end) return null return [start.index, end.index] } - insertKnownId(range: [number, number], id: ExprId) { - if (this.finished) { - throw new Error('IdMap already finished') - } - + insertKnownId(range: SourceRange, id: ExprId) { + if (this.finished) throw new Error('IdMap already finished') const key = IdMap.keyForRange(range) this.rangeToExpr.set(key, id) this.accessed.add(id) } - getIfExist(range: readonly [number, number]): ExprId | undefined { + getIfExist(range: SourceRange): ExprId | undefined { const key = IdMap.keyForRange(range) return this.rangeToExpr.get(key) } - getOrInsertUniqueId(range: readonly [number, number]): ExprId { - if (this.finished) { - throw new Error('IdMap already finished') - } - + getOrInsertUniqueId(range: SourceRange): ExprId { + if (this.finished) throw new Error('IdMap already finished') const key = IdMap.keyForRange(range) const val = this.rangeToExpr.get(key) - if (val !== undefined) { + if (val) { this.accessed.add(val) return val } else { @@ -313,8 +308,8 @@ export class IdMap { return this.accessed } - toRawRanges(): Record { - const ranges: Record = {} + toRawRanges(): Record { + const ranges: Record = {} for (const [key, expr] of this.rangeToExpr.entries()) { ranges[expr] = IdMap.rangeForKey(key) } @@ -328,14 +323,9 @@ export class IdMap { * Can be called at most once. After calling this method, the ID map is no longer usable. */ finishAndSynchronize(): typeof this.yMap { - if (this.finished) { - throw new Error('IdMap already finished') - } + if (this.finished) throw new Error('IdMap already finished') this.finished = true - - const doc = this.doc - - doc.transact(() => { + this.doc.transact(() => { this.yMap.forEach((_, expr) => { // Expressions that were accessed and present in the map are guaranteed to match. There is // no mechanism for modifying them, so we don't need to check for equality. We only need to @@ -385,7 +375,7 @@ export function isUuid(x: unknown): x is Uuid { } /** A range represented as start and end indices. */ -export type ContentRange = [number, number] +export type ContentRange = [start: number, end: number] export function rangeEquals(a: ContentRange, b: ContentRange): boolean { return a[0] == b[0] && a[1] == b[1] @@ -403,58 +393,3 @@ export function rangeIntersects(a: ContentRange, b: ContentRange): boolean { export function rangeIsBefore(a: ContentRange, b: ContentRange): boolean { return a[1] <= b[0] } - -if (import.meta.vitest) { - const { test, expect } = import.meta.vitest - type RangeTest = { a: ContentRange; b: ContentRange } - - const equalRanges: RangeTest[] = [ - { a: [0, 0], b: [0, 0] }, - { a: [0, 1], b: [0, 1] }, - { a: [-5, 5], b: [-5, 5] }, - ] - - const totalOverlap: RangeTest[] = [ - { a: [0, 1], b: [0, 0] }, - { a: [0, 2], b: [2, 2] }, - { a: [-1, 1], b: [1, 1] }, - { a: [0, 2], b: [0, 1] }, - { a: [-10, 10], b: [-3, 7] }, - { a: [0, 5], b: [1, 2] }, - { a: [3, 5], b: [3, 4] }, - ] - - const reverseTotalOverlap: RangeTest[] = totalOverlap.map(({ a, b }) => ({ a: b, b: a })) - - const noOverlap: RangeTest[] = [ - { a: [0, 1], b: [2, 3] }, - { a: [0, 1], b: [-1, -1] }, - { a: [5, 6], b: [2, 3] }, - { a: [0, 2], b: [-2, -1] }, - { a: [-5, -3], b: [9, 10] }, - { a: [-3, 2], b: [3, 4] }, - ] - - const partialOverlap: RangeTest[] = [ - { a: [0, 3], b: [-1, 1] }, - { a: [0, 1], b: [-1, 0] }, - { a: [0, 0], b: [-1, 0] }, - { a: [0, 2], b: [1, 4] }, - { a: [-8, 0], b: [0, 10] }, - ] - - test.each([...equalRanges, ...totalOverlap])('Range $a should enclose $b', ({ a, b }) => - expect(rangeEncloses(a, b)).toBe(true), - ) - test.each([...noOverlap, ...partialOverlap, ...reverseTotalOverlap])( - 'Range $a should not enclose $b', - ({ a, b }) => expect(rangeEncloses(a, b)).toBe(false), - ) - test.each([...equalRanges, ...totalOverlap, ...reverseTotalOverlap, ...partialOverlap])( - 'Range $a should intersect $b', - ({ a, b }) => expect(rangeIntersects(a, b)).toBe(true), - ) - test.each([...noOverlap])('Range $a should not intersect $b', ({ a, b }) => - expect(rangeIntersects(a, b)).toBe(false), - ) -} diff --git a/app/gui2/src/components/GraphEditor/upload.ts b/app/gui2/src/components/GraphEditor/upload.ts index 86bf41551f9a..6ba49758c3e3 100644 --- a/app/gui2/src/components/GraphEditor/upload.ts +++ b/app/gui2/src/components/GraphEditor/upload.ts @@ -66,7 +66,7 @@ export class Uploader { ): Promise { const roots = await contentRoots const projectRootId = roots.find((root) => root.type == 'Project') - if (!projectRootId) throw new Error('Unable to find project root, uploading not possible.') + if (!projectRootId) throw new Error('Could not find project root, uploading not possible.') const instance = new Uploader( await rpc, await binary, @@ -149,34 +149,39 @@ export class Uploader { private async ensureDataDirExists() { const exists = await this.dataDirExists() - if (!exists) { - await this.rpc.createFile({ - type: 'Directory', - name: DATA_DIR_NAME, - path: { rootId: this.projectRootId, segments: [] }, - }) - } + if (exists) return + await this.rpc.createFile({ + type: 'Directory', + name: DATA_DIR_NAME, + path: { rootId: this.projectRootId, segments: [] }, + }) } private async dataDirExists(): Promise { try { const info = await this.rpc.fileInfo(this.dataDirPath()) return info.attributes.kind.type == 'Directory' - } catch (err: any) { - if (err.cause && err.cause instanceof RemoteRpcError) { - if ([ErrorCode.FILE_NOT_FOUND, ErrorCode.CONTENT_ROOT_NOT_FOUND].includes(err.cause.code)) { + } catch (error) { + if ( + typeof error === 'object' && + error && + 'cause' in error && + error.cause instanceof RemoteRpcError + ) { + if ( + error.cause.code === ErrorCode.FILE_NOT_FOUND || + error.cause.code === ErrorCode.CONTENT_ROOT_NOT_FOUND + ) return false - } } - throw err + throw error } } private async pickUniqueName(suggestedName: string): Promise { const files = await this.rpc.listFiles(this.dataDirPath()) const existingNames = new Set(files.paths.map((path) => path.name)) - const [stem, maybeExtension] = splitFilename(suggestedName) - const extension = maybeExtension ?? '' + const { stem, extension = '' } = splitFilename(suggestedName) let candidate = suggestedName let num = 1 while (existingNames.has(candidate)) { @@ -190,14 +195,12 @@ export class Uploader { /** * Split filename into stem and (optional) extension. */ -function splitFilename(filename: string): [string, string | null] { - const dotIndex = filename.lastIndexOf('.') - +function splitFilename(fileName: string): { stem: string; extension?: string } { + const dotIndex = fileName.lastIndexOf('.') if (dotIndex !== -1 && dotIndex !== 0) { - const stem = filename.substring(0, dotIndex) - const extension = filename.substring(dotIndex + 1) - return [stem, extension] + const stem = fileName.substring(0, dotIndex) + const extension = fileName.substring(dotIndex + 1) + return { stem, extension } } - - return [filename, null] + return { stem: fileName } } diff --git a/app/gui2/src/stores/visualization/index.ts b/app/gui2/src/stores/visualization/index.ts index 845870a3c9ab..d802a6afa9ba 100644 --- a/app/gui2/src/stores/visualization/index.ts +++ b/app/gui2/src/stores/visualization/index.ts @@ -26,6 +26,7 @@ import { isUrlString } from '@/util/data/urlString' import { isIconName } from '@/util/iconName' import { rpcWithRetries } from '@/util/net' import { defineStore } from 'pinia' +import { ErrorCode, LsRpcError, RemoteRpcError } from 'shared/languageServer' import type { Event as LSEvent, VisualizationConfiguration } from 'shared/languageServerTypes' import type { ExprId, VisualizationIdentifier } from 'shared/yjsModel' import { computed, reactive } from 'vue' @@ -208,12 +209,28 @@ export const useVisualizationStore = defineStore('visualization', () => { } } - Promise.all([proj.lsRpcConnection, projectRoot]).then(([ls, projectRoot]) => { + Promise.all([proj.lsRpcConnection, projectRoot]).then(async ([ls, projectRoot]) => { if (!projectRoot) { console.error('Could not load custom visualizations: Project directory not found.') return } - ls.watchFiles(projectRoot, [customVisualizationsDirectory], onFileEvent, rpcWithRetries) + try { + await ls.watchFiles(projectRoot, [customVisualizationsDirectory], onFileEvent, rpcWithRetries) + .promise + } catch (error) { + if ( + error instanceof LsRpcError && + error.cause instanceof RemoteRpcError && + error.cause.code === ErrorCode.FILE_NOT_FOUND + ) { + console.info( + "'visualizations/' folder not found in project directory. " + + "If you have custom visualizations, please put them under 'visualizations/'.", + ) + } else { + throw error + } + } }) function* types(type: Opt) { diff --git a/app/gui2/src/util/net.ts b/app/gui2/src/util/net.ts index 63389d159d18..2a76437cb67c 100644 --- a/app/gui2/src/util/net.ts +++ b/app/gui2/src/util/net.ts @@ -40,12 +40,12 @@ export async function exponentialBackoff( backoffOptions?: BackoffOptions, ): Promise> { const options = { ...defaultBackoffOptions, ...backoffOptions } - for (let retries = 0; ; retries += 1) { + for ( + let retries = 0, delay = options.retryDelay; + ; + retries += 1, delay = Math.min(options.retryDelayMax, delay * options.retryDelayMultiplier) + ) { const result = await f() - const delay = Math.min( - options.retryDelayMax, - options.retryDelay * options.retryDelayMultiplier ** retries, - ) if ( result.ok || retries >= options.maxRetries || diff --git a/app/gui2/ydoc-server/languageServerSession.ts b/app/gui2/ydoc-server/languageServerSession.ts index d50e8ba89a26..79679ceed68e 100644 --- a/app/gui2/ydoc-server/languageServerSession.ts +++ b/app/gui2/ydoc-server/languageServerSession.ts @@ -5,12 +5,14 @@ import { ObservableV2 } from 'lib0/observable' import * as random from 'lib0/random' import * as Y from 'yjs' import { LanguageServer, computeTextChecksum } from '../shared/languageServer' -import { Checksum, Path } from '../shared/languageServerTypes' +import { Checksum, FileEdit, Path, response } from '../shared/languageServerTypes' +import { exponentialBackoff, printingCallbacks } from '../shared/retry' import { DistributedProject, ExprId, IdMap, ModuleDoc, + SourceRange, type NodeMetadata, type Uuid, } from '../shared/yjsModel' @@ -23,15 +25,21 @@ import { import * as fileFormat from './fileFormat' import { WSSharedDoc } from './ydoc' -const sessions = new Map() +const SOURCE_DIR = 'src' +const EXTENSION = '.enso' const DEBUG_LOG_SYNC = false -type Events = { - error: (error: Error) => void +function createOpenRPCClient(url: string) { + const transport = new WebSocketTransport(url) + const requestManager = new RequestManager([transport]) + transport.connection.on('error', (error) => + console.error('Language Server transport error:', error), + ) + return new Client(requestManager) } -export class LanguageServerSession extends ObservableV2 { +export class LanguageServerSession { clientId: Uuid indexDoc: WSSharedDoc docs: Map @@ -39,22 +47,17 @@ export class LanguageServerSession extends ObservableV2 { url: string client: Client ls: LanguageServer + connection: response.InitProtocolConnection | undefined model: DistributedProject projectRootId: Uuid | null authoritativeModules: Map constructor(url: string) { - super() this.clientId = random.uuidv4() as Uuid this.docs = new Map() this.retainCount = 0 this.url = url - const transport = new WebSocketTransport(url) - const requestManager = new RequestManager([transport]) - this.client = new Client(requestManager) console.log('new session with', url) - transport.connection.on('error', (error) => this.emit('error', [error])) - this.ls = new LanguageServer(this.client) this.indexDoc = new WSSharedDoc() this.docs.set('index', this.indexDoc) this.model = new DistributedProject(this.indexDoc.doc) @@ -64,36 +67,91 @@ export class LanguageServerSession extends ObservableV2 { this.indexDoc.doc.on('subdocs', (subdocs: { loaded: Set }) => { for (const doc of subdocs.loaded) { const name = this.model.findModuleByDocId(doc.guid) - if (name == null) continue + if (!name) continue const persistence = this.authoritativeModules.get(name) - if (persistence == null) continue + if (!persistence) continue } }) + const { client, ls } = this.setupClient() + this.client = client + this.ls = ls + } + static sessions = new Map() + static get(url: string): LanguageServerSession { + const session = map.setIfUndefined( + LanguageServerSession.sessions, + url, + () => new LanguageServerSession(url), + ) + session.retain() + return session + } + + private restartClient() { + this.client.close() + this.ls.destroy() + this.connection = undefined + this.setupClient() + } + + private setupClient() { + this.client = createOpenRPCClient(this.url) + this.ls = new LanguageServer(this.client) this.ls.on('file/event', async (event) => { if (DEBUG_LOG_SYNC) { console.log('file/event', event) } - switch (event.kind) { - case 'Added': - if (isSourceFile(event.path)) { - const fileInfo = await this.ls.fileInfo(event.path) - if (fileInfo.attributes.kind.type == 'File') { - this.getModuleModel(event.path).open() + const path = event.path.segments.join('/') + try { + switch (event.kind) { + case 'Added': { + if (isSourceFile(event.path)) { + const fileInfo = await this.ls.fileInfo(event.path) + if (fileInfo.attributes.kind.type == 'File') { + await exponentialBackoff( + () => this.getModuleModel(event.path).open(), + printingCallbacks(`opened new file '${path}'`, `open new file '${path}'`), + ) + } } + break } - break - case 'Modified': - this.getModuleModelIfExists(event.path)?.reload() - break + case 'Modified': { + await exponentialBackoff( + async () => this.tryGetExistingModuleModel(event.path)?.reload(), + printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`), + ) + break + } + } + } catch { + this.restartClient() } }) - this.ls.on('text/fileModifiedOnDisk', async (event) => { - this.getModuleModelIfExists(event.path)?.reload() + const path = event.path.segments.join('/') + try { + await exponentialBackoff( + async () => this.tryGetExistingModuleModel(event.path)?.reload(), + printingCallbacks(`reloaded file '${path}'`, `reload file '${path}'`), + ) + } catch { + this.restartClient() + } }) - - this.readInitialState() + exponentialBackoff( + () => this.readInitialState(), + printingCallbacks('read initial state', 'read initial state'), + ).catch((error) => { + console.error('Could not read initial state.') + console.error(error) + exponentialBackoff( + async () => this.restartClient(), + printingCallbacks('restarted RPC client', 'restart RPC client'), + ) + }) + return { client: this.client, ls: this.ls } } private assertProjectRoot(): asserts this is { projectRootId: Uuid } { @@ -101,39 +159,38 @@ export class LanguageServerSession extends ObservableV2 { } private async readInitialState() { + let moduleOpenPromises: Promise[] = [] try { - const { contentRoots } = await this.ls.initProtocolConnection(this.clientId) - const projectRoot = contentRoots.find((root) => root.type === 'Project') ?? null - if (projectRoot == null) throw new Error('Missing project root') + const connection = this.connection ?? (await this.ls.initProtocolConnection(this.clientId)) + this.connection = connection + const projectRoot = connection.contentRoots.find((root) => root.type === 'Project') + if (!projectRoot) throw new Error('Missing project root') this.projectRootId = projectRoot.id await this.ls.acquireReceivesTreeUpdates({ rootId: this.projectRootId, segments: [] }) - const srcFiles = await this.scanSrcFiles() - await Promise.all( - this.indexDoc.doc.transact(() => { - return srcFiles.map((file) => - this.getModuleModel(pushPathSegment(file.path, file.name)).open(), - ) - }, this), + const files = await this.scanSourceFiles() + moduleOpenPromises = this.indexDoc.doc.transact( + () => + files.map((file) => this.getModuleModel(pushPathSegment(file.path, file.name)).open()), + this, ) + await Promise.all(moduleOpenPromises) } catch (error) { - console.error('LS Initialization failed:', error) - if (error instanceof Error) { - this.emit('error', [error]) - } - return + console.error('LS initialization failed.') + throw error } console.log('LS connection initialized.') } - async scanSrcFiles() { + async scanSourceFiles() { this.assertProjectRoot() - const srcModules = await this.ls.listFiles({ rootId: this.projectRootId, segments: ['src'] }) - return srcModules.paths.filter((file) => file.type === 'File' && file.name.endsWith('.enso')) + const sourceDir: Path = { rootId: this.projectRootId, segments: [SOURCE_DIR] } + const srcModules = await this.ls.listFiles(sourceDir) + return srcModules.paths.filter((file) => file.type === 'File' && file.name.endsWith(EXTENSION)) } - getModuleModelIfExists(path: Path): ModulePersistence | null { + tryGetExistingModuleModel(path: Path): ModulePersistence | undefined { const name = pathToModuleName(path) - return this.authoritativeModules.get(name) ?? null + return this.authoritativeModules.get(name) } getModuleModel(path: Path): ModulePersistence { @@ -147,37 +204,26 @@ export class LanguageServerSession extends ObservableV2 { const index = this.model.findModuleByDocId(wsDoc.doc.guid) this.docs.delete(wsDoc.doc.guid) this.authoritativeModules.delete(name) - if (index != null) { - this.model.deleteModule(index) - } + if (index != null) this.model.deleteModule(index) }) return mod }) } - static get(url: string): LanguageServerSession { - const session = map.setIfUndefined(sessions, url, () => new LanguageServerSession(url)) - session.retain() - return session - } - retain() { - this.retainCount++ + this.retainCount += 1 } - release(): Promise { - this.retainCount-- - if (this.retainCount === 0) { - const closing = Promise.all( - Array.from(this.authoritativeModules.values(), (mod) => mod.dispose()), - ).then(() => {}) - this.authoritativeModules.clear() - this.model.doc.destroy() - this.ls.dispose() - sessions.delete(this.url) - return closing - } - return Promise.resolve() + async release(): Promise { + this.retainCount -= 1 + if (this.retainCount !== 0) return + const modules = this.authoritativeModules.values() + const moduleDisposePromises = Array.from(modules, (mod) => mod.dispose()) + this.authoritativeModules.clear() + this.model.doc.destroy() + this.ls.dispose() + LanguageServerSession.sessions.delete(this.url) + await Promise.all(moduleDisposePromises) } getYDoc(guid: string): WSSharedDoc | undefined { @@ -185,23 +231,19 @@ export class LanguageServerSession extends ObservableV2 { } } -const isSourceFile = (path: Path): boolean => { - return path.segments[0] === 'src' && path.segments[path.segments.length - 1].endsWith('.enso') +function isSourceFile(path: Path): boolean { + return ( + path.segments[0] === SOURCE_DIR && path.segments[path.segments.length - 1].endsWith(EXTENSION) + ) } -const pathToModuleName = (path: Path): string => { - if (path.segments[0] === 'src') { - return path.segments.slice(1).join('/') - } else { - return '//' + path.segments.join('/') - } +function pathToModuleName(path: Path): string { + if (path.segments[0] === SOURCE_DIR) return path.segments.slice(1).join('/') + else return '//' + path.segments.join('/') } -const pushPathSegment = (path: Path, segment: string): Path => { - return { - rootId: path.rootId, - segments: [...path.segments, segment], - } +function pushPathSegment(path: Path, segment: string): Path { + return { rootId: path.rootId, segments: [...path.segments, segment] } } enum LsSyncState { @@ -225,14 +267,15 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { ls: LanguageServer path: Path doc: ModuleDoc = new ModuleDoc(new Y.Doc()) - state: LsSyncState = LsSyncState.Closed - lastAction = Promise.resolve() + readonly state: LsSyncState = LsSyncState.Closed + readonly lastAction = Promise.resolve() updateToApply: Uint8Array | null = null syncedContent: string | null = null syncedVersion: Checksum | null = null syncedMeta: fileFormat.Metadata = fileFormat.tryParseMetadataOrFallback(null) queuedAction: LsAction | null = null cleanup = () => {} + constructor(ls: LanguageServer, path: Path, sharedDoc: Y.Doc) { super() this.ls = ls @@ -240,9 +283,7 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { const onRemoteUpdate = this.queueRemoteUpdate.bind(this) const onLocalUpdate = (update: Uint8Array, origin: unknown) => { - if (origin === 'file') { - Y.applyUpdate(sharedDoc, update, this) - } + if (origin === 'file') Y.applyUpdate(sharedDoc, update, this) } const onFileModified = this.handleFileModified.bind(this) const onFileRemoved = this.handleFileRemoved.bind(this) @@ -258,6 +299,44 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { } } + private inState(...states: LsSyncState[]): boolean { + return states.includes(this.state) + } + + private setState(state: LsSyncState) { + if (this.state !== LsSyncState.Disposed) { + if (DEBUG_LOG_SYNC) { + console.debug('State change:', LsSyncState[this.state], '->', LsSyncState[state]) + } + // This is SAFE. `this.state` is only `readonly` to ensure that this is the only place + // where it is mutated. + // @ts-expect-error + this.state = state + if (state === LsSyncState.Synchronized) this.trySyncRemoveUpdates() + } else { + throw new Error('LsSync disposed') + } + } + + private setLastAction(lastAction: Promise) { + // This is SAFE. `this.lastAction` is only `readonly` to ensure that this is the only place + // where it is mutated. + // @ts-expect-error + this.lastAction = lastAction.then( + () => {}, + () => {}, + ) + return lastAction + } + + /** Set the current state to the given state while the callback is running. + * Set the current state back to {@link LsSyncState.Synchronized} when the callback finishes. */ + private async withState(state: LsSyncState, callback: () => void | Promise) { + this.setState(state) + await callback() + this.setState(LsSyncState.Synchronized) + } + async open() { this.queuedAction = LsAction.Open switch (this.state) { @@ -265,29 +344,36 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { case LsSyncState.WritingFile: case LsSyncState.Synchronized: case LsSyncState.WriteError: - case LsSyncState.Reloading: + case LsSyncState.Reloading: { return - case LsSyncState.Closing: + } + case LsSyncState.Closing: { await this.lastAction - if (this.queuedAction === LsAction.Open) { - await this.open() - } + if (this.queuedAction === LsAction.Open) await this.open() return - case LsSyncState.Opening: + } + case LsSyncState.Opening: { await this.lastAction return - case LsSyncState.Closed: - { - this.changeState(LsSyncState.Opening) - const opening = this.ls.openTextFile(this.path) - this.lastAction = opening.then() - const result = await opening + } + case LsSyncState.Closed: { + await this.withState(LsSyncState.Opening, async () => { + const promise = this.ls.openTextFile(this.path) + this.setLastAction(promise.catch(() => this.setState(LsSyncState.Closed))) + const result = await promise + if (!result.writeCapability) { + console.error('Could not acquire write capability for module:', this.path) + throw new Error( + `Could not acquire write capability for module '${this.path.segments.join('/')}'`, + ) + } this.syncFileContents(result.content, result.currentVersion) - this.changeState(LsSyncState.Synchronized) - } + }) return + } default: { - const _: never = this.state + this.state satisfies never + return } } } @@ -308,7 +394,6 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { } else { this.updateToApply = update } - this.trySyncRemoveUpdates() } @@ -342,7 +427,7 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { metadataKeys: Y.YMapEvent['keys'] | null, ) { if (this.syncedContent == null || this.syncedVersion == null) return - if (contentDelta == null && idMapKeys == null && metadataKeys == null) return + if (!contentDelta && !idMapKeys && !metadataKeys) return const { edits, newContent, newMetadata } = applyDocumentUpdates( this.doc, @@ -356,17 +441,17 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { const newVersion = computeTextChecksum(newContent) if (DEBUG_LOG_SYNC) { - console.log(' === changes === ') - console.log('number of edits:', edits.length) - console.log('metadata:', metadataKeys) - console.log('content:', contentDelta) - console.log('idMap:', idMapKeys) + console.debug(' === changes === ') + console.debug('number of edits:', edits.length) + console.debug('metadata:', metadataKeys) + console.debug('content:', contentDelta) + console.debug('idMap:', idMapKeys) if (edits.length > 0) { - console.log('version:', this.syncedVersion, '->', newVersion) - console.log('Content diff:') - console.log(prettyPrintDiff(this.syncedContent, newContent)) + console.debug('version:', this.syncedVersion, '->', newVersion) + console.debug('Content diff:') + console.debug(prettyPrintDiff(this.syncedContent, newContent)) } - console.log(' =============== ') + console.debug(' =============== ') } if (edits.length === 0) { if (newVersion !== this.syncedVersion) { @@ -375,36 +460,30 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { return } - this.changeState(LsSyncState.WritingFile) + this.setState(LsSyncState.WritingFile) const execute = contentDelta != null || idMapKeys != null - const apply = this.ls.applyEdit( - { - path: this.path, - edits, - oldVersion: this.syncedVersion, - newVersion, - }, - execute, - ) - return (this.lastAction = apply.then( + const edit: FileEdit = { path: this.path, edits, oldVersion: this.syncedVersion, newVersion } + const apply = this.ls.applyEdit(edit, execute) + const promise = apply.then( () => { this.syncedContent = newContent this.syncedVersion = newVersion this.syncedMeta = newMetadata - this.changeState(LsSyncState.Synchronized) + this.setState(LsSyncState.Synchronized) }, - (e) => { - console.error('Failed to apply edit:', e) - - // Try to recover by reloading the file. Drop the attempted updates, since applying them - // have failed. - this.changeState(LsSyncState.WriteError) + (error) => { + console.error('Could not apply edit:', error) + // Try to recover by reloading the file. + // Drop the attempted updates, since applying them have failed. + this.setState(LsSyncState.WriteError) this.syncedContent = null this.syncedVersion = null return this.reload() }, - )) + ) + this.setLastAction(promise) + return promise } private syncFileContents(content: string, version: Checksum) { @@ -416,12 +495,14 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { const idMap = new IdMap(this.doc.idMap, this.doc.contents) for (const [{ index, size }, id] of idMapMeta) { - const range = [index.value, index.value + size.value] - if (typeof range[0] !== 'number' || typeof range[1] !== 'number') { + const start = index.value + const end = index.value + size.value + const range: SourceRange = [start, end] + if (typeof start !== 'number' || typeof end !== 'number') { console.error(`Invalid range for id ${id}:`, range) continue } - idMap.insertKnownId([index.value, index.value + size.value], id as ExprId) + idMap.insertKnownId(range, id as ExprId) } const keysToDelete = new Set(this.doc.metadata.keys()) @@ -435,13 +516,10 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { keysToDelete.delete(id) this.doc.metadata.set(id, formattedMeta) } - for (const id of keysToDelete) { - this.doc.metadata.delete(id) - } + for (const id of keysToDelete) this.doc.metadata.delete(id) this.syncedContent = content this.syncedVersion = version this.syncedMeta = metadata - const codeDiff = simpleDiffString(this.doc.contents.toString(), code) this.doc.contents.delete(codeDiff.index, codeDiff.remove) this.doc.contents.insert(codeDiff.index, codeDiff.insert) @@ -453,29 +531,35 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { this.queuedAction = LsAction.Close switch (this.state) { case LsSyncState.Disposed: - case LsSyncState.Closed: + case LsSyncState.Closed: { return - case LsSyncState.Closing: + } + case LsSyncState.Closing: { await this.lastAction return + } case LsSyncState.Opening: case LsSyncState.WritingFile: - case LsSyncState.Reloading: + case LsSyncState.Reloading: { await this.lastAction if (this.queuedAction === LsAction.Close) { await this.close() } return + } case LsSyncState.WriteError: case LsSyncState.Synchronized: { - this.changeState(LsSyncState.Closing) - const closing = (this.lastAction = this.ls.closeTextFile(this.path)) - await closing - this.changeState(LsSyncState.Closed) + this.setState(LsSyncState.Closing) + const promise = this.ls.closeTextFile(this.path) + const state = this.state + this.setLastAction(promise.catch(() => this.setState(state))) + await promise + this.setState(LsSyncState.Closed) return } default: { - const _: never = this.state + this.state satisfies never + return } } } @@ -486,60 +570,83 @@ class ModulePersistence extends ObservableV2<{ removed: () => void }> { case LsSyncState.Opening: case LsSyncState.Disposed: case LsSyncState.Closed: - case LsSyncState.Closing: + case LsSyncState.Closing: { return - case LsSyncState.Reloading: + } + case LsSyncState.Reloading: { await this.lastAction return - case LsSyncState.WritingFile: + } + case LsSyncState.WritingFile: { await this.lastAction - if (this.queuedAction === LsAction.Reload) { - await this.reload() - } + if (this.queuedAction === LsAction.Reload) await this.reload() return - case LsSyncState.Synchronized: + } + case LsSyncState.Synchronized: { + this.withState(LsSyncState.Reloading, async () => { + const promise = Promise.all([ + this.ls.readFile(this.path), + this.ls.fileChecksum(this.path), + ]) + this.setLastAction(promise) + const [contents, checksum] = await promise + this.syncFileContents(contents.contents, checksum.checksum) + }) + return + } case LsSyncState.WriteError: { - this.changeState(LsSyncState.Reloading) - const reloading = this.ls.closeTextFile(this.path).then(() => { - return this.ls.openTextFile(this.path) + this.withState(LsSyncState.Reloading, async () => { + const path = this.path.segments.join('/') + const reloading = this.ls + .closeTextFile(this.path) + .catch((error) => { + console.error('Could not close file after write error:') + console.error(error) + }) + .then( + () => + exponentialBackoff( + async () => { + const result = await this.ls.openTextFile(this.path) + if (!result.writeCapability) { + const message = `Could not acquire write capability for module '${this.path.segments.join( + '/', + )}'` + console.error(message) + throw new Error(message) + } + return result + }, + printingCallbacks( + `opened file '${path}' for writing`, + `open file '${path}' for writing`, + ), + ), + (error) => { + console.error('Could not reopen file after write error:') + console.error(error) + // This error is unrecoverable. + throw error + }, + ) + this.setLastAction(reloading) + const result = await reloading + this.syncFileContents(result.content, result.currentVersion) }) - this.lastAction = reloading.then() - const result = await reloading - this.syncFileContents(result.content, result.currentVersion) - this.changeState(LsSyncState.Synchronized) return } default: { - const _: never = this.state - } - } - } - - private inState(...states: LsSyncState[]): boolean { - return states.includes(this.state) - } - - private changeState(state: LsSyncState) { - if (this.state !== LsSyncState.Disposed) { - if (DEBUG_LOG_SYNC) { - console.log('State change:', LsSyncState[this.state], '->', LsSyncState[state]) - } - this.state = state - if (state === LsSyncState.Synchronized) { - this.trySyncRemoveUpdates() + this.state satisfies never + return } - } else { - throw new Error('LsSync disposed') } } - dispose(): Promise { + async dispose(): Promise { this.cleanup() const alreadyClosed = this.inState(LsSyncState.Closing, LsSyncState.Closed) - this.changeState(LsSyncState.Disposed) - if (!alreadyClosed) { - return this.ls.closeTextFile(this.path).then() - } - return Promise.resolve() + this.setState(LsSyncState.Disposed) + if (alreadyClosed) return Promise.resolve() + return this.ls.closeTextFile(this.path) } } diff --git a/app/gui2/ydoc-server/ydoc.ts b/app/gui2/ydoc-server/ydoc.ts index e7d5aa6c2205..9898576dbc31 100644 --- a/app/gui2/ydoc-server/ydoc.ts +++ b/app/gui2/ydoc-server/ydoc.ts @@ -32,8 +32,8 @@ type ConnectionId = YjsConnection | string export class WSSharedDoc { doc: Y.Doc /** - * Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn - * is closed. + * Maps from connection id to set of controlled user ids. + * Delete all user ids from awareness when this conn is closed. */ conns: Map> awareness: Awareness @@ -53,18 +53,15 @@ export class WSSharedDoc { if (conn !== null) { const connControlledIDs = this.conns.get(conn) if (connControlledIDs !== undefined) { - added.forEach((clientID) => { - connControlledIDs.add(clientID) - }) - removed.forEach((clientID) => { - connControlledIDs.delete(clientID) - }) + for (const clientID of added) connControlledIDs.add(clientID) + for (const clientID of removed) connControlledIDs.delete(clientID) } } // broadcast awareness update const encoder = encoding.createEncoder() encoding.writeVarUint(encoder, messageAwareness) - encoding.writeVarUint8Array(encoder, encodeAwarenessUpdate(this.awareness, changedClients)) + const update = encodeAwarenessUpdate(this.awareness, changedClients) + encoding.writeVarUint8Array(encoder, update) this.broadcast(encoding.toUint8Array(encoder)) }, ) @@ -72,11 +69,9 @@ export class WSSharedDoc { } broadcast(message: Uint8Array) { - this.conns.forEach((_, conn) => { - if (typeof conn !== 'string') { - conn.send(message) - } - }) + for (const [conn] of this.conns) { + if (typeof conn !== 'string') conn.send(message) + } } updateHandler(update: Uint8Array, _origin: any) { @@ -88,8 +83,8 @@ export class WSSharedDoc { } /** - * Handle servicing incoming websocket connection listening for given document updates. - * @param ws The newly connected websocket requesting Yjs document synchronization + * Handle servicing incoming WebSocket connection listening for given document updates. + * @param ws The newly connected WebSocket requesting Yjs document synchronization * @param lsUrl Address of the language server to connect to. Each unique language server address * will be assigned its own `DistributedProject` instance with a unique namespace of Yjs documents. * @param docName The name of the document to synchronize. When the document name is `index`, the @@ -99,20 +94,16 @@ export function setupGatewayClient(ws: WebSocket, lsUrl: string, docName: string const lsSession = LanguageServerSession.get(lsUrl) const wsDoc = lsSession.getYDoc(docName) if (wsDoc == null) { - console.log(`Document ${docName} not found in language server session ${lsUrl}`) + console.error(`Document '${docName}' not found in language server session '${lsUrl}'.`) ws.close() return } const connection = new YjsConnection(ws, wsDoc) - - const doClose = () => connection.close() - lsSession.once('error', doClose) connection.once('close', async () => { - lsSession.off('error', doClose) try { await lsSession.release() - } catch (err) { - console.error('Session release failed.\n', err) + } catch (error) { + console.error('Session release failed.\n', error) } }) } @@ -129,11 +120,7 @@ class YjsConnection extends ObservableV2<{ close(): void }> { ws.binaryType = 'arraybuffer' ws.on('message', (message: ArrayBuffer) => this.messageListener(new Uint8Array(message))) ws.on('close', () => this.close()) - - if (!isLoaded) { - wsDoc.doc.load() - } - + if (!isLoaded) wsDoc.doc.load() this.initPing() this.sendSyncMessage() } @@ -143,24 +130,21 @@ class YjsConnection extends ObservableV2<{ close(): void }> { let pongReceived = true const pingInterval = setInterval(() => { if (!pongReceived) { - if (this.wsDoc.conns.has(this)) { - this.close() - } + if (this.wsDoc.conns.has(this)) this.close() clearInterval(pingInterval) } else if (this.wsDoc.conns.has(this)) { pongReceived = false try { this.ws.ping() - } catch (e) { + } catch (error) { + console.error('Error sending ping:', error) this.close() clearInterval(pingInterval) } } }, pingTimeout) this.ws.on('close', () => clearInterval(pingInterval)) - this.ws.on('pong', () => { - pongReceived = true - }) + this.ws.on('pong', () => (pongReceived = true)) } sendSyncMessage() { @@ -185,11 +169,7 @@ class YjsConnection extends ObservableV2<{ close(): void }> { this.close() } try { - this.ws.send(message, (error) => { - if (error != null) { - this.close() - } - }) + this.ws.send(message, (error) => error && this.close()) } catch (e) { this.close() } @@ -201,10 +181,9 @@ class YjsConnection extends ObservableV2<{ close(): void }> { const decoder = decoding.createDecoder(message) const messageType = decoding.readVarUint(decoder) switch (messageType) { - case messageSync: + case messageSync: { encoding.writeVarUint(encoder, messageSync) readSyncMessage(decoder, encoder, this.wsDoc.doc, this) - // If the `encoder` only contains the type of reply message and no // message, there is no need to send the message. When `encoder` only // contains the type of reply, its length is 1. @@ -212,6 +191,7 @@ class YjsConnection extends ObservableV2<{ close(): void }> { this.send(encoding.toUint8Array(encoder)) } break + } case messageAwareness: { const update = decoding.readVarUint8Array(decoder) applyAwarenessUpdate(this.wsDoc.awareness, update, this)