Skip to content

Commit

Permalink
Go Coordinator with Database as the System Catalog (chroma-core#1274)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
This PR adds the coordinator implemented in go, focusing on the system catalog functionality. 
 - Improvements & Bug fixes
	 - ...
 - New functionality
	 - Add go coordinator to with database as the system catalog

## Test plan
*How are these changes tested?*

- [ ] test_systems.py
- [ ] Example and property based test in the go coordinator

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*

---------

Co-authored-by: Liquan Pei <[email protected]>
  • Loading branch information
Ishiihara authored Oct 25, 2023
1 parent 8af4cac commit ee1c364
Show file tree
Hide file tree
Showing 71 changed files with 9,571 additions and 16 deletions.
17 changes: 11 additions & 6 deletions bin/cluster-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
set -e

function cleanup {
# Restore the previous kube context
kubectl config use-context $PREV_CHROMA_KUBE_CONTEXT
# Kill the tunnel process
kill $TUNNEL_PID
minikube delete -p chroma-test
# Restore the previous kube context
kubectl config use-context $PREV_CHROMA_KUBE_CONTEXT
# Kill the tunnel process
kill $TUNNEL_PID
minikube delete -p chroma-test
}

trap cleanup EXIT
Expand All @@ -25,6 +25,7 @@ minikube addons enable ingress-dns -p chroma-test
# Setup docker to build inside the minikube cluster and build the image
eval $(minikube -p chroma-test docker-env)
docker build -t server:latest -f Dockerfile .
docker build -t chroma-coordinator:latest -f go/coordinator/Dockerfile .

# Apply the kubernetes manifests
kubectl apply -f k8s/deployment
Expand All @@ -36,7 +37,7 @@ kubectl apply -f k8s/test
kubectl wait --namespace chroma --for=condition=Ready pods --all --timeout=300s

# Run mini kube tunnel in the background to expose the service
minikube tunnel -p chroma-test &
minikube tunnel -c true -p chroma-test &
TUNNEL_PID=$!

# Wait for the tunnel to be ready. There isn't an easy way to check if the tunnel is ready. So we just wait for 10 seconds
Expand All @@ -45,8 +46,12 @@ sleep 10
export CHROMA_CLUSTER_TEST_ONLY=1
export CHROMA_SERVER_HOST=$(kubectl get svc server -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}')
export PULSAR_BROKER_URL=$(kubectl get svc pulsar -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}')
export CHROMA_COORDINATOR_HOST=$(kubectl get svc coordinator -n chroma -o=jsonpath='{.status.loadBalancer.ingress[0].ip}')
export CHROMA_SERVER_GRPC_PORT="50051"

echo "Chroma Server is running at port $CHROMA_SERVER_HOST"
echo "Pulsar Broker is running at port $PULSAR_BROKER_URL"
echo "Chroma Coordinator is running at port $CHROMA_COORDINATOR_HOST"

echo testing: python -m pytest "$@"
python -m pytest "$@"
13 changes: 13 additions & 0 deletions bin/reset.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

eval $(minikube -p chroma-test docker-env)

docker build -t chroma-coordinator:latest -f go/coordinator/Dockerfile .

kubectl delete deployment coordinator -n chroma

# Apply the kubernetes manifests
kubectl apply -f k8s/deployment
kubectl apply -f k8s/crd
kubectl apply -f k8s/cr
kubectl apply -f k8s/test
8 changes: 8 additions & 0 deletions chromadb/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@
hypothesis.settings.load_profile(os.getenv("HYPOTHESIS_PROFILE", "dev"))


NOT_CLUSTER_ONLY = os.getenv("CHROMA_CLUSTER_TEST_ONLY") != "1"

def skip_if_not_cluster() -> pytest.MarkDecorator:
return pytest.mark.skipif(
NOT_CLUSTER_ONLY,
reason="Requires Kubernetes to be running with a valid config",
)

def find_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
Expand Down
18 changes: 17 additions & 1 deletion chromadb/test/db/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,24 @@ def grpc_with_mock_server() -> Generator[SysDB, None, None]:
yield client


# def grpc_with_real_server() -> Generator[SysDB, None, None]:
# system = System(
# Settings(
# allow_reset=True,
# chroma_collection_assignment_policy_impl="chromadb.test.db.test_system.MockAssignmentPolicy",
# )
# )
# client = system.instance(GrpcSysDB)
# system.start()
# client.reset_and_wait_for_ready()
# yield client


def db_fixtures() -> List[Callable[[], Generator[SysDB, None, None]]]:
return [sqlite, sqlite_persistent, grpc_with_mock_server]
if "CHROMA_CLUSTER_TEST_ONLY" in os.environ:
return [grpc_with_real_server]
else:
return [sqlite, sqlite_persistent, grpc_with_mock_server]


@pytest.fixture(scope="module", params=db_fixtures())
Expand Down
10 changes: 1 addition & 9 deletions chromadb/test/segment/distributed/test_memberlist_provider.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Tests the CustomResourceMemberlist provider
import threading
from chromadb.test.conftest import skip_if_not_cluster
from kubernetes import client, config
import pytest
import os
Expand All @@ -12,15 +13,6 @@
)
import time

NOT_CLUSTER_ONLY = os.getenv("CHROMA_CLUSTER_TEST_ONLY") != "1"


def skip_if_not_cluster() -> pytest.MarkDecorator:
return pytest.mark.skipif(
NOT_CLUSTER_ONLY,
reason="Requires Kubernetes to be running with a valid config",
)


# Used for testing to update the memberlist CRD
def update_memberlist(n: int, memberlist_name: str = "worker-memberlist") -> Memberlist:
Expand Down
23 changes: 23 additions & 0 deletions go/coordinator/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM golang:1.20-alpine as build

