diff --git a/json_handler.go b/json_handler.go index 16e9e6e..650ba42 100644 --- a/json_handler.go +++ b/json_handler.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "strconv" "strings" "time" @@ -110,6 +111,11 @@ func getFlattenedFields(v map[string]interface{}) map[string]string { extValues[key] = valTyped.String() case string: extValues[key] = fmt.Sprintf("%q", valTyped) + case []interface{}: + flattenedArrayFields := getFlattenedArrayFields(valTyped) + for k, v := range flattenedArrayFields { + extValues[key+"."+k] = v + } case map[string]interface{}: flattenedFields := getFlattenedFields(valTyped) for keyNested, valStr := range flattenedFields { @@ -122,6 +128,37 @@ func getFlattenedFields(v map[string]interface{}) map[string]string { return extValues } +func getFlattenedArrayFields(data []interface{}) map[string]string { + flattened := make(map[string]string) + for i, v := range data { + switch vt := v.(type) { + case json.Number: + if z, err := vt.Int64(); err == nil { + flattened[strconv.Itoa(i)] = fmt.Sprintf("%d", z) + } else if f, err := vt.Float64(); err == nil { + flattened[strconv.Itoa(i)] = fmt.Sprintf("%g", f) + } else { + flattened[strconv.Itoa(i)] = vt.String() + } + case string: + flattened[strconv.Itoa(i)] = vt + case []interface{}: + flattenedArrayFields := getFlattenedArrayFields(vt) + for k, v := range flattenedArrayFields { + flattened[fmt.Sprintf("%d.%s", i, k)] = v + } + case map[string]interface{}: + flattenedFields := getFlattenedFields(vt) + for k, v := range flattenedFields { + flattened[fmt.Sprintf("%d.%s", i, k)] = v + } + default: + flattened[strconv.Itoa(i)] = fmt.Sprintf("%v", vt) + } + } + return flattened +} + // UnmarshalJSON sets the fields of the handler. func (h *JSONHandler) UnmarshalJSON(data []byte) bool { @@ -183,6 +220,11 @@ func (h *JSONHandler) UnmarshalJSON(data []byte) bool { h.Fields[key] = v.String() case string: h.Fields[key] = fmt.Sprintf("%q", v) + case []interface{}: + flattenedArrayFields := getFlattenedArrayFields(v) + for k, v := range flattenedArrayFields { + h.Fields[key+"."+k] = v + } case map[string]interface{}: flattenedFields := getFlattenedFields(v) for keyNested, val := range flattenedFields { diff --git a/json_handler_test.go b/json_handler_test.go index 638f224..cf039bc 100644 --- a/json_handler_test.go +++ b/json_handler_test.go @@ -165,3 +165,37 @@ func TestJsonHandler_TryHandle_LargeNumbers(t *testing.T) { require.Equal(t, "1.2345", h.Fields["storage.some.float"]) require.Equal(t, "1730187806608637000", h.Fields["storage.session.id"]) } + +func TestJsonHandler_TryHandle_FlattendArrayFields(t *testing.T) { + handler := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()} + ev := new(typesv1.StructuredLogEvent) + raw := []byte(`{"peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}],"storage":{"session.id":1730187806608637000, "some": {"float": 1.2345}}}`) + if !handler.TryHandle(raw, ev) { + t.Fatalf("failed to handle log") + } + require.Equal(t, "\"10.244.0.126:8083\"", handler.Fields["peers.0.ID"]) + require.Equal(t, "\"10.244.0.126:8083\"", handler.Fields["peers.0.URI"]) + require.Equal(t, "\"10.244.0.206:8083\"", handler.Fields["peers.1.ID"]) + require.Equal(t, "\"10.244.0.206:8083\"", handler.Fields["peers.1.URI"]) + require.Equal(t, "\"10.244.1.150:8083\"", handler.Fields["peers.2.ID"]) + require.Equal(t, "\"10.244.1.150:8083\"", handler.Fields["peers.2.URI"]) +} + +func TestJsonHandler_TryHandle_FlattenedArrayFields_NestedArray(t *testing.T) { + handler := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()} + ev := new(typesv1.StructuredLogEvent) + raw := []byte(`{"peers":[[1,2,3.14],[4,50.55,[6,7]],["hello","world"],{"foo":"bar"}]}`) + if !handler.TryHandle(raw, ev) { + t.Fatalf("failed to handle log") + } + require.Equal(t, "1", handler.Fields["peers.0.0"]) + require.Equal(t, "2", handler.Fields["peers.0.1"]) + require.Equal(t, "3.14", handler.Fields["peers.0.2"]) + require.Equal(t, "4", handler.Fields["peers.1.0"]) + require.Equal(t, "50.55", handler.Fields["peers.1.1"]) + require.Equal(t, "6", handler.Fields["peers.1.2.0"]) + require.Equal(t, "7", handler.Fields["peers.1.2.1"]) + require.Equal(t, "hello", handler.Fields["peers.2.0"]) + require.Equal(t, "world", handler.Fields["peers.2.1"]) + require.Equal(t, "\"bar\"", handler.Fields["peers.3.foo"]) +} diff --git a/scanner_test.go b/scanner_test.go index 2012820..692c453 100644 --- a/scanner_test.go +++ b/scanner_test.go @@ -204,6 +204,196 @@ func TestFlatteningNestedObjects_simple(t *testing.T) { } } +func TestFlatteningNestedObjects_with_arrays(t *testing.T) { + payload := `{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","source":{"function":"main.realMain.func5.1","file":"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go","line":407},"msg":"galaxycache peers updated","selfURI":"10.244.0.126:8083","peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]}` + + now := time.Date(2024, 12, 9, 0, 0, 0, 0, time.UTC) + want := []*typesv1.LogEvent{ + { + ParsedAt: timestamppb.New(now), + Raw: []byte(`{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","source":{"function":"main.realMain.func5.1","file":"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go","line":407},"msg":"galaxycache peers updated","selfURI":"10.244.0.126:8083","peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]}`), + Structured: &typesv1.StructuredLogEvent{ + Timestamp: timestamppb.New(time.Date(2024, 12, 5, 6, 40, 35, 247902137, time.UTC)), + Lvl: "DEBUG", + Msg: "galaxycache peers updated", + Kvs: []*typesv1.KV{ + { + Key: "selfURI", + Value: "\"10.244.0.126:8083\"", + }, + { + Key: "source.function", + Value: "\"main.realMain.func5.1\"", + }, + { + Key: "source.file", + Value: "\"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go\"", + }, + { + Key: "source.line", + Value: "407", + }, + { + Key: "peers.0.ID", + Value: "\"10.244.0.126:8083\"", + }, + { + Key: "peers.0.URI", + Value: "\"10.244.0.126:8083\"", + }, + { + Key: "peers.1.ID", + Value: "\"10.244.0.206:8083\"", + }, + { + Key: "peers.1.URI", + Value: "\"10.244.0.206:8083\"", + }, + { + Key: "peers.2.ID", + Value: "\"10.244.1.150:8083\"", + }, + { + Key: "peers.2.URI", + Value: "\"10.244.1.150:8083\"", + }, + }, + }, + }, + } + + src := strings.NewReader(payload) + opts := DefaultOptions() + opts.timeNow = func() time.Time { + return now + } + + sink := bufsink.NewSizedBufferedSink(100, nil) + ctx := context.Background() + err := Scan(ctx, src, sink, opts) + require.NoError(t, err) + + got := sink.Buffered + require.Equal(t, len(want), len(got)) // assume that there's no skipped log events + + n := len(want) + for i := 0; i < n; i++ { + actualKvs := make(map[string]string) + for _, kv := range got[i].Structured.Kvs { + actualKvs[kv.Key] = kv.Value + } + expectedKvs := make(map[string]string) + for _, kv := range want[i].Structured.Kvs { + expectedKvs[kv.Key] = kv.Value + } + require.Equal(t, got[i].ParsedAt, want[i].ParsedAt) + require.Equal(t, got[i].Raw, want[i].Raw) + require.Equal(t, got[i].Structured.Timestamp, want[i].Structured.Timestamp) + require.Equal(t, got[i].Structured.Msg, want[i].Structured.Msg) + require.Equal(t, got[i].Structured.Lvl, want[i].Structured.Lvl) + require.Equal(t, expectedKvs, actualKvs) + } +} + +func TestFlatteningNestedObjects_with_nested_arrays(t *testing.T) { + payload := `{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","msg":"galaxycache peers updated","peers":[[1,2,3],[4,5,6],[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]]}` + + now := time.Date(2024, 12, 9, 0, 0, 0, 0, time.UTC) + want := []*typesv1.LogEvent{ + { + ParsedAt: timestamppb.New(now), + Raw: []byte(`{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","msg":"galaxycache peers updated","peers":[[1,2,3],[4,5,6],[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]]}`), + Structured: &typesv1.StructuredLogEvent{ + Timestamp: timestamppb.New(time.Date(2024, 12, 5, 6, 40, 35, 247902137, time.UTC)), + Lvl: "DEBUG", + Msg: "galaxycache peers updated", + Kvs: []*typesv1.KV{ + { + Key: "peers.0.0", + Value: "1", + }, + { + Key: "peers.0.1", + Value: "2", + }, + { + Key: "peers.0.2", + Value: "3", + }, + { + Key: "peers.1.0", + Value: "4", + }, + { + Key: "peers.1.1", + Value: "5", + }, + { + Key: "peers.1.2", + Value: "6", + }, + { + Key: "peers.2.0.ID", + Value: "\"10.244.0.126:8083\"", + }, + { + Key: "peers.2.0.URI", + Value: "\"10.244.0.126:8083\"", + }, + { + Key: "peers.2.1.ID", + Value: "\"10.244.0.206:8083\"", + }, + { + Key: "peers.2.1.URI", + Value: "\"10.244.0.206:8083\"", + }, + { + Key: "peers.2.2.ID", + Value: "\"10.244.1.150:8083\"", + }, + { + Key: "peers.2.2.URI", + Value: "\"10.244.1.150:8083\"", + }, + }, + }, + }, + } + + src := strings.NewReader(payload) + opts := DefaultOptions() + opts.timeNow = func() time.Time { + return now + } + + sink := bufsink.NewSizedBufferedSink(100, nil) + ctx := context.Background() + err := Scan(ctx, src, sink, opts) + require.NoError(t, err) + + got := sink.Buffered + require.Equal(t, len(want), len(got)) // assume that there's no skipped log events + + n := len(want) + for i := 0; i < n; i++ { + actualKvs := make(map[string]string) + for _, kv := range got[i].Structured.Kvs { + actualKvs[kv.Key] = kv.Value + } + expectedKvs := make(map[string]string) + for _, kv := range want[i].Structured.Kvs { + expectedKvs[kv.Key] = kv.Value + } + require.Equal(t, got[i].ParsedAt, want[i].ParsedAt) + require.Equal(t, got[i].Raw, want[i].Raw) + require.Equal(t, got[i].Structured.Timestamp, want[i].Structured.Timestamp) + require.Equal(t, got[i].Structured.Msg, want[i].Structured.Msg) + require.Equal(t, got[i].Structured.Lvl, want[i].Structured.Lvl) + require.Equal(t, expectedKvs, actualKvs) + } +} + func pjsonslice[E proto.Message](m []E) string { sb := strings.Builder{} for _, e := range m {