Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192 Added Deleted and Init states in ServiceUnitState #19546

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2450,6 +2450,7 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private long namespaceBundleUnloadingTimeoutMs = 60000;

/**** --- Load Balancer Extension. --- ****/
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
dynamic = true,
Expand Down Expand Up @@ -2525,6 +2526,22 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
)
private double loadBalancerBundleLoadReportPercentage = 10;

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "After this delay, the service-unit state channel tombstones any service units (e.g., bundles) "
+ "in semi-terminal states. For example, after splits, parent bundles will be `deleted`, "
+ "and then after this delay, the parent bundles' state will be `tombstoned` "
+ "in the service-unit state channel. "
+ "Pulsar does not immediately remove such semi-terminal states "
+ "to avoid unnecessary system confusion, "
+ "as the bundles in the `tombstoned` state might temporarily look available to reassign. "
+ "Rarely, one could lower this delay in order to aggressively clean "
+ "the service-unit state channel when there are a large number of bundles. "
+ "minimum value = 30 secs"
+ "(only used in load balancer extension logics)"
)
private long loadBalancerServiceUnitStateCleanUpDelayTimeInSeconds = 604800;

/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,8 @@ public void doNamespaceBundleSplit() {
throw new UnsupportedOperationException();
}

public ExtensibleLoadManagerImpl get() {
return loadManager;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,60 +24,47 @@
/**
* Defines the possible states for service units.
*
* The following diagram defines the valid state changes
*
* ┌───────────┐
* ┌──────────┤ released │◄────────┐
* │own └───────────┘ │release
* │ │
* │ │
* ▼ │
* ┌────────┐ assign(transfer) ┌─────┴────┐
* │ ├───────────────────►│ │
* │ owned │ │ assigned │
* │ │◄───────────────────┤ │
* └──┬─────┤ own └──────────┘
* │ ▲ │ ▲
* │ │ │ │
* │ │ └──────────────┐ │
* │ │ │ │
* │ │ unload │ │ assign(assignment)
* split │ │ │ │
* │ │ │ │
* │ │ create(child) │ │
* │ │ │ │
* ▼ │ │ │
* ┌─────┴─────┐ └─────►┌───┴──────┐
* │ │ │ │
* │ splitting ├────────────────► │ free │
* │ │ discard(parent)│ │
* └───────────┘ └──────────┘
* Refer to Service Unit State Channel in https://github.com/apache/pulsar/issues/16691 for additional details.
Demogorgon314 marked this conversation as resolved.
Show resolved Hide resolved
*/
public enum ServiceUnitState {

Free, // not owned by any broker (terminal state)
Init, // initializing the state. no previous state(terminal state)

Free, // not owned by any broker (semi-terminal state)

Owned, // owned by a broker (terminal state)

Assigned, // the ownership is assigned(but the assigned broker has not been notified the ownership yet)
Assigning, // the ownership is being assigned (e.g. the new ownership is being notified to the target broker)

Released, // the source broker's ownership has been released (e.g. the topic connections are closed)
Releasing, // the source broker's ownership is being released (e.g. the topic connections are being closed)

Splitting; // the service unit(e.g. bundle) is in the process of splitting.
Splitting, // the service unit is in the process of splitting. (e.g. the metadata store is being updated)

private static Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
// (Free -> Released | Splitting) transitions are required
// when the topic is compacted in the middle of transfer or split.
Free, Set.of(Owned, Assigned, Released, Splitting),
Owned, Set.of(Assigned, Splitting, Free),
Assigned, Set.of(Owned, Released, Free),
Released, Set.of(Owned, Free),
Splitting, Set.of(Free)
Deleted; // deleted in the system (semi-terminal state)

private static final Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(
// (Init -> all states) transitions are required
// when the topic is compacted in the middle of assign, transfer or split.
Init, Set.of(Free, Owned, Assigning, Releasing, Splitting, Deleted),
Free, Set.of(Assigning, Init),
Owned, Set.of(Assigning, Splitting, Releasing),
Assigning, Set.of(Owned, Releasing),
Releasing, Set.of(Owned, Free),
Splitting, Set.of(Deleted),
Deleted, Set.of(Init)
);

private static final Set<ServiceUnitState> inFlightStates = Set.of(
Assigning, Releasing, Splitting
);

public static boolean isValidTransition(ServiceUnitState from, ServiceUnitState to) {
Set<ServiceUnitState> transitions = validTransitions.get(from);
return transitions.contains(to);
}

public static boolean isInFlightState(ServiceUnitState state) {
return inFlightStates.contains(state);
}

}
Loading