Skip to content

Commit

Permalink
streamClient.Create
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanbohan committed Feb 6, 2024
1 parent dd72587 commit 80375d5
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 9 deletions.
38 changes: 38 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,44 @@ func (c *Client) Write(ctx context.Context, tables ...*table.Table) (*gpb.Grepti
return c.client.Handle(ctx, request_)
}

// Create is like [Write] to write the data into GreptimeDB, but schema is defined in the struct tag.
//
// type monitor struct {
// ID int64 `greptime:"tag;column:id;type:int64"`
// Host string `greptime:"tag;column:host;type:string"`
// Memory uint64 `greptime:"field;column:memory;type:uint64"`
// Cpu float64 `greptime:"field;column:cpu;type:float64"`
// Temperature int64 `greptime:"field;column:temperature;type:int64"`
// Running bool `greptime:"field;column:running;type:boolean"`
// Ts time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"`
// }
//
// func (monitor) TableName() string {
// return monitorTableName
// }
//
// monitors := []monitor{
// {
// ID: randomId(),
// Host: "127.0.0.1",
// Memory: 1,
// Cpu: 1.0,
// Temperature: -1,
// Ts: time1,
// Running: true,
// },
// {
// ID: randomId(),
// Host: "127.0.0.2",
// Memory: 2,
// Cpu: 2.0,
// Temperature: -2,
// Ts: time2,
// Running: true,
// },
// }
//
// resp, err := client.Create(context.Background(), monitors)
func (c *Client) Create(ctx context.Context, body any) (*gpb.GreptimeResponse, error) {
tbl, err := schema.Parse(body)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func (datatype) TableName() string {
}

type monitor struct {
ID int64 `gorm:"primaryKey;column:id" greptime:"tag;column:id;type=int64"`
Host string `gorm:"primaryKey;column:host" greptime:"tag;column=host;type=string"`
Memory uint64 `gorm:"column:memory" greptime:"field;column=memory;type=uint64"`
Cpu float64 `gorm:"column:cpu" greptime:"field;column=cpu;type=float64"`
Temperature int64 `gorm:"column:temperature" greptime:"field;column=temperature;type=int64"`
Running bool `gorm:"column:running" greptime:"field;column=running;type=boolean"`
Ts time.Time `gorm:"column:ts" greptime:"timestamp;column=ts;type=timestamp;precision=millisecond"`
ID int64 `gorm:"primaryKey;column:id" greptime:"tag;column:id;type:int64"`
Host string `gorm:"primaryKey;column:host" greptime:"tag;column:host;type:string"`
Memory uint64 `gorm:"column:memory" greptime:"field;column:memory;type:uint64"`
Cpu float64 `gorm:"column:cpu" greptime:"field;column:cpu;type:float64"`
Temperature int64 `gorm:"column:temperature" greptime:"field;column:temperature;type:int64"`
Running bool `gorm:"column:running" greptime:"field;column:running;type:boolean"`
Ts time.Time `gorm:"column:ts" greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"`
}

func (monitor) TableName() string {
Expand Down
55 changes: 54 additions & 1 deletion client/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@ func NewStreamClient(cfg *config.Config) (*StreamClient, error) {
return &StreamClient{client: client, cfg: cfg}, nil
}

// Send is to send the data into GreptimeDB via explicit schema.
//
// tbl := table.New(<tableName>)
//
// // add column at first. This is to define the schema of the table.
// tbl.AddTagColumn("tag1", types.INT64)
// tbl.AddFieldColumn("field1", types.STRING)
// tbl.AddFieldColumn("field2", types.DOUBLE)
// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECONDS)
//
// // you can add multiple row(s). This is the real data.
// tbl.AddRow(1, "hello", 1.1, time.Now())
//
// // send data into GreptimeDB
// resp, err := streamClient.Send(context.Background(), tbl)
func (c *StreamClient) Send(ctx context.Context, tables ...*table.Table) error {
header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password)
request_, err := request.New(header_, tables...).Build()
Expand All @@ -57,7 +72,45 @@ func (c *StreamClient) Send(ctx context.Context, tables ...*table.Table) error {
return c.client.Send(request_)
}

func (c *StreamClient) Write(ctx context.Context, body any) error {
// Create is like [Send] to send the data into GreptimeDB, but schema is defined in the struct tag.
//
// type monitor struct {
// ID int64 `greptime:"tag;column:id;type:int64"`
// Host string `greptime:"tag;column:host;type:string"`
// Memory uint64 `greptime:"field;column:memory;type:uint64"`
// Cpu float64 `greptime:"field;column:cpu;type:float64"`
// Temperature int64 `greptime:"field;column:temperature;type:int64"`
// Running bool `greptime:"field;column:running;type:boolean"`
// Ts time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"`
// }
//
// func (monitor) TableName() string {
// return monitorTableName
// }
//
// monitors := []monitor{
// {
// ID: randomId(),
// Host: "127.0.0.1",
// Memory: 1,
// Cpu: 1.0,
// Temperature: -1,
// Ts: time1,
// Running: true,
// },
// {
// ID: randomId(),
// Host: "127.0.0.2",
// Memory: 2,
// Cpu: 2.0,
// Temperature: -2,
// Ts: time2,
// Running: true,
// },
// }
//
// resp, err := streamClient.Create(context.Background(), monitors)
func (c *StreamClient) Create(ctx context.Context, body any) error {
tbl, err := schema.Parse(body)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestStreamCreate(t *testing.T) {
},
}

err = streamClient.Write(context.Background(), monitors)
err = streamClient.Create(context.Background(), monitors)
assert.Nil(t, err)
affected, err := streamClient.CloseAndRecv(context.Background())
assert.EqualValues(t, 2, affected.GetValue())
Expand Down

0 comments on commit 80375d5

Please sign in to comment.