-
Notifications
You must be signed in to change notification settings - Fork 7
/
main.go
169 lines (147 loc) · 5.31 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Copyright 2022 IBM Corp.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"net/http"
"os"
"strconv"
fybrikEnv "fybrik.io/fybrik/pkg/environment"
"fybrik.io/fybrik/pkg/logging"
fybrikTLS "fybrik.io/fybrik/pkg/tls"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
"gopkg.in/yaml.v2"
api "fybrik.io/openmetadata-connector/datacatalog-go/go"
occ "fybrik.io/openmetadata-connector/pkg/openmetadata-connector-core"
"fybrik.io/openmetadata-connector/pkg/utils"
)
const (
DefaultConfigFile = "/etc/conf/conf.yaml"
DefaultCustomizationFile = "./customization.yaml"
)
const (
Customization = "customization"
FailureToParseCustomizationFile = "failure to parse customization file"
FailureToReadConfigFile = "failure to read configuration file"
FailureToReadCustomizationFile = "failure to read customization file"
FileContainingTagsAndPropertiesNeeded = "File containing tags and custom properties needed for working with Fybrik"
ParseCustomizationFileFailed = "parseCustomizationFile() failed. Exiting"
)
func parseCustomizationFile(customizationFile string, logger *zerolog.Logger) (map[string]interface{}, error) {
customizationFileBytes, err := os.ReadFile(customizationFile)
if err != nil {
logger.Error().Err(err).Msg(FailureToReadCustomizationFile)
return nil, err
}
customization := make(map[string]interface{})
err = yaml.Unmarshal(customizationFileBytes, &customization)
if err != nil {
logger.Error().Err(err).Msg(FailureToParseCustomizationFile)
return nil, fmt.Errorf(FailureToParseCustomizationFile)
}
return customization, nil
}
// RunCmd defines the command for running the connector
func RunCmd() *cobra.Command {
logger := logging.LogInit(logging.CONNECTOR, "OpenMetadata Connector")
configFile := DefaultConfigFile
customizationFile := DefaultCustomizationFile
cmd := &cobra.Command{
Use: "run",
Short: "Run the connector",
Run: func(cmd *cobra.Command, args []string) {
// TODO - add logging level and pretty logging
customization, err := parseCustomizationFile(customizationFile, &logger)
if err != nil {
logger.Error().Msg(ParseCustomizationFileFailed)
return
}
configFileBytes, err := os.ReadFile(configFile)
if err != nil {
logger.Error().Err(err).Msg(FailureToReadConfigFile)
return
}
conf := make(map[string]interface{})
err = yaml.Unmarshal(configFileBytes, &conf)
if err != nil {
logger.Error().Err(err).Msg("failure to parse config file")
return
}
DefaultAPIService := occ.NewOpenMetadataAPIService(conf, customization, &logger)
DefaultAPIController := occ.NewOpenMetadataAPIController(DefaultAPIService)
// Init the http client which is used to communicate with Vault and Openmetadata servers.
utils.InitHTTPClient(&logger)
router := api.NewRouter(DefaultAPIController)
addr := ":" + strconv.Itoa(DefaultAPIService.Port)
if fybrikEnv.IsUsingTLS() {
tlsConfig, err := fybrikTLS.GetServerConfig(&logger)
if err != nil {
logger.Error().Msg("failed to get tls config")
return
}
server := http.Server{Addr: addr, Handler: router, TLSConfig: tlsConfig, ReadHeaderTimeout: occ.ReadHeaderTimeout}
logger.Info().Msg(occ.ServerStartingMsg)
err = server.ListenAndServeTLS("", "")
if err != nil {
logger.Error().Err(err).Msg("function ListenAndServeTLS returns error")
}
} else {
server := http.Server{Addr: addr, Handler: router, ReadHeaderTimeout: occ.ReadHeaderTimeout}
logger.Info().Msg(occ.ServerStartingMsg)
err := server.ListenAndServe()
if err != nil {
logger.Error().Err(err).Msg("function ListenAndServe returns error")
}
}
},
}
cmd.Flags().StringVar(&configFile, "config", configFile, "Configuration file")
cmd.Flags().StringVar(&customizationFile, Customization, customizationFile,
FileContainingTagsAndPropertiesNeeded)
cmd.CompletionOptions.DisableDefaultCmd = true
return cmd
}
// PrepareCmd defines the command for preparing OM for Fybrik
// by creating tags and custom properties for Fybrik
func PrepareCmd() *cobra.Command {
logger := logging.LogInit(logging.CONNECTOR, "Preparing OpenMetadata for Fybrik")
customizationFile := DefaultCustomizationFile
cmd := &cobra.Command{
Use: "prepare",
Short: "Prepare OpenMetadata for Fybrik",
Run: func(cmd *cobra.Command, args []string) {
customization, err := parseCustomizationFile(customizationFile, &logger)
if err != nil {
logger.Error().Msg(ParseCustomizationFileFailed)
return
}
ok, endpoint, user, password := utils.GetEnvironmentVariables()
if !ok {
logger.Error().Msg("failed to get environment variables. cannot proceed")
return
}
occ.PrepareOpenMetadataForFybrik(endpoint, user, password, customization, &logger)
},
}
cmd.Flags().StringVar(&customizationFile, Customization, customizationFile, FileContainingTagsAndPropertiesNeeded)
cmd.CompletionOptions.DisableDefaultCmd = true
return cmd
}
// RootCmd defines the root cli command
func RootCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "openmetadata-connector",
Short: "Kubernetes based OpenMetadata data catalog connector for Fybrik",
}
cmd.AddCommand(RunCmd())
cmd.AddCommand(PrepareCmd())
return cmd
}
func main() {
// Run the cli
if err := RootCmd().Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}