Skip to content

Commit

Permalink
feat: support redis/influx ping (lf-edge#2850)
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <[email protected]>
  • Loading branch information
Yisaer authored and ngjaying committed Jul 24, 2024
1 parent 565fefb commit 6d3a9c2
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 1 deletion.
28 changes: 28 additions & 0 deletions extensions/impl/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,34 @@ func (m *influxSink) Connect(ctx api.StreamContext) (err error) {
return err
}

func (m *influxSink) Ping(_ string, props map[string]interface{}) (err error) {
if err = m.Provision(nil, props); err != nil {
return err
}
var insecureSkip bool
if m.tlsconf != nil {
insecureSkip = m.tlsconf.InsecureSkipVerify
}
m.cli, err = client.NewHTTPClient(client.HTTPConfig{
Addr: m.conf.Addr,
Username: m.conf.Username,
Password: m.conf.Password,
InsecureSkipVerify: insecureSkip,
TLSConfig: m.tlsconf,
})
if err != nil {
return fmt.Errorf("error creating influx client: %s", err)
}
defer func() {
if m.cli != nil {
m.cli.Close()
}
}()
// Test connection. Put it here to avoid server connection when running test in Configure
_, _, err = m.cli.Ping(time.Second * 10)
return err
}

func (m *influxSink) Collect(ctx api.StreamContext, item api.MessageTuple) error {
return m.collect(ctx, item.ToMap())
}
Expand Down
4 changes: 3 additions & 1 deletion extensions/impl/influx2/influx2.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func (m *influxSink2) Ping(_ string, props map[string]interface{}) error {
}
m.cli = client.NewClientWithOptions(m.conf.Addr, m.conf.Token, options)
defer func() {
m.cli.Close()
if m.cli != nil {
m.cli.Close()
}
}()
pingable, err := m.cli.Ping(context.Background())
if err != nil || !pingable {
Expand Down
17 changes: 17 additions & 0 deletions internal/io/redis/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package redis

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -42,6 +43,22 @@ type lookupSource struct {
cli *redis.Client
}

func (s *lookupSource) Ping(dataSource string, props map[string]interface{}) error {
err := s.Validate(props)
if err != nil {
return err
}
s.cli = redis.NewClient(&redis.Options{
Addr: s.c.Addr,
Username: s.c.Username,
Password: s.c.Password,
DB: s.db, // use default DB
})
defer s.cli.Close()
_, err = s.cli.Ping(context.Background()).Result()
return err
}

func (s *lookupSource) Provision(ctx api.StreamContext, props map[string]any) error {
return s.Validate(props)
}
Expand Down
9 changes: 9 additions & 0 deletions internal/io/redis/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,12 @@ func TestLookupSourceDB(t *testing.T) {
require.Error(t, err)
require.Equal(t, "redis lookup source db should be in range 0-15", err.Error())
}

func TestLookUpPingRedis(t *testing.T) {
s := &lookupSource{}
prop := map[string]interface{}{
"addr": addr,
"datatype": "string",
}
require.NoError(t, s.Ping("1", prop))
}
18 changes: 18 additions & 0 deletions internal/io/redis/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package redis

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -93,6 +94,23 @@ func (r *RedisSink) Validate(props map[string]any) error {
return nil
}

func (r *RedisSink) Ping(dataSource string, props map[string]interface{}) error {
if err := r.Validate(props); err != nil {
return err
}
cli := redis.NewClient(&redis.Options{
Addr: r.c.Addr,
Username: r.c.Username,
Password: r.c.Password,
DB: r.c.Db, // use default DB
})
_, err := cli.Ping(context.Background()).Result()
defer func() {
cli.Close()
}()
return err
}

func (r *RedisSink) Collect(ctx api.StreamContext, item api.MessageTuple) error {
return r.save(ctx, item.ToMap())
}
Expand Down

0 comments on commit 6d3a9c2

Please sign in to comment.