Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap reading+writing workspace state operations with mutexes. #646

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ All notable changes to this extension will be documented in this file.

### Fixed

- Correct schema upload success message when the uploaded schema was normalized to a preexisting
version which wasn't the most recent existing version for the subject,
[issue #642](https://github.com/confluentinc/vscode/issues/642).
- Correct schema upload success message when the uploaded schema was normalized to a preexisting version which wasn't the most recent existing version for the subject, [issue #642](https://github.com/confluentinc/vscode/issues/642).
- Fix possible race conditions in workspace state cache management when needing to read and mutate existing workspace state keys, [issue #534](https://github.com/confluentinc/vscode/issues/534).


## 0.21.2

Expand Down
9 changes: 9 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@
"@sentry/rollup-plugin": "^2.21.1",
"@vscode/codicons": "^0.0.36",
"@vscode/webview-ui-toolkit": "^1.4.0",
"async-mutex": "^0.5.0",
"d3": "^7.9.0",
"dataclass": "^3.0.0-beta.1",
"gql.tada": "^1.8.3",
Expand Down
227 changes: 134 additions & 93 deletions src/storage/resourceManager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Mutex } from "async-mutex";
import { Uri } from "vscode";
import { StorageManager, getStorageManager } from ".";
import { Status } from "../clients/sidecar";
Expand Down Expand Up @@ -45,7 +46,16 @@ export type AllUriMetadata = Map<string, UriMetadata>;
*/
export class ResourceManager {
static instance: ResourceManager | null = null;
private constructor(private storage: StorageManager) {}

/** Mutexes for each workspace storage key to prevent conflicting concurrent writes */
private mutexes: Map<WorkspaceStorageKeys, Mutex> = new Map();

private constructor(private storage: StorageManager) {
// Initialize mutexes for each workspace storage key
for (const key of Object.values(WorkspaceStorageKeys)) {
this.mutexes.set(key, new Mutex());
}
}

static getInstance(): ResourceManager {
if (!ResourceManager.instance) {
Expand All @@ -71,7 +81,21 @@ export class ResourceManager {
]);
}

// TODO(shoup): Add method for deleting all local resources once connection tracking is implemented.
/**
* Run an async callback which will both read and later mutate workspace storage with exclusive access to a workspace storage key.
*
* This strategy prevents concurrent writes to the same workspace storage key, which can lead to data corruption, when multiple
* asynchronous operations are calling methods which both read and write to the same workspace storage key, namely mutating
* actions to keys that hold arrays or maps.
*/
private async runWithMutex<T>(key: WorkspaceStorageKeys, callback: () => Promise<T>): Promise<T> {
const mutex = this.mutexes.get(key);
if (!mutex) {
throw new Error(`No mutex found for key: ${key}`);
}

return await mutex.runExclusive(callback);
}

// ENVIRONMENTS

Expand Down Expand Up @@ -122,26 +146,26 @@ export class ResourceManager {
* @param clusters The array of {@link CCloudKafkaCluster}s to store
*/
async setCCloudKafkaClusters(clusters: CCloudKafkaCluster[]): Promise<void> {
// get any existing map of <environmentId, CCloudKafkaCluster[]>
const existingEnvClusters: CCloudKafkaClustersByEnv =
(await this.getCCloudKafkaClusters()) ?? new Map();
// create a map of <environmentId, CCloudKafkaCluster[]> for the new clusters
const newClustersByEnv: CCloudKafkaClustersByEnv = new Map();
clusters.forEach((cluster) => {
if (!newClustersByEnv.has(cluster.environmentId)) {
newClustersByEnv.set(cluster.environmentId, []);
const storageKey = WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS;
await this.runWithMutex(storageKey, async () => {
// get any existing map of <environmentId, CCloudKafkaCluster[]>
const existingEnvClusters: CCloudKafkaClustersByEnv =
(await this.getCCloudKafkaClusters()) ?? new Map();
// create a map of <environmentId, CCloudKafkaCluster[]> for the new clusters
const newClustersByEnv: CCloudKafkaClustersByEnv = new Map();
clusters.forEach((cluster) => {
if (!newClustersByEnv.has(cluster.environmentId)) {
newClustersByEnv.set(cluster.environmentId, []);
}
newClustersByEnv.get(cluster.environmentId)?.push(cluster);
});
// merge the new clusters into the existing map
for (const [envId, newClusters] of newClustersByEnv) {
// replace any existing clusters for the environment with the new clusters
existingEnvClusters.set(envId, newClusters);
}
newClustersByEnv.get(cluster.environmentId)?.push(cluster);
await this.storage.setWorkspaceState(storageKey, existingEnvClusters);
});
// merge the new clusters into the existing map
for (const [envId, newClusters] of newClustersByEnv) {
// replace any existing clusters for the environment with the new clusters
existingEnvClusters.set(envId, newClusters);
}
await this.storage.setWorkspaceState(
WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS,
existingEnvClusters,
);
}

/**
Expand Down Expand Up @@ -197,12 +221,15 @@ export class ResourceManager {
* if not provided, all <environmentId, {@link CCloudKafkaCluster}> pairs will be deleted
*/
async deleteCCloudKafkaClusters(environment?: string): Promise<void> {
if (!environment) {
return await this.storage.deleteWorkspaceState(WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS);
}
const clusters = await this.getCCloudKafkaClusters();
clusters.delete(environment);
await this.storage.setWorkspaceState(WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS, clusters);
const storageKey = WorkspaceStorageKeys.CCLOUD_KAFKA_CLUSTERS;
await this.runWithMutex(storageKey, async () => {
if (!environment) {
return await this.storage.deleteWorkspaceState(storageKey);
}
const clusters = await this.getCCloudKafkaClusters();
clusters.delete(environment);
await this.storage.setWorkspaceState(storageKey, clusters);
});
}

/**
Expand Down Expand Up @@ -320,15 +347,15 @@ export class ResourceManager {
* if not provided, all <environmentId, {@link CCloudSchemaRegistry}> pairs will be deleted
*/
async deleteCCloudSchemaRegistries(environment?: string): Promise<void> {
if (!environment) {
return await this.storage.deleteWorkspaceState(WorkspaceStorageKeys.CCLOUD_SCHEMA_REGISTRIES);
}
const schemaRegistriesByEnv = await this.getCCloudSchemaRegistries();
schemaRegistriesByEnv.delete(environment);
await this.storage.setWorkspaceState(
WorkspaceStorageKeys.CCLOUD_SCHEMA_REGISTRIES,
schemaRegistriesByEnv,
);
const storageKey = WorkspaceStorageKeys.CCLOUD_SCHEMA_REGISTRIES;
await this.runWithMutex(storageKey, async () => {
if (!environment) {
return await this.storage.deleteWorkspaceState(storageKey);
}
const schemaRegistriesByEnv = await this.getCCloudSchemaRegistries();
schemaRegistriesByEnv.delete(environment);
await this.storage.setWorkspaceState(storageKey, schemaRegistriesByEnv);
});
}

// TOPICS
Expand All @@ -348,16 +375,18 @@ export class ResourceManager {

const key = this.topicKeyForCluster(cluster);

// Fetch the proper map from storage, or create a new one if none exists.
const topicsByCluster =
(await this.storage.getWorkspaceState<TopicsByKafkaCluster>(key)) ||
new Map<string, KafkaTopic[]>();
await this.runWithMutex(key, async () => {
// Fetch the proper map from storage, or create a new one if none exists.
const topicsByCluster =
(await this.storage.getWorkspaceState<TopicsByKafkaCluster>(key)) ||
new Map<string, KafkaTopic[]>();

// Set the new topics for the cluster
topicsByCluster.set(cluster.id, topics);
// Set the new topics for the cluster
topicsByCluster.set(cluster.id, topics);

// Now save the updated cluster topics into the proper key'd storage.
await this.storage.setWorkspaceState(key, topicsByCluster);
// Now save the updated cluster topics into the proper key'd storage.
await this.storage.setWorkspaceState(key, topicsByCluster);
});
}

/**
Expand Down Expand Up @@ -433,16 +462,17 @@ export class ResourceManager {
throw new Error("Schema registry ID mismatch in schemas");
}

const existingSchemasBySchemaRegistry: CCloudSchemaBySchemaRegistry = await this.getSchemaMap();
const workspaceKey = WorkspaceStorageKeys.CCLOUD_SCHEMAS;
await this.runWithMutex(workspaceKey, async () => {
const existingSchemasBySchemaRegistry: CCloudSchemaBySchemaRegistry =
await this.getSchemaMap();

// wholly reassign the list of schemas for this Schema Registry.
existingSchemasBySchemaRegistry.set(schemaRegistryId, schemas);
// wholly reassign the list of schemas for this Schema Registry.
existingSchemasBySchemaRegistry.set(schemaRegistryId, schemas);

// And repersist.
await this.storage.setWorkspaceState(
WorkspaceStorageKeys.CCLOUD_SCHEMAS,
existingSchemasBySchemaRegistry,
);
// And repersist.
await this.storage.setWorkspaceState(workspaceKey, existingSchemasBySchemaRegistry);
});
}

/**
Expand Down Expand Up @@ -517,13 +547,16 @@ export class ResourceManager {
* See {@link mergeURIMetadata} for when needing to further annotate a possibly preexisting URI.
*/
async setURIMetadata(uri: Uri, metadata: UriMetadata): Promise<void> {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();
const storageKey = WorkspaceStorageKeys.URI_METADATA;
await this.runWithMutex(storageKey, async () => {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(storageKey)) ??
new Map<string, UriMetadata>();

allMetadata.set(uri.toString(), metadata);
allMetadata.set(uri.toString(), metadata);

await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
await this.storage.setWorkspaceState(storageKey, allMetadata);
});
}

/** Merge new values into any preexisting extension URI metadata. Use when needing to further
Expand All @@ -532,24 +565,28 @@ export class ResourceManager {
* @returns The new metadata for the URI after the merge.
*/
async mergeURIMetadata(uri: Uri, metadata: UriMetadata): Promise<UriMetadata> {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();
return await this.runWithMutex(WorkspaceStorageKeys.URI_METADATA, async () => {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();

const existingMetadata = allMetadata.get(uri.toString()) ?? new Map<UriMetadataKeys, string>();
const existingMetadata =
allMetadata.get(uri.toString()) ?? new Map<UriMetadataKeys, string>();

for (const [key, value] of metadata) {
existingMetadata.set(key, value);
}
for (const [key, value] of metadata) {
existingMetadata.set(key, value);
}

allMetadata.set(uri.toString(), existingMetadata);
allMetadata.set(uri.toString(), existingMetadata);

await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);

return existingMetadata;
return existingMetadata;
});
}

/** Merge a single new URI metadata value into preexisting metadata. See {@link mergeURIMetadata}
/**
* Merge a single new URI metadata value into preexisting metadata. See {@link mergeURIMetadata}
*
* @returns The new complete set of metadata for the URI after the merge.
*/
Expand Down Expand Up @@ -582,42 +619,46 @@ export class ResourceManager {
uri: Uri,
...keys: UriMetadataKeys[]
): Promise<UriMetadata | undefined> {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();

let existingMetadata = allMetadata.get(uri.toString());
if (existingMetadata === undefined) {
return undefined;
}
return await this.runWithMutex(WorkspaceStorageKeys.URI_METADATA, async () => {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();

let existingMetadata = allMetadata.get(uri.toString());
if (existingMetadata === undefined) {
return undefined;
}

for (const key of keys) {
existingMetadata.delete(key);
}
for (const key of keys) {
existingMetadata.delete(key);
}

if (existingMetadata.size === 0) {
// all gone, remove the whole entry. Future calls
// to getUriMetadata() will return undefined.
allMetadata.delete(uri.toString());
existingMetadata = undefined;
} else {
allMetadata.set(uri.toString(), existingMetadata);
}
if (existingMetadata.size === 0) {
// all gone, remove the whole entry. Future calls
// to getUriMetadata() will return undefined.
allMetadata.delete(uri.toString());
existingMetadata = undefined;
} else {
allMetadata.set(uri.toString(), existingMetadata);
}

await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);

return existingMetadata;
return existingMetadata;
});
}

/** Forget all extension metadata for a URI. Useful if knowing that the URI was just destroyed. */
async deleteURIMetadata(uri: Uri): Promise<void> {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();
await this.runWithMutex(WorkspaceStorageKeys.URI_METADATA, async () => {
const allMetadata =
(await this.storage.getWorkspaceState<AllUriMetadata>(WorkspaceStorageKeys.URI_METADATA)) ??
new Map<string, UriMetadata>();

allMetadata.delete(uri.toString());
allMetadata.delete(uri.toString());

await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
await this.storage.setWorkspaceState(WorkspaceStorageKeys.URI_METADATA, allMetadata);
});
}
}

Expand Down