Skip to content

Commit

Permalink
Wrap reading+writing workspace state operations with a mutex. (#646)
Browse files Browse the repository at this point in the history
  • Loading branch information
jlrobins authored Nov 20, 2024
1 parent 1255e3b commit 820395a
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 96 deletions.
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ All notable changes to this extension will be documented in this file.

### Fixed

- Correct schema upload success message when the uploaded schema was normalized to a preexisting
version which wasn't the most recent existing version for the subject,
[issue #642](https://github.com/confluentinc/vscode/issues/642).
- Correct schema upload success message when the uploaded schema was normalized to a preexisting version which wasn't the most recent existing version for the subject, [issue #642](https://github.com/confluentinc/vscode/issues/642).
- Fix possible race conditions in workspace state cache management when needing to read and mutate existing workspace state keys, [issue #534](https://github.com/confluentinc/vscode/issues/534).


## 0.21.2

Expand Down
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@
"@sentry/rollup-plugin": "^2.21.1",
"@vscode/codicons": "^0.0.36",
"@vscode/webview-ui-toolkit": "^1.4.0",
"async-mutex": "^0.5.0",
"d3": "^7.9.0",
"dataclass": "^3.0.0-beta.1",
"gql.tada": "^1.8.3",
Expand Down
227 changes: 134 additions & 93 deletions src/storage/resourceManager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Mutex } from "async-mutex";
import { Uri } from "vscode";
import { StorageManager, getStorageManager } from ".";
import { Status } from "../clients/sidecar";
Expand Down Expand Up @@ -45,7 +46,16 @@ export type AllUriMetadata = Map<string, UriMetadata>;
*/
export class ResourceManager {
static instance: ResourceManager | null = null;
private constructor(private storage: StorageManager) {}

/** Mutexes for each workspace storage key to prevent conflicting concurrent writes */
private mutexes: Map<WorkspaceStorageKeys, Mutex> = new Map();

private constructor(private storage: StorageManager) {
// Initialize mutexes for each workspace storage key
for (const key of Object.values(WorkspaceStorageKeys)) {
this.mutexes.set(key, new Mutex());
}
}

static getInstance(): ResourceManager {
if (!ResourceManager.instance) {
Expand All @@ -71,7 +81,21 @@ export class ResourceManager {
]);
}

// TODO(shoup): Add method for deleting all local resources once connection tracking is implemented.
/**
* Run an async callback which will both read and later mutate workspace storage with exclusive access to a workspace storage key.
*
* This strategy prevents concurrent writes to the same workspace storage key, which can lead to data corruption, when multiple
* asynchronous operations are calling methods which both read and write to the same workspace storage key, namely mutating
* actions to keys that hold arrays or maps.
*/
private async runWithMutex<T>(key: WorkspaceStorageKeys, callback: () => Promise<T>): Promise<T> {
const mutex = this.mutexes.get(key);
if (!mutex) {
throw new Error(`No mutex found for key: ${key}`);
}

return await mutex.runExclusive(callback);
}

// ENVIRONMENTS

Expand Down Expand Up @@ -122,26 +146,26 @@ export class ResourceManager {
* @param clusters The array of {@link CCloudKafkaCluster}s to store
*/
async setCCloudKafkaClusters(clusters: CCloudKafkaCluster[]): Promise<void> {
// get any existing map of <environmentId, CCloudKafkaCluster[]>
const existingEnvClusters: CCloudKafkaClustersByEnv =
(await this.getCCloudKafkaClusters()) ?? new Map();
// create a map of <environmentId, CCloudKafkaCluster[]> for the new clusters
const newClustersByEnv: CCloudKafkaClustersByEnv = new Map();
clusters.forEach((cluster) => {
if (!newClustersByEnv.has(cluster.environmentId)) {
newClustersByEnv.set(cluster.environmentId, []);
const storageKey = WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS;
await this.runWithMutex(storageKey, async () => {
// get any existing map of <environmentId, CCloudKafkaCluster[]>
const existingEnvClusters: CCloudKafkaClustersByEnv =
(await this.getCCloudKafkaClusters()) ?? new Map();
// create a map of <environmentId, CCloudKafkaCluster[]> for the new clusters
const newClustersByEnv: CCloudKafkaClustersByEnv = new Map();
clusters.forEach((cluster) => {
if (!newClustersByEnv.has(cluster.environmentId)) {
newClustersByEnv.set(cluster.environmentId, []);
}
newClustersByEnv.get(cluster.environmentId)?.push(cluster);
});
// merge the new clusters into the existing map
for (const [envId, newClusters] of newClustersByEnv) {
// replace any existing clusters for the environment with the new clusters
existingEnvClusters.set(envId, newClusters);
}
newClustersByEnv.get(cluster.environmentId)?.push(cluster);
await this.storage.setWorkspaceState(storageKey, existingEnvClusters);
});
// merge the new clusters into the existing map
for (const [envId, newClusters] of newClustersByEnv) {
// replace any existing clusters for the environment with the new clusters
existingEnvClusters.set(envId, newClusters);
}
await this.storage.setWorkspaceState(
WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS,
existingEnvClusters,
);
}

/**
Expand Down Expand Up @@ -197,12 +221,15 @@ export class ResourceManager {
* if not provided, all <environmentId, {@link CCloudKafkaCluster}> pairs will be deleted
*/
async deleteCCloudKafkaClusters(environment?: string): Promise<void> {
if (!environment) {
return await this.storage.deleteWorkspaceState(WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS);
}
const clusters = await this.getCCloudKafkaClusters();
clusters.delete(environment);
await this.storage.setWorkspaceState(WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS, clusters);
const storageKey = WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS;
await this.runWithMutex(storageKey, async () => {
if (!environment) {
return await this.storage.deleteWorkspaceState(storageKey);
}
const clusters = await this.getCCloudKafkaClusters();
clusters.delete(environment);
await this.storage.setWorkspaceState(storageKey, clusters);
});
}

/**
Expand Down Expand Up @@ -320,15 +347,15 @@ export class ResourceManager {
* if not provided, all <environmentId, {@link CCloudSchemaRegistry}> pairs will be deleted
*/
async deleteCCloudSchemaRegistries(environment?: string): Promise<void> {
if (!environment) {
return await this.storage.deleteWorkspaceState(WorkspaceStorageKeys.CCLOUD_SCHEMA_REGISTRIES);
}
const schemaRegistriesByEnv = await this.getCCloudSchemaRegistries();
schemaRegistriesByEnv.delete(environment);
await this.storage.setWorkspaceState(
WorkspaceStorageKeys.CCLOUD_SCHEMA_REGISTRIES,
schemaRegistriesByEnv,
);
const storageKey = WorkspaceStorageKeys.CCLOUD_SCHEMA_REGISTRIES;
await this.runWithMutex(storageKey, async () => {
if (!environment) {
return await this.storage.deleteWorkspaceState(storageKey);
}
const schemaRegistriesByEnv = await this.getCCloudSchemaRegistries();
schemaRegistriesByEnv.delete(environment);
await this.storage.setWorkspaceState(storageKey, schemaRegistriesByEnv);
});
}

// TOPICS
Expand All @@ -348,16 +375,18 @@ export class ResourceManager {

const key = this.topicKeyForCluster(cluster);

// Fetch the proper map from storage, or create a new one if none exists.
const topicsByCluster =
(await this.storage.getWorkspaceState<TopicsByKafkaCluster>(key)) ||
new Map<string, KafkaTopic[]>();
await this.runWithMutex(key, async () => {
// Fetch the proper map from storage, or create a new one if none exists.
const topicsByCluster =
(await this.storage.getWorkspaceState<TopicsByKafkaCluster>(key)) ||
new Map<string, KafkaTopic[]>();

// Set the new topics for the cluster
topicsByCluster.set(cluster.id, topics);
// Set the new topics for the cluster
topicsByCluster.set(cluster.id, topics);

// Now save the updated cluster topics into the proper key'd storage.
await this.storage.setWorkspaceState(key, topicsByCluster);
// Now save the updated cluster topics into the proper key'd storage.
await this.storage.setWorkspaceState(key, topicsByCluster);
});
}

/**
Expand Down Expand Up @@ -433,16 +462,17 @@ export class ResourceManager {
throw new Error("Schema registry ID mismatch in schemas");
}

const existingSchemasBySchemaRegistry: CCloudSchemaBySchemaRegistry = await this.getSchemaMap();
const workspaceKey = WorkspaceStorageKeys.CCLOUD_SCHEMAS;
await this.runWithMutex(workspaceKey, async () => {
const existingSchemasBySchemaRegistry: CCloudSchemaBySchemaRegistry =
await this.getSchemaMap();

// wholly reassign the list of schemas for this Schema Registry.
existingSchemasBySchemaRegistry.set(schemaRegistryId, schemas);
// wholly reassign the list of schemas for this Schema Registry.
existingSchemasBySchemaRegistry.set(schemaRegistryId, schemas);

// And repersist.
await this.storage.setWorkspaceState(
WorkspaceStorageKeys.CCLOUD_SCHEMAS,
existingSchemasBySchemaRegistry,
);
// And repersist.
await this.storage.setWorkspaceState(workspaceKey, existingSchemasBySchemaRegistry);
});
}

/**
Expand Down Expand Up @@ -517,13 +547,16 @@ export class ResourceManager {
* See {@link mergeURIMetadata} for when needing to further annotate a possibly preexisting URI.
*/
async setURIMetadata(uri: Uri, metadata: UriMetadata): Promise<void> {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();
const storageKey = WorkspaceStorageKeys.URI_METADATA;
await this.runWithMutex(storageKey, async () => {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(storageKey)) ??
new Map<string, UriMetadata>();

allMetadata.set(uri.toString(), metadata);
allMetadata.set(uri.toString(), metadata);

await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
await this.storage.setWorkspaceState(storageKey, allMetadata);
});
}

/** Merge new values into any preexisting extension URI metadata. Use when needing to further
Expand All @@ -532,24 +565,28 @@ export class ResourceManager {
* @returns The new metadata for the URI after the merge.
*/
async mergeURIMetadata(uri: Uri, metadata: UriMetadata): Promise<UriMetadata> {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();
return await this.runWithMutex(WorkspaceStorageKeys.URI_METADATA, async () => {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();

const existingMetadata = allMetadata.get(uri.toString()) ?? new Map<UriMetadataKeys, string>();
const existingMetadata =
allMetadata.get(uri.toString()) ?? new Map<UriMetadataKeys, string>();

for (const [key, value] of metadata) {
existingMetadata.set(key, value);
}
for (const [key, value] of metadata) {
existingMetadata.set(key, value);
}

allMetadata.set(uri.toString(), existingMetadata);
allMetadata.set(uri.toString(), existingMetadata);

await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);

return existingMetadata;
return existingMetadata;
});
}

/** Merge a single new URI metadata value into preexisting metadata. See {@link mergeURIMetadata}
/**
* Merge a single new URI metadata value into preexisting metadata. See {@link mergeURIMetadata}
*
* @returns The new complete set of metadata for the URI after the merge.
*/
Expand Down Expand Up @@ -582,42 +619,46 @@ export class ResourceManager {
uri: Uri,
...keys: UriMetadataKeys[]
): Promise<UriMetadata | undefined> {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();

let existingMetadata = allMetadata.get(uri.toString());
if (existingMetadata === undefined) {
return undefined;
}
return await this.runWithMutex(WorkspaceStorageKeys.URI_METADATA, async () => {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();

let existingMetadata = allMetadata.get(uri.toString());
if (existingMetadata === undefined) {
return undefined;
}

for (const key of keys) {
existingMetadata.delete(key);
}
for (const key of keys) {
existingMetadata.delete(key);
}

if (existingMetadata.size === 0) {
// all gone, remove the whole entry. Future calls
// to getUriMetadata() will return undefined.
allMetadata.delete(uri.toString());
existingMetadata = undefined;
} else {
allMetadata.set(uri.toString(), existingMetadata);
}
if (existingMetadata.size === 0) {
// all gone, remove the whole entry. Future calls
// to getUriMetadata() will return undefined.
allMetadata.delete(uri.toString());
existingMetadata = undefined;
} else {
allMetadata.set(uri.toString(), existingMetadata);
}

await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);

return existingMetadata;
return existingMetadata;
});
}

/** Forget all extension metadata for a URI. Useful if knowing that the URI was just destroyed. */
async deleteURIMetadata(uri: Uri): Promise<void> {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();
await this.runWithMutex(WorkspaceStorageKeys.URI_METADATA, async () => {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();

allMetadata.delete(uri.toString());
allMetadata.delete(uri.toString());

await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
});
}
}

Expand Down

0 comments on commit 820395a

Please sign in to comment.