Skip to content

Commit

Permalink
[modify] refine producer and consumer examples
Browse files Browse the repository at this point in the history
[fix] set body to nil when send http GET or DELETE
  • Loading branch information
shabicheng committed Jul 14, 2021
1 parent d1c9ae9 commit 7b3f372
Show file tree
Hide file tree
Showing 10 changed files with 26 additions and 68 deletions.
2 changes: 1 addition & 1 deletion consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ consumerWorker.Start()

```
ch:=make(chan os.Signal) //将os信号值作为信道
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
consumerWorker.Start()
if _, ok := <-ch; ok { // 当获取到os停止信号以后,例如ctrl+c触发的os信号,会调用消费者退出方法进行退出。
consumerWorker.StopAndWait()
Expand Down
9 changes: 5 additions & 4 deletions example/consumer/copy_data/copy_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package main

import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/consumer"
"github.com/go-kit/kit/log/level"
"os"
"os/signal"

sls "github.com/aliyun/aliyun-log-go-sdk"
consumerLibrary "github.com/aliyun/aliyun-log-go-sdk/consumer"
"github.com/go-kit/kit/log/level"
)

// README :
Expand Down Expand Up @@ -47,7 +48,7 @@ func main() {
}
consumerWorker := consumerLibrary.InitConsumerWorker(option, process)
ch := make(chan os.Signal)
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
consumerWorker.Start()
if _, ok := <-ch; ok {
level.Info(consumerWorker.Logger).Log("msg", "get stop signal, start to stop consumer worker", "consumer worker name", option.ConsumerName)
Expand Down
2 changes: 1 addition & 1 deletion example/producer/native_pb_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func main() {
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
var m sync.WaitGroup
for i := 0; i < 10; i++ {
Expand Down
9 changes: 5 additions & 4 deletions example/producer/performance_test_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package main

import (
"fmt"
"github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/producer"
"github.com/gogo/protobuf/proto"
"math/rand"
"os"
"os/signal"
"runtime"
"time"

sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/aliyun/aliyun-log-go-sdk/producer"
"github.com/gogo/protobuf/proto"
)

var valueList [][]*string
Expand All @@ -28,7 +29,7 @@ func main() {

producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
fmt.Println("start send logs")
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion example/producer/producer_simple_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func main() {
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
var m sync.WaitGroup
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion example/producer/simple_callback_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
var m sync.WaitGroup
callBack := &Callback{}
Expand Down
2 changes: 1 addition & 1 deletion example/producer/use_ststoken_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func main() {
producerConfig.UpdateStsToken = updateStsToken
producerInstance := producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
var m sync.WaitGroup
for i := 0; i < 10; i++ {
Expand Down
54 changes: 8 additions & 46 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,27 +754,15 @@ func (s *LogStore) UpdateIndexString(indexStr string) error {

// DeleteIndex ...
func (s *LogStore) DeleteIndex() error {
type Body struct {
project string `json:"projectName"`
store string `json:"logstoreName"`
}

body, err := json.Marshal(Body{
project: s.project.Name,
store: s.Name,
})
if err != nil {
return err
}

h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
}

uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "DELETE", uri, h, body)
r, err := request(s.project, "DELETE", uri, h, nil)
if r != nil {
r.Body.Close()
}
Expand All @@ -783,27 +771,14 @@ func (s *LogStore) DeleteIndex() error {

// GetIndex ...
func (s *LogStore) GetIndex() (*Index, error) {
type Body struct {
project string `json:"projectName"`
store string `json:"logstoreName"`
}

body, err := json.Marshal(Body{
project: s.project.Name,
store: s.Name,
})
if err != nil {
return nil, err
}

h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
"x-log-bodyrawsize": "0",
"Accept-Encoding": "deflate",
}

uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "GET", uri, h, body)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return nil, err
}
Expand All @@ -820,27 +795,14 @@ func (s *LogStore) GetIndex() (*Index, error) {

// GetIndexString ...
func (s *LogStore) GetIndexString() (string, error) {
type Body struct {
project string `json:"projectName"`
store string `json:"logstoreName"`
}

body, err := json.Marshal(Body{
project: s.project.Name,
store: s.Name,
})
if err != nil {
return "", err
}

h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
"Accept-Encoding": "deflate", // TODO: support lz4
"x-log-bodyrawsize": "0",
"Accept-Encoding": "deflate",
}

uri := fmt.Sprintf("/logstores/%s/index", s.Name)
r, err := request(s.project, "GET", uri, h, body)
r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion producer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ producerConfig.AccessKeyID = os.Getenv("AccessKeyID")
producerConfig.AccessKeySecret = os.Getenv("AccessKeySecret")
producerInstance:=producer.InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start() // 启动producer实例
```

Expand Down
10 changes: 2 additions & 8 deletions producer/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"time"
)



type Callback struct {
t *testing.T
}
Expand All @@ -34,7 +32,6 @@ func (callback *Callback) Fail(result *Result) {
callback.t.Error("Failed to get error code")
}


}

func TestProducer_CallBack(t *testing.T) {
Expand All @@ -44,7 +41,7 @@ func TestProducer_CallBack(t *testing.T) {
producerConfig.AccessKeySecret = ""
producerInstance := InitProducer(producerConfig)
ch := make(chan os.Signal)
signal.Notify(ch)
signal.Notify(ch, os.Kill, os.Interrupt)
producerInstance.Start()
var m sync.WaitGroup
callBack := &Callback{}
Expand All @@ -68,7 +65,4 @@ func TestProducer_CallBack(t *testing.T) {
producerInstance.Close(60000)
}




}
}

0 comments on commit 7b3f372

Please sign in to comment.