Skip to content

Commit

Permalink
Use user credentials in S3 verticle
Browse files Browse the repository at this point in the history
  • Loading branch information
ncovercash committed Oct 27, 2023
1 parent f772b25 commit 28832d7
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 163 deletions.
232 changes: 91 additions & 141 deletions src/main/java/org/folio/service/file/S3JobRunningVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonArray;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Expand All @@ -29,9 +28,6 @@
import org.folio.rest.jaxrs.model.JobProfileInfo;
import org.folio.rest.jaxrs.model.StatusDto;
import org.folio.rest.jaxrs.model.StatusDto.ErrorStatus;
import org.folio.service.auth.PermissionsClient;
import org.folio.service.auth.PermissionsClient.PermissionUser;
import org.folio.service.auth.SystemUserAuthService;
import org.folio.service.processing.ParallelFileChunkingProcessor;
import org.folio.service.processing.ranking.ScoreService;
import org.folio.service.s3storage.MinioStorageService;
Expand All @@ -54,9 +50,7 @@ public class S3JobRunningVerticle extends AbstractVerticle {

private final DataImportQueueItemDao queueItemDao;
private final MinioStorageService minioStorageService;
private final PermissionsClient permissionsClient;
private final ScoreService scoreService;
private final SystemUserAuthService systemUserService;
private final UploadDefinitionService uploadDefinitionService;

private final ParallelFileChunkingProcessor fileProcessor;
Expand All @@ -71,9 +65,7 @@ public S3JobRunningVerticle(
Vertx vertx,
DataImportQueueItemDao queueItemDao,
MinioStorageService minioStorageService,
PermissionsClient permissionsClient,
ScoreService scoreService,
SystemUserAuthService systemUserService,
UploadDefinitionService uploadDefinitionService,
ParallelFileChunkingProcessor fileProcessor,
@Value("${ASYNC_PROCESSOR_POLL_INTERVAL_MS:5000}") int pollInterval,
Expand All @@ -84,8 +76,6 @@ public S3JobRunningVerticle(
this.queueItemDao = queueItemDao;

this.minioStorageService = minioStorageService;
this.permissionsClient = permissionsClient;
this.systemUserService = systemUserService;
this.scoreService = scoreService;
this.uploadDefinitionService = uploadDefinitionService;
this.fileProcessor = fileProcessor;
Expand Down Expand Up @@ -153,122 +143,73 @@ protected Future<QueueJob> processQueueItem(DataImportQueueItem queueItem) {
// on failure and success
AtomicReference<File> localFile = new AtomicReference<>();

return getConnectionParams(queueItem)
.compose(params ->
Future
.succeededFuture(new QueueJob().withQueueItem(queueItem))
.compose((QueueJob job) ->
createLocalFile(queueItem)
.map((File file) -> {
localFile.set(file);
return job.withFile(file);
})
)
.compose(job ->
uploadDefinitionService
.getJobExecutionById(queueItem.getJobExecutionId(), params)
.map(job::withJobExecution)
)
.compose(job ->
updateJobExecutionStatusSafely(
job.getJobExecution().getId(),
new StatusDto()
.withStatus(StatusDto.Status.PROCESSING_IN_PROGRESS),
params
)
.map(job)
)
.compose(this::downloadFromS3)
.compose(job ->
fileProcessor
.processFile(
job.getFile(),
job.getJobExecution().getId(),
// this is the only part used on our end
new JobProfileInfo()
.withDataType(
JobProfileInfo.DataType.fromValue(
job.getQueueItem().getDataType()
)
),
getUserConnectionParams(
job.getJobExecution().getUserId(),
params
)
)
.map(job)
)
.onFailure((Throwable err) -> {
LOGGER.error("Unable to start chunk {}", queueItem, err);

updateJobExecutionStatusSafely(
queueItem.getJobExecutionId(),
new StatusDto()
.withErrorStatus(ErrorStatus.FILE_PROCESSING_ERROR)
.withStatus(StatusDto.Status.ERROR),
params
);
})
.onSuccess((QueueJob result) ->
LOGGER.info(
"Completed processing job execution {}!",
queueItem.getJobExecutionId()
)
)
.onComplete((AsyncResult<QueueJob> v) -> {
queueItemDao.deleteQueueItemById(queueItem.getId());
OkapiConnectionParams params = getConnectionParams(queueItem);

File file = localFile.get();
if (file != null) {
vertx.fileSystem().delete(file.toString());
}
return Future
.succeededFuture(new QueueJob().withQueueItem(queueItem))
.compose((QueueJob job) ->
createLocalFile(queueItem)
.map((File file) -> {
localFile.set(file);
return job.withFile(file);
})
);
}

private OkapiConnectionParams getUserConnectionParams(
String userId,
OkapiConnectionParams params
) {
PermissionUser permissionUser = permissionsClient
.getPermissionsUserByUserId(params, userId)
.orElseThrow(() ->
LOGGER.throwing(
new IllegalStateException(
"User ID " + userId + "who created the job was not found"
)
.compose(job ->
uploadDefinitionService
.getJobExecutionById(queueItem.getJobExecutionId(), params)
.map(job::withJobExecution)
)
.compose(job ->
updateJobExecutionStatusSafely(
job.getJobExecution().getId(),
new StatusDto().withStatus(StatusDto.Status.PROCESSING_IN_PROGRESS),
params
)
.map(job)
)
.compose(this::downloadFromS3)
.compose(job ->
fileProcessor
.processFile(
job.getFile(),
job.getJobExecution().getId(),
// this is the only part used on our end
new JobProfileInfo()
.withDataType(
JobProfileInfo.DataType.fromValue(
job.getQueueItem().getDataType()
)
),
// we need to include the user ID here since some later checks in mod-invoice/etc use it
getConnectionParams(queueItem, job.getJobExecution().getUserId())
)
.map(job)
)
.onFailure((Throwable err) -> {
LOGGER.error("Unable to start chunk {}", queueItem, err);

updateJobExecutionStatusSafely(
queueItem.getJobExecutionId(),
new StatusDto()
.withErrorStatus(ErrorStatus.FILE_PROCESSING_ERROR)
.withStatus(StatusDto.Status.ERROR),
params
);
})
.onSuccess((QueueJob result) ->
LOGGER.info(
"Completed processing job execution {}!",
queueItem.getJobExecutionId()
)
);
)
.onComplete((AsyncResult<QueueJob> v) -> {
queueItemDao.deleteQueueItemById(queueItem.getId());

return new OkapiConnectionParams(
Map.of(
// shared from the system user
XOkapiHeaders.URL.toLowerCase(),
params.getOkapiUrl(),
XOkapiHeaders.TENANT.toLowerCase(),
params.getTenantId(),
XOkapiHeaders.TOKEN.toLowerCase(),
params.getToken(),
// provided since some checks are made against these in mod-invoice
XOkapiHeaders.USER_ID.toLowerCase(),
userId,
XOkapiHeaders.PERMISSIONS.toLowerCase(),
new JsonArray(
permissionUser
.getPermissions()
.stream()
.filter(permission ->
systemUserService.getPermissionsList().contains(permission) ||
systemUserService
.getOptionalPermissionsList()
.contains(permission)
)
.toList()
)
.toString()
),
vertx
);
File file = localFile.get();
if (file != null) {
vertx.fileSystem().delete(file.toString());
}
});
}

protected Future<Void> updateJobExecutionStatusSafely(
Expand Down Expand Up @@ -326,39 +267,48 @@ protected Future<File> createLocalFile(DataImportQueueItem queueItem) {
}

/**
* Authenticate and get connection parameters (Okapi URL/token)
* Get connection parameters (Okapi URL/token)
*/
protected Future<OkapiConnectionParams> getConnectionParams(
protected OkapiConnectionParams getConnectionParams(
DataImportQueueItem queueItem
) {
OkapiConnectionParams provisionalParams = new OkapiConnectionParams(
return new OkapiConnectionParams(
Map.of(
XOkapiHeaders.URL.toLowerCase(),
queueItem.getOkapiUrl(),
XOkapiHeaders.TENANT.toLowerCase(),
queueItem.getTenant(),
// filled right after, but we need tenant/URL to get the token
XOkapiHeaders.TOKEN.toLowerCase(),
""
queueItem.getOkapiToken(),
XOkapiHeaders.PERMISSIONS.toLowerCase(),
queueItem.getOkapiPermissions()
),
vertx
);
}

return systemUserService
.getAuthToken(provisionalParams)
.map(token ->
new OkapiConnectionParams(
Map.of(
XOkapiHeaders.URL.toLowerCase(),
queueItem.getOkapiUrl(),
XOkapiHeaders.TENANT.toLowerCase(),
queueItem.getTenant(),
XOkapiHeaders.TOKEN.toLowerCase(),
token
),
vertx
)
);
/**
* Get connection parameters (Okapi URL/token), including a user ID
*/
protected OkapiConnectionParams getConnectionParams(
DataImportQueueItem queueItem,
String userId
) {
return new OkapiConnectionParams(
Map.of(
XOkapiHeaders.URL.toLowerCase(),
queueItem.getOkapiUrl(),
XOkapiHeaders.TENANT.toLowerCase(),
queueItem.getTenant(),
XOkapiHeaders.TOKEN.toLowerCase(),
queueItem.getOkapiToken(),
XOkapiHeaders.PERMISSIONS.toLowerCase(),
queueItem.getOkapiPermissions(),
XOkapiHeaders.USER_ID.toLowerCase(),
userId
),
vertx
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.folio.dao.DataImportQueueItemDao;
import org.folio.dataimport.util.OkapiConnectionParams;
import org.folio.rest.jaxrs.model.DataImportQueueItem;
import org.folio.rest.jaxrs.model.JobExecution;
import org.folio.rest.jaxrs.model.StatusDto;
import org.folio.service.auth.SystemUserAuthService;
import org.folio.service.file.S3JobRunningVerticle.QueueJob;
import org.folio.service.processing.ParallelFileChunkingProcessor;
import org.folio.service.processing.ranking.ScoreService;
Expand Down Expand Up @@ -75,9 +75,6 @@ public class S3JobRunningVerticleUnitTest {
@Mock
ScoreService scoreService;

@Mock
SystemUserAuthService systemUserService;

@Mock
UploadDefinitionService uploadDefinitionService;

Expand Down Expand Up @@ -107,9 +104,7 @@ public void setUp() throws IOException {
mockVertx,
queueItemDao,
minioStorageService,
null,
scoreService,
systemUserService,
uploadDefinitionService,
fileProcessor,
POLL_INTERVAL,
Expand All @@ -119,25 +114,43 @@ public void setUp() throws IOException {
}

@Test
public void testConnectionParams(TestContext context) {
when(systemUserService.getAuthToken(any()))
.thenReturn(Future.succeededFuture("token"));
public void testConnectionParams() {
OkapiConnectionParams params = verticle.getConnectionParams(
new DataImportQueueItem()
.withTenant("tenant")
.withOkapiUrl("okapi-url")
.withOkapiToken("token")
.withOkapiPermissions("permissions")
);

verticle
.getConnectionParams(
new DataImportQueueItem().withTenant("tenant").withOkapiUrl("okapi-url")
)
.onComplete(
context.asyncAssertSuccess(params -> {
assertThat(params.getTenantId(), is("tenant"));
assertThat(params.getOkapiUrl(), is("okapi-url"));
assertThat(params.getToken(), is("token"));
assertThat(params.getTenantId(), is("tenant"));
assertThat(params.getOkapiUrl(), is("okapi-url"));
assertThat(params.getToken(), is("token"));
assertThat(
params.getHeaders().get("x-okapi-permissions"),
is("permissions")
);
}

verify(systemUserService, times(1)).getAuthToken(any());
@Test
public void testConnectionParamsWithUserId() {
OkapiConnectionParams params = verticle.getConnectionParams(
new DataImportQueueItem()
.withTenant("tenant")
.withOkapiUrl("okapi-url")
.withOkapiToken("token")
.withOkapiPermissions("permissions"),
"user-id"
);

verifyNoMoreInteractions(systemUserService);
})
);
assertThat(params.getTenantId(), is("tenant"));
assertThat(params.getOkapiUrl(), is("okapi-url"));
assertThat(params.getToken(), is("token"));
assertThat(
params.getHeaders().get("x-okapi-permissions"),
is("permissions")
);
assertThat(params.getHeaders().get("x-okapi-user-id"), is("user-id"));
}

@Test
Expand Down

0 comments on commit 28832d7

Please sign in to comment.