diff --git a/.arcconfig b/.arcconfig deleted file mode 100644 index 52351a40..00000000 --- a/.arcconfig +++ /dev/null @@ -1,4 +0,0 @@ -{ - "project_id" : "Pegasus", - "conduit_uri" : "https://phabricator.d.xiaomi.net/" -} diff --git a/pom.xml b/pom.xml index 72519ab1..c6025866 100755 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,17 @@ + + org.apache.maven.plugins + maven-jar-plugin + + + + true + + + + org.apache.maven.plugins maven-source-plugin diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java b/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java index 90748650..c7a92ba9 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/FutureGroup.java @@ -21,8 +21,11 @@ public void waitAllCompleteOrOneFail(int timeoutMillis) throws PException { waitAllCompleteOrOneFail(null, timeoutMillis); } - // Waits until all future tasks complete but terminate if one fails. - // `results` is nullable + /** + * Waits until all future tasks complete but terminate if one fails. + * + * @param results is nullable, each element is the result of the Future. + */ public void waitAllCompleteOrOneFail(List results, int timeoutMillis) throws PException { int timeLimit = timeoutMillis; long duration = 0; @@ -32,7 +35,6 @@ public void waitAllCompleteOrOneFail(List results, int timeoutMillis) th long startTs = System.currentTimeMillis(); fu.await(timeLimit); duration = System.currentTimeMillis() - startTs; - assert duration >= 0; timeLimit -= duration; } catch (Exception e) { throw new PException("async task #[" + i + "] await failed: " + e.toString()); diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PException.java b/src/main/java/com/xiaomi/infra/pegasus/client/PException.java index 53f3d4b2..7692ec17 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PException.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PException.java @@ -15,21 +15,22 @@ */ public class PException extends Exception { private static final long serialVersionUID = 4436491238550521203L; + private static final String versionPrefix = loadVersion() + ": "; public PException() { super(); } public PException(String message, Throwable cause) { - super(message, cause); + super(versionPrefix + message, cause); } public PException(String message) { - super(message); + super(versionPrefix + message); } public PException(Throwable cause) { - super(cause); + super(versionPrefix + cause.toString(), cause); } static PException threadInterrupted(String tableName, InterruptedException e) { @@ -47,4 +48,12 @@ static PException timeout(String tableName, int timeout, TimeoutException e) { "[table=%s, timeout=%dms] Timeout on Future await: %s", tableName, timeout, e.getMessage()))); } + + private static String loadVersion() { + String ver = PException.class.getPackage().getImplementationVersion(); + if (ver == null) { + return "{version}"; + } + return ver; + } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java index 7a9f8de1..f99c52c4 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/PegasusTable.java @@ -1730,7 +1730,7 @@ public List getUnorderedScanners(int maxSplitCount, Sca return ret; } - private void handleReplicaException( + static void handleReplicaException( DefaultPromise promise, client_operator op, Table table, int timeout) { gpid gPid = op.get_gpid(); ReplicaConfiguration replicaConfiguration = diff --git a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java index 56b6f54c..90db903a 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java +++ b/src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java @@ -4,6 +4,16 @@ package com.xiaomi.infra.pegasus.client; +import com.xiaomi.infra.pegasus.apps.update_request; +import com.xiaomi.infra.pegasus.base.blob; +import com.xiaomi.infra.pegasus.base.error_code; +import com.xiaomi.infra.pegasus.base.gpid; +import com.xiaomi.infra.pegasus.operator.rrdb_put_operator; +import com.xiaomi.infra.pegasus.rpc.KeyHasher; +import com.xiaomi.infra.pegasus.rpc.async.ClusterManager; +import com.xiaomi.infra.pegasus.rpc.async.TableHandler; +import io.netty.util.concurrent.DefaultPromise; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import org.junit.Assert; import org.junit.Test; @@ -13,7 +23,7 @@ public class TestPException { public void testThreadInterrupted() throws Exception { PException ex = PException.threadInterrupted("test", new InterruptedException("intxxx")); Assert.assertEquals( - "com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_THREAD_INTERRUPTED: [table=test] Thread was interrupted: intxxx", + "{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_THREAD_INTERRUPTED: [table=test] Thread was interrupted: intxxx", ex.getMessage()); } @@ -21,7 +31,49 @@ public void testThreadInterrupted() throws Exception { public void testTimeout() throws Exception { PException ex = PException.timeout("test", 1000, new TimeoutException("tmxxx")); Assert.assertEquals( - "com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=test, timeout=1000ms] Timeout on Future await: tmxxx", + "{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=test, timeout=1000ms] Timeout on Future await: tmxxx", ex.getMessage()); } + + @Test + public void testVersion() { + // Test the constructors of PException + + PException ex = new PException("test"); + Assert.assertEquals("{version}: test", ex.getMessage()); + + ex = new PException("test", new TimeoutException()); + Assert.assertEquals("{version}: test", ex.getMessage()); + } + + @Test + public void testHandleReplicationException() throws Exception { + String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"}; + ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList); + TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT); + DefaultPromise promise = table.newPromise(); + update_request req = new update_request(new blob(), new blob(), 100); + gpid gpid = table.getGpidByHash(1); + rrdb_put_operator op = new rrdb_put_operator(gpid, table.getTableName(), req, 0); + op.rpc_error.errno = error_code.error_types.ERR_OBJECT_NOT_FOUND; + + // set failure in promise, the exception is thrown as ExecutionException. + PegasusTable.handleReplicaException(promise, op, table, 1000); + try { + promise.get(); + } catch (ExecutionException e) { + TableHandler.ReplicaConfiguration replicaConfig = table.getReplicaConfig(gpid.get_pidx()); + String server = replicaConfig.primary.get_ip() + ":" + replicaConfig.primary.get_port(); + + String msg = + String.format( + "com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_OBJECT_NOT_FOUND: [table=temp,operation=put,replicaServer=%s,gpid=(%s)] The replica server doesn't serve this partition!", + server, gpid.toString()); + Assert.assertEquals(e.getMessage(), msg); + return; + } catch (InterruptedException e) { + Assert.fail(); + } + Assert.fail(); + } }