Skip to content

Commit

Permalink
Eliminate NPE after recovering from a temporary name resolution failu…
Browse files Browse the repository at this point in the history
…re. (#11298)

* Eliminate NPE after recovering from a temporary name resolution failure.

* Add test case for 2 failing subchannels to make sure it causes channel to go into TF.
  • Loading branch information
larry-safran authored Jun 21, 2024
1 parent 0fea7dd commit 9b39b3e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,32 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {

@Override
public void handleNameResolutionError(Status error) {
if (rawConnectivityState == SHUTDOWN) {
return;
}

for (SubchannelData subchannelData : subchannels.values()) {
subchannelData.getSubchannel().shutdown();
}
subchannels.clear();
if (addressIndex != null) {
addressIndex.updateGroups(null);
}
rawConnectivityState = TRANSIENT_FAILURE;
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
}

void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
ConnectivityState newState = stateInfo.getState();

SubchannelData subchannelData = subchannels.get(getAddress(subchannel));
// Shutdown channels/previously relevant subchannels can still callback with state updates.
// To prevent pickers from returning these obsolete subchannels, this logic
// is included to check if the current list of active subchannels includes this subchannel.
SubchannelData subchannelData = subchannels.get(getAddress(subchannel));
if (subchannelData == null || subchannelData.getSubchannel() != subchannel) {
return;
}

if (newState == SHUTDOWN) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,37 @@ public void nameResolutionSuccessAfterError() {
assertEquals(mockSubchannel1, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
}

@Test
public void nameResolutionTemporaryError() {
List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0));
InOrder inOrder = inOrder(mockHelper, mockSubchannel1);
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener1 = stateListenerCaptor.getValue();
stateListener1.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertEquals(mockSubchannel1, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());

loadBalancer.handleNameResolutionError(
Status.UNAVAILABLE.withDescription("nameResolutionError"));
inOrder.verify(mockHelper).updateBalancingState(
eq(TRANSIENT_FAILURE), any(SubchannelPicker.class));

loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(servers).setAttributes(affinity).build());
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener2 = stateListenerCaptor.getValue();

assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());

stateListener2.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
assertEquals(mockSubchannel1, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
}


@Test
public void nameResolutionErrorWithStateChanges() {
List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0));
Expand Down Expand Up @@ -725,6 +756,33 @@ public void requestConnection() {
}
}

@Test
public void failChannelWhenSubchannelsFail() {
List<EquivalentAddressGroup> newServers = Lists.newArrayList(servers.get(0), servers.get(1));
when(mockSubchannel1.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(0)));
when(mockSubchannel2.getAllAddresses()).thenReturn(Lists.newArrayList(servers.get(1)));

// accept resolved addresses
loadBalancer.acceptResolvedAddresses(
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2);
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
verify(mockHelper).createSubchannel(createArgsCaptor.capture());
inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
assertNull(pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());

inOrder.verify(mockSubchannel1).requestConnection();
stateListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));

inOrder.verify(mockSubchannel2).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener2 = stateListenerCaptor.getValue();
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));

inOrder.verify(mockHelper).updateBalancingState(eq(TRANSIENT_FAILURE), pickerCaptor.capture());
assertEquals(CONNECTION_ERROR, pickerCaptor.getValue().pickSubchannel(mockArgs).getStatus());
}

@Test
public void updateAddresses_emptyEagList_returns_false() {
loadBalancer.acceptResolvedAddresses(
Expand Down

0 comments on commit 9b39b3e

Please sign in to comment.