-
Notifications
You must be signed in to change notification settings - Fork 836
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
WIP controller <-> scheduler coordination on server status updates
- Loading branch information
Showing
17 changed files
with
1,050 additions
and
670 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
/* | ||
Copyright (c) 2024 Seldon Technologies Ltd. | ||
Use of this software is governed by | ||
(1) the license included in the LICENSE file or | ||
(2) if the license included in the LICENSE file is the Business Source License 1.1, | ||
the Change License after the Change Date as each is defined in accordance with the LICENSE file. | ||
*/ | ||
|
||
package v1alpha1 | ||
|
||
import "fmt" | ||
|
||
type ValidatedScalingSpec struct { | ||
Replicas uint32 | ||
MinReplicas uint32 | ||
MaxReplicas uint32 | ||
} | ||
|
||
func GetValidatedScalingSpec(replicas *int32, minReplicas *int32, maxReplicas *int32) (*ValidatedScalingSpec, error) { | ||
var validatedSpec ValidatedScalingSpec | ||
|
||
if replicas != nil && *replicas > 0 { | ||
validatedSpec.Replicas = uint32(*replicas) | ||
} else { | ||
if minReplicas != nil && *minReplicas > 0 { | ||
// set replicas to the min replicas when replicas is not set explicitly | ||
validatedSpec.Replicas = uint32(*minReplicas) | ||
} else { | ||
validatedSpec.Replicas = 1 | ||
} | ||
} | ||
|
||
if minReplicas != nil && *minReplicas > 0 { | ||
validatedSpec.MinReplicas = uint32(*minReplicas) | ||
if validatedSpec.Replicas < validatedSpec.MinReplicas { | ||
return nil, fmt.Errorf("number of replicas %d must be >= min replicas %d", validatedSpec.Replicas, validatedSpec.MinReplicas) | ||
} | ||
} else { | ||
validatedSpec.MinReplicas = 0 | ||
} | ||
|
||
if maxReplicas != nil && *maxReplicas > 0 { | ||
validatedSpec.MaxReplicas = uint32(*maxReplicas) | ||
if validatedSpec.Replicas > validatedSpec.MaxReplicas { | ||
return nil, fmt.Errorf("number of replicas %d must be <= min replicas %d", validatedSpec.Replicas, validatedSpec.MaxReplicas) | ||
} | ||
} else { | ||
validatedSpec.MaxReplicas = 0 | ||
} | ||
|
||
return &validatedSpec, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
/* | ||
Copyright (c) 2024 Seldon Technologies Ltd. | ||
Use of this software is governed by | ||
(1) the license included in the LICENSE file or | ||
(2) if the license included in the LICENSE file is the Business Source License 1.1, | ||
the Change License after the Change Date as each is defined in accordance with the LICENSE file. | ||
*/ | ||
|
||
package coordinator | ||
|
||
import ( | ||
"context" | ||
"reflect" | ||
|
||
busV3 "github.com/mustafaturan/bus/v3" | ||
log "github.com/sirupsen/logrus" | ||
) | ||
|
||
func (h *EventHub) RegisterServerEventHandler( | ||
name string, | ||
queueSize int, | ||
logger log.FieldLogger, | ||
handle func(event ServerEventMsg), | ||
) { | ||
events := make(chan ServerEventMsg, queueSize) | ||
h.addServerEventHandlerChannel(events) | ||
|
||
go func() { | ||
for e := range events { | ||
handle(e) | ||
} | ||
}() | ||
|
||
handler := h.newServerEventHandler(logger, events, handle) | ||
h.bus.RegisterHandler(name, handler) | ||
} | ||
|
||
func (h *EventHub) newServerEventHandler( | ||
logger log.FieldLogger, | ||
events chan ServerEventMsg, | ||
_ func(event ServerEventMsg), | ||
) busV3.Handler { | ||
handleServerEventMessage := func(_ context.Context, e busV3.Event) { | ||
l := logger.WithField("func", "handleServerEventMessage") | ||
l.Debugf("Received event on %s from %s (ID: %s, TxID: %s)", e.Topic, e.Source, e.ID, e.TxID) | ||
|
||
me, ok := e.Data.(ServerEventMsg) | ||
if !ok { | ||
l.Warnf( | ||
"Event (ID %s, TxID %s) on topic %s from %s is not a ServerEventMsg: %s", | ||
e.ID, | ||
e.TxID, | ||
e.Topic, | ||
e.Source, | ||
reflect.TypeOf(e.Data).String(), | ||
) | ||
return | ||
} | ||
|
||
h.lock.RLock() | ||
if h.closed { | ||
return | ||
} | ||
// Propagate the busV3.Event source to the ServerEventMsg | ||
// This is useful for logging, but also in case we want to distinguish | ||
// the action to take based on where the event came from. | ||
me.Source = e.Source | ||
events <- me | ||
h.lock.RUnlock() | ||
} | ||
|
||
return busV3.Handler{ | ||
Matcher: topicServerEvents, | ||
Handle: handleServerEventMessage, | ||
} | ||
} | ||
|
||
func (h *EventHub) addServerEventHandlerChannel(c chan ServerEventMsg) { | ||
h.lock.Lock() | ||
defer h.lock.Unlock() | ||
|
||
h.serverEventHandlerChannels = append(h.serverEventHandlerChannels, c) | ||
} | ||
|
||
func (h *EventHub) PublishServerEvent(source string, event ServerEventMsg) { | ||
err := h.bus.EmitWithOpts( | ||
context.Background(), | ||
topicServerEvents, | ||
event, | ||
busV3.WithSource(source), | ||
) | ||
if err != nil { | ||
h.logger.WithError(err).Errorf( | ||
"unable to publish server event message from %s to %s", | ||
source, | ||
topicServerEvents, | ||
) | ||
} | ||
} |
Oops, something went wrong.