diff --git a/go/internal/feast/server/grpc_server.go b/go/internal/feast/server/grpc_server.go index 6040880959..c47d185d6c 100644 --- a/go/internal/feast/server/grpc_server.go +++ b/go/internal/feast/server/grpc_server.go @@ -86,7 +86,7 @@ func (s *grpcServingServiceServer) GetOnlineFeatures(ctx context.Context, reques fmt.Printf("Couldn't instantiate logger for feature service %s: %+v", featuresOrService.FeatureService.Name, err) } - err = logger.Log(entityValuesMap, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId) + err = logger.Log(request.Entities, resp.Results[len(request.Entities):], resp.Metadata.FeatureNames.Val[len(request.Entities):], request.RequestContext, requestId) if err != nil { fmt.Printf("LoggerImpl error[%s]: %+v", featuresOrService.FeatureService.Name, err) } diff --git a/go/internal/feast/server/http_server.go b/go/internal/feast/server/http_server.go index d4d1b7524b..d126f848f9 100644 --- a/go/internal/feast/server/http_server.go +++ b/go/internal/feast/server/http_server.go @@ -7,7 +7,9 @@ import ( "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/server/logging" + "github.com/feast-dev/feast/go/protos/feast/serving" prototypes "github.com/feast-dev/feast/go/protos/feast/types" + "github.com/feast-dev/feast/go/types" "net/http" ) @@ -152,6 +154,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { featureService, err = s.fs.GetFeatureService(*request.FeatureService) if err != nil { http.Error(w, fmt.Sprintf("Error getting feature service from registry: %+v", err), http.StatusInternalServerError) + return } } entitiesProto := make(map[string]*prototypes.RepeatedValue) @@ -173,6 +176,7 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { if err != nil { http.Error(w, fmt.Sprintf("Error getting feature vector: %+v", err), http.StatusInternalServerError) + return } var featureNames []string @@ -209,9 +213,42 @@ func (s *httpServer) getOnlineFeatures(w http.ResponseWriter, r *http.Request) { if err != nil { http.Error(w, fmt.Sprintf("Error encoding response: %+v", err), http.StatusInternalServerError) + return } w.Header().Set("Content-Type", "application/json") + + if featureService != nil && featureService.LoggingConfig != nil && s.loggingService != nil { + logger, err := s.loggingService.GetOrCreateLogger(featureService) + if err != nil { + http.Error(w, fmt.Sprintf("Couldn't instantiate logger for feature service %s: %+v", featureService.Name, err), http.StatusInternalServerError) + return + } + + requestId := GenerateRequestId() + + // Note: we're converting arrow to proto for feature logging. In the future we should + // base feature logging on arrow so that we don't have to do this extra conversion. + var featureVectorProtos []*serving.GetOnlineFeaturesResponse_FeatureVector + for _, vector := range featureVectors[len(request.Entities):] { + values, err := types.ArrowValuesToProtoValues(vector.Values) + if err != nil { + http.Error(w, fmt.Sprintf("Couldn't convert arrow values into protobuf: %+v", err), http.StatusInternalServerError) + return + } + featureVectorProtos = append(featureVectorProtos, &serving.GetOnlineFeaturesResponse_FeatureVector{ + Values: values, + Statuses: vector.Statuses, + EventTimestamps: vector.Timestamps, + }) + } + + err = logger.Log(entitiesProto, featureVectorProtos, featureNames, requestContextProto, requestId) + if err != nil { + http.Error(w, fmt.Sprintf("LoggerImpl error[%s]: %+v", featureService.Name, err), http.StatusInternalServerError) + return + } + } } func (s *httpServer) Serve(host string, port int) error { diff --git a/go/internal/feast/server/logging/logger.go b/go/internal/feast/server/logging/logger.go index d7ed1fbe18..cbf1c3439a 100644 --- a/go/internal/feast/server/logging/logger.go +++ b/go/internal/feast/server/logging/logger.go @@ -42,7 +42,7 @@ type LogSink interface { } type Logger interface { - Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error + Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error } type LoggerImpl struct { @@ -207,7 +207,7 @@ func getFullFeatureName(featureViewName string, featureName string) string { return fmt.Sprintf("%s__%s", featureViewName, featureName) } -func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error { +func (l *LoggerImpl) Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error { if len(featureVectors) == 0 { return nil } @@ -250,7 +250,7 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur if !ok { return errors.Errorf("Missing join key %s in log data", joinKey) } - entityValues[idx] = rows[rowIdx] + entityValues[idx] = rows.Val[rowIdx] } requestDataValues := make([]*types.Value, len(l.schema.RequestData)) @@ -283,6 +283,6 @@ func (l *LoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featur type DummyLoggerImpl struct{} -func (l *DummyLoggerImpl) Log(joinKeyToEntityValues map[string][]*types.Value, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error { +func (l *DummyLoggerImpl) Log(joinKeyToEntityValues map[string]*types.RepeatedValue, featureVectors []*serving.GetOnlineFeaturesResponse_FeatureVector, featureNames []string, requestData map[string]*types.RepeatedValue, requestId string) error { return nil } diff --git a/go/internal/feast/server/logging/logger_test.go b/go/internal/feast/server/logging/logger_test.go index 5625b05a76..4ce883c75b 100644 --- a/go/internal/feast/server/logging/logger_test.go +++ b/go/internal/feast/server/logging/logger_test.go @@ -90,7 +90,17 @@ func TestLogAndFlushToFile(t *testing.T) { assert.Nil(t, err) assert.Nil(t, logger.Log( - map[string][]*types.Value{"driver_id": {{Val: &types.Value_Int32Val{Int32Val: 111}}}}, + map[string]*types.RepeatedValue{ + "driver_id": { + Val: []*types.Value{ + { + Val: &types.Value_Int32Val{ + Int32Val: 111, + }, + }, + }, + }, + }, []*serving.GetOnlineFeaturesResponse_FeatureVector{ { Values: []*types.Value{{Val: &types.Value_DoubleVal{DoubleVal: 2.0}}},