Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
Solve #1119, Refine Mover progress report (#1121)
Browse files Browse the repository at this point in the history
  • Loading branch information
littlezhou authored Sep 20, 2017
1 parent 92713ad commit 6b52d2d
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void moveInSameNode() throws Exception {
}

// Do move executor
MoverExecutor moverExecutor = new MoverExecutor(conf, 10, 500);
MoverStatus status = new MoverStatus();
MoverExecutor moverExecutor = new MoverExecutor(status, conf, 10, 500);
int failedMoves = moverExecutor.executeMove(plan);
Assert.assertEquals(0, failedMoves);

Expand Down Expand Up @@ -123,7 +124,8 @@ public void moveCrossNodes() throws Exception {
}

// Do mover executor
MoverExecutor moverExecutor = new MoverExecutor(conf, 10, 500);
MoverStatus status = new MoverStatus();
MoverExecutor moverExecutor = new MoverExecutor(status, conf, 10, 500);
int failedMoves = moverExecutor.executeMove(plan);
Assert.assertEquals(0, failedMoves);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void init(Map<String, String> args) {
if (plan != null) {
Gson gson = new Gson();
movePlan = gson.fromJson(plan, FileMovePlan.class);
status.setTotalBlocks(movePlan.getBlockIds().size());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public MoverBasedMoveRunner(Configuration conf, MoverStatus actionStatus) {
public void move(String file, FileMovePlan plan) throws Exception {
int maxMoves = plan.getPropertyValueInt(FileMovePlan.MAX_CONCURRENT_MOVES, 10);
int maxRetries = plan.getPropertyValueInt(FileMovePlan.MAX_NUM_RETRIES, 10);
MoverExecutor executor = new MoverExecutor(conf, maxRetries, maxMoves);
MoverExecutor executor = new MoverExecutor(actionStatus, conf, maxRetries, maxMoves);
executor.executeMove(plan);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ public class MoverExecutor {

private Map<Long, Block> sourceBlockMap;
private Map<String, DatanodeInfo> sourceDatanodeMap;
private MoverStatus status;

public MoverExecutor(Configuration conf, int maxRetryTimes, int maxConcurrentMoves) {
public MoverExecutor(MoverStatus status, Configuration conf,
int maxRetryTimes, int maxConcurrentMoves) {
this.status = status;
this.conf = conf;
this.maxRetryTimes = maxRetryTimes;
this.maxConcurrentMoves = maxConcurrentMoves;
Expand All @@ -87,9 +90,17 @@ public void run() {
}
});
}
while (!ReplicaMove.allMoveFinished(allMoves)) {

int[] stat = new int[2];
while (true) {
ReplicaMove.countStatus(allMoves, stat);
if (stat[0] == allMoves.size()) {
status.increaseMovedBlocks(stat[1]);
break;
}
Thread.sleep(1000);
}

int remaining = ReplicaMove.refreshMoverList(allMoves);
if (allMoves.size() == 0) {
LOG.info("{} succeeded", this);
Expand Down Expand Up @@ -135,7 +146,8 @@ private void parseSchedulePlan(FileMovePlan plan) throws IOException {
DatanodeInfo sourceDatanode = sourceDatanodeMap.get(sourceUuids.get(planIndex));
StorageGroup source = new StorageGroup(sourceDatanode, sourceStorageTypes.get(planIndex));
//build target
DatanodeInfo targetDatanode = CompatibilityHelperLoader.getHelper().newDatanodeInfo(targetIpAddrs.get(planIndex), targetXferPorts.get(planIndex));
DatanodeInfo targetDatanode = CompatibilityHelperLoader.getHelper()
.newDatanodeInfo(targetIpAddrs.get(planIndex), targetXferPorts.get(planIndex));
StorageGroup target = new StorageGroup(targetDatanode, targetStorageTypes.get(planIndex));
// generate single move
ReplicaMove replicaMove = new ReplicaMove(block, source, target, nnc, saslClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,77 +21,35 @@
* ActionStatus of Mover tool.
*/
public class MoverStatus {
private long totalBlocks;
private long totalSize;
private long movedBlocks;
private boolean totalValueSet;
private int totalBlocks; // each replica is counted as a block
private int movedBlocks;

public MoverStatus() {
totalBlocks = 0;
totalSize = 0;
movedBlocks = 0;
totalValueSet = false;
}

synchronized public float getPercentage() {
if (!totalValueSet) {
public float getPercentage() {
if (totalBlocks == 0) {
return 0;
}
return movedBlocks * 1.0F / totalBlocks;
}

synchronized public long getTotalSize() {
return totalSize;
}

synchronized public long getTotalBlocks() {
public int getTotalBlocks() {
return totalBlocks;
}

synchronized public void setTotalBlocks(long blocks) {
if (totalValueSet) {
return;
}
public void setTotalBlocks(int blocks) {
totalBlocks = blocks;
}

synchronized public long increaseTotalBlocks(long blocks) {
if (totalValueSet) {
return totalBlocks;
}
totalBlocks += blocks;
return totalBlocks;
}

synchronized public void setTotalSize(long size) {
if (totalValueSet) {
return;
}
totalSize = size;
}

synchronized public long increaseTotalSize(long size) {
if (totalValueSet) {
return totalSize;
}
totalSize += size;
return totalSize;
}

synchronized public long increaseMovedBlocks(long blocks) {
synchronized public int increaseMovedBlocks(int blocks) {
movedBlocks += blocks;
return movedBlocks;
}

synchronized public void setMovedBlocks(long blocks) {
movedBlocks = blocks;
}

synchronized public long getMovedBlocks() {
public int getMovedBlocks() {
return movedBlocks;
}

synchronized public void completeTotalValueSet() {
totalValueSet = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
Expand Down Expand Up @@ -185,6 +184,19 @@ public static boolean allMoveFinished(List<ReplicaMove> allMoves) {
return true;
}

public static void countStatus(List<ReplicaMove> allMoves, int[] ret) {
ret[0] = 0;
ret[1] = 0;
for (ReplicaMove move : allMoves) {
if (move.status.isFinished()) {
ret[0] += 1;
if (move.status.isSuccessful()) {
ret[1] += 1;
}
}
}
}

/**
* Remove successful moves and refresh the status of remaining ones for a new iteration.
* @param allMoves
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartdata.hdfs.CompatibilityHelperLoader;
import org.smartdata.hdfs.scheduler.MovePlanStatistics;
import org.smartdata.model.action.FileMovePlan;
import org.smartdata.hdfs.action.move.DBlock;
import org.smartdata.hdfs.action.move.MLocation;
import org.smartdata.hdfs.action.move.MoverStatus;
import org.smartdata.hdfs.action.move.Source;
import org.smartdata.hdfs.action.move.StorageGroup;
import org.smartdata.hdfs.action.move.StorageMap;
Expand All @@ -52,8 +52,8 @@
/**
* A processor to do Mover action.
*/
public class MoverProcessor {
static final Logger LOG = LoggerFactory.getLogger(MoverProcessor.class);
public class MovePlanMaker {
static final Logger LOG = LoggerFactory.getLogger(MovePlanMaker.class);

private final DFSClient dfs;
private NetworkTopology networkTopology;
Expand All @@ -62,19 +62,19 @@ public class MoverProcessor {

private final BlockStoragePolicy[] blockStoragePolicies;
private long movedBlocks = 0;
private final MoverStatus moverStatus;
private final MovePlanStatistics statistics;
private FileMovePlan schedulePlan;

public MoverProcessor(DFSClient dfsClient, StorageMap storages,
NetworkTopology cluster, MoverStatus moverStatus) throws IOException {
public MovePlanMaker(DFSClient dfsClient, StorageMap storages,
NetworkTopology cluster, MovePlanStatistics statistics) throws IOException {
this.dfs = dfsClient;
this.storages = storages;
this.networkTopology = cluster;
this.retryCount = new AtomicInteger(1);
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
BlockStoragePolicySuite.ID_BIT_LENGTH];
initStoragePolicies();
this.moverStatus = moverStatus;
this.statistics = statistics;
}

private void initStoragePolicies() throws IOException {
Expand Down Expand Up @@ -152,8 +152,8 @@ private void processFile(String fullPath, HdfsLocatedFileStatus status) {
final StorageTypeDiff diff =
new StorageTypeDiff(types, CompatibilityHelperLoader.getHelper().getStorageTypes(lb));
int remainingReplications = diff.removeOverlap(true);
moverStatus.increaseTotalSize(lb.getBlockSize() * remainingReplications);
moverStatus.increaseTotalBlocks(remainingReplications);
statistics.increaseTotalSize(lb.getBlockSize() * remainingReplications);
statistics.increaseTotalBlocks(remainingReplications);
if (remainingReplications != 0) {
scheduleMoveBlock(diff, lb);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.hdfs.scheduler;

public class MovePlanStatistics {
private long totalBlocks;
private long totalSize;

public MovePlanStatistics() {
totalBlocks = 0;
totalSize = 0;
}

public MovePlanStatistics(long totalBlocks, long totalSize) {
this.totalBlocks = totalBlocks;
this.totalSize = totalSize;
}

public void increaseTotalBlocks(int numBlocks) {
totalBlocks += numBlocks;
}

public void increaseTotalSize(long size) {
totalSize += size;
}

public long getTotalBlocks() {
return totalBlocks;
}

public long getTotalSize() {
return totalSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@
import org.smartdata.hdfs.HadoopUtil;
import org.smartdata.hdfs.action.HdfsAction;
import org.smartdata.hdfs.action.MoveFileAction;
import org.smartdata.hdfs.action.move.MoverStatus;
import org.smartdata.hdfs.metric.fetcher.DatanodeStorageReportProcTask;
import org.smartdata.hdfs.metric.fetcher.MoverProcessor;
import org.smartdata.hdfs.metric.fetcher.MovePlanMaker;
import org.smartdata.metastore.ActionSchedulerService;
import org.smartdata.metastore.MetaStore;
import org.smartdata.model.ActionInfo;
Expand All @@ -46,8 +45,8 @@

public class MoverScheduler extends ActionSchedulerService {
private DFSClient client;
private MoverStatus moverStatus;
private MoverProcessor processor;
private MovePlanStatistics statistics;
private MovePlanMaker planMaker;
private URI nnUri;
private long dnInfoUpdateInterval = 2 * 60 * 1000;
private ScheduledExecutorService updateService;
Expand All @@ -64,7 +63,7 @@ public MoverScheduler(SmartContext context, MetaStore metaStore)

public void init() throws IOException {
this.client = new DFSClient(nnUri, getContext().getConf());
moverStatus = new MoverStatus();
statistics = new MovePlanStatistics();
updateService = Executors.newScheduledThreadPool(1);
}

Expand All @@ -78,7 +77,7 @@ public void start() throws IOException {
DatanodeStorageReportProcTask task =
new DatanodeStorageReportProcTask(client, getContext().getConf());
task.run();
processor = new MoverProcessor(client, task.getStorages(), task.getNetworkTopology(), moverStatus);
planMaker = new MovePlanMaker(client, task.getStorages(), task.getNetworkTopology(), statistics);

updateServiceFuture = updateService.scheduleAtFixedRate(
new UpdateClusterInfoTask(task),
Expand Down Expand Up @@ -125,7 +124,7 @@ public ScheduleResult onSchedule(ActionInfo actionInfo, LaunchAction action) {

try {
client.setStoragePolicy(file, policy);
FileMovePlan plan = processor.processNamespace(new Path(file));
FileMovePlan plan = planMaker.processNamespace(new Path(file));
plan.setNamenode(nnUri);
action.getArgs().put(MoveFileAction.MOVE_PLAN, plan.toString());
return ScheduleResult.SUCCESS;
Expand Down Expand Up @@ -159,7 +158,7 @@ public UpdateClusterInfoTask(DatanodeStorageReportProcTask task) {
@Override
public void run() {
task.run();
processor.updateClusterInfo(task.getStorages(), task.getNetworkTopology());
planMaker.updateClusterInfo(task.getStorages(), task.getNetworkTopology());
}
}
}

0 comments on commit 6b52d2d

Please sign in to comment.