Skip to content

Commit

Permalink
Make clients closeable (#94)
Browse files Browse the repository at this point in the history
* Go close - start

* Fixed bug and added logs

* Updated samples

* Fixed receiver

* Stop gofunc on done

* make done

* close done first

* don't explicitly close headersPool, to avoid race condition.

* fmt

* Pr fixes

* fmt
  • Loading branch information
AsafMah authored May 8, 2022
1 parent 6614580 commit a4b257a
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 2 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ client, err := kusto.New(endpoint, authorizer)
if err != nil {
panic("add error handling")
}
// Be sure to close the client when you're done. (Error handling omitted for brevity.)
defer client.Close()
```
endpoint represents the Kusto endpoint. This will resemble: "https://<instance>.<region>.kusto.windows.net".

Expand Down Expand Up @@ -174,6 +176,8 @@ in, err := ingest.New(kustoClient, "database", "table")
if err != nil {
panic("add error handling")
}
// Be sure to close the ingestor when you're done. (Error handling omitted for brevity.)
defer in.Close()
```

#### From a File
Expand Down
9 changes: 9 additions & 0 deletions kusto/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -201,3 +202,11 @@ func (c *conn) execute(ctx context.Context, execType int, db string, query Stmt,

return execResp{reqHeader: header, respHeader: resp.Header, frameCh: frameCh}, nil
}

func (c *conn) Close() error {
if closer, ok := c.auth.(io.Closer); ok {
return closer.Close()
}

return nil
}
16 changes: 16 additions & 0 deletions kusto/data/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,3 +401,19 @@ func GetKustoError(err error) (*Error, bool) {
}
return nil, false
}

type CombinedError struct {
Errors []error
}

func (c CombinedError) Error() string {
result := ""
for _, err := range c.Errors {
result += fmt.Sprintf("'%s';", err.Error())
}
return result
}

func GetCombinedError(errs ...error) *CombinedError {
return &CombinedError{Errors: errs}
}
17 changes: 17 additions & 0 deletions kusto/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"sync"

"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/conn"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/properties"
"github.com/Azure/azure-kusto-go/kusto/ingest/internal/queued"
Expand All @@ -15,6 +16,7 @@ import (
)

type Ingestor interface {
io.Closer
FromFile(ctx context.Context, fPath string, options ...FileOption) (*Result, error)
FromReader(ctx context.Context, reader io.Reader, options ...FileOption) (*Result, error)
}
Expand Down Expand Up @@ -240,3 +242,18 @@ func (i *Ingestion) newProp() properties.All {
},
}
}

