Skip to content

Commit

Permalink
feat: Use a daemon thread to monitor the go feature server exclusively (
Browse files Browse the repository at this point in the history
#2391)

* feat: Use a daemon thread to monitor the go feature server exclusively

Also reenable the tests for lifecycle management of the goserver

Signed-off-by: Achal Shah <[email protected]>

* fix workflow

Signed-off-by: Achal Shah <[email protected]>

* Fix setup.py build_python_protos

Signed-off-by: Achal Shah <[email protected]>

* CR comments

Signed-off-by: Achal Shah <[email protected]>

* mkdir is needed

Signed-off-by: Achal Shah <[email protected]>

* explicit commit

Signed-off-by: Achal Shah <[email protected]>

* flush file explicitly

Signed-off-by: Achal Shah <[email protected]>

* Flush registry changes

Signed-off-by: Achal Shah <[email protected]>

* Signal handler can only be triggered from main thread

Signed-off-by: Achal Shah <[email protected]>

* join on background thread when cancelling and update test

Signed-off-by: Achal Shah <[email protected]>

* fix redis

Signed-off-by: Achal Shah <[email protected]>

* add back go_build.py

Signed-off-by: Achal Shah <[email protected]>

* wait on grpc connection check

Signed-off-by: Achal Shah <[email protected]>

* Add a more robust wait

Signed-off-by: Achal Shah <[email protected]>

* Add a sleep

Signed-off-by: Achal Shah <[email protected]>

* Remove explicitt cleanup

Signed-off-by: Achal Shah <[email protected]>

* More defensive

Signed-off-by: Achal Shah <[email protected]>

* Even more defensive

Signed-off-by: Achal Shah <[email protected]>

* Stop join

Signed-off-by: Achal Shah <[email protected]>

* Clean up the process to ensure next one is fine

Signed-off-by: Achal Shah <[email protected]>

* join thread

Signed-off-by: Achal Shah <[email protected]>

* remove a sleep

Signed-off-by: Achal Shah <[email protected]>

* Cleanup

Signed-off-by: Achal Shah <[email protected]>
  • Loading branch information
achals authored Mar 10, 2022
1 parent ae133fd commit 0bb5e8c
Show file tree
Hide file tree
Showing 18 changed files with 181 additions and 237 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ jobs:
run: |
pip install --upgrade "pip>=21.3.1"
- name: Install dependencies
run: make install-go-ci-dependencies
run: make install-go-proto-dependencies
- name: Lint go
run: make lint-go
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
with:
go-version: 1.17.7
- name: Install dependencies
run: make install-go-ci-dependencies
run: make install-go-proto-dependencies
- name: Compile protos
run: make compile-protos-go
- name: Test
Expand Down
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ build: protos build-java build-docker build-html

# Python SDK

install-python-ci-dependencies: install-go-ci-dependencies
install-python-ci-dependencies: install-go-proto-dependencies
cd sdk/python && python -m piptools sync requirements/py$(PYTHON)-ci-requirements.txt
cd sdk/python && COMPILE_GO=true python setup.py develop

Expand Down Expand Up @@ -125,19 +125,21 @@ build-java-no-tests:

# Go SDK

install-go-ci-dependencies:
install-go-proto-dependencies:
go install google.golang.org/protobuf/cmd/[email protected]
go install google.golang.org/grpc/cmd/[email protected]

compile-protos-go: install-go-ci-dependencies
install-protoc-dependencies:
pip install grpcio-tools==1.34.0

compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
python sdk/python/setup.py build_go_protos

compile-go-feature-server: compile-protos-go
go mod tidy
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver

test-go: install-go-ci-dependencies
test-go: compile-protos-go
go test ./...

format-go:
Expand Down
1 change: 0 additions & 1 deletion go/cmd/goserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type FeastEnvConfig struct {
}

// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus

func main() {

var feastEnvConfig FeastEnvConfig
Expand Down
Binary file not shown.
26 changes: 10 additions & 16 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ class FeatureStore:

@log_exceptions
def __init__(
self,
repo_path: Optional[str] = None,
config: Optional[RepoConfig] = None,
go_server_use_thread: bool = False,
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
"""
Creates a FeatureStore object.
Expand All @@ -135,7 +132,6 @@ def __init__(
self._registry._initialize_registry()
self._provider = get_provider(self.config, self.repo_path)
self._go_server = None
self._go_server_use_thread = go_server_use_thread

@log_exceptions
def version(self) -> str:
Expand Down Expand Up @@ -733,6 +729,10 @@ def apply(
service.name, project=self.project, commit=False
)

# If a go server is running, kill it so that it can be recreated in `update_infra` with
# the latest registry state.
self.kill_go_server()

self._get_provider().update_infra(
project=self.project,
tables_to_delete=views_to_delete if not partial else [],
Expand All @@ -754,6 +754,8 @@ def teardown(self):

entities = self.list_entities()

self.kill_go_server()

self._get_provider().teardown_infra(self.project, tables, entities)
self._registry.teardown()

Expand Down Expand Up @@ -1233,11 +1235,8 @@ def get_online_features(
if self.config.go_feature_server:
# Lazily start the go server on the first request
if self._go_server is None:
self._go_server = GoServer(
str(self.repo_path.absolute()),
self.config,
self._go_server_use_thread,
)
self._go_server = GoServer(str(self.repo_path.absolute()), self.config,)
self._go_server._shared_connection._check_grpc_connection()
return self._go_server.get_online_features(
features, columnar, full_feature_names
)
Expand Down Expand Up @@ -1860,12 +1859,7 @@ def serve_transformations(self, port: int) -> None:
def kill_go_server(self):
if self._go_server:
self._go_server.kill_go_server_explicitly()

def set_go_server_use_thread(self, use: bool):
if self._go_server:
self._go_server.set_use_thread(use)
else:
self._go_server_use_thread = use
self._go_server = None


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
Expand Down
Loading

0 comments on commit 0bb5e8c

Please sign in to comment.