Skip to content

Commit

Permalink
add metricstore send support (#261)
Browse files Browse the repository at this point in the history
* add metricstore send support

* add metricstore send support

* add metricstore send support

* add metricstore send support

* add metricstore send support

* add metricstore send support

* add metricstore send support

* add metricstore send support

* add metricstore send support

* add metricstore send support

* add metricstore send support
  • Loading branch information
EvanLjp authored Mar 27, 2024
1 parent 0ed38c6 commit 7b9b95d
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 34 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ go get -u github.com/aliyun/aliyun-log-go-sdk

5. **写数据**

这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失。
这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失, 对于MetricStore类型, 使用`PutLogsWithMetricStoreURL`API 发送数据可以提升大基数时间线下查询性能

```go
logs := []*sls.Log{}
Expand Down
1 change: 1 addition & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type ClientInterface interface {
MergeShards(project, logstore string, shardID int) (shards []*Shard, err error)

// #################### Log Operations #####################
PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error)
// PutLogs put logs into logstore.
// The callers should transform user logs into LogGroup.
PutLogs(project, logstore string, lg *LogGroup) (err error)
Expand Down
6 changes: 6 additions & 0 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKe
return ls.PostLogStoreLogs(lg, hashKey)
}

func (c *Client) PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error) {
ls := convertLogstore(c, project, logstore)
ls.useMetricStoreURL = true
return ls.PutLogs(lg)
}

// PostRawLogWithCompressType put raw log data to log service, no marshal
func (c *Client) PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) {
ls := convertLogstore(c, project, logstore)
Expand Down
11 changes: 8 additions & 3 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type LogStore struct {
putLogCompressType int
EncryptConf *EncryptConf `json:"encrypt_conf,omitempty"`
ProductType string `json:"productType,omitempty"`
useMetricStoreURL bool
}

// Shard defines shard struct
Expand Down Expand Up @@ -303,8 +304,12 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
}
outLen = len(out)
}

uri := fmt.Sprintf("/logstores/%v", s.Name)
var uri string
if s.useMetricStoreURL {
uri = fmt.Sprintf("/prometheus/%s/%s/api/v1/write", s.project.Name, s.Name)
} else {
uri = fmt.Sprintf("/logstores/%v", s.Name)
}
r, err := request(s.project, "POST", uri, h, out[:outLen])
if err != nil {
return NewClientError(err)
Expand All @@ -329,7 +334,7 @@ func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) {
return nil
}

if hashKey == nil || *hashKey == "" {
if hashKey == nil || *hashKey == "" || s.useMetricStoreURL {
// empty hash call PutLogs
return s.PutLogs(lg)
}
Expand Down
59 changes: 30 additions & 29 deletions producer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,35 +110,36 @@ func(callback *Callback)Fail(result *producer.Result){

## **producer配置详解**

| 参数 | 类型 | 描述 |
| ------------------- | ------ | ------------------------------------------------------------ |
| TotalSizeLnBytes | Int64 | 单个 producer 实例能缓存的日志大小上限,默认为 100MB。 |
| MaxIoWorkerCount | Int64 | 单个producer能并发的最多groutine的数量,默认为50,该参数用户可以根据自己实际服务器的性能去配置。 |
| MaxBlockSec | Int | 如果 producer 可用空间不足,调用者在 send 方法上的最大阻塞时间,默认为 60 秒。<br/>如果超过这个时间后所需空间仍无法得到满足,send 方法会抛出TimeoutException。如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。如果您希望 send 方法一直阻塞直到所需空间得到满足,可将该值设为负数。 |
| MaxBatchSize | Int64 | 当一个 ProducerBatch 中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该 batch 将被发送,默认为 512 KB,最大可设置成 5MB。 |
| MaxBatchCount | Int | 当一个 ProducerBatch 中缓存的日志条数大于等于 batchCountThreshold 时,该 batch 将被发送,默认为 4096,最大可设置成 40960。 |
| LingerMs | Int64 | 一个 ProducerBatch 从创建到可发送的逗留时间,默认为 2 秒,最小可设置成 100 毫秒。 |
| Retries | Int | 如果某个 ProducerBatch 首次发送失败,能够对其重试的次数,默认为 10 次。<br/>如果 retries 小于等于 0,该 ProducerBatch 首次发送失败后将直接进入失败队列。 |
| MaxReservedAttempts | Int | 每个 ProducerBatch 每次被尝试发送都对应着一个 Attemp,此参数用来控制返回给用户的 attempt 个数,默认只保留最近的 11 次 attempt 信息。<br/>该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 |
| BaseRetryBackoffMs | Int64 | 首次重试的退避时间,默认为 100 毫秒。 Producer 采样指数退避算法,第 N 次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 |
| MaxRetryBackoffMs | Int64 | 重试的最大退避时间,默认为 50 秒。 |
| AdjustShargHash | Bool | 如果调用 send 方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为 true。 |
| Buckets | Int | 当且仅当 adjustShardHash 为 true 时,该参数才生效。此时,producer 会自动将 shardHash 重新分组,分组数量为 buckets。<br/>如果两条数据的 shardHash 不同,它们是无法合并到一起发送的,会降低 producer 吞吐量。将 shardHash 重新分组后,能让数据有更多地机会被批量发送。该参数的取值范围是 [1, 256],且必须是 2 的整数次幂,默认为 64。 |
| AllowLogLevel | String | 设置日志输出级别,默认值是Info,consumer中一共有4种日志输出级别,分别为debug,info,warn和error。 |
| LogFileName | String | 日志文件输出路径,不设置的话默认输出到stdout。 |
| IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 |
| LogMaxSize | Int | 单个日志存储数量,默认为10M。 |
| LogMaxBackups | Int | 日志轮转数量,默认为10。 |
| LogCompass | Bool | 是否使用gzip 压缩日志,默认为false。 |
| Endpoint | String | 服务入口,关于如何确定project对应的服务入口可参考文章[服务入口](https://help.aliyun.com/document_detail/29008.html?spm=a2c4e.11153940.blogcont682761.14.446e7720gs96LB)|
| AccessKeyID | String | 账户的AK id。 |
| AccessKeySecret | String | 账户的AK 密钥。 |
|CredentialsProvider|Interface|可选,可自定义CredentialsProvider,来提供动态的 AccessKeyId/AccessKeySecret/StsToken,该接口应当缓存 AK,且必须线程安全|
| NoRetryStatusCodeList | []int | 用户配置的不需要重试的错误码列表,当发送日志失败时返回的错误码在列表中,则不会重试。默认包含400,404两个值。 |
| UpdateStsToken | Func | 函数类型,该函数内去实现自己的获取ststoken 的逻辑,producer 会自动刷新ststoken并放入client 当中。
| StsTokenShutDown | channel| 关闭ststoken 自动刷新的通讯信道,当该信道关闭时,不再自动刷新ststoken值。当producer关闭的时候,该参数不为nil值,则会主动调用close去关闭该信道停止ststoken的自动刷新。 |
| Region | String | 日志服务的区域,当签名版本使用 AuthV4 时必选。 例如cn-hangzhou。 |
| AuthVersion | String | 使用的签名版本,可选枚举值为 AuthV1, AuthV4。AuthV4 签名示例可参考程序 [producer_test.go](producer_test.go)|
| 参数 | 类型 | 描述 |
| ------------------- |-----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| TotalSizeLnBytes | Int64 | 单个 producer 实例能缓存的日志大小上限,默认为 100MB。 |
| MaxIoWorkerCount | Int64 | 单个producer能并发的最多groutine的数量,默认为50,该参数用户可以根据自己实际服务器的性能去配置。 |
| MaxBlockSec | Int | 如果 producer 可用空间不足,调用者在 send 方法上的最大阻塞时间,默认为 60 秒。<br/>如果超过这个时间后所需空间仍无法得到满足,send 方法会抛出TimeoutException。如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。如果您希望 send 方法一直阻塞直到所需空间得到满足,可将该值设为负数。 |
| MaxBatchSize | Int64 | 当一个 ProducerBatch 中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该 batch 将被发送,默认为 512 KB,最大可设置成 5MB。 |
| MaxBatchCount | Int | 当一个 ProducerBatch 中缓存的日志条数大于等于 batchCountThreshold 时,该 batch 将被发送,默认为 4096,最大可设置成 40960。 |
| LingerMs | Int64 | 一个 ProducerBatch 从创建到可发送的逗留时间,默认为 2 秒,最小可设置成 100 毫秒。 |
| Retries | Int | 如果某个 ProducerBatch 首次发送失败,能够对其重试的次数,默认为 10 次。<br/>如果 retries 小于等于 0,该 ProducerBatch 首次发送失败后将直接进入失败队列。 |
| MaxReservedAttempts | Int | 每个 ProducerBatch 每次被尝试发送都对应着一个 Attemp,此参数用来控制返回给用户的 attempt 个数,默认只保留最近的 11 次 attempt 信息。<br/>该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 |
| BaseRetryBackoffMs | Int64 | 首次重试的退避时间,默认为 100 毫秒。 Producer 采样指数退避算法,第 N 次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 |
| MaxRetryBackoffMs | Int64 | 重试的最大退避时间,默认为 50 秒。 |
| AdjustShargHash | Bool | 如果调用 send 方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为 true。 |
| Buckets | Int | 当且仅当 adjustShardHash 为 true 时,该参数才生效。此时,producer 会自动将 shardHash 重新分组,分组数量为 buckets。<br/>如果两条数据的 shardHash 不同,它们是无法合并到一起发送的,会降低 producer 吞吐量。将 shardHash 重新分组后,能让数据有更多地机会被批量发送。该参数的取值范围是 [1, 256],且必须是 2 的整数次幂,默认为 64。 |
| AllowLogLevel | String | 设置日志输出级别,默认值是Info,consumer中一共有4种日志输出级别,分别为debug,info,warn和error。 |
| LogFileName | String | 日志文件输出路径,不设置的话默认输出到stdout。 |
| IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 |
| LogMaxSize | Int | 单个日志存储数量,默认为10M。 |
| LogMaxBackups | Int | 日志轮转数量,默认为10。 |
| LogCompass | Bool | 是否使用gzip 压缩日志,默认为false。 |
| Endpoint | String | 服务入口,关于如何确定project对应的服务入口可参考文章[服务入口](https://help.aliyun.com/document_detail/29008.html?spm=a2c4e.11153940.blogcont682761.14.446e7720gs96LB)|
| AccessKeyID | String | 账户的AK id。 |
| AccessKeySecret | String | 账户的AK 密钥。 |
|CredentialsProvider| Interface | 可选,可自定义CredentialsProvider,来提供动态的 AccessKeyId/AccessKeySecret/StsToken,该接口应当缓存 AK,且必须线程安全 |
| NoRetryStatusCodeList | []int | 用户配置的不需要重试的错误码列表,当发送日志失败时返回的错误码在列表中,则不会重试。默认包含400,404两个值。 |
| UpdateStsToken | Func | 函数类型,该函数内去实现自己的获取ststoken 的逻辑,producer 会自动刷新ststoken并放入client 当中。
| StsTokenShutDown | channel | 关闭ststoken 自动刷新的通讯信道,当该信道关闭时,不再自动刷新ststoken值。当producer关闭的时候,该参数不为nil值,则会主动调用close去关闭该信道停止ststoken的自动刷新。 |
| Region | String | 日志服务的区域,当签名版本使用 AuthV4 时必选。 例如cn-hangzhou。 |
| AuthVersion | String | 使用的签名版本,可选枚举值为 AuthV1, AuthV4。AuthV4 签名示例可参考程序 [producer_test.go](producer_test.go)|
| UseMetricStoreURL | bool | 使用 Metricstore地址进行发送日志,可以提升大基数时间线下的查询性能。 |

## 关于性能

Expand Down
4 changes: 3 additions & 1 deletion producer/io_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server")
beginMs := GetTimeMs(time.Now().UnixNano())
var err error
if producerBatch.shardHash != nil {
if producerBatch.isUseMetricStoreUrl() {
err = ioWorker.client.PutLogsWithMetricStoreURL(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup)
} else if producerBatch.shardHash != nil {
err = ioWorker.client.PostLogStoreLogs(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup, producerBatch.getShardHash())
} else {
err = ioWorker.client.PutLogs(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup)
Expand Down
8 changes: 8 additions & 0 deletions producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ProducerBatch struct {
shardHash *string
result *Result
maxReservedAttempts int
useMetricStoreUrl bool
}

func generatePackId(source string) string {
Expand Down Expand Up @@ -76,6 +77,7 @@ func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logs
logstore: logstore,
result: initResult(),
maxReservedAttempts: config.MaxReservedAttempts,
useMetricStoreUrl: config.UseMetricStoreURL,
}
if shardHash == "" {
producerBatch.shardHash = nil
Expand Down Expand Up @@ -114,6 +116,12 @@ func (producerBatch *ProducerBatch) getLogGroupCount() int {
return len(producerBatch.logGroup.GetLogs())
}

func (producerBatch *ProducerBatch) isUseMetricStoreUrl() bool {
defer producerBatch.lock.RUnlock()
producerBatch.lock.RLock()
return producerBatch.useMetricStoreUrl
}

func (producerBatch *ProducerBatch) addLogToLogGroup(log interface{}) {
defer producerBatch.lock.Unlock()
producerBatch.lock.Lock()
Expand Down
1 change: 1 addition & 0 deletions producer/producer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ProducerConfig struct {
LogTags []*sls.LogTag
GeneratePackId bool
CredentialsProvider sls.CredentialsProvider
UseMetricStoreURL bool

packLock sync.Mutex
packPrefix string
Expand Down
Loading

0 comments on commit 7b9b95d

Please sign in to comment.