Skip to content

Latest commit

 

History

History
312 lines (272 loc) · 10.9 KB

README.md

File metadata and controls

312 lines (272 loc) · 10.9 KB

InfluxDB Client Go

CircleCI codecov License Slack Status

This repository contains the reference Go client for InfluxDB 2.

Features

Documentation

Go API docs is available at: https://pkg.go.dev/github.com/influxdata/influxdb-client-go

How To Use

Installation

Go 1.3 or later is required.

Add import github.com/influxdata/influxdb-client-go to your source code and sync dependencies or directly edit the go.mod file.

Basic Example

The following example demonstrates how to write data to InfluxDB 2 and read them back using the Flux language:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/influxdata/influxdb-client-go"
)

func main() {
    // create new client with default option for server url authenticate by token
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // user blocking write client for writes to desired bucket
    writeApi := client.WriteApiBlocking("my-org", "my-bucket")
    // create point using full params constructor 
    p := influxdb2.NewPoint("stat",
        map[string]string{"unit": "temperature"},
        map[string]interface{}{"avg": 24.5, "max": 45},
        time.Now())
    // write point immediately 
    writeApi.WritePoint(context.Background(), p)
    // create point using fluent style
    p = influxdb2.NewPointWithMeasurement("stat").
        AddTag("unit", "temperature").
        AddField("avg", 23.2).
        AddField("max", 45).
        SetTime(time.Now())
    writeApi.WritePoint(context.Background(), p)
    
    // Or write directly line protocol
    line := fmt.Sprintf("stat,unit=temperature avg=%f,max=%f", 23.5, 45.0)
    writeApi.WriteRecord(context.Background(), line)

    // get query client
    queryApi := client.QueryApi("my-org")
    // get parser flux query result
    result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Use Next() to iterate over query result lines
        for result.Next() {
            // Observe when there is new grouping key producing new table
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // read result
            fmt.Printf("row: %s\n", result.Record().String())
        }
        if result.Err() != nil {
            fmt.Printf("Query error: %s\n", result.Err().Error())
        }
    }
    // Ensures background processes finishes
    client.Close()
}

Options

The InfluxDBClient uses set of options to configure behavior. These are available in the Options object Creating a client instance using

client := influxdb2.NewClient("http://localhost:9999", "my-token")

will use the default options.

To set different configuration values, e.g. to set gzip compression and trust all server certificates, get default options and change what is needed:

client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token", 
    influxdb2.DefaultOptions().
        SetUseGZip(true).
        SetTlsConfig(&tls.Config{
            InsecureSkipVerify: true,
        }))

Writes

Client offers two ways of writing, non-blocking and blocking.

Non-blocking write client

Non-blocking write client uses implicit batching. Data are asynchronously written to the underlying buffer and they are automatically sent to a server when the size of the write buffer reaches the batch size, default 1000, or the flush interval, default 1s, times out. Writes are automatically retried on server back pressure.

This write client also offers synchronous blocking method to ensure that write buffer is flushed and all pending writes are finished, see Flush() method. Always use Close() method of the client to stop all background processes.

Asynchronous write client is recommended for frequent periodic writes.

package main

import (
    "fmt"
    "github.com/influxdata/influxdb-client-go"
    "math/rand"
    "time"
)

func main() {
    // Create client and set batch size to 20 
    client := influxdb2.NewClientWithOptions("http://localhost:9999", "my-token",
        influxdb2.DefaultOptions().SetBatchSize(20))
    // Get non-blocking write client
    writeApi := client.WriteApi("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write asynchronously
        writeApi.WritePoint(p)
    }
    // Force all unwritten data to be sent
    writeApi.Flush()
    // Ensures background processes finishes
    client.Close()
}

Blocking write client

Blocking write client writes given point(s) synchronously. It doesn't have implicit batching. Batch is created from given set of points.

package main

import (
    "context"
    "fmt"
    "github.com/influxdata/influxdb-client-go"
    "math/rand"
    "time"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get blocking write client
    writeApi := client.WriteApiBlocking("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create data point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write synchronously
        err := writeApi.WritePoint(context.Background(), p)
        if err != nil {
            panic(err)
        }
    }
    // Ensures background processes finishes
    client.Close()
}

Queries

Query client offers two ways of retrieving query results, parsed representation in QueryTableResult and a raw result string.

QueryTableResult

QueryTableResult offers comfortable way how to deal with flux query CSV response. It parses CSV stream into FluxTableMetaData, FluxColumn and FluxRecord objects for easy reading the result.

package main

import (
    "context"
    "fmt"
    "github.com/influxdata/influxdb-client-go"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get query client
    queryApi := client.QueryApi("my-org")
    // get QueryTableResult
    result, err := queryApi.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`)
    if err == nil {
        // Iterate over query response
        for result.Next() {
            // Notice when group key has changed
            if result.TableChanged() {
                fmt.Printf("table: %s\n", result.TableMetadata().String())
            }
            // Access data
            fmt.Printf("value: %v\n", result.Record().Value())
        }
        // check for an error
        if result.Err() != nil {
            fmt.Printf("query parsing error: %s\n", result.Err().Error())
        }
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}

Raw

QueryRaw() returns raw, unparsed, query result string and process it on your own. Returned csv format
can be controlled by the third parameter, query dialect.

package main

import (
    "context"
    "fmt"
    "github.com/influxdata/influxdb-client-go"
)

func main() {
    // Create client
    client := influxdb2.NewClient("http://localhost:9999", "my-token")
    // Get query client
    queryApi := client.QueryApi("my-org")
    // Query and get complete result as a string
    // Use default dialect
    result, err := queryApi.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -1h) |> filter(fn: (r) => r._measurement == "stat")`, influxdb2.DefaultDialect())
    if err == nil {
        fmt.Println("QueryResult:")
        fmt.Println(result)
    } else {
        panic(err)
    }
    // Ensures background processes finishes
    client.Close()
}    

Contributing

If you would like to contribute code you can do through GitHub by forking the repository and sending a pull request into the master branch.

License

The InfluxDB 2 Go Client is released under the MIT License.