Skip to content

Commit

Permalink
feat: have package version prefixed in PException (XiaoMi#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and foreverneverer committed Nov 7, 2019
1 parent 80076f4 commit 7012e6f
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 13 deletions.
4 changes: 0 additions & 4 deletions .arcconfig

This file was deleted.

11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ public void waitAllCompleteOrOneFail(int timeoutMillis) throws PException {
waitAllCompleteOrOneFail(null, timeoutMillis);
}

// Waits until all future tasks complete but terminate if one fails.
// `results` is nullable
/**
* Waits until all future tasks complete but terminate if one fails.
*
* @param results is nullable, each element is the result of the Future.
*/
public void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) throws PException {
int timeLimit = timeoutMillis;
long duration = 0;
Expand All @@ -32,7 +35,6 @@ public void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) th
long startTs = System.currentTimeMillis();
fu.await(timeLimit);
duration = System.currentTimeMillis() - startTs;
assert duration >= 0;
timeLimit -= duration;
} catch (Exception e) {
throw new PException("async task #[" + i + "] await failed: " + e.toString());
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/com/xiaomi/infra/pegasus/client/PException.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
*/
public class PException extends Exception {
private static final long serialVersionUID = 4436491238550521203L;
private static final String versionPrefix = loadVersion() + ": ";

public PException() {
super();
}

public PException(String message, Throwable cause) {
super(message, cause);
super(versionPrefix + message, cause);
}

public PException(String message) {
super(message);
super(versionPrefix + message);
}

public PException(Throwable cause) {
super(cause);
super(versionPrefix + cause.toString(), cause);
}

static PException threadInterrupted(String tableName, InterruptedException e) {
Expand All @@ -47,4 +48,12 @@ static PException timeout(String tableName, int timeout, TimeoutException e) {
"[table=%s, timeout=%dms] Timeout on Future await: %s",
tableName, timeout, e.getMessage())));
}

private static String loadVersion() {
String ver = PException.class.getPackage().getImplementationVersion();
if (ver == null) {
return "{version}";
}
return ver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1730,7 +1730,7 @@ public List<PegasusScannerInterface> getUnorderedScanners(int maxSplitCount, Sca
return ret;
}

private void handleReplicaException(
static void handleReplicaException(
DefaultPromise promise, client_operator op, Table table, int timeout) {
gpid gPid = op.get_gpid();
ReplicaConfiguration replicaConfiguration =
Expand Down
56 changes: 54 additions & 2 deletions src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.apps.update_request;
import com.xiaomi.infra.pegasus.base.blob;
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.base.gpid;
import com.xiaomi.infra.pegasus.operator.rrdb_put_operator;
import com.xiaomi.infra.pegasus.rpc.KeyHasher;
import com.xiaomi.infra.pegasus.rpc.async.ClusterManager;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import io.netty.util.concurrent.DefaultPromise;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -13,15 +23,57 @@ public class TestPException {
public void testThreadInterrupted() throws Exception {
PException ex = PException.threadInterrupted("test", new InterruptedException("intxxx"));
Assert.assertEquals(
"com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_THREAD_INTERRUPTED: [table=test] Thread was interrupted: intxxx",
"{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_THREAD_INTERRUPTED: [table=test] Thread was interrupted: intxxx",
ex.getMessage());
}

@Test
public void testTimeout() throws Exception {
PException ex = PException.timeout("test", 1000, new TimeoutException("tmxxx"));
Assert.assertEquals(
"com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=test, timeout=1000ms] Timeout on Future await: tmxxx",
"{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=test, timeout=1000ms] Timeout on Future await: tmxxx",
ex.getMessage());
}

@Test
public void testVersion() {
// Test the constructors of PException

PException ex = new PException("test");
Assert.assertEquals("{version}: test", ex.getMessage());

ex = new PException("test", new TimeoutException());
Assert.assertEquals("{version}: test", ex.getMessage());
}

@Test
public void testHandleReplicationException() throws Exception {
String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"};
ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList);
TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT);
DefaultPromise<Void> promise = table.newPromise();
update_request req = new update_request(new blob(), new blob(), 100);
gpid gpid = table.getGpidByHash(1);
rrdb_put_operator op = new rrdb_put_operator(gpid, table.getTableName(), req, 0);
op.rpc_error.errno = error_code.error_types.ERR_OBJECT_NOT_FOUND;

// set failure in promise, the exception is thrown as ExecutionException.
PegasusTable.handleReplicaException(promise, op, table, 1000);
try {
promise.get();
} catch (ExecutionException e) {
TableHandler.ReplicaConfiguration replicaConfig = table.getReplicaConfig(gpid.get_pidx());
String server = replicaConfig.primary.get_ip() + ":" + replicaConfig.primary.get_port();

String msg =
String.format(
"com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_OBJECT_NOT_FOUND: [table=temp,operation=put,replicaServer=%s,gpid=(%s)] The replica server doesn't serve this partition!",
server, gpid.toString());
Assert.assertEquals(e.getMessage(), msg);
return;
} catch (InterruptedException e) {
Assert.fail();
}
Assert.fail();
}
}

0 comments on commit 7012e6f

Please sign in to comment.