Skip to content

Commit

Permalink
feat(destination): bqstream - add support for batch of properties (#2367
Browse files Browse the repository at this point in the history
)
  • Loading branch information
akashrpo authored Oct 14, 2022
1 parent 8444433 commit 73baf76
Show file tree
Hide file tree
Showing 3 changed files with 268 additions and 43 deletions.
64 changes: 64 additions & 0 deletions mocks/services/streammanager/bqstream/mock_bqstream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 51 additions & 23 deletions services/streammanager/bqstream/bqstreammanager.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:generate mockgen -destination=../../../mocks/services/streammanager/bqstream/mock_bqstream.go -package mock_bqstream github.com/rudderlabs/rudder-server/services/streammanager/bqstream BQClient

package bqstream

import (
Expand Down Expand Up @@ -25,15 +27,33 @@ type Config struct {
TableId string `json:"tableId"`
}

// https://stackoverflow.com/questions/55951812/insert-into-bigquery-without-a-well-defined-struct
type GenericRecord map[string]bigquery.Value

type BQClient interface {
Put(ctx context.Context, datasetID, tableID string, records []*GenericRecord) error
Close() error
}

type BQStreamProducer struct {
Opts common.Opts
Client BQClient
}

type Client struct {
bqClient *bigquery.Client
opts common.Opts
}

// https://stackoverflow.com/questions/55951812/insert-into-bigquery-without-a-well-defined-struct
type genericRecord map[string]bigquery.Value
func (c *Client) Put(ctx context.Context, datasetID, tableID string, records []*GenericRecord) error {
bqInserter := c.bqClient.Dataset(datasetID).Table(tableID).Inserter()
return bqInserter.Put(ctx, records)
}

func (c *Client) Close() error {
return c.bqClient.Close()
}

func (rec genericRecord) Save() (map[string]bigquery.Value, string, error) {
func (rec GenericRecord) Save() (map[string]bigquery.Value, string, error) {
var insertID string
if columnVal, isInsertIdPresent := rec["insertId"]; isInsertIdPresent {
insertID = columnVal.(string)
Expand All @@ -44,12 +64,12 @@ func (rec genericRecord) Save() (map[string]bigquery.Value, string, error) {

var pkgLogger logger.Logger

func init() {
func Init() {
pkgLogger = logger.NewLogger().Child("streammanager").Child("bqstream")
}

type BQStreamProducer struct {
client *Client
func init() {
Init()
}

func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*BQStreamProducer, error) {
Expand Down Expand Up @@ -78,28 +98,37 @@ func NewProducer(destination *backendconfig.DestinationT, o common.Opts) (*BQStr
if err != nil {
return nil, err
}
return &BQStreamProducer{client: &Client{bqClient: bqClient, opts: o}}, nil
return &BQStreamProducer{Client: &Client{bqClient: bqClient}, Opts: o}, nil
}

func (producer *BQStreamProducer) Produce(jsonData json.RawMessage, _ interface{}) (statusCode int, respStatus, responseMessage string) {
client := producer.client
bqClient := client.bqClient
o := client.opts
client := producer.Client
if client == nil {
return http.StatusBadRequest, "Failure", "[BQStream] error :: invalid client"
}
parsedJSON := gjson.ParseBytes(jsonData)
dsId := parsedJSON.Get("datasetId").String()
tblId := parsedJSON.Get("tableId").String()
props := parsedJSON.Get("properties").String()
props := parsedJSON.Get("properties")

var genericRec *genericRecord
err := json.Unmarshal([]byte(props), &genericRec)
if err != nil {
return http.StatusBadRequest, "Failure", createErr(err, "error in unmarshalling data").Error()
var genericRecs []*GenericRecord
if props.IsArray() {
err := json.Unmarshal([]byte(props.String()), &genericRecs)
if err != nil {
return http.StatusBadRequest, "Failure", createErr(err, "error in unmarshalling data").Error()
}
} else {
var genericRec *GenericRecord
err := json.Unmarshal([]byte(props.String()), &genericRec)
if err != nil {
return http.StatusBadRequest, "Failure", createErr(err, "error in unmarshalling data").Error()
}
genericRecs = append(genericRecs, genericRec)
}
bqInserter := bqClient.Dataset(dsId).Table(tblId).Inserter()
ctx, cancel := context.WithTimeout(context.Background(), o.Timeout)
defer cancel()
err = bqInserter.Put(ctx, genericRec)

ctx, cancel := context.WithTimeout(context.Background(), producer.Opts.Timeout)
defer cancel()
err := client.Put(ctx, dsId, tblId, genericRecs)
if err != nil {
if ctx.Err() != nil && errors.Is(err, context.DeadlineExceeded) {
return http.StatusGatewayTimeout, "Failure", createErr(err, "timeout in data insertion").Error()
Expand All @@ -111,13 +140,12 @@ func (producer *BQStreamProducer) Produce(jsonData json.RawMessage, _ interface{
}

func (producer *BQStreamProducer) Close() error {
client := producer.client
client := producer.Client
if client == nil {
return createErr(nil, "error while trying to close the client")
}
bqClient := client.bqClient

err := bqClient.Close()
err := client.Close()
if err != nil {
return createErr(err, "error while closing the client")
}
Expand Down
Loading

0 comments on commit 73baf76

Please sign in to comment.