Skip to content

Commit

Permalink
[Improvement][Master] Validate same content of input file when using …
Browse files Browse the repository at this point in the history
…task cache (#13298)

* support file content checksum

* fix inject null storageOperate bug
  • Loading branch information
Radeity authored Jan 3, 2023
1 parent 8a47992 commit ccad56e
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,9 @@ private Constants() {
public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server";
public static final String THREAD_NAME_ALERT_SERVER = "Alert-Server";

// suffix of crc file
public static final String CRC_SUFFIX = ".crc";

/**
* complement date default cron string
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -264,4 +267,36 @@ public static boolean directoryTraversal(String filename) {
}
}

/**
* Calculate file checksum with CRC32 algorithm
* @param pathName
* @return checksum of file/dir
*/
public static String getFileChecksum(String pathName) throws IOException {
CRC32 crc32 = new CRC32();
File file = new File(pathName);
String crcString = "";
if (file.isDirectory()) {
// file system interface remains the same order
String[] subPaths = file.list();
StringBuilder concatenatedCRC = new StringBuilder();
for (String subPath : subPaths) {
concatenatedCRC.append(getFileChecksum(pathName + FOLDER_SEPARATOR + subPath));
}
crcString = concatenatedCRC.toString();
} else {
try (
FileInputStream fileInputStream = new FileInputStream(pathName);
CheckedInputStream checkedInputStream = new CheckedInputStream(fileInputStream, crc32);) {
while (checkedInputStream.read() != -1) {
}
} catch (IOException e) {
throw new IOException("Calculate checksum error.");
}
crcString = Long.toHexString(crc32.getValue());
}

return crcString;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,28 @@ public void testDirectoryTraversal() {
Assertions.assertTrue(FileUtils.directoryTraversal(path));
}

@Test
void testGetFileChecksum() throws Exception {
String filePath1 = "test/testFile1.txt";
String filePath2 = "test/testFile2.txt";
String filePath3 = "test/testFile3.txt";
String content1 = "正正正faffdasfasdfas,한국어; 한글……にほんご\nfrançais";
String content2 = "正正正faffdasfasdfas,한국어; 한글……にほん\nfrançais";
FileUtils.writeContent2File(content1, filePath1);
FileUtils.writeContent2File(content2, filePath2);
FileUtils.writeContent2File(content1, filePath3);

String checksum1 = FileUtils.getFileChecksum(filePath1);
String checksum2 = FileUtils.getFileChecksum(filePath2);
String checksum3 = FileUtils.getFileChecksum(filePath3);

Assertions.assertNotEquals(checksum1, checksum2);
Assertions.assertEquals(checksum1, checksum3);

String dirPath = "test/";

Assertions.assertDoesNotThrow(
() -> FileUtils.getFileChecksum(dirPath));
}

}
4 changes: 4 additions & 0 deletions dolphinscheduler-dao/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.dolphinscheduler</groupId>
<artifactId>dolphinscheduler-storage-api</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,42 @@

package org.apache.dolphinscheduler.dao.utils;

import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX;

import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;

public class TaskCacheUtils {

protected static final Logger logger = LoggerFactory.getLogger(TaskCacheUtils.class);

private TaskCacheUtils() {
throw new IllegalStateException("Utility class");
}
Expand All @@ -54,15 +67,17 @@ private TaskCacheUtils() {
* 4. input VarPool, from upstream task and workflow global parameters
* @param taskInstance task instance
* @param taskExecutionContext taskExecutionContext
* @param storageOperate storageOperate
* @return cache key
*/
public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext) {
public static String generateCacheKey(TaskInstance taskInstance, TaskExecutionContext taskExecutionContext,
StorageOperate storageOperate) {
List<String> keyElements = new ArrayList<>();
keyElements.add(String.valueOf(taskInstance.getTaskCode()));
keyElements.add(String.valueOf(taskInstance.getTaskDefinitionVersion()));
keyElements.add(String.valueOf(taskInstance.getIsCache().getCode()));
keyElements.add(String.valueOf(taskInstance.getEnvironmentConfig()));
keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext));
keyElements.add(getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate));
String data = StringUtils.join(keyElements, "_");
return DigestUtils.sha256Hex(data);
}
Expand Down Expand Up @@ -109,7 +124,8 @@ public static Pair<Integer, String> revertCacheKey(String tagCacheKey) {
* @param taskInstance task instance
* taskExecutionContext taskExecutionContext
*/
public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context) {
public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExecutionContext context,
StorageOperate storageOperate) {
JsonNode taskParams = JSONUtils.parseObject(taskInstance.getTaskParams());

// The set of input values considered from localParams in the taskParams
Expand All @@ -123,6 +139,12 @@ public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExec
// var pool value from upstream task
List<Property> varPool = JSONUtils.toList(taskInstance.getVarPool(), Property.class);

Map<String, String> fileCheckSumMap = new HashMap<>();
List<Property> fileInput = varPool.stream().filter(property -> property.getType().equals(DataType.FILE))
.collect(Collectors.toList());
fileInput.forEach(
property -> fileCheckSumMap.put(property.getProp(), getValCheckSum(property, context, storageOperate)));

// var pool value from workflow global parameters
if (context.getPrepareParamsMap() != null) {
Set<String> taskVarPoolSet = varPool.stream().map(Property::getProp).collect(Collectors.toSet());
Expand All @@ -139,9 +161,40 @@ public static String getTaskInputVarPoolData(TaskInstance taskInstance, TaskExec
.filter(property -> propertyInSet.contains(property.getProp()))
.sorted(Comparator.comparing(Property::getProp))
.collect(Collectors.toList());

varPool.forEach(property -> {
if (property.getType() == DataType.FILE) {
property.setValue(fileCheckSumMap.get(property.getValue()));
}
});
return JSONUtils.toJsonString(varPool);
}

/**
* get checksum from crc32 file of file property in varPool
* cache can be used if content of upstream output files are the same
* @param fileProperty
* @param context
* @param storageOperate
*/
public static String getValCheckSum(Property fileProperty, TaskExecutionContext context,
StorageOperate storageOperate) {
String resourceCRCPath = fileProperty.getValue() + CRC_SUFFIX;
String resourceCRCWholePath = storageOperate.getResourceFileName(context.getTenantCode(), resourceCRCPath);
String targetPath = String.format("%s/%s", context.getExecutePath(), resourceCRCPath);
logger.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath);
String crcString = "";
try {
storageOperate.download(context.getTenantCode(), resourceCRCWholePath, targetPath, false,
true);
crcString = FileUtils.readFile2Str(new FileInputStream(targetPath));
fileProperty.setValue(crcString);
} catch (IOException e) {
logger.error("Replace checksum failed for file property {}.", fileProperty.getProp());
}
return crcString;
}

/**
* get var in set from task definition
* @param taskInstance task instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.dolphinscheduler.dao.utils;

import static org.apache.dolphinscheduler.common.constants.Constants.CRC_SUFFIX;

import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
Expand All @@ -35,13 +39,16 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class TaskCacheUtilsTest {

private TaskInstance taskInstance;

private TaskExecutionContext taskExecutionContext;

private StorageOperate storageOperate;

@BeforeEach
void setUp() {
String taskParams = "{\n" +
Expand Down Expand Up @@ -95,6 +102,7 @@ void setUp() {
prepareParamsMap.put("a", property);
taskExecutionContext.setPrepareParamsMap(prepareParamsMap);

storageOperate = Mockito.mock(StorageOperate.class);
}

@Test
Expand All @@ -121,25 +129,26 @@ void testGetScriptVarInSet() {

@Test
void TestGetTaskInputVarPoolData() {
TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext);
TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate);
// only a=aa and c=cc will influence the result,
// b=bb is a fixed value, will be considered in task version
// k=kk is not in task params, will be ignored
String except =
"[{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"aa\"},{\"prop\":\"c\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"cc\"}]";
Assertions.assertEquals(except, TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext));
Assertions.assertEquals(except,
TaskCacheUtils.getTaskInputVarPoolData(taskInstance, taskExecutionContext, storageOperate));
}

@Test
void TestGenerateCacheKey() {
String cacheKeyBase = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
String cacheKeyBase = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
Property propertyI = new Property();
propertyI.setProp("i");
propertyI.setDirect(Direct.IN);
propertyI.setType(DataType.VARCHAR);
propertyI.setValue("ii");
taskExecutionContext.getPrepareParamsMap().put("i", propertyI);
String cacheKeyNew = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
String cacheKeyNew = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
// i will not influence the result, because task instance not use it
Assertions.assertEquals(cacheKeyBase, cacheKeyNew);

Expand All @@ -149,17 +158,17 @@ void TestGenerateCacheKey() {
propertyD.setType(DataType.VARCHAR);
propertyD.setValue("dd");
taskExecutionContext.getPrepareParamsMap().put("i", propertyD);
String cacheKeyD = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
String cacheKeyD = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
// d will influence the result, because task instance use it
Assertions.assertNotEquals(cacheKeyBase, cacheKeyD);

taskInstance.setTaskDefinitionVersion(100);
String cacheKeyE = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
String cacheKeyE = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
// task definition version is changed, so cache key changed
Assertions.assertNotEquals(cacheKeyD, cacheKeyE);

taskInstance.setEnvironmentConfig("export PYTHON_HOME=/bin/python3");
String cacheKeyF = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext);
String cacheKeyF = TaskCacheUtils.generateCacheKey(taskInstance, taskExecutionContext, storageOperate);
// EnvironmentConfig is changed, so cache key changed
Assertions.assertNotEquals(cacheKeyE, cacheKeyF);
}
Expand All @@ -169,4 +178,23 @@ void testGetCacheKey() {
String cacheKey = TaskCacheUtils.generateTagCacheKey(1, "123");
Assertions.assertEquals("1-123", cacheKey);
}

@Test
void testReplaceWithCheckSum() {
String content = "abcdefg";
String filePath = "test/testFile.txt";
FileUtils.writeContent2File(content, filePath + CRC_SUFFIX);

Property property = new Property();
property.setProp("f1");
property.setValue("testFile.txt");
property.setType(DataType.FILE);
property.setDirect(Direct.IN);
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setExecutePath("test");
taskExecutionContext.setTenantCode("aaa");

String crc = TaskCacheUtils.getValCheckSum(property, taskExecutionContext, storageOperate);
Assertions.assertEquals(crc, content);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.dao.utils.TaskCacheUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
Expand Down Expand Up @@ -104,6 +105,12 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
@Autowired
private TaskEventService taskEventService;

/**
* storage operator
*/
@Autowired(required = false)
private StorageOperate storageOperate;

/**
* consumer thread pool
*/
Expand Down Expand Up @@ -298,7 +305,7 @@ private boolean checkIsCacheExecution(TaskInstance taskInstance, TaskExecutionCo
return false;
}
// check if task is cache execution
String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context);
String cacheKey = TaskCacheUtils.generateCacheKey(taskInstance, context, storageOperate);
TaskInstance cacheTaskInstance = taskInstanceDao.findTaskInstanceByCacheKey(cacheKey);
// if we can find the cache task instance, we will add cache event, and return true.
if (cacheTaskInstance != null) {
Expand Down
Loading

0 comments on commit ccad56e

Please sign in to comment.