Skip to content

Commit

Permalink
Add feature logging to http server
Browse files Browse the repository at this point in the history
Signed-off-by: Tsotne Tabidze <[email protected]>
  • Loading branch information
Tsotne Tabidze committed May 17, 2022
1 parent 54c3da8 commit 4b2f0a8
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go/internal/feast/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques
fmt.Printf("Couldn't instantiate logger for feature service %s: %+v", featuresOrService.FeatureService.Name, err)
}

err = logger.Log(entityValuesMap, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId)
err = logger.Log(request.Entities, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId)
if err != nil {
fmt.Printf("LoggerImpl error[%s]: %+v", featuresOrService.FeatureService.Name, err)
}
Expand Down
37 changes: 37 additions & 0 deletions go/internal/feast/server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"github.com/feast-dev/feast/go/internal/feast"
"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/internal/feast/server/logging"
"github.com/feast-dev/feast/go/protos/feast/serving"
prototypes "github.com/feast-dev/feast/go/protos/feast/types"
"github.com/feast-dev/feast/go/types"
"net/http"
)

Expand Down Expand Up @@ -152,6 +154,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {
featureService, err = s.fs.GetFeatureService(*request.FeatureService)
if err != nil {
http.Error(w, fmt.Sprintf("Error getting feature service from registry: %+v", err), http.StatusInternalServerError)
return
}
}
entitiesProto := make(map[string]*prototypes.RepeatedValue)
Expand All @@ -173,6 +176,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {

if err != nil {
http.Error(w, fmt.Sprintf("Error getting feature vector: %+v", err), http.StatusInternalServerError)
return
}

var featureNames []string
Expand Down Expand Up @@ -209,9 +213,42 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) {

if err != nil {
http.Error(w, fmt.Sprintf("Error encoding response: %+v", err), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")

if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil {
logger, err := s.loggingService.GetOrCreateLogger(featureService)
if err != nil {
http.Error(w, fmt.Sprintf("Couldn't instantiate logger for feature service %s: %+v", featureService.Name, err), http.StatusInternalServerError)
return
}

requestId := GenerateRequestId()

// Note: we're converting arrow to proto for feature logging. In the future we should
// base feature logging on arrow so that we don't have to do this extra conversion.
var featureVectorProtos []*serving.GetOnlineFeaturesResponse_FeatureVector
for _, vector := range featureVectors[len(request.Entities):] {
values, err := types.ArrowValuesToProtoValues(vector.Values)
if err != nil {
http.Error(w, fmt.Sprintf("Couldn't convert arrow values into protobuf: %+v", err), http.StatusInternalServerError)
return
}
featureVectorProtos = append(featureVectorProtos, &serving.GetOnlineFeaturesResponse_FeatureVector{
Values: values,
Statuses: vector.Statuses,
EventTimestamps: vector.Timestamps,
})
}

err = logger.Log(entitiesProto, featureVectorProtos, featureNames, requestContextProto, requestId)
if err != nil {
http.Error(w, fmt.Sprintf("LoggerImpl error[%s]: %+v", featureService.Name, err), http.StatusInternalServerError)
return
}
}
}

func (s *httpServer) Serve(host string, port int) error {
Expand Down
8 changes: 4 additions & 4 deletions go/internal/feast/server/logging/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type LogSink interface {
}

type Logger interface {
Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error
Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error
}

type LoggerImpl struct {
Expand Down Expand Up @@ -207,7 +207,7 @@ func getFullFeatureName(featureViewName string, featureName string) string {
return fmt.Sprintf("%s__%s", featureViewName, featureName)
}

func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error {
func (l *LoggerImpl) Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error {
if len(featureVectors) == 0 {
return nil
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur
if !ok {
return errors.Errorf("Missing join key %s in log data", joinKey)
}
entityValues[idx] = rows[rowIdx]
entityValues[idx] = rows.Val[rowIdx]
}

requestDataValues := make([]*types.Value, len(l.schema.RequestData))
Expand Down Expand Up @@ -283,6 +283,6 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur

type DummyLoggerImpl struct{}

func (l *DummyLoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error {
func (l *DummyLoggerImpl) Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error {
return nil
}
12 changes: 11 additions & 1 deletion go/internal/feast/server/logging/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,17 @@ func TestLogAndFlushToFile(t *testing.T) {
assert.Nil(t, err)

assert.Nil(t, logger.Log(
map[string][]*types.Value{"driver_id": {{Val: &types.Value_Int32Val{Int32Val: 111}}}},
map[string]*types.RepeatedValue{
"driver_id": {
Val: []*types.Value{
{
Val: &types.Value_Int32Val{
Int32Val: 111,
},
},
},
},
},
[]*serving.GetOnlineFeaturesResponse_FeatureVector{
{
Values: []*types.Value{{Val: &types.Value_DoubleVal{DoubleVal: 2.0}}},
Expand Down

0 comments on commit 4b2f0a8

Please sign in to comment.