Skip to content

Commit

Permalink
🛠️ #46 inited the least latency routing (#70)
Browse files Browse the repository at this point in the history
- Adding a new routing strategy to pick the least latency model
- Adding simple coverage for some config building logic.
  • Loading branch information
roma-glushko authored Jan 14, 2024
1 parent 2957360 commit 6aec59f
Show file tree
Hide file tree
Showing 22 changed files with 693 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
fail_ci_if_error: false

api-docs:
name: Tests
name: OpenAPI Specs
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
Expand Down
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ lint: install-checkers ## Lint the source code
@$(CHECKER_BIN)/gofumpt -l -w .
@echo "🧹 Vetting go.mod.."
@go vet ./...


static-checks: install-checkers ## Static Analysis
@echo "🧹 GoCI Lint.."
@golangci-lint run ./...

Expand Down
39 changes: 22 additions & 17 deletions docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ const docTemplate = `{
"type": "object",
"properties": {
"timeout": {
"type": "integer"
"type": "string"
}
}
},
Expand Down Expand Up @@ -239,6 +239,23 @@ const docTemplate = `{
}
}
},
"latency.Config": {
"type": "object",
"properties": {
"decay": {
"description": "Weight of new latency measurements",
"type": "number"
},
"update_interval": {
"description": "How often gateway should probe models with not the lowest response latency",
"type": "string"
},
"warmup_samples": {
"description": "The number of latency probes required to init moving average",
"type": "integer"
}
}
},
"openai.Config": {
"type": "object",
"required": [
Expand Down Expand Up @@ -335,6 +352,9 @@ const docTemplate = `{
"description": "Model instance ID (unique in scope of the router)",
"type": "string"
},
"latency": {
"$ref": "#/definitions/latency.Config"
},
"openai": {
"$ref": "#/definitions/openai.Config"
}
Expand Down Expand Up @@ -389,25 +409,10 @@ const docTemplate = `{
},
"strategy": {
"description": "strategy on picking the next model to serve the request",
"allOf": [
{
"$ref": "#/definitions/routing.Strategy"
}
]
"type": "string"
}
}
},
"routing.Strategy": {
"type": "string",
"enum": [
"round-robin",
"priority"
],
"x-enum-varnames": [
"RoundRobin",
"Priority"
]
},
"schemas.ChatMessage": {
"type": "object",
"properties": {
Expand Down
39 changes: 22 additions & 17 deletions docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
"type": "object",
"properties": {
"timeout": {
"type": "integer"
"type": "string"
}
}
},
Expand Down Expand Up @@ -236,6 +236,23 @@
}
}
},
"latency.Config": {
"type": "object",
"properties": {
"decay": {
"description": "Weight of new latency measurements",
"type": "number"
},
"update_interval": {
"description": "How often gateway should probe models with not the lowest response latency",
"type": "string"
},
"warmup_samples": {
"description": "The number of latency probes required to init moving average",
"type": "integer"
}
}
},
"openai.Config": {
"type": "object",
"required": [
Expand Down Expand Up @@ -332,6 +349,9 @@
"description": "Model instance ID (unique in scope of the router)",
"type": "string"
},
"latency": {
"$ref": "#/definitions/latency.Config"
},
"openai": {
"$ref": "#/definitions/openai.Config"
}
Expand Down Expand Up @@ -386,25 +406,10 @@
},
"strategy": {
"description": "strategy on picking the next model to serve the request",
"allOf": [
{
"$ref": "#/definitions/routing.Strategy"
}
]
"type": "string"
}
}
},
"routing.Strategy": {
"type": "string",
"enum": [
"round-robin",
"priority"
],
"x-enum-varnames": [
"RoundRobin",
"Priority"
]
},
"schemas.ChatMessage": {
"type": "object",
"properties": {
Expand Down
28 changes: 17 additions & 11 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ definitions:
clients.ClientConfig:
properties:
timeout:
type: integer
type: string
type: object
http.ErrorSchema:
properties:
Expand All @@ -78,6 +78,19 @@ definitions:
$ref: '#/definitions/routers.LangRouterConfig'
type: array
type: object
latency.Config:
properties:
decay:
description: Weight of new latency measurements
type: number
update_interval:
description: How often gateway should probe models with not the lowest response
latency
type: string
warmup_samples:
description: The number of latency probes required to init moving average
type: integer
type: object
openai.Config:
properties:
baseUrl:
Expand Down Expand Up @@ -141,6 +154,8 @@ definitions:
id:
description: Model instance ID (unique in scope of the router)
type: string
latency:
$ref: '#/definitions/latency.Config'
openai:
$ref: '#/definitions/openai.Config'
required:
Expand Down Expand Up @@ -175,21 +190,12 @@ definitions:
description: Unique router ID
type: string
strategy:
allOf:
- $ref: '#/definitions/routing.Strategy'
description: strategy on picking the next model to serve the request
type: string
required:
- models
- routers
type: object
routing.Strategy:
enum:
- round-robin
- priority
type: string
x-enum-varnames:
- RoundRobin
- Priority
schemas.ChatMessage:
properties:
content:
Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/clients/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package clients
import "time"

type ClientConfig struct {
Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout" swaggertype:"primitive,integer"`
Timeout *time.Duration `yaml:"timeout,omitempty" json:"timeout" swaggertype:"primitive,string"`
}

func DefaultClientConfig() *ClientConfig {
Expand Down
22 changes: 16 additions & 6 deletions pkg/providers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"errors"
"fmt"

"glide/pkg/routers/latency"

"glide/pkg/providers/clients"

"glide/pkg/routers/health"
Expand All @@ -18,7 +20,8 @@ var ErrProviderNotFound = errors.New("provider not found")
type LangModelConfig struct {
ID string `yaml:"id" json:"id" validate:"required"` // Model instance ID (unique in scope of the router)
Enabled bool `yaml:"enabled" json:"enabled"` // Is the model enabled?
ErrorBudget health.ErrorBudget `yaml:"error_budget" json:"error_budget" swaggertype:"primitive,string"`
ErrorBudget *health.ErrorBudget `yaml:"error_budget" json:"error_budget" swaggertype:"primitive,string"`
Latency *latency.Config `yaml:"latency" json:"latency"`
Client *clients.ClientConfig `yaml:"client" json:"client"`
OpenAI *openai.Config `yaml:"openai" json:"openai"`
AzureOpenAI *azureopenai.Config `yaml:"azureopenai" json:"azureopenai"`
Expand All @@ -32,26 +35,33 @@ func DefaultLangModelConfig() *LangModelConfig {
Enabled: true,
Client: clients.DefaultClientConfig(),
ErrorBudget: health.DefaultErrorBudget(),
Latency: latency.DefaultConfig(),
}
}

func (c *LangModelConfig) ToModel(tel *telemetry.Telemetry) (*LangModel, error) {
var client LangModelProvider

var err error

if c.OpenAI != nil {
client, err := openai.NewClient(c.OpenAI, c.Client, tel)
client, err = openai.NewClient(c.OpenAI, c.Client, tel)

if err != nil {
return nil, fmt.Errorf("error initing openai client: %v", err)
}

return NewLangModel(c.ID, client, c.ErrorBudget), nil
}

if c.AzureOpenAI != nil {
client, err := azureopenai.NewClient(c.AzureOpenAI, c.Client, tel)
client, err = azureopenai.NewClient(c.AzureOpenAI, c.Client, tel)

if err != nil {
return nil, fmt.Errorf("error initing azureopenai client: %v", err)
}
}

return NewLangModel(c.ID, client, c.ErrorBudget), nil
if client != nil {
return NewLangModel(c.ID, client, *c.ErrorBudget, *c.Latency), nil
}

return nil, ErrProviderNotFound
Expand Down
41 changes: 31 additions & 10 deletions pkg/providers/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package providers
import (
"context"
"errors"
"time"

"glide/pkg/providers/clients"
"glide/pkg/routers/health"
"glide/pkg/routers/latency"

"glide/pkg/api/schemas"
)
Expand All @@ -19,27 +21,33 @@ type LangModelProvider interface {
type Model interface {
ID() string
Healthy() bool
Latency() *latency.MovingAverage
LatencyUpdateInterval() *time.Duration
}

type LanguageModel interface {
Model
LangModelProvider
}

// LangModel
// LangModel wraps provider client and expend it with health & latency tracking
type LangModel struct {
modelID string
client LangModelProvider
rateLimit *health.RateLimitTracker
errorBudget *health.TokenBucket // TODO: centralize provider API health tracking in the registry
modelID string
client LangModelProvider
rateLimit *health.RateLimitTracker
errorBudget *health.TokenBucket // TODO: centralize provider API health tracking in the registry
latency *latency.MovingAverage
latencyUpdateInterval *time.Duration
}

func NewLangModel(modelID string, client LangModelProvider, budget health.ErrorBudget) *LangModel {
func NewLangModel(modelID string, client LangModelProvider, budget health.ErrorBudget, latencyConfig latency.Config) *LangModel {
return &LangModel{
modelID: modelID,
client: client,
rateLimit: health.NewRateLimitTracker(),
errorBudget: health.NewTokenBucket(budget.TimePerTokenMicro(), budget.Budget()),
modelID: modelID,
client: client,
rateLimit: health.NewRateLimitTracker(),
errorBudget: health.NewTokenBucket(budget.TimePerTokenMicro(), budget.Budget()),
latency: latency.NewMovingAverage(latencyConfig.Decay, latencyConfig.WarmupSamples),
latencyUpdateInterval: latencyConfig.UpdateInterval,
}
}

Expand All @@ -51,13 +59,26 @@ func (m *LangModel) Provider() string {
return m.client.Provider()
}

func (m *LangModel) Latency() *latency.MovingAverage {
return m.latency
}

func (m *LangModel) LatencyUpdateInterval() *time.Duration {
return m.latencyUpdateInterval
}

func (m *LangModel) Healthy() bool {
return !m.rateLimit.Limited() && m.errorBudget.HasTokens()
}

func (m *LangModel) Chat(ctx context.Context, request *schemas.UnifiedChatRequest) (*schemas.UnifiedChatResponse, error) {
// TODO: we may want to track time-to-first-byte to "normalize" response latency wrt response size
startedAt := time.Now()
resp, err := m.client.Chat(ctx, request)

// Do we want to track latency in case of errors as well?
m.latency.Add(float64(time.Since(startedAt)))

if err == nil {
// successful response
resp.ModelID = m.modelID
Expand Down
Loading

0 comments on commit 6aec59f

Please sign in to comment.