func (i *Ingestion) Close() error {
i.mgr.Close()
var err error
err = i.fs.Close()
if i.streamConn != nil {
err2 := i.streamConn.Close()
if err == nil {
err = err2
} else {
err = errors.GetCombinedError(err, err2)
}
}
return err
}
5 changes: 5 additions & 0 deletions kusto/ingest/ingest_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ func ExampleIngestion_FromFile() {
if err != nil {
// Do something
}
// Be sure to close the client when you're done. (Error handling omitted for brevity.)
defer client.Close()

ingestor, err := ingest.New(client, "database", "table")
if err != nil {
// Do something
}
// Closing the ingestor will not close the client (since the client may be used separately),
//but it is still important to close the ingestor when you're done.
defer ingestor.Close()

// Setup a maximum time for completion to be 10 minutes.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
Expand Down
4 changes: 4 additions & 0 deletions kusto/ingest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func (m mockClient) HttpClient() *http.Client {
return &http.Client{}
}

func (m mockClient) Close() error {
return nil
}

func (m mockClient) Auth() kusto.Authorization {
return m.auth
}
Expand Down
20 changes: 19 additions & 1 deletion kusto/ingest/internal/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Conn struct {
reqHeaders http.Header
headersPool chan http.Header
client *http.Client
done chan struct{}

inTest bool
}
Expand Down Expand Up @@ -82,6 +83,7 @@ func newWithoutValidation(endpoint string, auth kusto.Authorization, client *htt
reqHeaders: headers,
headersPool: make(chan http.Header, 100),
client: client,
done: make(chan struct{}),
}

// Fills a pool of headers to alleviate header copying timing at request time.
Expand Down Expand Up @@ -113,7 +115,13 @@ func (c *Conn) StreamIngest(ctx context.Context, db, table string, payload io.Re

headers := <-c.headersPool
go func() {
c.headersPool <- copyHeaders(c.reqHeaders)
header := copyHeaders(c.reqHeaders)
select {
case <-c.done:
return
case c.headersPool <- header:
return
}
}()

if clientRequestId != "" {
Expand Down Expand Up @@ -179,3 +187,13 @@ func copyHeaders(header http.Header) http.Header {
}
return headers
}

func (c *Conn) Close() error {
select {
case <-c.done:
return nil
default:
close(c.done)
return nil
}
}
7 changes: 7 additions & 0 deletions kusto/ingest/internal/queued/queued.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (

// Queued provides methods for taking data from various sources and ingesting it into Kusto using queued ingestion.
type Queued interface {
io.Closer
Local(ctx context.Context, from string, props properties.All) error
Reader(ctx context.Context, reader io.Reader, props properties.All) (string, error)
Blob(ctx context.Context, from string, fileSize int64, props properties.All) error
Expand Down Expand Up @@ -436,3 +437,9 @@ func IsLocalPath(s string) (bool, error) {

return true, nil
}

func (i *Ingestion) Close() error {
i.mgr.Close()
i.transferManager.Close()
return nil
}
10 changes: 9 additions & 1 deletion kusto/ingest/internal/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,15 @@ func New(client mgmter) (*Manager, error) {

// Close closes the manager. This stops any token refreshes.
func (m *Manager) Close() {
close(m.done)
for {
select {
case <-m.done:
return
default:
close(m.done)
return
}
}
}

func (m *Manager) renewResources() {
Expand Down
4 changes: 4 additions & 0 deletions kusto/ingest/internal/resources/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ type FsMock struct {
OnBlob func(ctx context.Context, from string, fileSize int64, props properties.All) error
}

func (f FsMock) Close() error {
return nil
}

func (f FsMock) Local(ctx context.Context, from string, props properties.All) error {
if f.OnLocal != nil {
return f.OnLocal(ctx, from, props)
Expand Down
12 changes: 12 additions & 0 deletions kusto/ingest/managed.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,15 @@ func (m *Managed) newProp() properties.All {
},
}
}

func (m *Managed) Close() error {
var err error
err = m.queued.Close()
err2 := m.streaming.Close()
if err == nil {
err = err2
} else {
err = errors.GetCombinedError(err, err2)
}
return nil
}
2 changes: 2 additions & 0 deletions kusto/ingest/query_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package ingest

import (
"context"
"io"
"net/http"

"github.com/Azure/azure-kusto-go/kusto"
)

type QueryClient interface {
io.Closer
Auth() kusto.Authorization
Endpoint() string
Query(ctx context.Context, db string, query kusto.Stmt, options ...kusto.QueryOption) (*kusto.RowIterator, error)
Expand Down
5 changes: 5 additions & 0 deletions kusto/ingest/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

type streamIngestor interface {
io.Closer
StreamIngest(ctx context.Context, db, table string, payload io.Reader, format properties.DataFormat, mappingName string, clientRequestId string) error
}

Expand Down Expand Up @@ -154,3 +155,7 @@ func (i *Streaming) newProp() properties.All {
},
}
}

func (i *Streaming) Close() error {
return i.streamConn.Close()
}
4 changes: 4 additions & 0 deletions kusto/ingest/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type fakeStreamIngestor struct {
onStreamIngest streamIngestFunc
}

func (f fakeStreamIngestor) Close() error {
return nil
}

func (f fakeStreamIngestor) StreamIngest(ctx context.Context, db, table string, payload io.Reader, format properties.DataFormat, mappingName string, clientRequestId string) error {
return f.onStreamIngest(ctx, db, table, payload, format, mappingName, clientRequestId)
}
Expand Down
18 changes: 18 additions & 0 deletions kusto/kusto.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kusto

import (
"context"
"io"
"net/http"
"net/url"
"reflect"
Expand All @@ -19,6 +20,7 @@ import (

// queryer provides for getting a stream of Kusto frames. Exists to allow fake Kusto streams in tests.
type queryer interface {
io.Closer
query(ctx context.Context, db string, query Stmt, options *queryOptions) (execResp, error)
mgmt(ctx context.Context, db string, query Stmt, options *mgmtOptions) (execResp, error)
}
Expand Down Expand Up @@ -431,3 +433,19 @@ func (*Client) contextSetup(ctx context.Context, mgmtCall bool) (context.Context
func (c *Client) HttpClient() *http.Client {
return c.http
}

func (c *Client) Close() error {
var err error
if c.conn != nil {
err = c.conn.Close()
}
if c.ingestConn != nil {
err2 := c.ingestConn.Close()
if err == nil {
err = err2
} else {
err = errors.GetCombinedError(err, err2)
}
}
return err
}
11 changes: 11 additions & 0 deletions kusto/kusto_examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func Example_simple() {
panic("add error handling")
}

// Be sure to close the client when you're done. (Error handling omitted for brevity.)
defer client.Close()

ctx := context.Background()

// Query our database table "systemNodes" for the CollectionTimes and the NodeIds.
Expand Down Expand Up @@ -107,6 +110,8 @@ func Example_complex() {
if err != nil {
panic("add error handling")
}
// Be sure to close the client when you're done. (Error handling omitted for brevity.)
defer client.Close()

ctx := context.Background()

Expand Down Expand Up @@ -169,6 +174,8 @@ func ExampleClient_Query_rows() {
if err != nil {
panic("add error handling")
}
// Be sure to close the client when you're done. (Error handling omitted for brevity.)
defer client.Close()

ctx := context.Background()

Expand Down Expand Up @@ -212,6 +219,8 @@ func ExampleClient_Query_do() {
if err != nil {
panic("add error handling")
}
// Be sure to close the client when you're done. (Error handling omitted for brevity.)
defer client.Close()

ctx := context.Background()

Expand Down Expand Up @@ -265,6 +274,8 @@ func ExampleClient_Query_struct() {
if err != nil {
panic("add error handling")
}
// Be sure to close the client when you're done. (Error handling omitted for brevity.)
defer client.Close()

ctx := context.Background()

Expand Down
4 changes: 4 additions & 0 deletions kusto/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (m *MockRows) Error(err error) error {
type mockConn struct {
}

func (m mockConn) Close() error {
return nil
}

func (m mockConn) query(_ context.Context, _ string, _ Stmt, _ *queryOptions) (execResp, error) {
return execResp{}, nil
}
Expand Down
Loading

0 comments on commit a4b257a

Please sign in to comment.