Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multithreaded bulk import #1077

Open
wants to merge 44 commits into
base: feat/bulk-import-base
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
0ea0211
feat: Add BulkImport APIs and cron
anku255 Mar 20, 2024
f86211a
chore: update pull request template
anku255 Mar 20, 2024
05dc4a1
fix: Use the correct tenant config to create the proxy storage
anku255 Mar 20, 2024
5b4658a
fix: PR changes
anku255 Mar 21, 2024
d721cd5
fix: PR changes
anku255 Mar 29, 2024
769cbb6
fix: PR changes
anku255 Apr 2, 2024
f443a8c
fix: PR changes
anku255 Apr 2, 2024
73b75a8
fix: PR changes
anku255 Apr 4, 2024
6d59616
fix: PR changes
anku255 Apr 8, 2024
b6c63f4
fix: Update version
anku255 Apr 9, 2024
173e7fc
fix: PR changes
anku255 Apr 10, 2024
4f6ab13
fix: PR changes
anku255 Apr 18, 2024
7f8ce62
Merge branch 'feat/bulk-import-base' into feat/bulk-import-1
anku255 Apr 25, 2024
fcdfb54
fix: Rename DeleteBulkImportUser API path
anku255 Apr 29, 2024
ddfaa1c
fix: disable bulk import for in-memory db
anku255 Apr 29, 2024
c7ae1e4
fix: a bug with createTotpDevices
anku255 Apr 29, 2024
d24bebf
fix: PR changes
anku255 May 8, 2024
09c1b85
feat: Add an api to import user in sync
anku255 May 23, 2024
119ca1d
feat: Add an api to get count of bulk import users
anku255 May 23, 2024
79a1e67
fix: PR changes
anku255 May 27, 2024
77cb57c
fix: Add error codes and plainTextPassword import
anku255 May 29, 2024
78979f4
fix: PR changes
anku255 Jun 18, 2024
0d70735
feat: multithreaded bulk import
tamassoltesz Sep 20, 2024
b78541e
fix: changelog update
tamassoltesz Sep 20, 2024
0c0dea8
fix: add new test
tamassoltesz Sep 25, 2024
67c42e9
fix: fixing unreliable mutithreaded bulk import with mysql
tamassoltesz Sep 27, 2024
72706f9
chore: merging master to feature branch
tamassoltesz Oct 1, 2024
807f617
fix: review fixes
tamassoltesz Oct 1, 2024
f7cc349
fix: fixing failing tests
tamassoltesz Oct 2, 2024
1ec5dde
feat: bulkimport flow tests
tamassoltesz Oct 3, 2024
2c14844
feat: bulk import cron starter api
tamassoltesz Oct 12, 2024
d0d13f0
fix: tweaking params for faster import
tamassoltesz Oct 17, 2024
1d076b5
fix: tests
tamassoltesz Oct 17, 2024
8070d45
checkpoint
tamassoltesz Nov 5, 2024
d2ec5cb
fix: remove vacuuming
tamassoltesz Nov 5, 2024
8ff6eea
chore: merging master to feature branch
tamassoltesz Nov 5, 2024
16eda70
fix: minor tweaks
tamassoltesz Nov 7, 2024
7aaf422
feat: bulk inserting the bulk migration data
tamassoltesz Nov 15, 2024
d33eb5e
fix: fast as a lightning
tamassoltesz Nov 19, 2024
a41f951
fix: restoring lost method
tamassoltesz Nov 19, 2024
96f6e0a
fix: reworked error handling to comform previous approach with messages
tamassoltesz Nov 22, 2024
21e91fc
fix: fixing tests
tamassoltesz Nov 26, 2024
4fe694a
fix: fixing failing tests, changing version
tamassoltesz Nov 27, 2024
535d00b
chore: update changelog
tamassoltesz Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ highlighting the necessary changes)
- If no such branch exists, then create one from the latest released branch.
- [ ] If added a foreign key constraint on `app_id_to_user_id` table, make sure to delete from this table when deleting
the user as well if `deleteUserIdMappingToo` is false.
- [ ] If added a new recipe, then make sure to update the bulk import API to include the new recipe.

## Remaining TODOs for this PR

Expand Down
40 changes: 40 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,46 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [9.4.0]

### Added
- Adds property `bulk_migration_parallelism` for fine-tuning the worker threads number
- Adds APIs to bulk import users
- GET `/bulk-import/users`
- POST `/bulk-import/users`
- GET `/bulk-import/users/count`
- POST `/bulk-import/users/remove`
- POST `/bulk-import/users/import`
- POST `/bulk-import/backgroundjob`
- GET `/bulk-import/backgroundjob`
- Adds `ProcessBulkImportUsers` cron job to process bulk import users
- Adds multithreaded worker support for the `ProcessBulkImportUsers` cron job for faster bulk imports
- Adds support for lazy importing users

