diff --git a/internal/io/sink/log_sink.go b/internal/io/sink/log_sink.go index d4d67be5c4..363f437b4a 100644 --- a/internal/io/sink/log_sink.go +++ b/internal/io/sink/log_sink.go @@ -1,4 +1,4 @@ -// Copyright 2021-2023 EMQ Technologies Co., Ltd. +// Copyright 2021-2024 EMQ Technologies Co., Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import ( "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/lf-edge/ekuiper/v2/internal/topo/collector" + "github.com/lf-edge/ekuiper/v2/pkg/cast" ) // NewLogSink log action, no properties now @@ -43,12 +44,12 @@ var QR = &QueryResult{LastFetch: time.Now()} func NewLogSinkToMemory() api.Sink { QR.Results = make([]string, 0, 10) return collector.Func(func(ctx api.StreamContext, data any) error { - result, ok := data.(string) - if !ok { + r, err := cast.ToString(data, cast.CONVERT_SAMEKIND) + if err != nil { return fmt.Errorf("result is not a string but got %v", data) } QR.Mux.Lock() - QR.Results = append(QR.Results, result) + QR.Results = append(QR.Results, r) QR.Mux.Unlock() return nil }) diff --git a/internal/server/rpc.go b/internal/server/rpc.go new file mode 100644 index 0000000000..2d22986877 --- /dev/null +++ b/internal/server/rpc.go @@ -0,0 +1,415 @@ +// Copyright 2021-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build rpc || !core + +package server + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/rpc" + "os" + "strings" + "time" + + "github.com/lf-edge/ekuiper/v2/internal/conf" + "github.com/lf-edge/ekuiper/v2/internal/io/sink" + "github.com/lf-edge/ekuiper/v2/internal/pkg/model" + "github.com/lf-edge/ekuiper/v2/internal/topo/rule" + "github.com/lf-edge/ekuiper/v2/pkg/cast" + "github.com/lf-edge/ekuiper/v2/pkg/infra" +) + +const QueryRuleId = "internal-ekuiper_query_rule" + +func init() { + servers["rpc"] = &rpcComp{} +} + +type rpcComp struct { + s *http.Server +} + +func (r *rpcComp) register() {} + +func (r *rpcComp) serve() { + // Start rpc service + server := new(Server) + portRpc := conf.Config.Basic.Port + ipRpc := conf.Config.Basic.Ip + rpcSrv := rpc.NewServer() + err := rpcSrv.Register(server) + if err != nil { + logger.Fatal("Format of service Server isn'restHttpType correct. ", err) + } + srvRpc := &http.Server{ + Addr: cast.JoinHostPortInt(ipRpc, portRpc), + WriteTimeout: time.Second * 15, + ReadTimeout: time.Second * 15, + IdleTimeout: time.Second * 60, + Handler: rpcSrv, + } + r.s = srvRpc + go func() { + if err = srvRpc.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Fatal("Error serving rpc service:", err) + } + }() + initQuery() +} + +func (r *rpcComp) close() { + if r.s != nil { + if err := r.s.Shutdown(context.TODO()); err != nil { + logger.Errorf("rpc server shutdown error: %v", err) + } + logger.Info("rpc server shutdown.") + } +} + +type Server int + +func (t *Server) CreateQuery(sql string, reply *string) error { + if _, ok := registry.Load(QueryRuleId); ok { + stopQuery() + } + tp, err := ruleProcessor.ExecQuery(QueryRuleId, sql) + if err != nil { + return err + } else { + rs := &rule.RuleState{RuleId: QueryRuleId, Topology: tp} + registry.Store(QueryRuleId, rs) + msg := fmt.Sprintf("Query was submit successfully.") + logger.Println(msg) + *reply = fmt.Sprint(msg) + } + return nil +} + +func stopQuery() { + if rs, ok := registry.Load(QueryRuleId); ok { + logger.Printf("stop the query.") + (*rs.Topology).Cancel() + registry.Delete(QueryRuleId) + } +} + +/** + * qid is not currently used. + */ +func (t *Server) GetQueryResult(_ string, reply *string) error { + if rs, ok := registry.Load(QueryRuleId); ok { + c := (*rs.Topology).GetContext() + if c != nil && c.Err() != nil { + return c.Err() + } + } + + sink.QR.LastFetch = time.Now() + sink.QR.Mux.Lock() + if len(sink.QR.Results) > 0 { + *reply = strings.Join(sink.QR.Results, "\n") + sink.QR.Results = make([]string, 0, 10) + } else { + *reply = "" + } + sink.QR.Mux.Unlock() + return nil +} + +func (t *Server) Stream(stream string, reply *string) error { + content, err := streamProcessor.ExecStmt(stream) + if err != nil { + return fmt.Errorf("Stream command error: %s", err) + } else { + for _, c := range content { + *reply = *reply + fmt.Sprintln(c) + } + } + return nil +} + +func (t *Server) CreateRule(rule *model.RPCArgDesc, reply *string) error { + id, err := createRule(rule.Name, rule.Json) + if err != nil { + return fmt.Errorf("Create rule %s error : %s.", id, err) + } else { + *reply = fmt.Sprintf("Rule %s was created successfully, please use 'bin/kuiper getstatus rule %s' command to get rule status.", rule.Name, rule.Name) + } + return nil +} + +func (t *Server) GetStatusRule(name string, reply *string) error { + if r, err := getRuleStatus(name); err != nil { + return err + } else { + *reply = r + } + return nil +} + +func (t *Server) GetTopoRule(name string, reply *string) error { + if r, err := getRuleTopo(name); err != nil { + return err + } else { + dst := &bytes.Buffer{} + if err = json.Indent(dst, cast.StringToBytes(r), "", " "); err != nil { + *reply = r + } else { + *reply = dst.String() + } + } + return nil +} + +func (t *Server) StartRule(name string, reply *string) error { + if err := startRule(name); err != nil { + return err + } else { + *reply = fmt.Sprintf("Rule %s was started", name) + } + return nil +} + +func (t *Server) StopRule(name string, reply *string) error { + *reply, _ = stopRule(name) + return nil +} + +func (t *Server) RestartRule(name string, reply *string) error { + err := restartRule(name) + if err != nil { + return err + } + *reply = fmt.Sprintf("Rule %s was restarted.", name) + return nil +} + +func (t *Server) DescRule(name string, reply *string) error { + r, err := ruleProcessor.ExecDesc(name) + if err != nil { + return fmt.Errorf("Desc rule error : %s.", err) + } else { + *reply = r + } + return nil +} + +func (t *Server) ShowRules(_ int, reply *string) error { + r, err := getAllRulesWithStatus() + if err != nil { + return fmt.Errorf("Show rule error : %s.", err) + } + if len(r) == 0 { + *reply = "No rule definitions are found." + } else { + result, err := json.Marshal(r) + if err != nil { + return fmt.Errorf("Show rule error : %s.", err) + } + dst := &bytes.Buffer{} + if err := json.Indent(dst, result, "", " "); err != nil { + return fmt.Errorf("Show rule error : %s.", err) + } + *reply = dst.String() + } + return nil +} + +func (t *Server) DropRule(name string, reply *string) error { + deleteRule(name) + r, err := ruleProcessor.ExecDrop(name) + if err != nil { + return fmt.Errorf("Drop rule error : %s.", err) + } else { + err := t.StopRule(name, reply) + if err != nil { + return err + } + } + *reply = r + return nil +} + +func (t *Server) ValidateRule(rule *model.RPCArgDesc, reply *string) error { + _, s, err := validateRule(rule.Name, rule.Json) + if s { + *reply = "The rule has been successfully validated and is confirmed to be correct." + } else { + *reply = err.Error() + } + return nil +} + +func (t *Server) Import(file string, reply *string) error { + f, err := os.Open(file) + if err != nil { + return fmt.Errorf("fail to read file %s: %v", file, err) + } + defer f.Close() + buf := new(bytes.Buffer) + _, err = io.Copy(buf, f) + if err != nil { + return fmt.Errorf("fail to convert file %s: %v", file, err) + } + content := buf.Bytes() + rules, counts, err := rulesetProcessor.Import(content) + if err != nil { + return fmt.Errorf("import ruleset error: %v", err) + } + infra.SafeRun(func() error { + for _, name := range rules { + rul, ee := ruleProcessor.GetRuleById(name) + if ee != nil { + logger.Error(ee) + continue + } + reply := recoverRule(rul) + if reply != "" { + logger.Error(reply) + } + } + return nil + }) + *reply = fmt.Sprintf("imported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2]) + return nil +} + +func (t *Server) Export(file string, reply *string) error { + f, err := os.Create(file) + if err != nil { + return err + } + exported, counts, err := rulesetProcessor.Export() + if err != nil { + return err + } + _, err = io.Copy(f, exported) + if err != nil { + return fmt.Errorf("fail to save to file %s:%v", file, err) + } + *reply = fmt.Sprintf("exported %d streams, %d tables and %d rules", counts[0], counts[1], counts[2]) + return nil +} + +func (t *Server) ImportConfiguration(arg *model.ImportDataDesc, reply *string) error { + file := arg.FileName + f, err := os.Open(file) + if err != nil { + return fmt.Errorf("fail to read file %s: %v", file, err) + } + defer f.Close() + buf := new(bytes.Buffer) + _, err = io.Copy(buf, f) + if err != nil { + return fmt.Errorf("fail to convert file %s: %v", file, err) + } + content := buf.Bytes() + partial := arg.Partial + + var result ImportConfigurationStatus + if !partial { + configurationReset() + result = configurationImport(context.Background(), content, arg.Stop) + } else { + result = configurationPartialImport(context.Background(), content) + } + marshal, _ := json.Marshal(result) + + dst := &bytes.Buffer{} + if err := json.Indent(dst, marshal, "", " "); err != nil { + return fmt.Errorf("import configuration error: %v", err) + } + *reply = dst.String() + + return nil +} + +func (t *Server) GetStatusImport(_ int, reply *string) error { + jsonRsp := configurationStatusExport() + result, err := json.Marshal(jsonRsp) + if err != nil { + return fmt.Errorf("Show rule error : %s.", err) + } + dst := &bytes.Buffer{} + if err := json.Indent(dst, result, "", " "); err != nil { + return fmt.Errorf("Show rule error : %s.", err) + } + *reply = dst.String() + + return nil +} + +func (t *Server) ExportConfiguration(arg *model.ExportDataDesc, reply *string) error { + rules := arg.Rules + file := arg.FileName + f, err := os.Create(file) + if err != nil { + return err + } + var jsonBytes []byte + // do not specify rules, export all + if len(rules) == 0 { + jsonBytes, err = configurationExport() + } else { + jsonBytes, err = ruleMigrationProcessor.ConfigurationPartialExport(rules) + } + if err != nil { + return err + } + _, err = io.Copy(f, bytes.NewReader(jsonBytes)) + if err != nil { + return fmt.Errorf("fail to save to file %s:%v", file, err) + } + *reply = fmt.Sprintf("export configuration success") + return nil +} + +func marshalDesc(m interface{}) (string, error) { + s, err := json.Marshal(m) + if err != nil { + return "", fmt.Errorf("invalid json %v", m) + } + dst := &bytes.Buffer{} + if err := json.Indent(dst, s, "", " "); err != nil { + return "", fmt.Errorf("indent json error %v", err) + } + return dst.String(), nil +} + +func initQuery() { + ticker := time.NewTicker(time.Second * 5) + go infra.SafeRun(func() error { + for { + <-ticker.C + if registry != nil { + if _, ok := registry.Load(QueryRuleId); !ok { + continue + } + + n := time.Now() + w := 10 * time.Second + if v := n.Sub(sink.QR.LastFetch); v >= w { + logger.Printf("The client seems no longer fetch the query result, stop the query now.") + stopQuery() + } + } + } + }) +} diff --git a/internal/server/rpc_plugin.go b/internal/server/rpc_plugin.go new file mode 100644 index 0000000000..4c4418229b --- /dev/null +++ b/internal/server/rpc_plugin.go @@ -0,0 +1,112 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build (rpc || !core) && (plugin || portable || !core) + +package server + +import ( + "encoding/json" + "fmt" + + "github.com/lf-edge/ekuiper/v2/internal/pkg/model" + "github.com/lf-edge/ekuiper/v2/internal/plugin" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +func (t *Server) CreatePlugin(arg *model.PluginDesc, reply *string) error { + pt := plugin.PluginType(arg.Type) + p, err := getPluginByJson(arg, pt) + if err != nil { + return fmt.Errorf("Create plugin error: %s", err) + } + if p.GetFile() == "" { + return fmt.Errorf("Create plugin error: Missing plugin file url.") + } + // define according to the build tag + err = t.doRegister(pt, p) + if err != nil { + return err + } + if err != nil { + return fmt.Errorf("Create plugin error: %s", err) + } else { + *reply = fmt.Sprintf("Plugin %s is created.", p.GetName()) + } + return nil +} + +func (t *Server) DropPlugin(arg *model.PluginDesc, reply *string) error { + pt := plugin.PluginType(arg.Type) + p, err := getPluginByJson(arg, pt) + if err != nil { + return fmt.Errorf("Drop plugin error: %s", err) + } + err = t.doDelete(pt, p.GetName(), arg.Stop) + if err != nil { + return fmt.Errorf("Drop plugin error: %s", err) + } else { + if pt == plugin.PORTABLE { + *reply = fmt.Sprintf("Plugin %s is dropped .", p.GetName()) + } else { + if arg.Stop { + *reply = fmt.Sprintf("Plugin %s is dropped and Kuiper will be stopped.", p.GetName()) + } else { + *reply = fmt.Sprintf("Plugin %s is dropped and Kuiper must restart for the change to take effect.", p.GetName()) + } + } + } + + return nil +} + +func (t *Server) DescPlugin(arg *model.PluginDesc, reply *string) error { + pt := plugin.PluginType(arg.Type) + p, err := getPluginByJson(arg, pt) + if err != nil { + return fmt.Errorf("Describe plugin error: %s", err) + } + m, err := t.doDesc(pt, p.GetName()) + if err != nil { + return fmt.Errorf("Describe plugin error: %s", err) + } else { + r, err := marshalDesc(m) + if err != nil { + return fmt.Errorf("Describe plugin error: %v", err) + } + *reply = r + } + return nil +} + +func (t *Server) ShowPlugins(arg int, reply *string) error { + pt := plugin.PluginType(arg) + l, err := t.doShow(pt) + if err != nil { + return fmt.Errorf("Show plugin error: %s", err) + } + *reply = l + return nil +} + +func getPluginByJson(arg *model.PluginDesc, pt plugin.PluginType) (plugin.Plugin, error) { + p := plugin.NewPluginByType(pt) + if arg.Json != "" { + if err := json.Unmarshal(cast.StringToBytes(arg.Json), p); err != nil { + return nil, fmt.Errorf("Parse plugin %s error : %s.", arg.Json, err) + } + } + p.SetName(arg.Name) + return p, nil +} diff --git a/internal/server/rpc_plugin_both.go b/internal/server/rpc_plugin_both.go new file mode 100644 index 0000000000..bce6f7a7ea --- /dev/null +++ b/internal/server/rpc_plugin_both.go @@ -0,0 +1,77 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build (!core && !wasmedge) || (rpc && portable && plugin) + +package server + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/lf-edge/ekuiper/v2/internal/plugin" +) + +func (t *Server) doRegister(pt plugin.PluginType, p plugin.Plugin) error { + if pt == plugin.PORTABLE { + return portableManager.Register(p) + } else { + return nativeManager.Register(pt, p) + } +} + +func (t *Server) doDelete(pt plugin.PluginType, name string, stopRun bool) error { + if pt == plugin.PORTABLE { + return portableManager.Delete(name) + } else { + return nativeManager.Delete(pt, name, stopRun) + } +} + +func (t *Server) doDesc(pt plugin.PluginType, name string) (interface{}, error) { + var ( + result interface{} + ok bool + ) + if pt == plugin.PORTABLE { + result, ok = portableManager.GetPluginInfo(name) + } else { + result, ok = nativeManager.GetPluginInfo(pt, name) + } + if !ok { + return nil, fmt.Errorf("not found") + } + return result, nil +} + +func (t *Server) doShow(pt plugin.PluginType) (string, error) { + var result string + if pt == plugin.PORTABLE { + l := portableManager.List() + jb, err := json.Marshal(l) + if err != nil { + return "", err + } + return string(jb), nil + } else { + l := nativeManager.List(pt) + if len(l) == 0 { + result = "No plugin is found." + } else { + result = strings.Join(l, "\n") + } + return result, nil + } +} diff --git a/internal/server/rpc_plugin_hasnative.go b/internal/server/rpc_plugin_hasnative.go new file mode 100644 index 0000000000..3a7559e2ed --- /dev/null +++ b/internal/server/rpc_plugin_hasnative.go @@ -0,0 +1,69 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !core || (rpc && plugin) + +package server + +import ( + "fmt" + "strings" + + "github.com/lf-edge/ekuiper/v2/internal/pkg/model" + "github.com/lf-edge/ekuiper/v2/internal/plugin" +) + +func (t *Server) RegisterPlugin(arg *model.PluginDesc, reply *string) error { + p, err := getPluginByJson(arg, plugin.FUNCTION) + if err != nil { + return fmt.Errorf("Register plugin functions error: %s", err) + } + if len(p.GetSymbols()) == 0 { + return fmt.Errorf("Register plugin functions error: Missing function list.") + } + err = nativeManager.RegisterFuncs(p.GetName(), p.GetSymbols()) + if err != nil { + return fmt.Errorf("Create plugin error: %s", err) + } else { + *reply = fmt.Sprintf("Plugin %s is created.", p.GetName()) + } + return nil +} + +func (t *Server) ShowUdfs(_ int, reply *string) error { + l := nativeManager.ListSymbols() + if len(l) == 0 { + l = append(l, "No udf is found.") + } + *reply = strings.Join(l, "\n") + return nil +} + +func (t *Server) DescUdf(arg string, reply *string) error { + m, ok := nativeManager.GetPluginBySymbol(plugin.FUNCTION, arg) + if !ok { + return fmt.Errorf("Describe udf error: not found") + } else { + j := map[string]string{ + "name": arg, + "plugin": m, + } + r, err := marshalDesc(j) + if err != nil { + return fmt.Errorf("Describe udf error: %v", err) + } + *reply = r + } + return nil +} diff --git a/internal/server/rpc_plugin_native.go b/internal/server/rpc_plugin_native.go new file mode 100644 index 0000000000..b7cf43b975 --- /dev/null +++ b/internal/server/rpc_plugin_native.go @@ -0,0 +1,67 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build plugin && rpc && core && !portable + +package server + +import ( + "fmt" + "strings" + + "github.com/lf-edge/ekuiper/v2/internal/plugin" +) + +func (t *Server) doRegister(pt plugin.PluginType, p plugin.Plugin) error { + if pt == plugin.PORTABLE { + return fmt.Errorf("portable plugin support is disabled") + } else { + return nativeManager.Register(pt, p) + } +} + +func (t *Server) doDelete(pt plugin.PluginType, name string, stopRun bool) error { + if pt == plugin.PORTABLE { + return fmt.Errorf("portable plugin support is disabled") + } else { + return nativeManager.Delete(pt, name, stopRun) + } +} + +func (t *Server) doDesc(pt plugin.PluginType, name string) (interface{}, error) { + if pt == plugin.PORTABLE { + return nil, fmt.Errorf("portable plugin support is disabled") + } else { + r, ok := nativeManager.GetPluginInfo(pt, name) + if !ok { + return nil, fmt.Errorf("not found") + } + return r, nil + } +} + +func (t *Server) doShow(pt plugin.PluginType) (string, error) { + var result string + if pt == plugin.PORTABLE { + return "", fmt.Errorf("portable plugin support is disabled") + } else { + l := nativeManager.List(pt) + if len(l) == 0 { + result = "No plugin is found." + } else { + result = strings.Join(l, "\n") + } + return result, nil + } +} diff --git a/internal/server/rpc_plugin_portable.go b/internal/server/rpc_plugin_portable.go new file mode 100644 index 0000000000..ba0894e8ea --- /dev/null +++ b/internal/server/rpc_plugin_portable.go @@ -0,0 +1,65 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build portable && rpc && core && !plugin + +package server + +import ( + "encoding/json" + "fmt" + + "github.com/lf-edge/ekuiper/v2/internal/plugin" +) + +func (t *Server) doRegister(pt plugin.PluginType, p plugin.Plugin) error { + if pt == plugin.PORTABLE { + return portableManager.Register(p) + } else { + return fmt.Errorf("native plugin support is disabled") + } +} + +func (t *Server) doDelete(pt plugin.PluginType, name string, stopRun bool) error { + if pt == plugin.PORTABLE { + return portableManager.Delete(name) + } else { + return fmt.Errorf("native plugin support is disabled") + } +} + +func (t *Server) doDesc(pt plugin.PluginType, name string) (interface{}, error) { + if pt == plugin.PORTABLE { + r, ok := portableManager.GetPluginInfo(name) + if !ok { + return nil, fmt.Errorf("not found") + } + return r, nil + } else { + return nil, fmt.Errorf("native plugin support is disabled") + } +} + +func (t *Server) doShow(pt plugin.PluginType) (string, error) { + if pt == plugin.PORTABLE { + l := portableManager.List() + jb, err := json.Marshal(l) + if err != nil { + return "", err + } + return string(jb), nil + } else { + return "", fmt.Errorf("native plugin support is disabled") + } +} diff --git a/internal/server/rpc_schema.go b/internal/server/rpc_schema.go new file mode 100644 index 0000000000..ae7de75946 --- /dev/null +++ b/internal/server/rpc_schema.go @@ -0,0 +1,91 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !core || (rpc && schema) + +package server + +import ( + "encoding/json" + "fmt" + + "github.com/lf-edge/ekuiper/v2/internal/pkg/def" + "github.com/lf-edge/ekuiper/v2/internal/pkg/model" + "github.com/lf-edge/ekuiper/v2/internal/schema" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +func (t *Server) CreateSchema(arg *model.RPCTypedArgDesc, reply *string) error { + sd := &schema.Info{Type: def.SchemaType(arg.Type)} + if arg.Json != "" { + if err := json.Unmarshal(cast.StringToBytes(arg.Json), sd); err != nil { + return fmt.Errorf("Parse service %s error : %s.", arg.Json, err) + } + } + if sd.Name != arg.Name { + return fmt.Errorf("Create schema error: name mismatch.") + } + if sd.Content != "" && sd.FilePath != "" { + return fmt.Errorf("Invalid body: Cannot specify both content and file") + } + err := schema.Register(sd) + if err != nil { + return fmt.Errorf("Create schema error: %s", err) + } else { + *reply = fmt.Sprintf("Schema %s is created.", arg.Name) + } + return nil +} + +func (t *Server) DescSchema(arg *model.RPCTypedArgDesc, reply *string) error { + j, err := schema.GetSchema(def.SchemaType(arg.Type), arg.Name) + if err != nil { + return fmt.Errorf("Desc schema error : %s.", err) + } else if j == nil { + return fmt.Errorf("Desc schema error : not found.") + } else { + r, err := marshalDesc(j) + if err != nil { + return fmt.Errorf("Describe service error: %v", err) + } + *reply = r + } + return nil +} + +func (t *Server) DropSchema(arg *model.RPCTypedArgDesc, reply *string) error { + err := schema.DeleteSchema(def.SchemaType(arg.Type), arg.Name) + if err != nil { + return fmt.Errorf("Drop schema error : %s.", err) + } + *reply = fmt.Sprintf("Schema %s is dropped", arg.Name) + return nil +} + +func (t *Server) ShowSchemas(schemaType string, reply *string) error { + l, err := schema.GetAllForType(def.SchemaType(schemaType)) + if err != nil { + return fmt.Errorf("Show schemas error: %s.", err) + } + if len(l) == 0 { + *reply = "No schema definitions are found." + } else { + r, err := marshalDesc(l) + if err != nil { + return fmt.Errorf("Show service error: %v", err) + } + *reply = r + } + return nil +} diff --git a/internal/server/rpc_script.go b/internal/server/rpc_script.go new file mode 100644 index 0000000000..e7837003a0 --- /dev/null +++ b/internal/server/rpc_script.go @@ -0,0 +1,79 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build full || (rpc && script) + +package server + +import ( + "encoding/json" + "fmt" + + "github.com/lf-edge/ekuiper/v2/internal/plugin/js" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +func (t *Server) CreateScript(j string, reply *string) error { + sd := &js.Script{} + if err := json.Unmarshal(cast.StringToBytes(j), sd); err != nil { + return fmt.Errorf("Parse JavaScript function error : %s.", err) + } + err := js.GetManager().Create(sd) + if err != nil { + return fmt.Errorf("Create JavaScript function error: %s", err) + } else { + *reply = fmt.Sprintf("JavaScript function %s is created.", sd.Id) + } + return nil +} + +func (t *Server) DescScript(name string, reply *string) error { + j, err := js.GetManager().GetScript(name) + if err != nil { + return fmt.Errorf("Describe JavaScript function error : %s.", err) + } else { + r, err := marshalDesc(j) + if err != nil { + return fmt.Errorf("Describe JavaScript function error : %s.", err) + } + *reply = r + } + return nil +} + +func (t *Server) DropScript(name string, reply *string) error { + err := js.GetManager().Delete(name) + if err != nil { + return fmt.Errorf("Drop JavaScript function error : %s.", err) + } + *reply = fmt.Sprintf("JavaScript function %s is dropped", name) + return nil +} + +func (t *Server) ShowScripts(_ int, reply *string) error { + content, err := js.GetManager().List() + if err != nil { + return fmt.Errorf("Show JavaScript functions error: %s.", err) + } + if len(content) == 0 { + *reply = "No JavaScript functions are found." + } else { + r, err := marshalDesc(content) + if err != nil { + return fmt.Errorf("Show service error: %v", err) + } + *reply = r + } + return nil +} diff --git a/internal/server/rpc_script_test.go b/internal/server/rpc_script_test.go new file mode 100644 index 0000000000..0f31945bf8 --- /dev/null +++ b/internal/server/rpc_script_test.go @@ -0,0 +1,72 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build full || (rpc && script) + +package server + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/lf-edge/ekuiper/v2/internal/plugin/js" +) + +func init() { + // Wait for other db tests to finish to avoid db lock + for i := 0; i < 10; i++ { + if err := js.InitManager(); err != nil { + time.Sleep(10 * time.Millisecond) + } else { + break + } + } +} + +func TestFullLC(t *testing.T) { + // Create + s := new(Server) + reply := new(string) + err := s.CreateScript(`nojson`, reply) + assert.Error(t, err) + err = s.CreateScript(`{"id": "test", "source": "function() {}"}`, new(string)) + assert.Error(t, err) + err = s.CreateScript(`{"id": "test", "script": "function test(a, b) {return a+b}"}`, reply) + assert.NoError(t, err) + assert.Equal(t, "JavaScript function test is created.", *reply) + // Get + err = s.DescScript("test", reply) + assert.NoError(t, err) + assert.Equal(t, "{\n \"id\": \"test\",\n \"description\": \"\",\n \"script\": \"function test(a, b) {return a+b}\",\n \"isAgg\": false\n}", *reply) + // List + err = s.ShowScripts(0, reply) + assert.NoError(t, err) + assert.Equal(t, "[\n \"test\"\n]", *reply) + // Delete + err = s.DropScript("test", reply) + assert.NoError(t, err) + assert.Equal(t, "JavaScript function test is dropped", *reply) + // Get + err = s.DescScript("test", reply) + assert.Error(t, err) + // List + err = s.ShowScripts(0, reply) + assert.NoError(t, err) + assert.Equal(t, "No JavaScript functions are found.", *reply) + // Delete inexist + err = s.DropScript("test", reply) + assert.Error(t, err) +} diff --git a/internal/server/rpc_service.go b/internal/server/rpc_service.go new file mode 100644 index 0000000000..0097fee7bc --- /dev/null +++ b/internal/server/rpc_service.go @@ -0,0 +1,119 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !core || (rpc && service) + +package server + +import ( + "encoding/json" + "fmt" + + "github.com/lf-edge/ekuiper/v2/internal/pkg/model" + "github.com/lf-edge/ekuiper/v2/internal/service" + "github.com/lf-edge/ekuiper/v2/pkg/cast" +) + +func (t *Server) CreateService(arg *model.RPCArgDesc, reply *string) error { + sd := &service.ServiceCreationRequest{} + if arg.Json != "" { + if err := json.Unmarshal(cast.StringToBytes(arg.Json), sd); err != nil { + return fmt.Errorf("Parse service %s error : %s.", arg.Json, err) + } + } + if sd.Name != arg.Name { + return fmt.Errorf("Create service error: name mismatch.") + } + if sd.File == "" { + return fmt.Errorf("Create service error: Missing service file url.") + } + err := serviceManager.Create(sd) + if err != nil { + return fmt.Errorf("Create service error: %s", err) + } else { + *reply = fmt.Sprintf("Service %s is created.", arg.Name) + } + return nil +} + +func (t *Server) DescService(name string, reply *string) error { + s, err := serviceManager.Get(name) + if err != nil { + return fmt.Errorf("Desc service error : %s.", err) + } else { + r, err := marshalDesc(s) + if err != nil { + return fmt.Errorf("Describe service error: %v", err) + } + *reply = r + } + return nil +} + +func (t *Server) DescServiceFunc(name string, reply *string) error { + s, err := serviceManager.GetFunction(name) + if err != nil { + return fmt.Errorf("Desc service func error : %s.", err) + } else { + r, err := marshalDesc(s) + if err != nil { + return fmt.Errorf("Describe service func error: %v", err) + } + *reply = r + } + return nil +} + +func (t *Server) DropService(name string, reply *string) error { + err := serviceManager.Delete(name) + if err != nil { + return fmt.Errorf("Drop service error : %s.", err) + } + *reply = fmt.Sprintf("Service %s is dropped", name) + return nil +} + +func (t *Server) ShowServices(_ int, reply *string) error { + s, err := serviceManager.List() + if err != nil { + return fmt.Errorf("Show service error: %s.", err) + } + if len(s) == 0 { + *reply = "No service definitions are found." + } else { + r, err := marshalDesc(s) + if err != nil { + return fmt.Errorf("Show service error: %v", err) + } + *reply = r + } + return nil +} + +func (t *Server) ShowServiceFuncs(_ int, reply *string) error { + s, err := serviceManager.ListFunctions() + if err != nil { + return fmt.Errorf("Show service funcs error: %s.", err) + } + if len(s) == 0 { + *reply = "No service definitions are found." + } else { + r, err := marshalDesc(s) + if err != nil { + return fmt.Errorf("Show service funcs error: %v", err) + } + *reply = r + } + return nil +} diff --git a/internal/server/rpc_test.go b/internal/server/rpc_test.go new file mode 100644 index 0000000000..d196db4ba5 --- /dev/null +++ b/internal/server/rpc_test.go @@ -0,0 +1,238 @@ +// Copyright 2023-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "encoding/json" + "fmt" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/lf-edge/ekuiper/v2/internal/meta" + "github.com/lf-edge/ekuiper/v2/internal/pkg/model" + "github.com/lf-edge/ekuiper/v2/internal/plugin/native" + "github.com/lf-edge/ekuiper/v2/internal/plugin/portable" + "github.com/lf-edge/ekuiper/v2/internal/schema" + "github.com/lf-edge/ekuiper/v2/internal/service" +) + +type ServerTestSuite struct { + suite.Suite + s *Server +} + +func (suite *ServerTestSuite) SetupTest() { + suite.s = new(Server) + nativeManager, _ = native.InitManager() + portableManager, _ = portable.InitManager() + serviceManager, _ = service.InitManager() + _ = schema.InitRegistry() + meta.InitYamlConfigManager() +} + +func (suite *ServerTestSuite) TestStream() { + sql := `Create Stream test () WITH (FORMAT="JSON", type="simulator");` + var reply string + err := suite.s.Stream(sql, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "Stream test is created.\n", reply) + + reply = "" + sql = "show streams;" + err = suite.s.Stream(sql, &reply) + assert.Nil(suite.T(), err) + + reply = "" + sql = "SELECT * FROM test;" + err = suite.s.CreateQuery(sql, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "Query was submit successfully.", reply) + + var result string = "" + for i := 0; i < 5; i++ { + var queryresult string + time.Sleep(time.Second) + err = suite.s.GetQueryResult("test", &queryresult) + assert.Nil(suite.T(), err) + if queryresult != "" { + result += queryresult + break + } + } + assert.Equal(suite.T(), "[{\"humidity\":50,\"temperature\":22.5}]", result) + stopQuery() +} + +func (suite *ServerTestSuite) TestRule() { + sql := `Create Stream test () WITH (DATASOURCE="../internal/server/rpc_test_data/test.json", FORMAT="JSON", type="file");` + var reply string + err := suite.s.Stream(sql, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "Stream test is created.\n", reply) + + reply = "" + rule := `{ + "sql": "SELECT * from test;", + "actions": [{ + "file": { + "path": "../internal/server/rpc_test_data/data/result.txt", + "interval": 5000, + "fileType": "lines", + "format": "json" + } + }] + }` + ruleId := "myRule" + args := &model.RPCArgDesc{Name: ruleId, Json: rule} + err = suite.s.ValidateRule(args, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "The rule has been successfully validated and is confirmed to be correct.", reply) + + reply = "" + rule = `{ + "sql": "SELECT * from test;" + }` + args = &model.RPCArgDesc{Name: ruleId, Json: rule} + err = suite.s.ValidateRule(args, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "invalid rule json: Missing rule actions.", reply) + + reply = "" + rule = `{ + "sql": "SELECT * from test;", + "actions": [{ + "file": { + "path": "../internal/server/rpc_test_data/data/result.txt", + "interval": 5000, + "fileType": "lines", + "format": "json" + } + }] + }` + args = &model.RPCArgDesc{Name: ruleId, Json: rule} + err = suite.s.CreateRule(args, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "Rule myRule was created successfully, please use 'bin/kuiper getstatus rule myRule' command to get rule status.", reply) + + reply = "" + err = suite.s.GetStatusRule(ruleId, &reply) + assert.Nil(suite.T(), err) + + reply = "" + err = suite.s.ShowRules(1, &reply) + assert.Nil(suite.T(), err) + + reply = "" + err = suite.s.DescRule(ruleId, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "{\n \"sql\": \"SELECT * from test;\",\n \"actions\": [\n {\n \"file\": {\n \"path\": \"../internal/server/rpc_test_data/data/result.txt\",\n \"interval\": 5000,\n \"fileType\": \"lines\",\n \"format\": \"json\"\n }\n }\n ]\n}\n", reply) + + reply = "" + err = suite.s.GetTopoRule(ruleId, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "{\n \"sources\": [\n \"source_test\"\n ],\n \"edges\": {\n \"op_2_decoder\": [\n \"op_3_project\"\n ],\n \"op_3_project\": [\n \"op_file_0_0_transform\"\n ],\n \"op_file_0_0_transform\": [\n \"op_file_0_1_encode\"\n ],\n \"op_file_0_1_encode\": [\n \"sink_file_0\"\n ],\n \"source_test\": [\n \"op_2_decoder\"\n ]\n }\n}", reply) + + reply = "" + err = suite.s.StopRule(ruleId, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "Rule myRule was stopped.", reply) + fmt.Println("rule stopped") + + reply = "" + err = suite.s.StartRule(ruleId, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "Rule myRule was started", reply) + fmt.Println("rule started") + + reply = "" + err = suite.s.RestartRule(ruleId, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "Rule myRule was restarted.", reply) + fmt.Println("rule restarted") + + reply = "" + err = suite.s.DropRule(ruleId, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "Rule myRule is dropped.", reply) +} + +func (suite *ServerTestSuite) TestImportAndExport() { + file := "rpc_test_data/import.json" + var reply string + err := suite.s.Import(file, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "imported 1 streams, 0 tables and 1 rules", reply) + + reply = "" + file = "rpc_test_data/export.json" + err = suite.s.Export(file, &reply) + assert.Nil(suite.T(), err) + os.Remove(file) +} + +func (suite *ServerTestSuite) TestConfiguration() { + importArg := model.ImportDataDesc{ + FileName: "rpc_test_data/import_configuration.json", + Stop: false, + Partial: false, + } + var reply string + err := suite.s.ImportConfiguration(&importArg, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "{\n \"ErrorMsg\": \"\",\n \"ConfigResponse\": {\n \"streams\": {},\n \"tables\": {},\n \"rules\": {},\n \"nativePlugins\": {},\n \"portablePlugins\": {},\n \"sourceConfig\": {},\n \"sinkConfig\": {},\n \"connectionConfig\": {},\n \"Service\": {},\n \"Schema\": {},\n \"uploads\": {},\n \"scripts\": {}\n }\n}", reply) + + reply = "" + err = suite.s.GetStatusImport(1, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "{\n \"streams\": {},\n \"tables\": {},\n \"rules\": {},\n \"nativePlugins\": {},\n \"portablePlugins\": {},\n \"sourceConfig\": {},\n \"sinkConfig\": {},\n \"connectionConfig\": {},\n \"Service\": {},\n \"Schema\": {},\n \"uploads\": {},\n \"scripts\": {}\n}", reply) + + reply = "" + exportArg := model.ExportDataDesc{ + FileName: "rpc_test_data/export_configuration.json", + Rules: []string{}, + } + err = suite.s.ExportConfiguration(&exportArg, &reply) + assert.Nil(suite.T(), err) + assert.Equal(suite.T(), "export configuration success", reply) + os.Remove("rpc_test_data/export_configuration.json") +} + +func (suite *ServerTestSuite) TearDownTest() { + // Clean up + sql := "DROP STREAM test;" + var reply string + _ = suite.s.Stream(sql, &reply) + _ = suite.s.DropRule("myRule", &reply) +} + +func TestServerTestSuite(t *testing.T) { + suite.Run(t, new(ServerTestSuite)) +} + +func TestGetStoppedMessage(t *testing.T) { + message := `"123","123","123"` + r, err := getStoppedState(message) + require.NoError(t, err) + v := map[string]string{} + err = json.Unmarshal([]byte(r), &v) + require.NoError(t, err) + require.Equal(t, "stopped", v["status"]) + require.Equal(t, message, v["message"]) +} diff --git a/internal/server/rpc_test_data/test.json b/internal/server/rpc_test_data/test.json deleted file mode 100644 index 0b1fb18833..0000000000 --- a/internal/server/rpc_test_data/test.json +++ /dev/null @@ -1,22 +0,0 @@ -[ - { - "id": 1, - "temperature": 20, - "humidity": 50 - }, - { - "id": 2, - "temperature": 21, - "humidity": 51 - }, - { - "id": 3, - "temperature": 22, - "humidity": 52 - }, - { - "id": 4, - "temperature": 23, - "humidity": 53 - } -] \ No newline at end of file