Skip to content

Commit

Permalink
merge: #13658
Browse files Browse the repository at this point in the history
13658: [Backport stable/8.2] fix(broker): report health on change only r=megglos a=backport-action

# Description
Backport of #13655 to `stable/8.2`.

relates to #13650

Co-authored-by: Meggle (Sebastian Bathke) <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and megglos authored Jul 27, 2023
2 parents e385c98 + 00f10f7 commit 7705bb2
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.camunda.zeebe.util.health.HealthReport;
import io.camunda.zeebe.util.health.HealthStatus;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -57,19 +58,12 @@ private void updateHealthStatus() {
healthReport = HealthReport.healthy(this);
}

if (previousStatus != healthReport) {
if (!Objects.equals(previousStatus, healthReport)) {
switch (healthReport.getStatus()) {
case HEALTHY:
failureListeners.forEach(FailureListener::onRecovered);
break;
case UNHEALTHY:
failureListeners.forEach((l) -> l.onFailure(healthReport));
break;
case DEAD:
failureListeners.forEach((l) -> l.onUnrecoverableFailure(healthReport));
break;
default:
break;
case HEALTHY -> failureListeners.forEach(FailureListener::onRecovered);
case UNHEALTHY -> failureListeners.forEach((l) -> l.onFailure(healthReport));
case DEAD -> failureListeners.forEach((l) -> l.onUnrecoverableFailure(healthReport));
default -> {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.camunda.zeebe.scheduler.testing.ControlledActorSchedulerRule;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthIssue;
import io.camunda.zeebe.util.health.HealthReport;
import io.camunda.zeebe.util.health.HealthStatus;
import java.time.Duration;
Expand Down Expand Up @@ -400,6 +401,91 @@ public void shouldReportUnhealthyPerDefault() {
assertThat(healthReport.getIssue().message()).contains("Services not installed");
}

@Test
public void shouldCallOnFailureOnceForSameHealthIssue() {
// given
schedulerRule.submitActor(partition);
schedulerRule.workUntilDone();

final FailureListener failureListener = mock(FailureListener.class);
doNothing().when(failureListener).onFailure(any());

final var captor = ArgumentCaptor.forClass(ZeebePartitionHealth.class);
verify(healthMonitor).registerComponent(any(), captor.capture());
final var zeebePartitionHealth = captor.getValue();
zeebePartitionHealth.addFailureListener(failureListener);

when(transition.getHealthIssue()).thenReturn(HealthIssue.of("it's over"));

// when
final HealthReport healthReport1 = zeebePartitionHealth.getHealthReport();
assertThat(healthReport1.getStatus()).isEqualTo(HealthStatus.UNHEALTHY);

final HealthReport healthReport2 = zeebePartitionHealth.getHealthReport();

// then
assertThat(healthReport1).isEqualTo(healthReport2);
verify(failureListener, times(1)).onFailure(any());
}

@Test
public void shouldCallOnFailureOnHealthIssueChange() {
// given
schedulerRule.submitActor(partition);
schedulerRule.workUntilDone();

final FailureListener failureListener = mock(FailureListener.class);
doNothing().when(failureListener).onFailure(any());

final var captor = ArgumentCaptor.forClass(ZeebePartitionHealth.class);
verify(healthMonitor).registerComponent(any(), captor.capture());
final var zeebePartitionHealth = captor.getValue();
zeebePartitionHealth.addFailureListener(failureListener);

when(transition.getHealthIssue()).thenReturn(HealthIssue.of("it's over"));

// when
final HealthReport healthReport1 = zeebePartitionHealth.getHealthReport();
assertThat(healthReport1.getStatus()).isEqualTo(HealthStatus.UNHEALTHY);

when(transition.getHealthIssue()).thenReturn(HealthIssue.of("it's something else"));
final HealthReport healthReport2 = zeebePartitionHealth.getHealthReport();
assertThat(healthReport2.getStatus()).isEqualTo(HealthStatus.UNHEALTHY);

// then
assertThat(healthReport1).isNotEqualTo(healthReport2);
verify(failureListener, times(2)).onFailure(any());
}

@Test
public void shouldCallOnRecoveredOnceWhenHealthy() {
// given
schedulerRule.submitActor(partition);
schedulerRule.workUntilDone();

final FailureListener failureListener = mock(FailureListener.class);
doNothing().when(failureListener).onFailure(any());
doNothing().when(failureListener).onRecovered();

final var captor = ArgumentCaptor.forClass(ZeebePartitionHealth.class);
verify(healthMonitor).registerComponent(any(), captor.capture());
final var zeebePartitionHealth = captor.getValue();
zeebePartitionHealth.addFailureListener(failureListener);

// when
partition.onNewRole(Role.LEADER, 1);
schedulerRule.workUntilDone();

final HealthReport healthReport1 = zeebePartitionHealth.getHealthReport();
assertThat(healthReport1.getStatus()).isEqualTo(HealthStatus.HEALTHY);

final HealthReport healthReport2 = zeebePartitionHealth.getHealthReport();
assertThat(healthReport2.getStatus()).isEqualTo(HealthStatus.HEALTHY);
// then
assertThat(healthReport1).isEqualTo(healthReport2);
verify(failureListener, times(1)).onRecovered();
}

@Test
public void shouldReportHealthyAfterTransition() {
// given
Expand Down

0 comments on commit 7705bb2

Please sign in to comment.