RUN apk add --no-cache make git build-base bash

ENV PATH=$PATH:/go/bin
ADD ./go/coordinator /src/chroma-coordinator

RUN cd /src/chroma-coordinator \
&& make

FROM alpine:3.17.3

RUN apk add --no-cache bash bash-completion

RUN mkdir /chroma-coordinator
WORKDIR /chroma-coordinator

COPY --from=build /src/chroma-coordinator/bin/chroma /chroma-coordinator/bin/chroma
ENV PATH=$PATH:/chroma-coordinator/bin

RUN chroma completion bash > ~/.bashrc

CMD /bin/bash
56 changes: 56 additions & 0 deletions go/coordinator/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
.PHONY: build
build:
go build -v -o bin/chroma ./cmd

test: build
go test -cover -race ./...

lint:
#brew install golangci-lint
golangci-lint run

clean:
rm -f bin/chroma

docker:
docker build -t chroma-coordinator:latest .

docker_multi_arch:
docker buildx build --platform linux/x86_64,linux/arm64 -t oxia:latest .

.PHONY: proto
proto:
cd proto && \
protoc \
--go_out=. \
--go_opt paths=source_relative \
--plugin protoc-gen-go="${GOPATH}/bin/protoc-gen-go" \
--go-grpc_out=. \
--go-grpc_opt paths=source_relative \
--plugin protoc-gen-go-grpc="${GOPATH}/bin/protoc-gen-go-grpc" \
--go-vtproto_out=. \
--go-vtproto_opt paths=source_relative \
--plugin protoc-gen-go-vtproto="${GOPATH}/bin/protoc-gen-go-vtproto" \
--go-vtproto_opt=features=marshal+unmarshal+size+pool+equal+clone \
*.proto

proto_clean:
rm -f */*.pb.go

proto_format:
#brew install clang-format
clang-format -i --style=Google proto/*.proto

proto_lint:
#go install github.com/yoheimuta/protolint/cmd/protoc-gen-protolint
protoc --proto_path ./proto \
--protolint_out . \
--protolint_opt config_dir_path=. \
--protolint_opt proto_root=./proto \
proto/*.proto

proto_doc:
#go install github.com/pseudomuto/protoc-gen-doc/cmd/protoc-gen-doc
protoc --doc_out=docs/proto --doc_opt=markdown,proto.md proto/*.proto

proto_quality: proto_format proto_lint
15 changes: 15 additions & 0 deletions go/coordinator/cmd/flag/flag.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package flag

import (
"fmt"

"github.com/spf13/cobra"
)

const (
DefaultGRPCPort = 50051
)

func GRPCAddr(cmd *cobra.Command, conf *string) {
cmd.Flags().StringVarP(conf, "grpc-addr", "g", fmt.Sprintf("0.0.0.0:%d", DefaultGRPCPort), "GRPC service bind address")
}
38 changes: 38 additions & 0 deletions go/coordinator/cmd/grpccoordinator/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package grpccoordinator

import (
"io"

"github.com/chroma/chroma-coordinator/cmd/flag"
"github.com/chroma/chroma-coordinator/internal/grpccoordinator"
"github.com/chroma/chroma-coordinator/internal/utils"

"github.com/spf13/cobra"
)

var (
conf = grpccoordinator.Config{}

Cmd = &cobra.Command{
Use: "coordinator",
Short: "Start a coordinator",
Long: `Long description`,
Run: exec,
}
)

func init() {
flag.GRPCAddr(Cmd, &conf.BindAddress)
Cmd.Flags().StringVar(&conf.Username, "username", "root", "MetaTable username")
Cmd.Flags().StringVar(&conf.Password, "password", "", "MetaTable password")
Cmd.Flags().StringVar(&conf.Address, "db-address", "127.0.0.1:3306", "MetaTable db address")
Cmd.Flags().StringVar(&conf.DBName, "db-name", "", "MetaTable db name")
Cmd.Flags().IntVar(&conf.MaxIdleConns, "max-idle-conns", 10, "MetaTable max idle connections")
Cmd.Flags().IntVar(&conf.MaxOpenConns, "max-open-conns", 10, "MetaTable max open connections")
}

func exec(*cobra.Command, []string) {
utils.RunProcess(func() (io.Closer, error) {
return grpccoordinator.New(conf)
})
}
37 changes: 37 additions & 0 deletions go/coordinator/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"fmt"
"os"

"github.com/chroma/chroma-coordinator/cmd/grpccoordinator"
"github.com/chroma/chroma-coordinator/internal/utils"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
"go.uber.org/automaxprocs/maxprocs"
)

var (
rootCmd = &cobra.Command{
Use: "chroma",
Short: "Chroma root command",
Long: `Chroma root command`,
}
)

func init() {
rootCmd.AddCommand(grpccoordinator.Cmd)
}

func main() {
utils.LogLevel = zerolog.DebugLevel
utils.ConfigureLogger()
if _, err := maxprocs.Set(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
if err := rootCmd.Execute(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
46 changes: 46 additions & 0 deletions go/coordinator/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
module github.com/chroma/chroma-coordinator

go 1.20

require (
github.com/go-sql-driver/mysql v1.7.1
github.com/google/uuid v1.3.1
github.com/pingcap/log v1.1.0
github.com/rs/zerolog v1.31.0
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
go.uber.org/automaxprocs v1.5.3
go.uber.org/zap v1.26.0
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.31.0
gorm.io/driver/mysql v1.5.2
gorm.io/driver/sqlite v1.5.4
gorm.io/gorm v1.25.5
pgregory.net/rapid v1.1.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-sqlite3 v1.14.17 // indirect
github.com/pingcap/errors v0.11.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit ee1c364

Please sign in to comment.