Skip to content

Commit

Permalink
[fix] Python gateway can not upload to resource center
Browse files Browse the repository at this point in the history
in apache#12076 we refactor our resource center, remove the resource table,
it is a good refactor but it failed python api upload, this patch try
to fix python api upload function

ref: apache#12076
  • Loading branch information
zhongjiajie committed Nov 29, 2022
1 parent e180e16 commit 3f35baf
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectUser;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.Tenant;
Expand All @@ -59,6 +58,7 @@
import org.apache.dolphinscheduler.dao.mapper.ProjectUserMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.storage.StorageEntity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import py4j.GatewayServer;
Expand Down Expand Up @@ -632,9 +632,10 @@ public Long getEnvironmentInfo(String environmentName) {
*
* @param userName user who query resource
* @param fullName full name of the resource
* @return StorageEntity object which contains necessary information about resource
*/
public Resource queryResourcesFileInfo(String userName, String fullName) {
return resourceService.queryResourcesFileInfo(userName, fullName);
public StorageEntity queryResourcesFileInfo(String userName, String fullName) throws Exception {
return resourceService.queryFileStatus(userName, fullName);
}

public String getGatewayVersion() {
Expand All @@ -647,14 +648,11 @@ public String getGatewayVersion() {
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
* @return StorageEntity object which contains necessary information about resource
*/
public Integer createOrUpdateResource(
String userName, String fullName, String description,
String resourceContent) {
return resourceService.createOrUpdateResource(userName, fullName, description, resourceContent);
public StorageEntity createOrUpdateResource(String userName, String fullName, String resourceContent) throws Exception {
return resourceService.createOrUpdateResource(userName, fullName, resourceContent);
}

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.ProgramType;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.service.storage.StorageEntity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import java.io.IOException;
Expand Down Expand Up @@ -170,28 +170,13 @@ Result<Object> onlineCreateResource(User loginUser, ResourceType type, String fi

/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc,
String content);

/**
* create or update resource.
* If the folder is not already created, it will be
* If the folder is not already created, it will be ignored and directly create the new file
*
* @param userName user who create or update resource
* @param fullName The fullname of resource.Includes path and suffix.
* @param description description of resource
* @param resourceContent content of resource
* @return id of resource
*/
Integer createOrUpdateResource(String userName, String fullName, String description, String resourceContent);
StorageEntity createOrUpdateResource(String userName, String fullName, String resourceContent) throws Exception;

/**
* updateProcessInstance resource
Expand Down Expand Up @@ -229,7 +214,7 @@ Result<Object> updateResourceContent(User loginUser, String fullName, String ten
* @param userName user who query resource
* @param fullName full name of the resource
*/
Resource queryResourcesFileInfo(String userName, String fullName);
StorageEntity queryFileStatus(String userName, String fullName) throws Exception;

/**
* delete DATA_TRANSFER data in resource center
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,103 +1499,23 @@ public Result<Object> onlineCreateResource(User loginUser, ResourceType type, St
return result;
}

/**
* create or update resource.
* If the folder is not already created, it will be
*
* @param loginUser user who create or update resource
* @param fileFullName The full name of resource.Includes path and suffix.
* @param desc description of resource
* @param content content of resource
* @return create result code
*/
@Override
@Transactional
public Result<Object> onlineCreateOrUpdateResourceWithDir(User loginUser, String fileFullName, String desc,
String content) {
// TODO: need update to third party service
if (checkResourceExists(fileFullName)) {
Resource resource = resourcesMapper.queryResource(fileFullName, ResourceType.FILE.ordinal()).get(0);
Result<Object> result = this.updateResourceContent(loginUser, fileFullName,
resource.getUserName(), content);
if (result.getCode() == Status.SUCCESS.getCode()) {
resource.setDescription(desc);
Map<String, Object> resultMap = new HashMap<>();
for (Map.Entry<Object, Object> entry : new BeanMap(resource).entrySet()) {
if (!Constants.CLASS.equalsIgnoreCase(entry.getKey().toString())) {
resultMap.put(entry.getKey().toString(), entry.getValue());
}
}
result.setData(resultMap);
}
return result;
} else {
String resourceSuffix = fileFullName.substring(fileFullName.indexOf(PERIOD) + 1);
String fileNameWithSuffix = fileFullName.substring(fileFullName.lastIndexOf(FOLDER_SEPARATOR) + 1);
String resourceDir = fileFullName.replace(fileNameWithSuffix, EMPTY_STRING);
String resourceName = fileNameWithSuffix.replace(PERIOD + resourceSuffix, EMPTY_STRING);
String[] dirNames = resourceDir.split(FOLDER_SEPARATOR);
int pid = -1;
StringBuilder currDirPath = new StringBuilder();
for (String dirName : dirNames) {
if (StringUtils.isNotEmpty(dirName)) {
pid = queryOrCreateDirId(loginUser, pid, currDirPath.toString(), dirName);
currDirPath.append(FOLDER_SEPARATOR).append(dirName);
}
}
return this.onlineCreateResource(
loginUser, ResourceType.FILE, resourceName, resourceSuffix, desc, content,
currDirPath.toString());
}
}

@Override
@Transactional
public Integer createOrUpdateResource(String userName, String fullName, String description,
String resourceContent) {
public StorageEntity createOrUpdateResource(String userName, String filepath, String resourceContent) throws Exception {
User user = userMapper.queryByUserNameAccurately(userName);
int suffixLabelIndex = fullName.indexOf(PERIOD);
int suffixLabelIndex = filepath.indexOf(PERIOD);
if (suffixLabelIndex == -1) {
String msg = String.format("The suffix of file can not be empty, fullName:%s.", fullName);
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
if (!fullName.startsWith(FOLDER_SEPARATOR)) {
fullName = FOLDER_SEPARATOR + fullName;
}
Result<Object> createResult = onlineCreateOrUpdateResourceWithDir(
user, fullName, description, resourceContent);
if (createResult.getCode() == Status.SUCCESS.getCode()) {
Map<String, Object> resultMap = (Map<String, Object>) createResult.getData();
return (int) resultMap.get("id");
}
String msg = String.format("Create or update resource error, resourceName:%s.", fullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}

private int queryOrCreateDirId(User user, int pid, String currentDir, String dirName) {
String dirFullName = currentDir + FOLDER_SEPARATOR + dirName;
if (checkResourceExists(dirFullName)) {
List<Resource> resourceList = resourcesMapper.queryResource(dirFullName, ResourceType.FILE.ordinal());
return resourceList.get(0).getId();
} else {
// create dir
Result<Object> createDirResult = this.createDirectory(
user, dirName, EMPTY_STRING, ResourceType.FILE, pid, currentDir);
if (createDirResult.getCode() == Status.SUCCESS.getCode()) {
// Map<String, Object> resultMap = (Map<String, Object>) createDirResult.getData();
// return resultMap.get("id") == null ? -1 : (Integer) resultMap.get("id");
throw new IllegalArgumentException(String.format("Not allow create or update resources without extension name, filepath: %s", filepath));
}

// Since resource is kept in third party services, its id will always be -1.
return -1;
String defaultPath = storageOperate.getResDir(user.getTenantCode());
String fullName = defaultPath + filepath;

} else {
String msg = String.format("Create dir error, dirFullName:%s.", dirFullName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
Result<Object> result = uploadContentToStorage(user, fullName, user.getTenantCode(), resourceContent);
if (result.getCode() != Status.SUCCESS.getCode()) {
throw new ServiceException(result.getMsg());
}
return storageOperate.getFileStatus(fullName, defaultPath, user.getTenantCode(), ResourceType.FILE);
}

private void permissionPostHandle(ResourceType resourceType, User loginUser, Integer resourceId) {
Expand Down Expand Up @@ -1864,17 +1784,12 @@ public Map<String, Object> authorizeResourceTree(User loginUser, Integer userId)
}

@Override
public Resource queryResourcesFileInfo(String userName, String fileName) {
public StorageEntity queryFileStatus(String userName, String fileName) throws Exception {
// TODO: It is used in PythonGateway, should be revised
User user = userMapper.queryByUserNameAccurately(userName);
Result<Object> resourceResponse = this.queryResourceByFileName(user, fileName, ResourceType.FILE, "");
if (resourceResponse.getCode() != Status.SUCCESS.getCode()) {
String msg =
String.format("Query resource by fullName failed, userName:%s, fullName:%s", userName, fileName);
logger.error(msg);
throw new IllegalArgumentException(msg);
}
return (Resource) resourceResponse.getData();

String defaultPath = storageOperate.getResDir(user.getTenantCode());
return storageOperate.getFileStatus(defaultPath + fileName, defaultPath, user.getTenantCode(), ResourceType.FILE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.storage.StorageEntity;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import java.util.Date;
Expand Down Expand Up @@ -103,36 +103,29 @@ public void testCreateResource() {
String resourceDir = "/dir1/dir2/";
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String resourceFullName = resourceDir + resourceName + "." + resourceSuffix;

int resourceId = 3;

Mockito.when(resourcesService.createOrUpdateResource(user.getUserName(), resourceFullName, desc, content))
.thenReturn(resourceId);

int id = pythonGateway.createOrUpdateResource(
user.getUserName(), resourceFullName, desc, content);
Assertions.assertEquals(id, resourceId);
Assertions.assertDoesNotThrow(() -> pythonGateway.createOrUpdateResource(user.getUserName(), resourceFullName, content));
}

@Test
public void testQueryResourcesFileInfo() {
public void testQueryResourcesFileInfo() throws Exception {
User user = getTestUser();
Resource resource = getTestResource();
Mockito.when(resourcesService.queryResourcesFileInfo(user.getUserName(), resource.getFullName()))
.thenReturn(resource);
Resource result = pythonGateway.queryResourcesFileInfo(user.getUserName(), resource.getFullName());
Assertions.assertEquals(result.getId(), resource.getId());
StorageEntity storageEntity = getTestResource();

Mockito.when(resourcesService.queryFileStatus(user.getUserName(), storageEntity.getFullName()))
.thenReturn(storageEntity);
StorageEntity result = pythonGateway.queryResourcesFileInfo(user.getUserName(), storageEntity.getFullName());
Assertions.assertEquals(result.getId(), storageEntity.getId());
}

private Resource getTestResource() {
Resource resource = new Resource();
resource.setId(1);
resource.setType(ResourceType.FILE);
resource.setFullName("/dev/test.py");
return resource;
private StorageEntity getTestResource() {
StorageEntity storageEntity = new StorageEntity();
storageEntity.setId(1);
storageEntity.setType(ResourceType.FILE);
storageEntity.setFullName("/dev/test.py");
return storageEntity;
}

private User getTestUser() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class ResourcesServiceTest {

private MockedStatic<PropertyUtils> mockedStaticPropertyUtils;

private Throwable exception;

@BeforeEach
public void setUp() {
mockedStaticFileUtils = Mockito.mockStatic(FileUtils.class);
Expand Down Expand Up @@ -570,67 +572,25 @@ public void testOnlineCreateResource() {
}

@Test
public void testOnlineCreateResourceWithDir() {
public void testCreateOrUpdateResource() throws Exception {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
User user = getUser();
user.setId(1);
Mockito.when(userMapper.queryByUserNameAccurately(user.getUserName())).thenReturn(getUser());

String dir1Path = "/dir1";
String dir2Path = "/dir2";
String resourceDir = dir1Path + dir2Path;
String resourceName = "test";
String resourceSuffix = "py";
String desc = "desc";
String content = "content";
String fullName = resourceDir + "/" + resourceName + "." + resourceSuffix;

Resource dir1 = new Resource();
dir1.setFullName(dir1Path);
dir1.setId(1);
dir1.setUserId(user.getId());
Resource dir2 = new Resource();
dir2.setFullName(resourceDir);
dir2.setUserId(user.getId());

Mockito.when(storageOperate.getDir(ResourceType.FILE, "123")).thenReturn("/dolphinscheduler/123/resources/");
Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/");
// RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
exception = Assertions.assertThrows(IllegalArgumentException.class, () -> resourcesService.createOrUpdateResource(user.getUserName(), "filename", "my-content"));
Assertions.assertTrue(exception.getMessage().contains("Not allow create or update resources without extension name"));

// SUCCESS
Mockito.when(storageOperate.getResDir(user.getTenantCode())).thenReturn("/dolphinscheduler/123/resources/");
Mockito.when(FileUtils.getUploadFilename(Mockito.anyString(), Mockito.anyString())).thenReturn("test");
Mockito.when(FileUtils.writeContent2File(Mockito.anyString(), Mockito.anyString())).thenReturn(true);
try {
Mockito.when(storageOperate.mkdir("123", "/dolphinscheduler/123/resources" + dir1Path)).thenReturn(true);
Mockito.when(storageOperate.mkdir("123", "/dolphinscheduler/123/resources" + dir2Path)).thenReturn(true);
} catch (IOException e) {
logger.error("create resource directory {} failed", fullName);
}
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(user.getTenantId())).thenReturn(getTenant());
Result<Object> result = resourcesService.onlineCreateOrUpdateResourceWithDir(user, fullName, desc, content);
Assertions.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
Mockito.when(storageOperate.getFileStatus(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(getStorageEntityResource());
StorageEntity storageEntity = resourcesService.createOrUpdateResource(user.getUserName(), "filename.txt", "my-content");
Assertions.assertNotNull(storageEntity);
Assertions.assertEquals("/dolphinscheduler/123/resources/ResourcesServiceTest", storageEntity.getFullName());
}

// TODO: revise this testcase after modifying PythonGateway.java
// @Test
// public void testQueryResourcesFileInfo() {
// User user = getUser();
// String userName = "test-user";
// Mockito.when(userMapper.queryByUserNameAccurately(userName)).thenReturn(user);
// Resource file = new Resource();
// file.setFullName("/dir/file1.py");
// file.setId(1);
// Mockito.when(resourcesMapper.queryResource(file.getFullName(), ResourceType.FILE.ordinal()))
// .thenReturn(Collections.singletonList(file));
// Mockito.when(resourcePermissionCheckService.operationPermissionCheck(
// AuthorizationType.RESOURCE_FILE_ID, null, user.getId(), ApiFuncIdentificationConstant.FILE_VIEW,
// serviceLogger)).thenReturn(true);
// Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(
// AuthorizationType.RESOURCE_FILE_ID, new Object[]{file.getId()}, user.getId(), serviceLogger))
// .thenReturn(true);
// Mockito.when(userMapper.selectById(1)).thenReturn(getUser());
// Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
// Resource result = resourcesService.queryResourcesFileInfo(userName, file.getFullName());
// Assertions.assertEquals(file.getFullName(), result.getFullName());
// }

@Test
public void testUpdateResourceContent() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
Expand Down
Loading

0 comments on commit 3f35baf

Please sign in to comment.