Skip to content

Commit

Permalink
convert exceptions into CacheException prior calling to NearlineReque…
Browse files Browse the repository at this point in the history
…st#failed
  • Loading branch information
kofemann committed Apr 5, 2023
1 parent 6799f37 commit ef389a6
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 18 deletions.
36 changes: 22 additions & 14 deletions src/main/java/org/dcache/nearline/cta/CtaNearlineStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Empty;
import cta.admin.CtaAdmin.Version;
import diskCacheV111.util.CacheException;
import io.grpc.ChannelCredentials;
import io.grpc.ConnectivityState;
import io.grpc.Deadline;
Expand Down Expand Up @@ -206,9 +207,7 @@ public void onNext(CreateResponse createResponse) {
@Override
public void onError(Throwable t) {
LOGGER.error("Failed to submit create request {}", t.getMessage());
Exception e =
t instanceof Exception ? Exception.class.cast(t) : new Exception(t);
fr.failed(e);
fr.failed(asCacheException(t));
}

@Override
Expand All @@ -220,7 +219,7 @@ public void onCompleted() {
} catch (ExecutionException | InterruptedException e) {
Throwable t = Throwables.getRootCause(e);
LOGGER.error("Failed to activate flush request: {}", t.getMessage());
fr.failed(e);
fr.failed(asCacheException(t));
}
}

Expand Down Expand Up @@ -290,9 +289,7 @@ public void cancel() {
@Override
public void onError(Throwable t) {
LOGGER.error("Failed to submit archive request {}", t.getMessage());
Exception e =
t instanceof Exception ? Exception.class.cast(t) : new Exception(t);
r.failed(e);
r.failed(asCacheException(t));
}

@Override
Expand Down Expand Up @@ -343,7 +340,7 @@ public void completed(Set<Checksum> checksums) {
Throwable t = Throwables.getRootCause(e);
LOGGER.error("Failed to activate/allocate space for retrieve request: {}",
t.getMessage());
r.failed(e);
r.failed(asCacheException(t));
continue;
}

Expand Down Expand Up @@ -374,9 +371,7 @@ public void cancel() {
@Override
public void onError(Throwable t) {
LOGGER.error("Failed to submit stage request {}", t.getMessage());
Exception e =
t instanceof Exception ? Exception.class.cast(t) : new Exception(t);
r.failed(e);
r.failed(asCacheException(t));
}

@Override
Expand Down Expand Up @@ -409,9 +404,7 @@ public void onNext(Empty value) {
@Override
public void onError(Throwable t) {
LOGGER.error("Failed to submit stage request {}", t.getMessage());
Exception e =
t instanceof Exception ? Exception.class.cast(t) : new Exception(t);
r.failed(e);
r.failed(asCacheException(t));
}

@Override
Expand Down Expand Up @@ -575,4 +568,19 @@ int getPendingRequestsCount() {
PendingRequest getRequest(String id) {
return pendingRequests.get(id);
}


/**
* Convert a given Throwable into CacheException to ensure serialization.
* @param e origianl exception.
* @return corresponding cache exception
*/
private CacheException asCacheException(Throwable e) {

if (e.getClass().isAssignableFrom(CacheException.class)) {
return CacheException.class.cast(e);
}

return new CacheException(CacheException.UNEXPECTED_SYSTEM_EXCEPTION, e.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.mockito.Mockito.when;

import com.google.common.util.concurrent.Futures;
import diskCacheV111.util.CacheException;
import diskCacheV111.vehicles.GenericStorageInfo;
import eu.emi.security.authn.x509.impl.CertificateUtils;
import eu.emi.security.authn.x509.impl.CertificateUtils.Encoding;
Expand Down Expand Up @@ -199,7 +200,7 @@ public void testWithoutTLS() {
var request = mockedFlushRequest();
driver.flush(Set.of(request));
waitToComplete();
verify(request).failed(any(io.grpc.StatusRuntimeException.class));
verify(request).failed(any(CacheException.class));
}

@Test
Expand Down Expand Up @@ -637,7 +638,7 @@ public void testFailOnLostStageMessage() {
driver.stage(Set.of(request));

cta.waitToReply(4);
verify(request, times(1)).failed(any(io.grpc.StatusRuntimeException.class));
verify(request, times(1)).failed(any(CacheException.class));
}


Expand All @@ -653,7 +654,7 @@ public void testFailOnLostFlushMessage() {
driver.flush(Set.of(request));

cta.waitToReply(4);
verify(request, times(1)).failed(any(io.grpc.StatusRuntimeException.class));
verify(request, times(1)).failed(any(CacheException.class));
}

@Test
Expand All @@ -668,7 +669,7 @@ public void testFailOnLostDeleteMessage() {
driver.remove(Set.of(request));

cta.waitToReply(4);
verify(request, times(1)).failed(any(io.grpc.StatusRuntimeException.class));
verify(request, times(1)).failed(any(CacheException.class));
}

void waitToComplete() {
Expand Down

0 comments on commit ef389a6

Please sign in to comment.