Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: have package version prefixed in PException #63

Merged
merged 3 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .arcconfig

This file was deleted.

11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ public void waitAllCompleteOrOneFail(int timeoutMillis) throws PException {
waitAllCompleteOrOneFail(null, timeoutMillis);
}

// Waits until all future tasks complete but terminate if one fails.
// `results` is nullable
/**
* Waits until all future tasks complete but terminate if one fails.
*
* @param results is nullable, each element is the result of the Future.
*/
public void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) throws PException {
int timeLimit = timeoutMillis;
long duration = 0;
Expand All @@ -32,7 +35,6 @@ public void waitAllCompleteOrOneFail(List<Result> results, int timeoutMillis) th
long startTs = System.currentTimeMillis();
fu.await(timeLimit);
duration = System.currentTimeMillis() - startTs;
assert duration >= 0;
timeLimit -= duration;
} catch (Exception e) {
throw new PException("async task #[" + i + "] await failed: " + e.toString());
Expand Down
15 changes: 12 additions & 3 deletions src/main/java/com/xiaomi/infra/pegasus/client/PException.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,22 @@
*/
public class PException extends Exception {
private static final long serialVersionUID = 4436491238550521203L;
private static final String versionPrefix = loadVersion() + ": ";

public PException() {
super();
}

public PException(String message, Throwable cause) {
super(message, cause);
super(versionPrefix + message, cause);
}

public PException(String message) {
super(message);
super(versionPrefix + message);
}

public PException(Throwable cause) {
super(cause);
super(versionPrefix + cause.toString(), cause);
}

static PException threadInterrupted(String tableName, InterruptedException e) {
Expand All @@ -47,4 +48,12 @@ static PException timeout(String tableName, int timeout, TimeoutException e) {
"[table=%s, timeout=%dms] Timeout on Future await: %s",
tableName, timeout, e.getMessage())));
}

private static String loadVersion() {
String ver = PException.class.getPackage().getImplementationVersion();
if (ver == null) {
return "{version}";
}
return ver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1730,7 +1730,7 @@ public List<PegasusScannerInterface> getUnorderedScanners(int maxSplitCount, Sca
return ret;
}

private void handleReplicaException(
static void handleReplicaException(
DefaultPromise promise, client_operator op, Table table, int timeout) {
gpid gPid = op.get_gpid();
ReplicaConfiguration replicaConfiguration =
Expand Down
56 changes: 54 additions & 2 deletions src/test/java/com/xiaomi/infra/pegasus/client/TestPException.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@

package com.xiaomi.infra.pegasus.client;

import com.xiaomi.infra.pegasus.apps.update_request;
import com.xiaomi.infra.pegasus.base.blob;
import com.xiaomi.infra.pegasus.base.error_code;
import com.xiaomi.infra.pegasus.base.gpid;
import com.xiaomi.infra.pegasus.operator.rrdb_put_operator;
import com.xiaomi.infra.pegasus.rpc.KeyHasher;
import com.xiaomi.infra.pegasus.rpc.async.ClusterManager;
import com.xiaomi.infra.pegasus.rpc.async.TableHandler;
import io.netty.util.concurrent.DefaultPromise;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -13,15 +23,57 @@ public class TestPException {
public void testThreadInterrupted() throws Exception {
PException ex = PException.threadInterrupted("test", new InterruptedException("intxxx"));
Assert.assertEquals(
"com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_THREAD_INTERRUPTED: [table=test] Thread was interrupted: intxxx",
"{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_THREAD_INTERRUPTED: [table=test] Thread was interrupted: intxxx",
ex.getMessage());
}

@Test
public void testTimeout() throws Exception {
PException ex = PException.timeout("test", 1000, new TimeoutException("tmxxx"));
Assert.assertEquals(
"com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=test, timeout=1000ms] Timeout on Future await: tmxxx",
"{version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_TIMEOUT: [table=test, timeout=1000ms] Timeout on Future await: tmxxx",
ex.getMessage());
}

@Test
public void testVersion() {
// Test the constructors of PException

PException ex = new PException("test");
Assert.assertEquals("{version}: test", ex.getMessage());

ex = new PException("test", new TimeoutException());
Assert.assertEquals("{version}: test", ex.getMessage());
}

foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testHandleReplicationException() throws Exception {
String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"};
ClusterManager manager = new ClusterManager(1000, 1, false, null, 60, metaList);
TableHandler table = manager.openTable("temp", KeyHasher.DEFAULT);
DefaultPromise<Void> promise = table.newPromise();
update_request req = new update_request(new blob(), new blob(), 100);
gpid gpid = table.getGpidByHash(1);
rrdb_put_operator op = new rrdb_put_operator(gpid, table.getTableName(), req, 0);
op.rpc_error.errno = error_code.error_types.ERR_OBJECT_NOT_FOUND;

// set failure in promise, the exception is thrown as ExecutionException.
PegasusTable.handleReplicaException(promise, op, table, 1000);
try {
promise.get();
} catch (ExecutionException e) {
TableHandler.ReplicaConfiguration replicaConfig = table.getReplicaConfig(gpid.get_pidx());
String server = replicaConfig.primary.get_ip() + ":" + replicaConfig.primary.get_port();

String msg =
String.format(
"com.xiaomi.infra.pegasus.client.PException: {version}: com.xiaomi.infra.pegasus.rpc.ReplicationException: ERR_OBJECT_NOT_FOUND: [table=temp,operation=put,replicaServer=%s,gpid=(%s)] The replica server doesn't serve this partition!",
server, gpid.toString());
Assert.assertEquals(e.getMessage(), msg);
return;
} catch (InterruptedException e) {
Assert.fail();
}
Assert.fail();
}
}