Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1931, Support EC in rule
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou committed Sep 17, 2018
1 parent 4099e56 commit 83a428c
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.smartdata.model.rule.TranslateResult;
import org.smartdata.rule.parser.SmartRuleStringParser;
import org.smartdata.rule.parser.TranslationContext;
import org.smartdata.server.engine.rule.ErasureCodingPlugin;
import org.smartdata.server.engine.rule.ExecutorScheduler;
import org.smartdata.server.engine.rule.FileCopy2S3Plugin;
import org.smartdata.server.engine.rule.FileCopyDrPlugin;
Expand Down Expand Up @@ -79,9 +80,12 @@ public RuleManager(
this.serverContext = context;
this.metaStore = context.getMetaStore();

RuleExecutorPluginManager.addPlugin(new FileCopyDrPlugin(context.getMetaStore()));
RuleExecutorPluginManager.addPlugin(new FileCopy2S3Plugin());
RuleExecutorPluginManager.addPlugin(new SmallFilePlugin(context, cmdletManager));
if (serverContext.getServiceMode() == ServiceMode.HDFS) {
RuleExecutorPluginManager.addPlugin(new FileCopyDrPlugin(context.getMetaStore()));
RuleExecutorPluginManager.addPlugin(new FileCopy2S3Plugin());
RuleExecutorPluginManager.addPlugin(new SmallFilePlugin(context, cmdletManager));
RuleExecutorPluginManager.addPlugin(new ErasureCodingPlugin(context));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.server.engine.rule;

import org.apache.hadoop.hdfs.DFSClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.hdfs.CompatibilityHelperLoader;
import org.smartdata.hdfs.HadoopUtil;
import org.smartdata.metastore.MetaStore;
import org.smartdata.model.CmdletDescriptor;
import org.smartdata.model.ErasureCodingPolicyInfo;
import org.smartdata.model.RuleInfo;
import org.smartdata.model.rule.RuleExecutorPlugin;
import org.smartdata.model.rule.TranslateResult;
import org.smartdata.server.engine.ServerContext;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ErasureCodingPlugin implements RuleExecutorPlugin {
private ServerContext context;
private MetaStore metaStore;
private final Map<Long, List<String>> ecPolicies = new ConcurrentHashMap<>();
private final List<ErasureCodingPolicyInfo> ecInfos = new ArrayList<>();
private long lastUpdateTime = 0;
private URI nnUri = null;
private DFSClient client = null;

private static final Logger LOG =
LoggerFactory.getLogger(ErasureCodingPlugin.class);

public ErasureCodingPlugin(ServerContext context) {
this.context = context;
metaStore = context.getMetaStore();
try {
for (ErasureCodingPolicyInfo info : metaStore.getAllEcPolicies()) {
ecInfos.add(info);
}
} catch (Exception e) {
// ignore this
LOG.warn("Load ErasureCoding Policy failed!");
}
}


@Override
public void onNewRuleExecutor(RuleInfo ruleInfo,
TranslateResult tResult) {
long ruleId = ruleInfo.getId();
CmdletDescriptor des = tResult.getCmdDescriptor();
for (int i = 0; i < des.getActionSize(); i++) {
if (des.getActionName(i).equals("ec")) {
String policy = des.getActionArgs(i).get("-policy");
if (policy == null) {
continue;
}
if (!ecPolicies.containsKey(ruleId)) {
ecPolicies.put(ruleId, new ArrayList<String>());
}
ecPolicies.get(ruleId).add(policy);
}
}
}

private void initClient() {
try {
if (nnUri == null) {
nnUri = HadoopUtil.getNameNodeUri(context.getConf());
}
if (nnUri != null && client == null) {
client = HadoopUtil.getDFSClient(nnUri, context.getConf());
}
} catch (Exception e) {
LOG.error("Init client connection failed: " + e.getLocalizedMessage());
}
}

private void updateErasureCodingPolices() {
try {
initClient();
if (client == null) {
LOG.error("Failed to refresh EC policies due to can not setup connection to HDFS!");
return;
}
Map<Byte, String> idToPolicyName =
CompatibilityHelperLoader.getHelper().getErasueCodingPolicies(client);
if (idToPolicyName != null) {
ecInfos.clear();
for (Byte id : idToPolicyName.keySet()) {
ecInfos.add(new ErasureCodingPolicyInfo(id, idToPolicyName.get(id)));
}
metaStore.deleteAllEcPolicies();
metaStore.insertEcPolicies(ecInfos);
}
} catch (Exception e) {
LOG.warn("Failed to refresh EC policies!");
}
}

@Override
public boolean preExecution(RuleInfo ruleInfo,
TranslateResult tResult) {
if (!ecPolicies.containsKey(ruleInfo.getId())) {
return true;
}
List<String> polices = ecPolicies.get(ruleInfo.getId());
String notIn = null;
synchronized (ecInfos) {
for (String policy : polices) {
notIn = policy;
for (ErasureCodingPolicyInfo info : ecInfos) {
if (info.getEcPolicyName().equals(policy)) {
notIn = null;
break;
}
}
if (notIn != null) {
break;
}
}
}

if (notIn != null) {
synchronized (ecInfos) {
long curr = System.currentTimeMillis();
if (curr - lastUpdateTime >= 5000) {
LOG.info("Refresh EC policies for policy: " + notIn);
updateErasureCodingPolices();
lastUpdateTime = curr;
}
}
}
return true;
}

@Override
public List<String> preSubmitCmdlet(RuleInfo ruleInfo,
List<String> objects) {
return objects;
}

@Override
public CmdletDescriptor preSubmitCmdletDescriptor(RuleInfo ruleInfo,
TranslateResult tResult, CmdletDescriptor descriptor) {
return descriptor;
}

@Override
public void onRuleExecutorExit(RuleInfo ruleInfo) {
ecPolicies.remove(ruleInfo.getId());
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.server.engine.rule;

import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,14 @@ public void insertEcPolicies(List<ErasureCodingPolicyInfo> ecInfos) throws MetaS
}
}

public List<ErasureCodingPolicyInfo> getAllEcPolicies() throws MetaStoreException {
try {
return ecDao.getAllEcPolicies();
} catch (Exception e) {
throw new MetaStoreException(e);
}
}

public void deleteFileByPath(String path) throws MetaStoreException {
try {
fileInfoDao.deleteByPath(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public ErasureCodingPolicyInfo getEcPolicyByName(String policyName) throws MetaS
new Object[]{policyName}, new EcPolicyRowMapper());
}

public List<ErasureCodingPolicyInfo> getAllEcPolicies() throws MetaStoreException {
public List<ErasureCodingPolicyInfo> getAllEcPolicies() {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
return jdbcTemplate.query("SELECT * FROM " + TABLE_NAME, new EcPolicyRowMapper());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public class FileObject extends SmartObject {
PROPERTIES.put("isDir",
new Property("isDir", ValueType.BOOLEAN,
null, "file", "is_dir", false));
PROPERTIES.put("ecPolicy",
new Property("ecPolicy", ValueType.STRING,
null, "file", null, false,
"(SELECT policy_name FROM ec_policy WHERE id = file.ec_policy_id)"));
}

public FileObject() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public class SmartRuleStringParser {
optCond.put("cache", "not inCache");
optCond.put("uncache", "inCache");
optCond.put("sync", "unsynced");
optCond.put("ec", "1");
optCond.put("unec", "1");
}

List<RecognitionException> parseErrors = new ArrayList<RecognitionException>();
Expand Down Expand Up @@ -89,9 +91,23 @@ public TranslateResult translate() throws IOException {
if (cmdDes.getActionSize() != 1 || optCond.get(actName) == null) {
return tr;
}

String repl = optCond.get(actName);
if (cmdDes.getActionName(0).equals("ec") || cmdDes.getActionName(0).equals("unec")) {
String policy;
if (cmdDes.getActionName(0).equals("ec")) {
policy = cmdDes.getActionArgs(0).get("-policy");
if (policy == null) {
throw new IOException("Option '-policy' of 'ec' action not specified!");
}
} else {
policy = "replication";
}
repl = "ecPolicy != \"" + policy + "\"";
}
int[] condPosition = tr.getCondPosition();
String cond = rule.substring(condPosition[0], condPosition[1] + 1);
String optRule = rule.replace(cond, optCond.get(actName) + " and (" + cond + ")");
String optRule = rule.replace(cond, repl + " and (" + cond + ")");
return doTranslate(optRule);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Response submitCmdlet(String args) {
return new JsonResponse<>(Response.Status.CREATED, smartEngine.getCmdletManager()
.submitCmdlet(args)).build();
} catch (Exception e) {
logger.error("Exception in ActionRestApi while adding cmdlet", e);
logger.error("Exception in ActionRestApi while adding cmdlet: " + e.getLocalizedMessage());
return new JsonResponse<>(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage(), ExceptionUtils.getStackTrace(e)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Response addRule(@FormParam("ruleText") String ruleText) {
logger.info("Adding rule: " + ruleText);
t = smartEngine.getRuleManager().submitRule(ruleText, RuleState.DISABLED);
} catch (Exception e) {
logger.error("Exception in RuleRestApi while adding rule: ", e.getMessage());
logger.error("Exception in RuleRestApi while adding rule: " + e.getLocalizedMessage());
return new JsonResponse<>(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage(), ExceptionUtils.getStackTrace(e)).build();
}
Expand Down

0 comments on commit 83a428c

Please sign in to comment.