### Migrations

```sql
"CREATE TABLE IF NOT EXISTS bulk_import_users (
id CHAR(36),
app_id VARCHAR(64) NOT NULL DEFAULT 'public',
primary_user_id VARCHAR(36),
raw_data TEXT NOT NULL,
status VARCHAR(128) DEFAULT 'NEW',
error_msg TEXT,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL,
CONSTRAINT bulk_import_users_pkey PRIMARY KEY(app_id, id),
CONSTRAINT bulk_import_users__app_id_fkey FOREIGN KEY(app_id) REFERENCES apps(app_id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS bulk_import_users_status_updated_at_index ON bulk_import_users (app_id, status, updated_at);

CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index1 ON bulk_import_users (app_id, status, created_at DESC,
id DESC);

CREATE INDEX IF NOT EXISTS bulk_import_users_pagination_index2 ON bulk_import_users (app_id, created_at DESC, id DESC);
```

## [9.3.0]

### Changes
Expand Down
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ compileTestJava { options.encoding = "UTF-8" }
// }
//}

version = "9.3.0"

version = "9.4.0"

repositories {
mavenCentral()
Expand Down
4 changes: 4 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,7 @@ core_config_version: 0

# (Optional | Default: null) string value. The encryption key used for saving OAuth client secret on the database.
# oauth_client_secret_encryption_key:

# (DIFFERENT_ACROSS_TENANTS | OPTIONAL | Default: number of available processor cores) int value. If specified,
# the supertokens core will use the specified number of threads to complete the migration of users.
# bulk_migration_parallelism:
5 changes: 5 additions & 0 deletions devConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,8 @@ disable_telemetry: true

# (Optional | Default: null) string value. The encryption key used for saving OAuth client secret on the database.
# oauth_client_secret_encryption_key:

# (DIFFERENT_ACROSS_TENANTS | OPTIONAL | Default: number of available processor cores) int value. If specified,
# the supertokens core will use the specified number of threads to complete the migration of users.
# bulk_migration_parallelism:

6 changes: 6 additions & 0 deletions src/main/java/io/supertokens/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.supertokens.config.Config;
import io.supertokens.config.CoreConfig;
import io.supertokens.cronjobs.Cronjobs;
import io.supertokens.cronjobs.bulkimport.ProcessBulkImportUsers;
import io.supertokens.cronjobs.cleanupOAuthSessionsAndChallenges.CleanupOAuthSessionsAndChallenges;
import io.supertokens.cronjobs.deleteExpiredAccessTokenSigningKeys.DeleteExpiredAccessTokenSigningKeys;
import io.supertokens.cronjobs.deleteExpiredDashboardSessions.DeleteExpiredDashboardSessions;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class Main {

// this is a special variable that will be set to true by TestingProcessManager
public static boolean isTesting = false;
// this flag is used in ProcessBulkImportUsersCronJobTest to skip the user validation
public static boolean isTesting_skipBulkImportUserValidationInCronJob = false;

// this is a special variable that will be set to true by TestingProcessManager
public static boolean makeConsolePrintSilent = false;
Expand Down Expand Up @@ -257,6 +260,9 @@ private void init() throws IOException, StorageQueryException {
// starts DeleteExpiredAccessTokenSigningKeys cronjob if the access token signing keys can change
Cronjobs.addCronjob(this, DeleteExpiredAccessTokenSigningKeys.init(this, uniqueUserPoolIdsTenants));

// initializes ProcessBulkImportUsers cronjob to process bulk import users - start happens via API call @see BulkImportBackgroundJobManager
ProcessBulkImportUsers.init(this, uniqueUserPoolIdsTenants);

Cronjobs.addCronjob(this, CleanupOAuthSessionsAndChallenges.init(this, uniqueUserPoolIdsTenants));

// this is to ensure tenantInfos are in sync for the new cron job as well
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens;

import io.supertokens.pluginInterface.Storage;
import io.supertokens.pluginInterface.useridmapping.UserIdMapping;

public class StorageAndUserIdMappingForBulkImport extends StorageAndUserIdMapping {

public String userIdInQuestion;

public StorageAndUserIdMappingForBulkImport(Storage storage,
UserIdMapping userIdMapping, String userIdInQuestion) {
super(storage, userIdMapping);
this.userIdInQuestion = userIdInQuestion;
}
}
Loading
Loading