Skip to content

dimkus/go-clickhouse

 
 

Repository files navigation

go-clickhouse

Golang Yandex ClickHouse HTTP connector

ClickHouse manages extremely large volumes of data in a stable and sustainable manner. It currently powers Yandex.Metrica, world’s second largest web analytics platform, with over 13 trillion database records and over 20 billion events a day, generating customized reports on-the-fly, directly from non-aggregated data. This system was successfully implemented at CERN’s LHCb experiment to store and process metadata on 10bn events with over 1000 attributes per event registered in 2011.

Examples

Query rows

conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())
query := clickhouse.NewQuery("SELECT name, date FROM clicks")
iter := query.Iter(conn)
var (
    name string
    date string
)
for iter.Scan(&name, &date) {
    //
}
if iter.Error() != nil {
    log.Panicln(iter.Error())
}

Single insert

conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())
query, err := clickhouse.BuildInsert("clicks",
    clickhouse.Columns{"name", "date", "sourceip"},
    clickhouse.Row{"Test name", "2016-01-01 21:01:01", clickhouse.Func{"IPv4StringToNum", "192.0.2.192"}},
)
if err == nil {
    err = query.Exec(conn)
    if err == nil {
        //
    }
}

Auth and multiple insert

conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())
conn.AddParam("user", "{username}")
conn.AddParam("password", "{password}")

queryStr := `INSERT INTO clicks FORMAT TabSeparated
1	2017-09-27	10
2	2017-09-27	11
3	2017-09-27	12`

query := clickhouse.NewQuery(queryStr)
iter := query.Iter(conn)

if iter.Error() != nil {
    //
}

Fetch rows

conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())

queryStr := `SELECT visit_id, visit_number FROM clicks ORDER BY created_at DESC LIMIT 5`
query := clickhouse.NewQuery(queryStr)

type fetch struct {
    VisitId     string  `json:"visit_id"`
    VisitNumber int     `json:"visit_number"`
}
fetchObj := []fetch{}

err := query.ExecScan(dbConnection, &fetchObj)

if err != nil {
    //
}

/**
fetchObj:
([]fetch)
  0(fetch)
    VisitId(string) "ufifgdwp0y0wfiqp-7887"
    VisitNumber(int) 1
  1(fetch)
    VisitId(string) "ufifgdwp0y0wfiww-5356"
    VisitNumber(int) 1
  2(fetch)
    VisitId(string) "ufifgdwp0y0wfiwt-408"
    VisitNumber(int) 2
...
*/

Fetch rows and stat

conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())

queryStr := `SELECT visit_id, visit_number FROM clicks ORDER BY created_at DESC LIMIT 5`
query := clickhouse.NewQuery(queryStr)

type fetch struct {
    VisitId     string  `json:"visit_id"`
    VisitNumber int     `json:"visit_number"`
}
fetchObj := []fetch{}

err, stat := query.ExecScanStat(dbConnection, &fetchObj)

if err != nil {
    //
}

/**
fetchObj:
([]fetch)
  0(fetch)
    VisitId(string) "ufifgdwp0y0wfiqp-7887"
    VisitNumber(int) 1
  1(fetch)
    VisitId(string) "ufifgdwp0y0wfiww-5356"
    VisitNumber(int) 1
  2(fetch)
    VisitId(string) "ufifgdwp0y0wfiwt-408"
    VisitNumber(int) 2
...

stat:
(clickhouse.Stat)
   Rows(int) 25
   RowsBeforeLimitAtLeast(int) 73
   Stat(struct)
     Elapsed(float64) 0.003121279
     RowsRead(int) 74553
     BytesRead(int) 1043742
*/

External data for query processing

See documentation for details

conn := clickhouse.NewConn("localhost:8123", clickhouse.NewHttpTransport())
query := clickhouse.NewQuery("SELECT Num, Name FROM extdata")
query.AddExternal("extdata", "Num UInt32, Name String", []byte("1	first\n2	second")) // tab separated


iter := query.Iter(conn)
var (
    num  int
    name string
)
for iter.Scan(&num, &name) {
    //
}
if iter.Error() != nil {
    log.Panicln(iter.Error())
}

Cluster

Cluster is useful if you have several servers with same Distributed table (master). In this case you can send requests to random master to balance load.

  • cluster.Check() pings all connections and filters active ones
  • cluster.ActiveConn() returns random active connection
  • cluster.OnCheckError() is called when any connection fails

Important: You should call method Check() at least once after initialization, but we recommend to call it continuously, so ActiveConn() will always return filtered active connection.

http := clickhouse.NewHttpTransport()
conn1 := clickhouse.NewConn("host1", http)
conn2 := clickhouse.NewConn("host2", http)

cluster := clickhouse.NewCluster(conn1, conn2)
cluster.OnCheckError(func (c *clickhouse.Conn) {
    log.Fatalf("Clickhouse connection failed %s", c.Host)
})
// Ping connections every second
go func() {
    for {
        cluster.Check()
        time.Sleep(time.Second)
    }
}()

Transport options

Timeout

t := clickhouse.NewHttpTransport()
t.Timeout = time.Second * 5

conn := clickhouse.NewConn("host", t)

About

Golang ClickHouse HTTP connector

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Go 100.0%