diff --git a/pumps/logzio.go b/pumps/logzio.go index 49b43166e..2d9a7fd00 100644 --- a/pumps/logzio.go +++ b/pumps/logzio.go @@ -150,13 +150,19 @@ func (p *LogzioPump) WriteData(ctx context.Context, data []interface{}) error { "raw_response": decoded.RawResponse, "ip_address": decoded.IPAddress, } + "ip_address": decoded.IPAddress, + } + p.log.Info(p.GetName() + " Initialized") event, err := json.Marshal(mapping) if err != nil { return fmt.Errorf("failed to marshal decoded data: %s", err) } - p.sender.Send(event) + err = p.sender.Send(event) +if err != nil { + return fmt.Errorf("failed to send event: %s", err) +} } p.log.Info("Purged ", len(data), " records...") diff --git a/pumps/mgo_helper_test.go b/pumps/mgo_helper_test.go index c439efd42..a85a51517 100644 --- a/pumps/mgo_helper_test.go +++ b/pumps/mgo_helper_test.go @@ -37,7 +37,7 @@ func (c *Conn) ConnectDb() { if c.Store == nil { var err error c.Store, err = persistent.NewPersistentStorage(&persistent.ClientOpts{ - Type: "mgo", + Type: "mongo-go", ConnectionString: dbAddr, }) if err != nil { diff --git a/pumps/mongo.go b/pumps/mongo.go index 479b087e9..875c17f06 100644 --- a/pumps/mongo.go +++ b/pumps/mongo.go @@ -353,11 +353,6 @@ func (m *MongoPump) ensureIndexes(collectionName string) error { } func (m *MongoPump) connect() { - if m.dbConf.MongoDriverType == "" { - // Default to mgo - m.dbConf.MongoDriverType = persistent.Mgo - } - store, err := persistent.NewPersistentStorage(&persistent.ClientOpts{ ConnectionString: m.dbConf.MongoURL, UseSSL: m.dbConf.MongoUseSSL, @@ -367,7 +362,7 @@ func (m *MongoPump) connect() { SSLPEMKeyfile: m.dbConf.MongoSSLPEMKeyfile, SessionConsistency: m.dbConf.MongoSessionConsistency, ConnectionTimeout: m.timeout, - Type: m.dbConf.MongoDriverType, + Type: getMongoDriverType(m.dbConf.MongoDriverType), DirectConnection: m.dbConf.MongoDirectConnection, }) if err != nil { @@ -547,3 +542,12 @@ func (m *MongoPump) WriteUptimeData(data []interface{}) { m.log.Error("Problem inserting to mongo collection: ", err) } } + +func getMongoDriverType(driverType string) string { + if driverType == "" { + // Default to mongo-go + return persistent.OfficialMongo + } + + return driverType +} diff --git a/pumps/mongo_aggregate.go b/pumps/mongo_aggregate.go index 97362447f..c65925b28 100644 --- a/pumps/mongo_aggregate.go +++ b/pumps/mongo_aggregate.go @@ -215,11 +215,6 @@ func (m *MongoAggregatePump) Init(config interface{}) error { func (m *MongoAggregatePump) connect() { var err error - if m.dbConf.MongoDriverType == "" { - // Default to mgo - m.dbConf.MongoDriverType = persistent.Mgo - } - m.store, err = persistent.NewPersistentStorage(&persistent.ClientOpts{ ConnectionString: m.dbConf.MongoURL, UseSSL: m.dbConf.MongoUseSSL, @@ -229,7 +224,7 @@ func (m *MongoAggregatePump) connect() { SSLPEMKeyfile: m.dbConf.MongoSSLPEMKeyfile, SessionConsistency: m.dbConf.MongoSessionConsistency, ConnectionTimeout: m.timeout, - Type: m.dbConf.MongoDriverType, + Type: getMongoDriverType(m.dbConf.MongoDriverType), DirectConnection: m.dbConf.MongoDirectConnection, }) if err != nil { diff --git a/pumps/mongo_selective.go b/pumps/mongo_selective.go index 1f0d78650..1aa639f5d 100644 --- a/pumps/mongo_selective.go +++ b/pumps/mongo_selective.go @@ -113,11 +113,6 @@ func (m *MongoSelectivePump) Init(config interface{}) error { func (m *MongoSelectivePump) connect() { var err error - if m.dbConf.MongoDriverType == "" { - // Default to mgo - m.dbConf.MongoDriverType = persistent.Mgo - } - m.store, err = persistent.NewPersistentStorage(&persistent.ClientOpts{ ConnectionString: m.dbConf.MongoURL, UseSSL: m.dbConf.MongoUseSSL, @@ -127,7 +122,7 @@ func (m *MongoSelectivePump) connect() { SSLPEMKeyfile: m.dbConf.MongoSSLPEMKeyfile, SessionConsistency: m.dbConf.MongoSessionConsistency, ConnectionTimeout: m.timeout, - Type: m.dbConf.MongoDriverType, + Type: getMongoDriverType(m.dbConf.MongoDriverType), DirectConnection: m.dbConf.MongoDirectConnection, }) if err != nil { diff --git a/pumps/mongo_test.go b/pumps/mongo_test.go index d0a0f888b..ecae6633e 100644 --- a/pumps/mongo_test.go +++ b/pumps/mongo_test.go @@ -657,3 +657,35 @@ func TestMongoPump_WriteData(t *testing.T) { return records })) } +func TestGetMongoDriverType(t *testing.T) { + tests := []struct { + name string + driverType string + want string + }{ + { + name: "Empty driver type", + driverType: "", + want: persistent.OfficialMongo, + }, + { + name: "mongo-go driver type", + driverType: persistent.OfficialMongo, + want: persistent.OfficialMongo, + }, + { + name: "mgo driver type", + driverType: persistent.Mgo, + want: persistent.Mgo, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := getMongoDriverType(tt.driverType) + if got != tt.want { + t.Errorf("got %v, want %v", got, tt.want) + } + }) + } +}