Skip to content

Commit

Permalink
fix: fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tamassoltesz committed Nov 26, 2024
1 parent 96f6e0a commit 21e91fc
Show file tree
Hide file tree
Showing 12 changed files with 551 additions and 272 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024, VRAI Labs and/or its affiliates. All rights reserved.
*
* This software is licensed under the Apache License, Version 2.0 (the
* "License") as published by the Apache Software Foundation.
*
* You may not use this file except in compliance with the License. You may
* obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package io.supertokens;

import io.supertokens.pluginInterface.Storage;
import io.supertokens.pluginInterface.useridmapping.UserIdMapping;

public class StorageAndUserIdMappingForBulkImport extends StorageAndUserIdMapping {

public String userIdInQuestion;

public StorageAndUserIdMappingForBulkImport(Storage storage,
UserIdMapping userIdMapping, String userIdInQuestion) {
super(storage, userIdMapping);
this.userIdInQuestion = userIdInQuestion;
}
}
3 changes: 1 addition & 2 deletions src/main/java/io/supertokens/authRecipe/AuthRecipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -979,8 +979,7 @@ public static List<CreatePrimaryUserBulkResult> createPrimaryUsers(Main main,
List<String> allDistinctEmails,
List<String> allDistinctPhones,
Map<String, String> thirdpartyUserIdsToThirdpartyIds)
throws StorageQueryException, AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException,
RecipeUserIdAlreadyLinkedWithPrimaryUserIdException, UnknownUserIdException, TenantOrAppNotFoundException,
throws StorageQueryException, TenantOrAppNotFoundException,
FeatureNotEnabledException {
if (!Utils.isAccountLinkingEnabled(main, appIdentifier)) {
throw new FeatureNotEnabledException(
Expand Down
50 changes: 29 additions & 21 deletions src/main/java/io/supertokens/bulkimport/BulkImport.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,15 @@ public static void processUsersImportSteps(Main main, TransactionConnection conn
processUsersLoginMethods(main, appIdentifier, bulkImportProxyStorage, users);
try {
createPrimaryUsersAndLinkAccounts(main, appIdentifier, bulkImportProxyStorage, users);
} catch (AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException |
RecipeUserIdAlreadyLinkedWithPrimaryUserIdException | StorageQueryException | FeatureNotEnabledException |
TenantOrAppNotFoundException | UnknownUserIdException e) {
throw new RuntimeException(e);
createMultipleUserIdMapping(appIdentifier, users, allStoragesForApp);
verifyMultipleEmailForAllLoginMethods(appIdentifier, bulkImportProxyStorage, users);
createMultipleTotpDevices(main, appIdentifier, bulkImportProxyStorage, users);
createMultipleUserMetadata(appIdentifier, bulkImportProxyStorage, users);
createMultipleUserRoles(main, appIdentifier, bulkImportProxyStorage, users);
} catch ( StorageQueryException | FeatureNotEnabledException |
TenantOrAppNotFoundException e) {
throw new StorageTransactionLogicException(e);
}

createMultipleUserIdMapping(appIdentifier, users, allStoragesForApp);
verifyMultipleEmailForAllLoginMethods(appIdentifier, bulkImportProxyStorage, users);
createMultipleTotpDevices(main, appIdentifier, bulkImportProxyStorage, users);
createMultipleUserMetadata(appIdentifier, bulkImportProxyStorage, users);
createMultipleUserRoles(main, appIdentifier, bulkImportProxyStorage, users);
}

public static void processUsersLoginMethods(Main main, AppIdentifier appIdentifier, Storage storage,
Expand Down Expand Up @@ -456,9 +454,8 @@ private static void associateUserToTenants(Main main, AppIdentifier appIdentifie
private static void createPrimaryUsersAndLinkAccounts(Main main,
AppIdentifier appIdentifier, Storage storage,
List<BulkImportUser> users)
throws StorageTransactionLogicException, AccountInfoAlreadyAssociatedWithAnotherPrimaryUserIdException,
RecipeUserIdAlreadyLinkedWithPrimaryUserIdException, StorageQueryException, FeatureNotEnabledException,
TenantOrAppNotFoundException, UnknownUserIdException {
throws StorageTransactionLogicException, StorageQueryException, FeatureNotEnabledException,
TenantOrAppNotFoundException {
List<String> userIds =
users.stream()
.map(bulkImportUser -> getPrimaryLoginMethod(bulkImportUser).getSuperTokenOrExternalUserId())
Expand Down Expand Up @@ -591,6 +588,7 @@ public static void createMultipleUserIdMapping(AppIdentifier appIdentifier,
if(user.externalUserId != null) {
LoginMethod primaryLoginMethod = getPrimaryLoginMethod(user);
superTokensUserIdToExternalUserId.put(primaryLoginMethod.superTokensUserId, user.externalUserId);
primaryLoginMethod.externalUserId = user.externalUserId;
}
}
try {
Expand Down Expand Up @@ -645,7 +643,7 @@ public static void createMultipleUserMetadata(AppIdentifier appIdentifier, Stora

public static void createMultipleUserRoles(Main main, AppIdentifier appIdentifier, Storage storage,
List<BulkImportUser> users) throws StorageTransactionLogicException {
Map<TenantIdentifier, Map<String, String>> rolesToUserByTenant = new HashMap<>();
Map<TenantIdentifier, Map<String, List<String>>> rolesToUserByTenant = new HashMap<>();
for (BulkImportUser user : users) {

if (user.userRoles != null) {
Expand All @@ -658,24 +656,34 @@ public static void createMultipleUserRoles(Main main, AppIdentifier appIdentifie

rolesToUserByTenant.put(tenantIdentifier, new HashMap<>());
}
rolesToUserByTenant.get(tenantIdentifier).put(user.externalUserId, userRole.role);
if(!rolesToUserByTenant.get(tenantIdentifier).containsKey(user.externalUserId)){
rolesToUserByTenant.get(tenantIdentifier).put(user.externalUserId, new ArrayList<>());
}
rolesToUserByTenant.get(tenantIdentifier).get(user.externalUserId).add(userRole.role);
}
}
}
}
try {

UserRoles.addMultipleRolesToMultipleUsers(main, storage, rolesToUserByTenant);
UserRoles.addMultipleRolesToMultipleUsers(main, appIdentifier, storage, rolesToUserByTenant);
} catch (TenantOrAppNotFoundException e) {
throw new StorageTransactionLogicException(new Exception("E033: " + e.getMessage()));
} catch (StorageTransactionLogicException e) {
if(e.actualException instanceof UnknownRoleException){
throw new StorageTransactionLogicException(new Exception("E034: Role "
+ " does not exist! You need pre-create the role before assigning it to the user."));
if(e.actualException instanceof BulkImportBatchInsertException){
Map<String, Exception> errorsByPosition = ((BulkImportBatchInsertException) e.getCause()).exceptionByUserId;
for (String userid : errorsByPosition.keySet()) {
Exception exception = errorsByPosition.get(userid);
if (exception instanceof UnknownRoleException) {
String message = "E034: Role does not exist! You need to pre-create the role before " +
"assigning it to the user.";
errorsByPosition.put(userid, new Exception(message));
}
}
throw new StorageTransactionLogicException(new BulkImportBatchInsertException("roles errors translated", errorsByPosition));
} else {
throw new StorageTransactionLogicException(e);
}

}

}
Expand All @@ -686,7 +694,7 @@ public static void verifyMultipleEmailForAllLoginMethods(AppIdentifier appIdenti
Map<String, String> emailToUserId = new HashMap<>();
for (BulkImportUser user : users) {
for (LoginMethod lm : user.loginMethods) {
emailToUserId.put(lm.email, lm.getSuperTokenOrExternalUserId());
emailToUserId.put(lm.getSuperTokenOrExternalUserId(), lm.email);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -71,10 +74,8 @@ protected void doTaskPerApp(AppIdentifier app)
String[] allUserRoles = StorageUtils.getUserRolesStorage(bulkImportSQLStorage).getRoles(app);
BulkImportUserUtils bulkImportUserUtils = new BulkImportUserUtils(allUserRoles);

System.out.println(Thread.currentThread().getName() + " ProcessBulkImportUsers: " + " starting to load users " + this.batchSize);
List<BulkImportUser> users = bulkImportSQLStorage.getBulkImportUsersAndChangeStatusToProcessing(app,
this.batchSize);
System.out.println(Thread.currentThread().getName() + " ProcessBulkImportUsers: " + " loaded users");

if(users == null || users.isEmpty()) {
return;
Expand All @@ -91,7 +92,7 @@ protected void doTaskPerApp(AppIdentifier app)

try {
List<Future<?>> tasks = new ArrayList<>();
for (int i =0; i< NUMBER_OF_BATCHES; i++) {
for (int i =0; i< NUMBER_OF_BATCHES && i < loadedUsersChunks.size(); i++) {
tasks.add(executorService.submit(new ProcessBulkUsersImportWorker(main, app, loadedUsersChunks.get(i),
bulkImportSQLStorage, bulkImportUserUtils)));
}
Expand All @@ -101,14 +102,9 @@ protected void doTaskPerApp(AppIdentifier app)
Thread.sleep(1000);
}
Void result = (Void) task.get(); //to know if there were any errors while executing and for waiting in this thread for all the other threads to finish up
System.out.println("Result: " + result);
}


executorService.shutdownNow();
if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
System.out.println("Pool did not terminate");
}

} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@
import io.supertokens.storageLayer.StorageLayer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

public class ProcessBulkUsersImportWorker implements Runnable {

Expand Down Expand Up @@ -83,31 +80,40 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
BulkImportUser user = null;
try {
final Storage[] allStoragesForApp = getAllProxyStoragesForApp(main, appIdentifier);
boolean shouldRetryImmediately = false;
int userIndexPointer = 0;
List<BulkImportUser> validUsers = new ArrayList<>();
Map<String, Exception> validationErrorsBeforeActualProcessing = new HashMap<>();
while(userIndexPointer < users.size()) {
user = users.get(userIndexPointer);
if ((Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) ||
shouldRetryImmediately) {
if (Main.isTesting && Main.isTesting_skipBulkImportUserValidationInCronJob) {
// Skip validation when the flag is enabled during testing
// Skip validation if it's a retry run. This already passed validation. A revalidation triggers
// an invalid external user id already exists validation error - which is not true!
//TODO set invalid users status to failed
validUsers.add(user);
} else {
// Validate the user
validUsers.add(bulkImportUserUtils.createBulkImportUserFromJSON(main, appIdentifier, user.toJsonObject(), user.id));
try {
validUsers.add(bulkImportUserUtils.createBulkImportUserFromJSON(main, appIdentifier,
user.toJsonObject(), user.id));
} catch (InvalidBulkImportDataException exception) {
validationErrorsBeforeActualProcessing.put(user.id, new Exception(
String.valueOf(exception.errors)));
}
}
userIndexPointer+=1;
}

if(!validationErrorsBeforeActualProcessing.isEmpty()) {
throw new BulkImportBatchInsertException("Invalid input data", validationErrorsBeforeActualProcessing);
}
// Since all the tenants of a user must share the storage, we will just use the
// storage of the first tenantId of the first loginMethod
TenantIdentifier firstTenantIdentifier = new TenantIdentifier(appIdentifier.getConnectionUriDomain(),
appIdentifier.getAppId(), validUsers.get(0).loginMethods.get(0).tenantIds.get(0));

SQLStorage bulkImportProxyStorage = (SQLStorage) getBulkImportProxyStorage(firstTenantIdentifier);

shouldRetryImmediately = bulkImportProxyStorage.startTransaction(con -> {
bulkImportProxyStorage.startTransaction(con -> {
try {

BulkImport.processUsersImportSteps(main, con, appIdentifier, bulkImportProxyStorage, validUsers, allStoragesForApp);
Expand All @@ -132,12 +138,10 @@ private void processMultipleUsers(AppIdentifier appIdentifier, List<BulkImportUs
}
return false;
});

if(!shouldRetryImmediately){
userIndexPointer++;
}
} catch (StorageTransactionLogicException | InvalidBulkImportDataException | InvalidConfigException e) {
} catch (StorageTransactionLogicException | InvalidConfigException e) {
throw new RuntimeException(e);
} catch (BulkImportBatchInsertException insertException) {
handleProcessUserExceptions(app, users, insertException, baseTenantStorage);
} finally {
closeAllProxyStorages(); //closing it here to reuse the existing connection with all the users
}
Expand Down Expand Up @@ -169,15 +173,7 @@ private void handleProcessUserExceptions(AppIdentifier appIdentifier, List<BulkI
return;
}
if(exception.actualException instanceof BulkImportBatchInsertException){
Map<String, Exception> userIndexToError = ((BulkImportBatchInsertException) exception.actualException).exceptionByUserId;
for(String userid : userIndexToError.keySet()){
String id = usersBatch.stream()
.filter(bulkImportUser ->
bulkImportUser.loginMethods.stream()
.map(loginMethod -> loginMethod.superTokensUserId)
.anyMatch(s -> s.equals(userid))).findFirst().get().id;
bulkImportUserIdToErrorMessage.put(id, userIndexToError.get(userid).getMessage());
}
handleBulkImportException(usersBatch, (BulkImportBatchInsertException) exception.actualException, bulkImportUserIdToErrorMessage);
} else {
//fail the whole batch
errorMessage[0] = exception.actualException.getMessage();
Expand All @@ -190,6 +186,8 @@ private void handleProcessUserExceptions(AppIdentifier appIdentifier, List<BulkI
errorMessage[0] = ((InvalidBulkImportDataException) e).errors.toString();
} else if (e instanceof InvalidConfigException) {
errorMessage[0] = e.getMessage();
} else if (e instanceof BulkImportBatchInsertException) {
handleBulkImportException(usersBatch, (BulkImportBatchInsertException)e, bulkImportUserIdToErrorMessage);
}

try {
Expand All @@ -203,6 +201,31 @@ private void handleProcessUserExceptions(AppIdentifier appIdentifier, List<BulkI
}
}

private static void handleBulkImportException(List<BulkImportUser> usersBatch, BulkImportBatchInsertException exception,
Map<String, String> bulkImportUserIdToErrorMessage) {
Map<String, Exception> userIndexToError = exception.exceptionByUserId;
for(String userid : userIndexToError.keySet()){
Optional<BulkImportUser> userWithId = usersBatch.stream()
.filter(bulkImportUser -> bulkImportUser.id.equals(userid) || bulkImportUser.externalUserId.equals(userid)).findFirst();
String id = null;
if(userWithId.isPresent()){
id = userWithId.get().id;
}

if(id == null) {
userWithId = usersBatch.stream()
.filter(bulkImportUser ->
bulkImportUser.loginMethods.stream()
.map(loginMethod -> loginMethod.superTokensUserId)
.anyMatch(s -> s.equals(userid))).findFirst();
if(userWithId.isPresent()){
id = userWithId.get().id;
}
}
bulkImportUserIdToErrorMessage.put(id, userIndexToError.get(userid).getMessage());
}
}

private synchronized Storage getBulkImportProxyStorage(TenantIdentifier tenantIdentifier)
throws InvalidConfigException, IOException, TenantOrAppNotFoundException, DbInitException {
String userPoolId = StorageLayer.getStorage(tenantIdentifier, main).getUserPoolId();
Expand All @@ -229,7 +252,7 @@ private synchronized Storage getBulkImportProxyStorage(TenantIdentifier tenantId
throw new TenantOrAppNotFoundException(tenantIdentifier);
}

private Storage[] getAllProxyStoragesForApp(Main main, AppIdentifier appIdentifier)
private synchronized Storage[] getAllProxyStoragesForApp(Main main, AppIdentifier appIdentifier)
throws StorageTransactionLogicException {

try {
Expand Down
Loading

0 comments on commit 21e91fc

Please sign in to comment.