Skip to content

Commit

Permalink
PR improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
belimawr committed Oct 11, 2023
1 parent 2842714 commit 92b25f5
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions pkg/component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,21 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro
return result, nil
}

// dataStreamAndSource is a generic way to represent proto mesages
// that contain a source field and a datastream field.
type dataStreamAndSource interface {
GetDataStream() *proto.DataStream
GetSource() *structpb.Struct
}

func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) {
ds := raw.GetDataStream()
func deDotDataStream(ds *proto.DataStream, source *structpb.Struct) (*proto.DataStream, error) {
if ds == nil {
ds = &proto.DataStream{}
}

cfg, err := config.NewConfigFrom(source.AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

// Create a temporary struct to unpack the configuration.
// Unpack correctly handles any flattened fields like
// data_stream.type. So all we need to do is to call Unpack,
// ensure the DataStream does not have a different value,
// them merge them both.
tmp := struct {
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Expand All @@ -129,11 +131,6 @@ func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) {
} `config:"data_stream" yaml:"data_stream"`
}{}

cfg, err := config.NewConfigFrom(raw.GetSource().AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

if err := cfg.Unpack(&tmp); err != nil {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}
Expand All @@ -151,17 +148,17 @@ func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) {
}

ret := &proto.DataStream{
Dataset: merge(tmp.DataStream.Dataset, ds.Dataset),
Type: merge(tmp.DataStream.Type, ds.Type),
Namespace: merge(tmp.DataStream.Namespace, ds.Namespace),
Source: raw.GetDataStream().GetSource(),
Dataset: valueOrDefault(tmp.DataStream.Dataset, ds.Dataset),
Type: valueOrDefault(tmp.DataStream.Type, ds.Type),
Namespace: valueOrDefault(tmp.DataStream.Namespace, ds.Namespace),
Source: ds.GetSource(),
}

return ret, nil
}

// merge returns b if a is an empty string
func merge(a, b string) string {
// valueOrDefault returns b if a is an empty string
func valueOrDefault(a, b string) string {
if a == "" {
return b
}
Expand All @@ -170,13 +167,13 @@ func merge(a, b string) string {

func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error {
var err error
unitConfig.DataStream, err = deDotDataStream(unitConfig)
unitConfig.DataStream, err = deDotDataStream(unitConfig.GetDataStream(), unitConfig.GetSource())
if err != nil {
return fmt.Errorf("could not parse data_stream from input: %w", err)
}

for i, stream := range unitConfig.Streams {
stream.DataStream, err = deDotDataStream(stream)
stream.DataStream, err = deDotDataStream(stream.GetDataStream(), stream.GetSource())
if err != nil {
return fmt.Errorf("could not parse data_stream from stream [%d]: %w",
i, err)
Expand Down

0 comments on commit 92b25f5

Please sign in to comment.