diff --git a/README.md b/README.md index 33030911..b5ba62ab 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ go get -u github.com/aliyun/aliyun-log-go-sdk 5. **写数据** - 这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失。 + 这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失, 对于MetricStore类型, 使用`PutLogsWithMetricStoreURL`API 发送数据可以提升大基数时间线下查询性能 。 ```go logs := []*sls.Log{} diff --git a/client_interface.go b/client_interface.go index d74215f5..40321773 100644 --- a/client_interface.go +++ b/client_interface.go @@ -245,6 +245,7 @@ type ClientInterface interface { MergeShards(project, logstore string, shardID int) (shards []*Shard, err error) // #################### Log Operations ##################### + PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error) // PutLogs put logs into logstore. // The callers should transform user logs into LogGroup. PutLogs(project, logstore string, lg *LogGroup) (err error) diff --git a/client_store.go b/client_store.go index 09bdbc11..cd0c9efe 100644 --- a/client_store.go +++ b/client_store.go @@ -103,6 +103,12 @@ func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKe return ls.PostLogStoreLogs(lg, hashKey) } +func (c *Client) PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error) { + ls := convertLogstore(c, project, logstore) + ls.useMetricStoreURL = true + return ls.PutLogs(lg) +} + // PostRawLogWithCompressType put raw log data to log service, no marshal func (c *Client) PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) { ls := convertLogstore(c, project, logstore) diff --git a/log_store.go b/log_store.go index 0ceb3016..6b6f251a 100644 --- a/log_store.go +++ b/log_store.go @@ -40,6 +40,7 @@ type LogStore struct { putLogCompressType int EncryptConf *EncryptConf `json:"encrypt_conf,omitempty"` ProductType string `json:"productType,omitempty"` + useMetricStoreURL bool } // Shard defines shard struct @@ -303,8 +304,12 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) { } outLen = len(out) } - - uri := fmt.Sprintf("/logstores/%v", s.Name) + var uri string + if s.useMetricStoreURL { + uri = fmt.Sprintf("/prometheus/%s/%s/api/v1/write", s.project.Name, s.Name) + } else { + uri = fmt.Sprintf("/logstores/%v", s.Name) + } r, err := request(s.project, "POST", uri, h, out[:outLen]) if err != nil { return NewClientError(err) @@ -329,7 +334,7 @@ func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) { return nil } - if hashKey == nil || *hashKey == "" { + if hashKey == nil || *hashKey == "" || s.useMetricStoreURL { // empty hash call PutLogs return s.PutLogs(lg) } diff --git a/producer/README.md b/producer/README.md index ee5fde3a..65504aa8 100644 --- a/producer/README.md +++ b/producer/README.md @@ -110,35 +110,36 @@ func(callback *Callback)Fail(result *producer.Result){ ## **producer配置详解** -| 参数 | 类型 | 描述 | -| ------------------- | ------ | ------------------------------------------------------------ | -| TotalSizeLnBytes | Int64 | 单个 producer 实例能缓存的日志大小上限,默认为 100MB。 | -| MaxIoWorkerCount | Int64 | 单个producer能并发的最多groutine的数量,默认为50,该参数用户可以根据自己实际服务器的性能去配置。 | -| MaxBlockSec | Int | 如果 producer 可用空间不足,调用者在 send 方法上的最大阻塞时间,默认为 60 秒。
如果超过这个时间后所需空间仍无法得到满足,send 方法会抛出TimeoutException。如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。如果您希望 send 方法一直阻塞直到所需空间得到满足,可将该值设为负数。 | -| MaxBatchSize | Int64 | 当一个 ProducerBatch 中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该 batch 将被发送,默认为 512 KB,最大可设置成 5MB。 | -| MaxBatchCount | Int | 当一个 ProducerBatch 中缓存的日志条数大于等于 batchCountThreshold 时,该 batch 将被发送,默认为 4096,最大可设置成 40960。 | -| LingerMs | Int64 | 一个 ProducerBatch 从创建到可发送的逗留时间,默认为 2 秒,最小可设置成 100 毫秒。 | -| Retries | Int | 如果某个 ProducerBatch 首次发送失败,能够对其重试的次数,默认为 10 次。
如果 retries 小于等于 0,该 ProducerBatch 首次发送失败后将直接进入失败队列。 | -| MaxReservedAttempts | Int | 每个 ProducerBatch 每次被尝试发送都对应着一个 Attemp,此参数用来控制返回给用户的 attempt 个数,默认只保留最近的 11 次 attempt 信息。
该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 | -| BaseRetryBackoffMs | Int64 | 首次重试的退避时间,默认为 100 毫秒。 Producer 采样指数退避算法,第 N 次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 | -| MaxRetryBackoffMs | Int64 | 重试的最大退避时间,默认为 50 秒。 | -| AdjustShargHash | Bool | 如果调用 send 方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为 true。 | -| Buckets | Int | 当且仅当 adjustShardHash 为 true 时,该参数才生效。此时,producer 会自动将 shardHash 重新分组,分组数量为 buckets。
如果两条数据的 shardHash 不同,它们是无法合并到一起发送的,会降低 producer 吞吐量。将 shardHash 重新分组后,能让数据有更多地机会被批量发送。该参数的取值范围是 [1, 256],且必须是 2 的整数次幂,默认为 64。 | -| AllowLogLevel | String | 设置日志输出级别,默认值是Info,consumer中一共有4种日志输出级别,分别为debug,info,warn和error。 | -| LogFileName | String | 日志文件输出路径,不设置的话默认输出到stdout。 | -| IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 | -| LogMaxSize | Int | 单个日志存储数量,默认为10M。 | -| LogMaxBackups | Int | 日志轮转数量,默认为10。 | -| LogCompass | Bool | 是否使用gzip 压缩日志,默认为false。 | -| Endpoint | String | 服务入口,关于如何确定project对应的服务入口可参考文章[服务入口](https://help.aliyun.com/document_detail/29008.html?spm=a2c4e.11153940.blogcont682761.14.446e7720gs96LB)。 | -| AccessKeyID | String | 账户的AK id。 | -| AccessKeySecret | String | 账户的AK 密钥。 | -|CredentialsProvider|Interface|可选,可自定义CredentialsProvider,来提供动态的 AccessKeyId/AccessKeySecret/StsToken,该接口应当缓存 AK,且必须线程安全| -| NoRetryStatusCodeList | []int | 用户配置的不需要重试的错误码列表,当发送日志失败时返回的错误码在列表中,则不会重试。默认包含400,404两个值。 | -| UpdateStsToken | Func | 函数类型,该函数内去实现自己的获取ststoken 的逻辑,producer 会自动刷新ststoken并放入client 当中。 -| StsTokenShutDown | channel| 关闭ststoken 自动刷新的通讯信道,当该信道关闭时,不再自动刷新ststoken值。当producer关闭的时候,该参数不为nil值,则会主动调用close去关闭该信道停止ststoken的自动刷新。 | -| Region | String | 日志服务的区域,当签名版本使用 AuthV4 时必选。 例如cn-hangzhou。 | -| AuthVersion | String | 使用的签名版本,可选枚举值为 AuthV1, AuthV4。AuthV4 签名示例可参考程序 [producer_test.go](producer_test.go)。| +| 参数 | 类型 | 描述 | +| ------------------- |-----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TotalSizeLnBytes | Int64 | 单个 producer 实例能缓存的日志大小上限,默认为 100MB。 | +| MaxIoWorkerCount | Int64 | 单个producer能并发的最多groutine的数量,默认为50,该参数用户可以根据自己实际服务器的性能去配置。 | +| MaxBlockSec | Int | 如果 producer 可用空间不足,调用者在 send 方法上的最大阻塞时间,默认为 60 秒。
如果超过这个时间后所需空间仍无法得到满足,send 方法会抛出TimeoutException。如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。如果您希望 send 方法一直阻塞直到所需空间得到满足,可将该值设为负数。 | +| MaxBatchSize | Int64 | 当一个 ProducerBatch 中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该 batch 将被发送,默认为 512 KB,最大可设置成 5MB。 | +| MaxBatchCount | Int | 当一个 ProducerBatch 中缓存的日志条数大于等于 batchCountThreshold 时,该 batch 将被发送,默认为 4096,最大可设置成 40960。 | +| LingerMs | Int64 | 一个 ProducerBatch 从创建到可发送的逗留时间,默认为 2 秒,最小可设置成 100 毫秒。 | +| Retries | Int | 如果某个 ProducerBatch 首次发送失败,能够对其重试的次数,默认为 10 次。
如果 retries 小于等于 0,该 ProducerBatch 首次发送失败后将直接进入失败队列。 | +| MaxReservedAttempts | Int | 每个 ProducerBatch 每次被尝试发送都对应着一个 Attemp,此参数用来控制返回给用户的 attempt 个数,默认只保留最近的 11 次 attempt 信息。
该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 | +| BaseRetryBackoffMs | Int64 | 首次重试的退避时间,默认为 100 毫秒。 Producer 采样指数退避算法,第 N 次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 | +| MaxRetryBackoffMs | Int64 | 重试的最大退避时间,默认为 50 秒。 | +| AdjustShargHash | Bool | 如果调用 send 方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为 true。 | +| Buckets | Int | 当且仅当 adjustShardHash 为 true 时,该参数才生效。此时,producer 会自动将 shardHash 重新分组,分组数量为 buckets。
如果两条数据的 shardHash 不同,它们是无法合并到一起发送的,会降低 producer 吞吐量。将 shardHash 重新分组后,能让数据有更多地机会被批量发送。该参数的取值范围是 [1, 256],且必须是 2 的整数次幂,默认为 64。 | +| AllowLogLevel | String | 设置日志输出级别,默认值是Info,consumer中一共有4种日志输出级别,分别为debug,info,warn和error。 | +| LogFileName | String | 日志文件输出路径,不设置的话默认输出到stdout。 | +| IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 | +| LogMaxSize | Int | 单个日志存储数量,默认为10M。 | +| LogMaxBackups | Int | 日志轮转数量,默认为10。 | +| LogCompass | Bool | 是否使用gzip 压缩日志,默认为false。 | +| Endpoint | String | 服务入口,关于如何确定project对应的服务入口可参考文章[服务入口](https://help.aliyun.com/document_detail/29008.html?spm=a2c4e.11153940.blogcont682761.14.446e7720gs96LB)。 | +| AccessKeyID | String | 账户的AK id。 | +| AccessKeySecret | String | 账户的AK 密钥。 | +|CredentialsProvider| Interface | 可选,可自定义CredentialsProvider,来提供动态的 AccessKeyId/AccessKeySecret/StsToken,该接口应当缓存 AK,且必须线程安全 | +| NoRetryStatusCodeList | []int | 用户配置的不需要重试的错误码列表,当发送日志失败时返回的错误码在列表中,则不会重试。默认包含400,404两个值。 | +| UpdateStsToken | Func | 函数类型,该函数内去实现自己的获取ststoken 的逻辑,producer 会自动刷新ststoken并放入client 当中。 +| StsTokenShutDown | channel | 关闭ststoken 自动刷新的通讯信道,当该信道关闭时,不再自动刷新ststoken值。当producer关闭的时候,该参数不为nil值,则会主动调用close去关闭该信道停止ststoken的自动刷新。 | +| Region | String | 日志服务的区域,当签名版本使用 AuthV4 时必选。 例如cn-hangzhou。 | +| AuthVersion | String | 使用的签名版本,可选枚举值为 AuthV1, AuthV4。AuthV4 签名示例可参考程序 [producer_test.go](producer_test.go)。 | +| UseMetricStoreURL | bool | 使用 Metricstore地址进行发送日志,可以提升大基数时间线下的查询性能。 | ## 关于性能 diff --git a/producer/io_worker.go b/producer/io_worker.go index 524c26a9..a0329ae6 100644 --- a/producer/io_worker.go +++ b/producer/io_worker.go @@ -45,7 +45,9 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) { level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server") beginMs := GetTimeMs(time.Now().UnixNano()) var err error - if producerBatch.shardHash != nil { + if producerBatch.isUseMetricStoreUrl() { + err = ioWorker.client.PutLogsWithMetricStoreURL(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup) + } else if producerBatch.shardHash != nil { err = ioWorker.client.PostLogStoreLogs(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup, producerBatch.getShardHash()) } else { err = ioWorker.client.PutLogs(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup) diff --git a/producer/producer_batch.go b/producer/producer_batch.go index c58df83c..c9f94338 100644 --- a/producer/producer_batch.go +++ b/producer/producer_batch.go @@ -28,6 +28,7 @@ type ProducerBatch struct { shardHash *string result *Result maxReservedAttempts int + useMetricStoreUrl bool } func generatePackId(source string) string { @@ -76,6 +77,7 @@ func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logs logstore: logstore, result: initResult(), maxReservedAttempts: config.MaxReservedAttempts, + useMetricStoreUrl: config.UseMetricStoreURL, } if shardHash == "" { producerBatch.shardHash = nil @@ -114,6 +116,12 @@ func (producerBatch *ProducerBatch) getLogGroupCount() int { return len(producerBatch.logGroup.GetLogs()) } +func (producerBatch *ProducerBatch) isUseMetricStoreUrl() bool { + defer producerBatch.lock.RUnlock() + producerBatch.lock.RLock() + return producerBatch.useMetricStoreUrl +} + func (producerBatch *ProducerBatch) addLogToLogGroup(log interface{}) { defer producerBatch.lock.Unlock() producerBatch.lock.Lock() diff --git a/producer/producer_config.go b/producer/producer_config.go index ead72459..371c57bf 100644 --- a/producer/producer_config.go +++ b/producer/producer_config.go @@ -38,6 +38,7 @@ type ProducerConfig struct { LogTags []*sls.LogTag GeneratePackId bool CredentialsProvider sls.CredentialsProvider + UseMetricStoreURL bool packLock sync.Mutex packPrefix string diff --git a/token_auto_update_client.go b/token_auto_update_client.go index 3a4a7467..ed065ca4 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -741,6 +741,16 @@ func (c *TokenAutoUpdateClient) PutLogs(project, logstore string, lg *LogGroup) return } +func (c *TokenAutoUpdateClient) PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.PutLogsWithMetricStoreURL(project, logstore, lg) + if !c.processError(err) { + return + } + } + return +} + func (c *TokenAutoUpdateClient) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKey *string) (err error) { for i := 0; i < c.maxTryTimes; i++ { err = c.logClient.PostLogStoreLogs(project, logstore, lg, hashKey)