Skip to content

Commit

Permalink
chore: Go Sql Online Store (feast-dev#2446)
Browse files Browse the repository at this point in the history
* Initial structure for go sqlite online store

Signed-off-by: Kevin Zhang <[email protected]>

* Somewhat intermediate state

Signed-off-by: Kevin Zhang <[email protected]>

* Add sqlite online store for go for testing

Signed-off-by: Kevin Zhang <[email protected]>

* Revert

Signed-off-by: Kevin Zhang <[email protected]>

* Revert

Signed-off-by: Kevin Zhang <[email protected]>

* Clean up

Signed-off-by: Kevin Zhang <[email protected]>

* Address review issues

Signed-off-by: Kevin Zhang <[email protected]>

* Fix/address issues

Signed-off-by: Kevin Zhang <[email protected]>

* lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Make integration test work

Signed-off-by: Kevin Zhang <[email protected]>

* Fix tests

Signed-off-by: Kevin Zhang <[email protected]>

* Fix tests

Signed-off-by: Kevin Zhang <[email protected]>

* debugging

Signed-off-by: Kevin Zhang <[email protected]>

* Debug

Signed-off-by: Kevin Zhang <[email protected]>

* Debug

Signed-off-by: Kevin Zhang <[email protected]>

* Debug

Signed-off-by: Kevin Zhang <[email protected]>

* Debug

Signed-off-by: Kevin Zhang <[email protected]>

* Debug

Signed-off-by: Kevin Zhang <[email protected]>

* Debug

Signed-off-by: Kevin Zhang <[email protected]>

* Remove feature_repo files

Signed-off-by: Kevin Zhang <[email protected]>

* update gitignore

Signed-off-by: Kevin Zhang <[email protected]>

* Clean up code

Signed-off-by: Kevin Zhang <[email protected]>

* Update go mod

Signed-off-by: Kevin Zhang <[email protected]>

* Update makefile

Signed-off-by: Kevin Zhang <[email protected]>

* Fix gitignore issue

Signed-off-by: Kevin Zhang <[email protected]>

* Update makefile

Signed-off-by: Kevin Zhang <[email protected]>

* Update makefile

Signed-off-by: Kevin Zhang <[email protected]>

* Update makefile

Signed-off-by: Kevin Zhang <[email protected]>

* Update makefile

Signed-off-by: Kevin Zhang <[email protected]>

* Update makefile

Signed-off-by: Kevin Zhang <[email protected]>

* Revert worfklow

Signed-off-by: Kevin Zhang <[email protected]>

* Update build path

Signed-off-by: Kevin Zhang <[email protected]>

* remove

Signed-off-by: Kevin Zhang <[email protected]>

* rename

Signed-off-by: Kevin Zhang <[email protected]>

* Address review

Signed-off-by: Kevin Zhang <[email protected]>

* fix tests

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* see if this fixes test

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* revert

Signed-off-by: Kevin Zhang <[email protected]>

* Will add in separate pr to update cryptography

Signed-off-by: Kevin Zhang <[email protected]>
Signed-off-by: joostvan <[email protected]>
  • Loading branch information
kevjumba authored and joostvan committed Mar 30, 2022
1 parent c222364 commit 3789cf3
Show file tree
Hide file tree
Showing 12 changed files with 711 additions and 39 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ coverage.xml
.hypothesis/
.pytest_cache/
infra/scripts/*.conf
go/internal/test/feature_repo

# Translations
*.mo
Expand Down Expand Up @@ -204,7 +205,7 @@ ui/.pnp
ui/.pnp.js
ui/coverage
ui/build
ui/feature_repo/data/online.db
ui/feature_repo/data/online.db
ui/feature_repo/registry.db
ui/.vercel

Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ compile-go-lib: install-go-proto-dependencies install-go-ci-dependencies
python -m pip install pybindgen==0.22.0
cd sdk/python && python setup.py build_go_lib

# Needs feast package to setup the feature store
test-go: compile-protos-go
pip install -e "sdk/python[ci]"
go test ./...

format-go:
Expand Down
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@ require (
github.com/go-python/gopy v0.4.0
github.com/go-redis/redis/v8 v8.11.4
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.1.2
github.com/google/uuid v1.2.0
github.com/mattn/go-sqlite3 v1.14.12
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.7.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c
google.golang.org/grpc v1.44.0
google.golang.org/protobuf v1.27.1
)

require (
github.com/apache/thrift v0.16.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v2.0.0+incompatible // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/pierrec/lz4/v4 v4.1.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/exp v0.0.0-20211028214138-64b4c8e87d1a // indirect
Expand All @@ -31,7 +35,6 @@ require (
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20220118154757-00ab72f36ad5 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
Expand Down
215 changes: 210 additions & 5 deletions go.sum

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions go/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
Expand All @@ -24,6 +25,9 @@ func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request
}, nil
}

// Returns an object containing the response to GetOnlineFeatures.
// Metadata contains featurenames that corresponds to the number of rows in response.Results.
// Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format.
func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) {
featuresOrService, err := s.fs.ParseFeatures(request.GetKind())
if err != nil {
Expand All @@ -36,6 +40,9 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s
featuresOrService.FeatureService,
request.GetEntities(),
request.GetFullFeatureNames())
if err != nil {
return nil, err
}

resp := &serving.GetOnlineFeaturesResponse{
Results: make([]*serving.GetOnlineFeaturesResponse_FeatureVector, 0),
Expand Down
95 changes: 78 additions & 17 deletions go/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,42 @@ package main

import (
"context"
"net"
"path/filepath"
"reflect"
"runtime"
"testing"

"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/test"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
"net"
"path/filepath"
"runtime"
"testing"
)

// Return absolute path to the test_repo directory regardless of the working directory
func getRepoPath() string {
func getRepoPath(basePath string) string {
// Get the file path of this source file, regardless of the working directory
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("couldn't find file path of the test file")
if basePath == "" {
_, filename, _, ok := runtime.Caller(0)
if !ok {
panic("couldn't find file path of the test file")
}
return filepath.Join(filename, "..", "..", "feature_repo")
} else {
return filepath.Join(basePath, "feature_repo")
}
return filepath.Join(filename, "..", "..", "feature_repo")
}

func getClient(ctx context.Context) (serving.ServingServiceClient, func()) {
// Starts a new grpc server, registers the serving service and returns a client.
func getClient(ctx context.Context, basePath string) (serving.ServingServiceClient, func()) {
buffer := 1024 * 1024
listener := bufconn.Listen(buffer)

server := grpc.NewServer()
config, err := feast.NewRepoConfigFromFile(getRepoPath())
config, err := feast.NewRepoConfigFromFile(getRepoPath(basePath))
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -58,21 +67,73 @@ func getClient(ctx context.Context) (serving.ServingServiceClient, func()) {
}

func TestGetFeastServingInfo(t *testing.T) {
t.Skip("@todo(achals): feature_repo isn't checked in yet")
ctx := context.Background()
client, closer := getClient(ctx)
// Pregenerated using `feast init`.
dir := "."
err := test.SetupFeatureRepo(dir)
assert.Nil(t, err)
defer test.CleanUpRepo(dir)
client, closer := getClient(ctx, dir)
defer closer()
response, err := client.GetFeastServingInfo(ctx, &serving.GetFeastServingInfoRequest{})
assert.Nil(t, err)
assert.Equal(t, feastServerVersion, response.Version)
}

func TestGetOnlineFeatures(t *testing.T) {
t.Skip("@todo(achals): feature_repo isn't checked in yet")
func TestGetOnlineFeaturesSqlite(t *testing.T) {
ctx := context.Background()
client, closer := getClient(ctx)
// Pregenerated using `feast init`.
dir := "."
err := test.SetupFeatureRepo(dir)
assert.Nil(t, err)
defer test.CleanUpRepo(dir)
client, closer := getClient(ctx, dir)
defer closer()
response, err := client.GetOnlineFeatures(ctx, &serving.GetOnlineFeaturesRequest{})
entities := make(map[string]*types.RepeatedValue)
entities["driver_id"] = &types.RepeatedValue{
Val: []*types.Value{
{Val: &types.Value_Int64Val{Int64Val: 1001}},
{Val: &types.Value_Int64Val{Int64Val: 1003}},
{Val: &types.Value_Int64Val{Int64Val: 1005}},
},
}
request := &serving.GetOnlineFeaturesRequest{
Kind: &serving.GetOnlineFeaturesRequest_Features{
Features: &serving.FeatureList{
Val: []string{"driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips"},
},
},
Entities: entities,
}
response, err := client.GetOnlineFeatures(ctx, request)
expectedEntityValuesResp := []*types.Value{
{Val: &types.Value_Int64Val{Int64Val: 1001}},
{Val: &types.Value_Int64Val{Int64Val: 1003}},
{Val: &types.Value_Int64Val{Int64Val: 1005}},
}
expectedFeatureNamesResp := []string{"driver_id", "conv_rate", "acc_rate", "avg_daily_trips"}
assert.Nil(t, err)
assert.NotNil(t, response)
rows, err := test.ReadParquet(filepath.Join(dir, "feature_repo", "data", "driver_stats.parquet"))
assert.Nil(t, err)
entityKeys := map[int64]bool{1001: true, 1003: true, 1005: true}
correctFeatures := test.GetLatestFeatures(rows, entityKeys)
expectedConvRateValues := []*types.Value{}
expectedAccRateValues := []*types.Value{}
expectedAvgDailyTripsValues := []*types.Value{}

for _, key := range []int64{1001, 1003, 1005} {
expectedConvRateValues = append(expectedConvRateValues, &types.Value{Val: &types.Value_FloatVal{FloatVal: correctFeatures[key].Conv_rate}})
expectedAccRateValues = append(expectedAccRateValues, &types.Value{Val: &types.Value_FloatVal{FloatVal: correctFeatures[key].Acc_rate}})
expectedAvgDailyTripsValues = append(expectedAvgDailyTripsValues, &types.Value{Val: &types.Value_Int64Val{Int64Val: int64(correctFeatures[key].Avg_daily_trips)}})
}
// Columnar so get in column format row by row should have column names of all features
assert.Equal(t, len(response.Results), 4)

assert.True(t, reflect.DeepEqual(response.Results[0].Values, expectedEntityValuesResp))
assert.True(t, reflect.DeepEqual(response.Results[1].Values, expectedConvRateValues))
assert.True(t, reflect.DeepEqual(response.Results[2].Values, expectedAccRateValues))
assert.True(t, reflect.DeepEqual(response.Results[3].Values, expectedAvgDailyTripsValues))

assert.True(t, reflect.DeepEqual(response.Metadata.FeatureNames.Val, expectedFeatureNamesResp))
}
7 changes: 3 additions & 4 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (fs *FeatureStore) GetOnlineFeatures(
}
result := make([]*FeatureVector, 0)
arrowMemory := memory.NewGoAllocator()

for _, groupRef := range groupedRefs {
featureData, err := fs.readFromOnlineStore(ctx, groupRef.entityKeys, groupRef.featureViewNames, groupRef.featureNames)
if err != nil {
Expand Down Expand Up @@ -348,7 +349,6 @@ func (fs *FeatureStore) getFeatureViewsToUseByFeatureRefs(features []string, hid
fvs := make(map[string]*FeatureView)
requestFvs := make(map[string]*RequestFeatureView)
odFvs := make(map[string]*OnDemandFeatureView)

featureViews, err := fs.listFeatureViews(hideDummyEntity)
if err != nil {
return nil, nil, nil, nil, err
Expand Down Expand Up @@ -405,7 +405,6 @@ func (fs *FeatureStore) getFeatureViewsToUseByFeatureRefs(features []string, hid
" feature view %s and that you have registered it by running \"apply\"", featureViewName, featureViewName)
}
}

return fvs, fvsToUse, requestFvsToUse, odFvsToUse, nil
}

Expand Down Expand Up @@ -561,9 +560,9 @@ func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*p
requestedFeatureNames []string,
) ([][]FeatureData, error) {
numRows := len(entityRows)
entityRowsValue := make([]prototypes.EntityKey, numRows)
entityRowsValue := make([]*prototypes.EntityKey, numRows)
for index, entityKey := range entityRows {
entityRowsValue[index] = prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
entityRowsValue[index] = &prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues}
}
return fs.onlineStore.OnlineRead(ctx, entityRowsValue, requestedFeatureViewNames, requestedFeatureNames)
}
Expand Down
8 changes: 5 additions & 3 deletions go/internal/feast/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package feast
import (
"context"
"fmt"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/golang/protobuf/ptypes/timestamp"
Expand Down Expand Up @@ -33,7 +34,7 @@ type OnlineStore interface {
// Feature object as pointers in GetOnlineFeaturesResponse)
// => allocate memory for each field once in OnlineRead
// and reuse them in GetOnlineFeaturesResponse?
OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error)
OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error)
// Destruct must be call once user is done using OnlineStore
// This is to comply with the Connector since we have to close the plugin
Destruct()
Expand All @@ -51,12 +52,13 @@ func getOnlineStoreType(onlineStoreConfig map[string]interface{}) (string, bool)
func NewOnlineStore(config *RepoConfig) (OnlineStore, error) {
onlineStoreType, ok := getOnlineStoreType(config.OnlineStore)
if !ok {
return nil, fmt.Errorf("could not get online store type from online store config: %+v", config.OnlineStore)
onlineStore, err := NewSqliteOnlineStore(config.Project, config, config.OnlineStore)
return onlineStore, err
}
if onlineStoreType == "redis" {
onlineStore, err := NewRedisOnlineStore(config.Project, config.OnlineStore)
return onlineStore, err
} else {
return nil, fmt.Errorf("%s online store type is currently not supported; only Redis is supported", onlineStoreType)
return nil, fmt.Errorf("%s online store type is currently not supported; only redis and sqlite are supported", onlineStoreType)
}
}
11 changes: 6 additions & 5 deletions go/internal/feast/redisonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ import (
"encoding/binary"
"errors"
"fmt"
"sort"
"strconv"
"strings"

"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/feast-dev/feast/go/protos/feast/types"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"github.com/spaolacci/murmur3"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"sort"
"strconv"
"strings"
)

type redisType int
Expand Down Expand Up @@ -117,7 +118,7 @@ func getRedisType(onlineStoreConfig map[string]interface{}) (redisType, error) {
return t, nil
}

func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error) {
featureCount := len(featureNames)
index := featureCount
featureViewIndices := make(map[string]int)
Expand Down Expand Up @@ -152,7 +153,7 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []types.En
redisKeyToEntityIndex := make(map[string]int)
for i := 0; i < len(entityKeys); i++ {

var key, err = buildRedisKey(r.project, &entityKeys[i])
var key, err = buildRedisKey(r.project, entityKeys[i])
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 3789cf3

Please sign in to comment.