Skip to content

Commit

Permalink
Improve: Passing workerId to WorkerStat & Skip wait worker close if m…
Browse files Browse the repository at this point in the history
…aster executes failed (#292)

* Improve code

* Fix ci
  • Loading branch information
coderzc authored Dec 6, 2023
1 parent 2be0c28 commit e0b484a
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ComputeManager {
private static final Logger LOG = Log.logger(ComputeManager.class);
private static final String PREFIX = "partition-compute-executor-%s";

private final int workerId;
private final ComputerContext context;
private final Managers managers;

Expand All @@ -54,7 +55,8 @@ public class ComputeManager {
private final MessageSendManager sendManager;
private final ExecutorService computeExecutor;

public ComputeManager(ComputerContext context, Managers managers) {
public ComputeManager(int workerId, ComputerContext context, Managers managers) {
this.workerId = workerId;
this.context = context;
this.managers = managers;
this.partitions = new HashMap<>();
Expand All @@ -73,7 +75,7 @@ private Integer partitionComputeThreadNum(Config config) {
}

public WorkerStat input() {
WorkerStat workerStat = new WorkerStat();
WorkerStat workerStat = new WorkerStat(this.workerId);
this.recvManager.waitReceivedAllMessages();

Map<Integer, PeekableIterator<KvEntry>> vertices =
Expand Down Expand Up @@ -142,7 +144,7 @@ public void takeRecvedMessages() {
public WorkerStat compute(WorkerContext context, int superstep) {
this.sendManager.startSend(MessageType.MSG);

WorkerStat workerStat = new WorkerStat();
WorkerStat workerStat = new WorkerStat(this.workerId);
Map<Integer, PartitionStat> stats = new ConcurrentHashMap<>();

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class MasterService implements Closeable {
private final Managers managers;

private volatile boolean inited;
private volatile boolean failed;
private volatile boolean closed;
private Config config;
private volatile Bsp4Master bsp4Master;
Expand Down Expand Up @@ -153,7 +154,9 @@ public synchronized void close() {

this.masterComputation.close(new DefaultMasterContext());

this.bsp4Master.waitWorkersCloseDone();
if (!failed) {
this.bsp4Master.waitWorkersCloseDone();
}

this.managers.closeAll(this.config);

Expand Down Expand Up @@ -183,97 +186,103 @@ public void execute() {
this.checkInited();

LOG.info("{} MasterService execute", this);
/*
* Step 1: Determines which superstep to start from, and resume this
* superstep.
*/
int superstep = this.superstepToResume();
LOG.info("{} MasterService resume from superstep: {}",
this, superstep);
try {
/*
* Step 1: Determines which superstep to start from, and resume this
* superstep.
*/
int superstep = this.superstepToResume();
LOG.info("{} MasterService resume from superstep: {}",
this, superstep);

/*
* TODO: Get input splits from HugeGraph if resume from
* Constants.INPUT_SUPERSTEP.
*/
this.bsp4Master.masterResumeDone(superstep);
/*
* TODO: Get input splits from HugeGraph if resume from
* Constants.INPUT_SUPERSTEP.
*/
this.bsp4Master.masterResumeDone(superstep);

/*
* Step 2: Input superstep for loading vertices and edges.
* This step may be skipped if resume from other superstep than
* Constants.INPUT_SUPERSTEP.
*/
SuperstepStat superstepStat;
watcher.start();
if (superstep == Constants.INPUT_SUPERSTEP) {
superstepStat = this.inputstep();
superstep++;
} else {
// TODO: Get superstepStat from bsp service.
superstepStat = null;
}
watcher.stop();
LOG.info("{} MasterService input step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
E.checkState(superstep <= this.maxSuperStep,
"The superstep {} can't be > maxSuperStep {}",
superstep, this.maxSuperStep);

watcher.reset();
watcher.start();
// Step 3: Iteration computation of all supersteps.
for (; superstepStat.active(); superstep++) {
LOG.info("{} MasterService superstep {} started",
this, superstep);
/*
* Superstep iteration. The steps in each superstep are:
* 1) Master waits workers superstep prepared.
* 2) All managers call beforeSuperstep.
* 3) Master signals the workers that the master prepared
* superstep.
* 4) Master waits the workers do vertex computation.
* 5) Master signal the workers that all workers have finished
* vertex computation.
* 6) Master waits the workers end the superstep, and get
* superstepStat.
* 7) Master compute whether to continue the next superstep
* iteration.
* 8) All managers call afterSuperstep.
* 9) Master signals the workers with superstepStat, and workers
* know whether to continue the next superstep iteration.
* Step 2: Input superstep for loading vertices and edges.
* This step may be skipped if resume from other superstep than
* Constants.INPUT_SUPERSTEP.
*/
this.bsp4Master.waitWorkersStepPrepareDone(superstep);
this.managers.beforeSuperstep(this.config, superstep);
this.bsp4Master.masterStepPrepareDone(superstep);

this.bsp4Master.waitWorkersStepComputeDone(superstep);
this.bsp4Master.masterStepComputeDone(superstep);
List<WorkerStat> workerStats =
this.bsp4Master.waitWorkersStepDone(superstep);
superstepStat = SuperstepStat.from(workerStats);
SuperstepContext context = new SuperstepContext(superstep,
superstepStat);
// Call master compute(), note the worker afterSuperstep() is done
boolean masterContinue = this.masterComputation.compute(context);
if (this.finishedIteration(masterContinue, context)) {
superstepStat.inactivate();
SuperstepStat superstepStat;
watcher.start();
if (superstep == Constants.INPUT_SUPERSTEP) {
superstepStat = this.inputstep();
superstep++;
} else {
// TODO: Get superstepStat from bsp service.
superstepStat = null;
}
this.managers.afterSuperstep(this.config, superstep);
this.bsp4Master.masterStepDone(superstep, superstepStat);

LOG.info("{} MasterService superstep {} finished",
this, superstep);
watcher.stop();
LOG.info("{} MasterService input step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
E.checkState(superstep <= this.maxSuperStep,
"The superstep {} can't be > maxSuperStep {}",
superstep, this.maxSuperStep);

watcher.reset();
watcher.start();
// Step 3: Iteration computation of all supersteps.
for (; superstepStat.active(); superstep++) {
LOG.info("{} MasterService superstep {} started",
this, superstep);
/*
* Superstep iteration. The steps in each superstep are:
* 1) Master waits workers superstep prepared.
* 2) All managers call beforeSuperstep.
* 3) Master signals the workers that the master prepared
* superstep.
* 4) Master waits the workers do vertex computation.
* 5) Master signal the workers that all workers have finished
* vertex computation.
* 6) Master waits the workers end the superstep, and get
* superstepStat.
* 7) Master compute whether to continue the next superstep
* iteration.
* 8) All managers call afterSuperstep.
* 9) Master signals the workers with superstepStat, and workers
* know whether to continue the next superstep iteration.
*/
this.bsp4Master.waitWorkersStepPrepareDone(superstep);
this.managers.beforeSuperstep(this.config, superstep);
this.bsp4Master.masterStepPrepareDone(superstep);

this.bsp4Master.waitWorkersStepComputeDone(superstep);
this.bsp4Master.masterStepComputeDone(superstep);
List<WorkerStat> workerStats =
this.bsp4Master.waitWorkersStepDone(superstep);
superstepStat = SuperstepStat.from(workerStats);
SuperstepContext context = new SuperstepContext(superstep,
superstepStat);
// Call master compute(), note the worker afterSuperstep() is done
boolean masterContinue = this.masterComputation.compute(context);
if (this.finishedIteration(masterContinue, context)) {
superstepStat.inactivate();
}
this.managers.afterSuperstep(this.config, superstep);
this.bsp4Master.masterStepDone(superstep, superstepStat);

LOG.info("{} MasterService superstep {} finished",
this, superstep);
}
watcher.stop();
LOG.info("{} MasterService compute step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));

watcher.reset();
watcher.start();
// Step 4: Output superstep for outputting results.
this.outputstep();
watcher.stop();
LOG.info("{} MasterService output step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
} catch (Throwable throwable) {
LOG.error("{} MasterService execute failed", this, throwable);
failed = true;
throw throwable;
}
watcher.stop();
LOG.info("{} MasterService compute step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));

watcher.reset();
watcher.start();
// Step 4: Output superstep for outputting results.
this.outputstep();
watcher.stop();
LOG.info("{} MasterService output step cost: {}",
this, TimeUtil.readableTime(watcher.getTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void init(Config config) {
dm.connect(worker.id(), worker.hostname(), worker.dataPort());
}

this.computeManager = new ComputeManager(this.context, this.managers);
this.computeManager = new ComputeManager(this.workerInfo.id(), this.context, this.managers);

this.managers.initedAll(this.config);
LOG.info("{} WorkerService initialized", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git"
git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain

cd hugegraph-toolchain
mvn install -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp

cd hugegraph-loader
tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void setup() {
this.connectionId = new ConnectionId(new InetSocketAddress("localhost",
8081),
0);
this.computeManager = new ComputeManager(context(), this.managers);
this.computeManager = new ComputeManager(0, context(), this.managers);
}

@After
Expand Down

0 comments on commit e0b484a

Please sign in to comment.