From 2da08294ac6dadff1dbcb2d8327b0ea4846d5105 Mon Sep 17 00:00:00 2001 From: yuanbohan Date: Mon, 26 Feb 2024 14:34:33 +0800 Subject: [PATCH] feat: merge client with streamClient (#23) * merge client with streamClient * version and docs --- README.md | 74 ++++++------ client/client.go => client.go | 102 +++++++++++++++- client/stream_client.go | 128 -------------------- client/stream_client_test.go | 152 ------------------------ client/client_test.go => client_test.go | 116 +++++++++++++++++- config/config.go => config.go | 14 +-- examples/README.md | 17 +++ examples/schema/README.md | 22 ++-- examples/schema/main.go | 53 +++++---- examples/tag/README.md | 22 ++-- examples/tag/main.go | 54 +++++---- config/options/options.go => options.go | 8 +- version.go | 2 +- 13 files changed, 341 insertions(+), 423 deletions(-) rename client/client.go => client.go (53%) delete mode 100644 client/stream_client.go delete mode 100644 client/stream_client_test.go rename client/client_test.go => client_test.go (84%) rename config/config.go => config.go (89%) create mode 100644 examples/README.md rename config/options/options.go => options.go (91%) diff --git a/README.md b/README.md index d081279..7bbc493 100644 --- a/README.md +++ b/README.md @@ -6,11 +6,6 @@ Provide API to insert data into GreptimeDB. -## Basic Example - -- [schema](examples/schema/main.go) -- [tag](examples/tag/main.go) - ## How To Use ### Installation @@ -19,39 +14,29 @@ Provide API to insert data into GreptimeDB. go get -u github.com/GreptimeTeam/greptimedb-ingester-go ``` -### Example - -#### Config - -Initiate a Config for Client or StreamClient +### Import ```go -cfg := config.New(""). - WithAuth("", ""). - WithDatabase(database) -``` - -##### Options - -- keepalive +import greptime "github.com/GreptimeTeam/greptimedb-ingester-go" -```go -cfg = cfg.WithKeepalive(30*time.Second, 5*time.Second) ``` -#### Client or StreamClient +### Example -- Client +#### Config + +Initiate a Config for Client ```go -cli, err := client.New(cfg) +cfg := greptime.NewConfig(""). + WithAuth("", ""). + WithDatabase("") ``` -- StreamClient +#### Client ```go - -stream, err := client.NewStreamClient(cfg) +cli, err := greptime.NewClient(cfg) ``` #### Insert & StreamInsert @@ -81,14 +66,14 @@ The **GreptimeDB** column is for the datatypes supported in library, and the **G | BOOLEAN, BOOL | bool | | | STRING | string | | | BINARY, BYTES | []byte | | -| DATE | Int or time.Time | the day elapsed since 1970-1-1 | -| DATETIME | Int or time.Time | the millisecond elapsed since 1970-1-1 | -| TIMESTAMP_SECOND | Int or time.Time | | -| TIMESTAMP_MILLISECOND, TIMESTAMP | Int or time.Time | | -| TIMESTAMP_MICROSECOND | Int or time.Time | | -| TIMESTAMP_NANOSECOND | Int or time.Time | | +| DATE | *Int* or time.Time | the day elapsed since 1970-1-1 | +| DATETIME | *Int* or time.Time | the millisecond elapsed since 1970-1-1 | +| TIMESTAMP_SECOND | *Int* or time.Time | | +| TIMESTAMP_MILLISECOND, TIMESTAMP | *Int* or time.Time | | +| TIMESTAMP_MICROSECOND | *Int* or time.Time | | +| TIMESTAMP_NANOSECOND | *Int* or time.Time | | -NOTE: Int is for all of Integer and Unsigned Integer in Go +NOTE: *Int* is for all of Integer and Unsigned Integer in Go ##### With Schema predefined @@ -97,6 +82,11 @@ you can define schema via Table and Column, and then AddRow to include the real ###### define table schema, and add rows ```go +import( + "github.com/GreptimeTeam/greptimedb-ingester-go/table" + "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" +) + tbl, err := table.New("") tbl.AddTagColumn("id", types.INT64) @@ -108,16 +98,18 @@ err := tbl.AddRow(2, "127.0.0.2", time.Now()) ... ``` -###### Client Write into GreptimeDB +###### Write into GreptimeDB ```go resp, err := cli.Write(context.Background(), tbl) ``` -###### StreamClient Send into GreptimeDB +###### Stream Write into GreptimeDB ```go -err := streamClient.Send(context.Background(), tbl) +err := cli.StreamWrite(context.Background(), tbl) +... +affected, err := cli.CloseStream(ctx) ``` ##### With Struct Tag @@ -130,7 +122,7 @@ If you prefer ORM style, and define column-field relationship via struct field t - `tag`, `field`, `timestamp` is for [SemanticType][data-model], and the value is ignored - `column` is to define the column name - `type` is to define the data type. if type is timestamp, `precision` is supported -- the metadata separator is `;`, and the key value separator is `:` +- the metadata separator is `;` and the key value separator is `:` type supported is the same as described [Datatypes supported](#datatypes-supported), and case insensitive @@ -166,16 +158,18 @@ monitors := []Monitor{ } ``` -###### Client Create into GreptimeDB +###### Create into GreptimeDB ```go resp, err := cli.Create(context.Background(), monitors) ``` -###### StreamClient Create into GreptimeDB +###### Stream Create into GreptimeDB ```go -err := streamClient.Create(context.Background(), monitors) +err := cli.StreamCreate(context.Background(), monitors) +... +affected, err := cli.CloseStream(ctx) ``` #### Query diff --git a/client/client.go b/client.go similarity index 53% rename from client/client.go rename to client.go index 23e12c1..4048332 100644 --- a/client/client.go +++ b/client.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package client +package greptime import ( "context" @@ -20,7 +20,6 @@ import ( gpb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" "google.golang.org/grpc" - "github.com/GreptimeTeam/greptimedb-ingester-go/config" "github.com/GreptimeTeam/greptimedb-ingester-go/request" "github.com/GreptimeTeam/greptimedb-ingester-go/request/header" "github.com/GreptimeTeam/greptimedb-ingester-go/schema" @@ -30,13 +29,15 @@ import ( // Client helps to write data into GreptimeDB. A Client is safe for concurrent // use by multiple goroutines,you can have one Client instance in your application. type Client struct { - cfg *config.Config + cfg *Config client gpb.GreptimeDatabaseClient + + stream gpb.GreptimeDatabase_HandleRequestsClient } -// New helps to create the greptimedb client, which will be responsible write data into GreptimeDB. -func New(cfg *config.Config) (*Client, error) { +// NewClient helps to create the greptimedb client, which will be responsible write data into GreptimeDB. +func NewClient(cfg *Config) (*Client, error) { conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...) if err != nil { return nil, err @@ -116,3 +117,94 @@ func (c *Client) Create(ctx context.Context, body any) (*gpb.GreptimeResponse, e return c.Write(ctx, tbl) } + +// StreamWrite is to send the data into GreptimeDB via explicit schema. +// +// tbl, err := table.New() +// +// // add column at first. This is to define the schema of the table. +// tbl.AddTagColumn("tag1", types.INT64) +// tbl.AddFieldColumn("field1", types.STRING) +// tbl.AddFieldColumn("field2", types.FLOAT64) +// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND) +// +// // you can add multiple row(s). This is the real data. +// tbl.AddRow(1, "hello", 1.1, time.Now()) +// +// // send data into GreptimeDB +// resp, err := client.StreamWrite(context.Background(), tbl) +func (c *Client) StreamWrite(ctx context.Context, tables ...*table.Table) error { + if c.stream == nil { + stream, err := c.client.HandleRequests(ctx) + if err != nil { + return err + } + c.stream = stream + } + + header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) + request_, err := request.New(header_, tables...).Build() + if err != nil { + return err + } + return c.stream.Send(request_) +} + +// StreamCreate is like [StreamWrite] to send the data into GreptimeDB, but schema is defined in the struct tag. +// +// type monitor struct { +// ID int64 `greptime:"tag;column:id;type:int64"` +// Host string `greptime:"tag;column:host;type:string"` +// Memory uint64 `greptime:"field;column:memory;type:uint64"` +// Cpu float64 `greptime:"field;column:cpu;type:float64"` +// Temperature int64 `greptime:"field;column:temperature;type:int64"` +// Running bool `greptime:"field;column:running;type:boolean"` +// Ts time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"` +// } +// +// func (monitor) TableName() string { +// return monitorTableName +// } +// +// monitors := []monitor{ +// { +// ID: randomId(), +// Host: "127.0.0.1", +// Memory: 1, +// Cpu: 1.0, +// Temperature: -1, +// Ts: time1, +// Running: true, +// }, +// { +// ID: randomId(), +// Host: "127.0.0.2", +// Memory: 2, +// Cpu: 2.0, +// Temperature: -2, +// Ts: time2, +// Running: true, +// }, +// } +// +// resp, err := client.StreamCreate(context.Background(), monitors) +func (c *Client) StreamCreate(ctx context.Context, body any) error { + tbl, err := schema.Parse(body) + if err != nil { + return err + } + return c.StreamWrite(ctx, tbl) +} + +// CloseStream closes the stream. Once we’ve finished writing our client’s requests to the stream +// using client.StreamWrite or client.StreamCreate, we need to call client.CloseStream to let +// GreptimeDB know that we’ve finished writing and are expecting to receive a response. +func (c *Client) CloseStream(ctx context.Context) (*gpb.AffectedRows, error) { + resp, err := c.stream.CloseAndRecv() + if err != nil { + return nil, err + } + + c.stream = nil + return resp.GetAffectedRows(), nil +} diff --git a/client/stream_client.go b/client/stream_client.go deleted file mode 100644 index 6491ecd..0000000 --- a/client/stream_client.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2024 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package client - -import ( - "context" - - gpb "github.com/GreptimeTeam/greptime-proto/go/greptime/v1" - "google.golang.org/grpc" - - "github.com/GreptimeTeam/greptimedb-ingester-go/config" - "github.com/GreptimeTeam/greptimedb-ingester-go/request" - "github.com/GreptimeTeam/greptimedb-ingester-go/request/header" - "github.com/GreptimeTeam/greptimedb-ingester-go/schema" - "github.com/GreptimeTeam/greptimedb-ingester-go/table" -) - -// StreamClient is only for inserting -type StreamClient struct { - cfg *config.Config - - client gpb.GreptimeDatabase_HandleRequestsClient -} - -func NewStreamClient(cfg *config.Config) (*StreamClient, error) { - conn, err := grpc.Dial(cfg.GetEndpoint(), cfg.Options().Build()...) - if err != nil { - return nil, err - } - - client, err := gpb.NewGreptimeDatabaseClient(conn).HandleRequests(context.Background()) - if err != nil { - return nil, err - } - - return &StreamClient{client: client, cfg: cfg}, nil -} - -// Send is to send the data into GreptimeDB via explicit schema. -// -// tbl, err := table.New() -// -// // add column at first. This is to define the schema of the table. -// tbl.AddTagColumn("tag1", types.INT64) -// tbl.AddFieldColumn("field1", types.STRING) -// tbl.AddFieldColumn("field2", types.FLOAT64) -// tbl.AddTimestampColumn("timestamp", types.TIMESTAMP_MILLISECOND) -// -// // you can add multiple row(s). This is the real data. -// tbl.AddRow(1, "hello", 1.1, time.Now()) -// -// // send data into GreptimeDB -// resp, err := streamClient.Send(context.Background(), tbl) -func (c *StreamClient) Send(ctx context.Context, tables ...*table.Table) error { - header_ := header.New(c.cfg.Database).WithAuth(c.cfg.Username, c.cfg.Password) - request_, err := request.New(header_, tables...).Build() - if err != nil { - return err - } - return c.client.Send(request_) -} - -// Create is like [Send] to send the data into GreptimeDB, but schema is defined in the struct tag. -// -// type monitor struct { -// ID int64 `greptime:"tag;column:id;type:int64"` -// Host string `greptime:"tag;column:host;type:string"` -// Memory uint64 `greptime:"field;column:memory;type:uint64"` -// Cpu float64 `greptime:"field;column:cpu;type:float64"` -// Temperature int64 `greptime:"field;column:temperature;type:int64"` -// Running bool `greptime:"field;column:running;type:boolean"` -// Ts time.Time `greptime:"timestamp;column:ts;type:timestamp;precision:millisecond"` -// } -// -// func (monitor) TableName() string { -// return monitorTableName -// } -// -// monitors := []monitor{ -// { -// ID: randomId(), -// Host: "127.0.0.1", -// Memory: 1, -// Cpu: 1.0, -// Temperature: -1, -// Ts: time1, -// Running: true, -// }, -// { -// ID: randomId(), -// Host: "127.0.0.2", -// Memory: 2, -// Cpu: 2.0, -// Temperature: -2, -// Ts: time2, -// Running: true, -// }, -// } -// -// resp, err := streamClient.Create(context.Background(), monitors) -func (c *StreamClient) Create(ctx context.Context, body any) error { - tbl, err := schema.Parse(body) - if err != nil { - return err - } - return c.Send(ctx, tbl) -} - -func (c *StreamClient) CloseAndRecv(ctx context.Context) (*gpb.AffectedRows, error) { - resp, err := c.client.CloseAndRecv() - if err != nil { - return nil, err - } - - return resp.GetAffectedRows(), nil -} diff --git a/client/stream_client_test.go b/client/stream_client_test.go deleted file mode 100644 index 9ee7bdf..0000000 --- a/client/stream_client_test.go +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2024 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package client - -import ( - "context" - "fmt" - "log" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/GreptimeTeam/greptimedb-ingester-go/config" - tbl "github.com/GreptimeTeam/greptimedb-ingester-go/table" - "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" -) - -func newStreamClient() *StreamClient { - cfg := config.New(host). - WithPort(grpcPort). - WithDatabase(database). - WithKeepalive(30*time.Second, 5*time.Second) - - client, err := NewStreamClient(cfg) - if err != nil { - log.Fatalf("failed to create client: %s", err.Error()) - } - return client -} - -func TestStreamWrite(t *testing.T) { - loc, err := time.LoadLocation(timezone) - assert.Nil(t, err) - ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() - time1 := time.UnixMilli(ts1).In(loc) - ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() - time2 := time.UnixMilli(ts2).In(loc) - - monitors := []monitor{ - { - ID: randomId(), - Host: "127.0.0.1", - Memory: 1, - Cpu: 1.0, - Temperature: -1, - Ts: time1, - Running: true, - }, - { - ID: randomId(), - Host: "127.0.0.2", - Memory: 2, - Cpu: 2.0, - Temperature: -2, - Ts: time2, - Running: true, - }, - } - - table, err := tbl.New(monitorTableName) - assert.Nil(t, err) - - assert.Nil(t, table.AddTagColumn("id", types.INT64)) - assert.Nil(t, table.AddTagColumn("host", types.STRING)) - assert.Nil(t, table.AddFieldColumn("memory", types.UINT64)) - assert.Nil(t, table.AddFieldColumn("cpu", types.FLOAT64)) - assert.Nil(t, table.AddFieldColumn("temperature", types.INT64)) - assert.Nil(t, table.AddFieldColumn("running", types.BOOLEAN)) - assert.Nil(t, table.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) - - for _, monitor := range monitors { - err := table.AddRow(monitor.ID, monitor.Host, - monitor.Memory, monitor.Cpu, monitor.Temperature, monitor.Running, - monitor.Ts) - assert.Nil(t, err) - } - - streamClient := newStreamClient() - err = streamClient.Send(context.Background(), table) - assert.Nil(t, err) - affected, err := streamClient.CloseAndRecv(context.Background()) - assert.EqualValues(t, 2, affected.GetValue()) - assert.Nil(t, err) - - monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) - assert.Nil(t, err) - - assert.Equal(t, len(monitors), len(monitors_)) - - for i, monitor_ := range monitors_ { - assert.Equal(t, monitors[i], monitor_) - } -} - -func TestStreamCreate(t *testing.T) { - loc, err := time.LoadLocation(timezone) - assert.Nil(t, err) - ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() - time1 := time.UnixMilli(ts1).In(loc) - ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() - time2 := time.UnixMilli(ts2).In(loc) - - monitors := []monitor{ - { - ID: randomId(), - Host: "127.0.0.1", - Memory: 1, - Cpu: 1.0, - Temperature: -1, - Ts: time1, - Running: true, - }, - { - ID: randomId(), - Host: "127.0.0.2", - Memory: 2, - Cpu: 2.0, - Temperature: -2, - Ts: time2, - Running: true, - }, - } - - streamClient := newStreamClient() - err = streamClient.Create(context.Background(), monitors) - assert.Nil(t, err) - affected, err := streamClient.CloseAndRecv(context.Background()) - assert.EqualValues(t, 2, affected.GetValue()) - assert.Nil(t, err) - - monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) - assert.Nil(t, err) - - assert.Equal(t, len(monitors), len(monitors_)) - - for i, monitor_ := range monitors_ { - assert.Equal(t, monitors[i], monitor_) - } -} diff --git a/client/client_test.go b/client_test.go similarity index 84% rename from client/client_test.go rename to client_test.go index 5123272..72d5342 100644 --- a/client/client_test.go +++ b/client_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package client +package greptime import ( "context" @@ -30,7 +30,6 @@ import ( "gorm.io/driver/mysql" "gorm.io/gorm" - "github.com/GreptimeTeam/greptimedb-ingester-go/config" tbl "github.com/GreptimeTeam/greptimedb-ingester-go/table" "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" ) @@ -38,7 +37,6 @@ import ( //TODO(yuanbohan): // unmatched length of columns in rows and columns in schema // support pointer -// write pojo // timeout test var ( @@ -147,12 +145,12 @@ func (p *Mysql) AllDatatypes() ([]datatype, error) { } func newClient() *Client { - cfg := config.New(host). + cfg := NewConfig(host). WithPort(grpcPort). WithDatabase(database). WithKeepalive(30*time.Second, 5*time.Second) - client, err := New(cfg) + client, err := NewClient(cfg) if err != nil { log.Fatalf("failed to create client: %s", err.Error()) } @@ -516,3 +514,111 @@ func TestInsertAllDatatypes(t *testing.T) { // MySQL protocol only supports microsecond precision for TIMESTAMP assert.EqualValues(t, time_.UnixNano()/1000, result.TIMESTAMP_NANOSECOND_INT.UnixNano()/1000) } + +func TestStreamWrite(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + } + + table, err := tbl.New(monitorTableName) + assert.Nil(t, err) + + assert.Nil(t, table.AddTagColumn("id", types.INT64)) + assert.Nil(t, table.AddTagColumn("host", types.STRING)) + assert.Nil(t, table.AddFieldColumn("memory", types.UINT64)) + assert.Nil(t, table.AddFieldColumn("cpu", types.FLOAT64)) + assert.Nil(t, table.AddFieldColumn("temperature", types.INT64)) + assert.Nil(t, table.AddFieldColumn("running", types.BOOLEAN)) + assert.Nil(t, table.AddTimestampColumn("ts", types.TIMESTAMP_MILLISECOND)) + + for _, monitor := range monitors { + err := table.AddRow(monitor.ID, monitor.Host, + monitor.Memory, monitor.Cpu, monitor.Temperature, monitor.Running, + monitor.Ts) + assert.Nil(t, err) + } + + err = cli.StreamWrite(context.Background(), table) + assert.Nil(t, err) + affected, err := cli.CloseStream(context.Background()) + assert.EqualValues(t, 2, affected.GetValue()) + assert.Nil(t, err) + + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) + assert.Nil(t, err) + + assert.Equal(t, len(monitors), len(monitors_)) + + for i, monitor_ := range monitors_ { + assert.Equal(t, monitors[i], monitor_) + } +} + +func TestStreamCreate(t *testing.T) { + loc, err := time.LoadLocation(timezone) + assert.Nil(t, err) + ts1 := time.Now().Add(-1 * time.Minute).UnixMilli() + time1 := time.UnixMilli(ts1).In(loc) + ts2 := time.Now().Add(-2 * time.Minute).UnixMilli() + time2 := time.UnixMilli(ts2).In(loc) + + monitors := []monitor{ + { + ID: randomId(), + Host: "127.0.0.1", + Memory: 1, + Cpu: 1.0, + Temperature: -1, + Ts: time1, + Running: true, + }, + { + ID: randomId(), + Host: "127.0.0.2", + Memory: 2, + Cpu: 2.0, + Temperature: -2, + Ts: time2, + Running: true, + }, + } + + err = cli.StreamCreate(context.Background(), monitors) + assert.Nil(t, err) + affected, err := cli.CloseStream(context.Background()) + assert.EqualValues(t, 2, affected.GetValue()) + assert.Nil(t, err) + + monitors_, err := db.Query(fmt.Sprintf("select * from %s where id in %s order by host asc", monitorTableName, getMonitorsIds(monitors))) + assert.Nil(t, err) + + assert.Equal(t, len(monitors), len(monitors_)) + + for i, monitor_ := range monitors_ { + assert.Equal(t, monitors[i], monitor_) + } +} diff --git a/config/config.go b/config.go similarity index 89% rename from config/config.go rename to config.go index cc96d2b..8b2527e 100644 --- a/config/config.go +++ b/config.go @@ -12,13 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package config +package greptime import ( "fmt" "time" - - "github.com/GreptimeTeam/greptimedb-ingester-go/config/options" ) // Config is to define how the Client behaves. @@ -42,8 +40,8 @@ type Config struct { keepaliveTimeout time.Duration } -// New helps to init Config with host only -func New(host string) *Config { +// NewConfig helps to init Config with host only +func NewConfig(host string) *Config { return &Config{ Host: host, Port: 4001, @@ -79,12 +77,12 @@ func (c *Config) GetEndpoint() string { return fmt.Sprintf("%s:%d", c.Host, c.Port) } -func (c *Config) Options() *options.Options { +func (c *Config) Options() *Options { if c.keepaliveInterval == 0 && c.keepaliveTimeout == 0 { return nil } - keepalive := options.NewKeepaliveOptions() + keepalive := NewKeepaliveOptions() if c.keepaliveInterval != 0 { keepalive.WithInterval(c.keepaliveInterval) @@ -94,5 +92,5 @@ func (c *Config) Options() *options.Options { keepalive.WithTimeout(c.keepaliveTimeout) } - return options.New(keepalive) + return NewOptions(keepalive) } diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..fd3852c --- /dev/null +++ b/examples/README.md @@ -0,0 +1,17 @@ +# Insert data into GreptimeDB + +## Start local GreptimeDB instance via Docker + +```shell +docker run --rm -p 4000-4003:4000-4003 \ +--name greptime greptime/greptimedb standalone start \ +--http-addr 0.0.0.0:4000 \ +--rpc-addr 0.0.0.0:4001 \ +--mysql-addr 0.0.0.0:4002 \ +--postgres-addr 0.0.0.0:4003 +``` + +## Insert + +- [schema](schema/README.md) +- [tag](tag/README.md) diff --git a/examples/schema/README.md b/examples/schema/README.md index 0a33aa3..4d1cdf2 100644 --- a/examples/schema/README.md +++ b/examples/schema/README.md @@ -1,16 +1,5 @@ # Insert data into GreptimeDB -## start GreptimeDB via Docker - -```shell -docker run --rm -p 4000-4003:4000-4003 \ ---name greptime greptime/greptimedb standalone start \ ---http-addr 0.0.0.0:4000 \ ---rpc-addr 0.0.0.0:4001 \ ---mysql-addr 0.0.0.0:4002 \ ---postgres-addr 0.0.0.0:4003 -``` - ## Insert ```go @@ -20,7 +9,8 @@ go run main.go Output: ```log -2024/02/07 11:26:26 affected rows: 2 +2024/02/18 11:06:20 affected rows: 2 +2024/02/18 11:06:20 affected rows: 2 ``` ## Query @@ -49,8 +39,10 @@ mysql> select * from monitors_with_schema; +------+-------+-------------+----------------------------+ | id | host | temperature | timestamp | +------+-------+-------------+----------------------------+ -| 1 | hello | 1.1 | 2024-02-07 03:26:26.467898 | -| 2 | hello | 2.2 | 2024-02-07 03:26:26.467900 | +| 1 | hello | 1.1 | 2024-02-18 03:12:05.033351 | +| 1 | hello | 1.1 | 2024-02-18 03:12:05.158068 | +| 2 | hello | 2.2 | 2024-02-18 03:12:05.033352 | +| 2 | hello | 2.2 | 2024-02-18 03:12:05.158076 | +------+-------+-------------+----------------------------+ -2 rows in set (0.03 sec) +4 rows in set (0.01 sec) ``` diff --git a/examples/schema/main.go b/examples/schema/main.go index c293ef8..c77cec9 100644 --- a/examples/schema/main.go +++ b/examples/schema/main.go @@ -19,34 +19,26 @@ import ( "log" "time" - "github.com/GreptimeTeam/greptimedb-ingester-go/client" - "github.com/GreptimeTeam/greptimedb-ingester-go/config" + greptime "github.com/GreptimeTeam/greptimedb-ingester-go" "github.com/GreptimeTeam/greptimedb-ingester-go/table" "github.com/GreptimeTeam/greptimedb-ingester-go/table/types" ) var ( - cli *client.Client - stream *client.StreamClient + client *greptime.Client ) func init() { - cfg := config.New("127.0.0.1").WithDatabase("public") + cfg := greptime.NewConfig("127.0.0.1").WithDatabase("public") - cli_, err := client.New(cfg) + cli_, err := greptime.NewClient(cfg) if err != nil { log.Panic(err) } - cli = cli_ - - stream_, err := client.NewStreamClient(cfg) - if err != nil { - log.Panic(err) - } - stream = stream_ + client = cli_ } -func main() { +func data() *table.Table { tbl, err := table.New("monitors_with_schema") if err != nil { log.Println(err) @@ -73,18 +65,31 @@ func main() { log.Println(err) } - { // client write data into GreptimeDB - resp, err := cli.Write(context.Background(), tbl) - if err != nil { - log.Println(err) - } - log.Printf("affected rows: %d\n", resp.GetAffectedRows().GetValue()) + return tbl +} + +func write() { + resp, err := client.Write(context.Background(), data()) + if err != nil { + log.Println(err) } + log.Printf("affected rows: %d\n", resp.GetAffectedRows().GetValue()) +} - { // stream client send data into GreptimeDB - if err := stream.Send(context.Background(), tbl); err != nil { - log.Println(err) - } +func streamWrite() { + ctx := context.Background() + if err := client.StreamWrite(ctx, data()); err != nil { + log.Println(err) } + affected, err := client.CloseStream(ctx) + if err != nil { + log.Fatalln(err) + } + log.Printf("affected rows: %d\n", affected.GetValue()) +} +func main() { + write() + time.Sleep(time.Millisecond * 100) + streamWrite() } diff --git a/examples/tag/README.md b/examples/tag/README.md index 947f775..b3cf89f 100644 --- a/examples/tag/README.md +++ b/examples/tag/README.md @@ -1,16 +1,5 @@ # Insert data into GreptimeDB -## start GreptimeDB via Docker - -```shell -docker run --rm -p 4000-4003:4000-4003 \ ---name greptime greptime/greptimedb standalone start \ ---http-addr 0.0.0.0:4000 \ ---rpc-addr 0.0.0.0:4001 \ ---mysql-addr 0.0.0.0:4002 \ ---postgres-addr 0.0.0.0:4003 -``` - ## Insert ```go @@ -26,7 +15,8 @@ Output: ## Query ```shell -mysql -h 127.0.0.1 -P 4002 public +2024/02/18 11:14:54 affected rows: 2 +2024/02/18 11:14:54 affected rows: 2 ``` ```shell @@ -49,8 +39,10 @@ mysql> select * from monitors_with_tag; +------+-----------+--------+------+-------------+---------+----------------------------+ | id | host | memory | cpu | temperature | running | ts | +------+-----------+--------+------+-------------+---------+----------------------------+ -| 1 | 127.0.0.1 | 1 | 1 | -1 | 1 | 2024-02-07 03:29:31.625000 | -| 2 | 127.0.0.2 | 2 | 2 | -2 | 1 | 2024-02-07 03:29:31.625000 | +| 1 | 127.0.0.1 | 1 | 1 | -1 | 1 | 2024-02-18 03:14:54.116000 | +| 1 | 127.0.0.1 | 1 | 1 | -1 | 1 | 2024-02-18 03:14:54.242000 | +| 2 | 127.0.0.2 | 2 | 2 | -2 | 1 | 2024-02-18 03:14:54.116000 | +| 2 | 127.0.0.2 | 2 | 2 | -2 | 1 | 2024-02-18 03:14:54.242000 | +------+-----------+--------+------+-------------+---------+----------------------------+ -2 rows in set (0.01 sec) +4 rows in set (0.01 sec) ``` diff --git a/examples/tag/main.go b/examples/tag/main.go index 5cd5841..0b628b4 100644 --- a/examples/tag/main.go +++ b/examples/tag/main.go @@ -19,29 +19,21 @@ import ( "log" "time" - "github.com/GreptimeTeam/greptimedb-ingester-go/client" - "github.com/GreptimeTeam/greptimedb-ingester-go/config" + greptime "github.com/GreptimeTeam/greptimedb-ingester-go" ) var ( - cli *client.Client - stream *client.StreamClient + client *greptime.Client ) func init() { - cfg := config.New("127.0.0.1").WithDatabase("public") + cfg := greptime.NewConfig("127.0.0.1").WithDatabase("public") - cli_, err := client.New(cfg) + cli_, err := greptime.NewClient(cfg) if err != nil { log.Panic(err) } - cli = cli_ - - stream_, err := client.NewStreamClient(cfg) - if err != nil { - log.Panic(err) - } - stream = stream_ + client = cli_ } type Monitor struct { @@ -58,8 +50,8 @@ func (Monitor) TableName() string { return "monitors_with_tag" } -func main() { - monitors := []Monitor{ +func data() []Monitor { + return []Monitor{ { ID: 1, Host: "127.0.0.1", @@ -79,18 +71,30 @@ func main() { Running: true, }, } +} - { // client write data into GreptimeDB - resp, err := cli.Create(context.Background(), monitors) - if err != nil { - log.Fatal(err) - } - log.Printf("affected rows: %d\n", resp.GetAffectedRows().GetValue()) +func create() { + resp, err := client.Create(context.Background(), data()) + if err != nil { + log.Fatal(err) } + log.Printf("affected rows: %d\n", resp.GetAffectedRows().GetValue()) +} - { // stream client send data into GreptimeDB - if err := stream.Create(context.Background(), monitors); err != nil { - log.Fatal(err) - } +func streamCreate() { + ctx := context.Background() + if err := client.StreamCreate(ctx, data()); err != nil { + log.Println(err) + } + affected, err := client.CloseStream(ctx) + if err != nil { + log.Fatalln(err) } + log.Printf("affected rows: %d\n", affected.GetValue()) +} + +func main() { + create() + time.Sleep(time.Millisecond * 100) + streamCreate() } diff --git a/config/options/options.go b/options.go similarity index 91% rename from config/options/options.go rename to options.go index fbeccfb..2a19f9c 100644 --- a/config/options/options.go +++ b/options.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package options +package greptime import ( "time" @@ -20,12 +20,10 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" - - greptime "github.com/GreptimeTeam/greptimedb-ingester-go" ) var ( - uaOpt = grpc.WithUserAgent("greptimedb-ingester-go/" + greptime.Version) + uaOpt = grpc.WithUserAgent("greptimedb-ingester-go/" + Version) // TODO(yuanbohan): SecurityOptions insecureOpt = grpc.WithTransportCredentials(insecure.NewCredentials()) @@ -38,7 +36,7 @@ type Options struct { keepalive *KeepaliveOption } -func New(keepalive *KeepaliveOption) *Options { +func NewOptions(keepalive *KeepaliveOption) *Options { return &Options{ keepalive: keepalive, } diff --git a/version.go b/version.go index 09c1798..a5b5e89 100644 --- a/version.go +++ b/version.go @@ -14,4 +14,4 @@ package greptime -const Version = "v0.1.1" // THIS MUST BE THE SAME AS THE VERSION in GitHub release +const Version = "v0.2.0" // THIS MUST BE THE SAME AS THE VERSION in GitHub release