Skip to content

Commit

Permalink
test(portable): happy path fvt (#3107)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Aug 19, 2024
1 parent ba0684f commit 5f20f6a
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 13 deletions.
23 changes: 16 additions & 7 deletions .github/workflows/run_test_case.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,38 @@ jobs:
go build -o ../../../../plugins/portable/mirror/mirror .
cp mirror.json ../../../../plugins/portable/mirror
cd ../../../../
cp -r sdk/python/ekuiper sdk/python/example/pysam/
cp -r sdk/python/example/pysam plugins/portable/pysam
cp -r sdk/python/ekuiper plugins/portable/pysam/
cp test/lookup.json data/lookup.json
- name: Run test case
run: |
make failpoint-enable
go test -trimpath -tags="edgex msgpack script parquet test" --cover -covermode=atomic -coverpkg=./... -coverprofile=coverage.xml ./...
go test -trimpath -tags="edgex msgpack script parquet test" --cover -covermode=atomic -coverpkg=./... -coverprofile=coverage.xml $(go list ./... | grep -v "github.com/lf-edge/ekuiper/v2/fvt")
make failpoint-disable
- uses: actions/upload-artifact@v3
if: failure()
with:
name: stream.log
path: log/stream.log
- name: Clean plugins
run: |
rm -r plugins/portable/pysam
rm -r plugins/portable/mirror
- name: Run fvt
run: |
go test -trimpath -tags="edgex msgpack script parquet" --cover -covermode=atomic -coverpkg=./... -coverprofile=fvt_coverage.xml ./fvt
- uses: actions/upload-artifact@v3
if: failure()
with:
name: streamFvt.log
path: log/stream.log
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
files: coverage.xml,fvt_coverage.xml
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: false
verbose: true
- uses: actions/upload-artifact@v3
if: failure()
with:
name: stream.log
path: log/stream.log

run_fvt_tests:
needs:
Expand Down
2 changes: 1 addition & 1 deletion extensions/sources/random/random.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ default:
deduplicate: 0

ext:
interval: 100
interval: 150

dedup:
interval: 100
Expand Down
218 changes: 218 additions & 0 deletions fvt/portable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fvt

import (
"archive/zip"
"io"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

type PortableTestSuite struct {
suite.Suite
}

func TestPortableTestSuite(t *testing.T) {
suite.Run(t, new(PortableTestSuite))
}

func (s *ServerTestSuite) TestLC() {
streamSql := `{"sql": "create stream pyjsonStream () WITH (TYPE=\"pyjson\", FORMAT=\"json\")"}`
ruleSql := `{
"id": "rulePort1",
"sql": "SELECT * FROM pyjsonStream",
"actions": [
{
"print": {}
}
]
}`
//s.Run("create rule error when plugin not installed", func() {
// resp, err := client.CreateStream(streamSql)
// s.Require().NoError(err)
// s.T().Log(GetResponseText(resp))
// s.Require().Equal(http.StatusBadRequest, resp.StatusCode)
//
// resp, err = client.CreateRule(ruleSql)
// s.Require().NoError(err)
// s.T().Log(GetResponseText(resp))
// s.Require().Equal(http.StatusBadRequest, resp.StatusCode)
//})
s.Run("install plugin and check status", func() {
// zip the plugin dir
pysamDir := filepath.Join(PWD, "sdk", "python", "example", "pysam")
pysamZipPath := "/tmp/pysam.zip"
err := zipDirectory(pysamDir, pysamZipPath)
s.Require().NoError(err)
defer func() {
os.Remove(pysamZipPath)
}()
// install the plugin
resp, err := client.Post("plugins/portables", `{"name":"pysam","file":"file:///tmp/pysam.zip"}`)
s.Require().NoError(err)
s.Require().Equal(http.StatusCreated, resp.StatusCode)
})
s.Run("check plugin status", func() {
// check the plugin status
resp, err := client.Get("plugins/portables")
s.Require().NoError(err)
s.Require().Equal(http.StatusOK, resp.StatusCode)
payload, err := io.ReadAll(resp.Body)
s.NoError(err)
defer resp.Body.Close()
s.Require().Equal("[{\"name\":\"pysam\",\"version\":\"v1.0.0\",\"language\":\"python\",\"executable\":\"/home/runner/work/ekuiper/ekuiper/plugins/portable/pysam/pysam.py\",\"sources\":[\"pyjson\"],\"sinks\":[\"print\"],\"functions\":[\"revert\"]}]", string(payload))
})
s.Run("test rule with plugin", func() {
resp, err := client.CreateStream(streamSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

resp, err = client.CreateRule(ruleSql)
s.Require().NoError(err)
s.T().Log(GetResponseText(resp))
s.Require().Equal(http.StatusCreated, resp.StatusCode)

// Check rule status after a while
ticker := time.NewTicker(ConstantInterval)
defer ticker.Stop()
count := 20
for count > 0 {
<-ticker.C
count--
metrics, err := client.GetRuleStatus("rulePort1")
s.Require().NoError(err)
if metrics["sink_print_0_0_records_out_total"].(float64) > 0 {
break
}
}
metrics, err := client.GetRuleStatus("rulePort1")
s.Require().NoError(err)
s.Equal("running", metrics["status"])
s.T().Log(metrics)
sinkOut, ok := metrics["sink_print_0_0_records_out_total"]
s.True(ok)
if !ok {
s.T().Log(metrics)
}
s.True(sinkOut.(float64) > 0)
s.Equal("", metrics["source_pyjsonStream_0_last_exception"])
})
//s.Run("test rule restart", func() {
// resp, err := client.RestartRule("rulePort1")
// s.Require().NoError(err)
// s.Require().Equal(http.StatusOK, resp.StatusCode)
// // Check rule status after a while
// time.Sleep(ConstantInterval)
// metrics, err := client.GetRuleStatus("rulePort1")
// s.Require().NoError(err)
// s.Equal("running", metrics["status"])
// sinkOut, ok := metrics["sink_print_0_0_records_out_total"]
// s.True(ok)
// if !ok {
// s.T().Log(metrics)
// }
// s.True(sinkOut.(float64) > 0)
// s.Equal("", metrics["source_pyjsonStream_0_last_exception"])
//})
//s.Run("test plugin update", func() {})
s.Run("clean up", func() {
resp, err := client.DeleteRule("rulePort1")
s.NoError(err)
s.Equal(200, resp.StatusCode)

resp, err = client.DeleteStream("pyjsonStream")
s.NoError(err)
s.Equal(200, resp.StatusCode)
})
s.Run("delete plugin", func() {
resp, err := client.Delete("plugins/portables/pysam")
s.NoError(err)
s.Equal(http.StatusOK, resp.StatusCode)
})
}

// zipDirectory zips the contents of the specified source directory into the target zip file
func zipDirectory(source string, target string) error {
// Create the zip file
zipFile, err := os.Create(target)
if err != nil {
return err
}
defer zipFile.Close()

// Create a new zip writer
writer := zip.NewWriter(zipFile)
defer writer.Close()

// Walk through the source directory and add files to the zip
return filepath.Walk(source, func(file string, fi os.FileInfo, err error) error {
if err != nil {
return err
}

// Skip the source directory itself
if file == source {
return nil
}

// Create a header for the file
header, err := zip.FileInfoHeader(fi)
if err != nil {
return err
}

// Set the header name to the relative path
relPath, err := filepath.Rel(source, file)
if err != nil {
return err
}
header.Name = relPath

// If it's a directory, append a "/" to the name
if fi.IsDir() {
header.Name += "/"
}

// Create the writer for the file header
writer, err := writer.CreateHeader(header)
if err != nil {
return err
}

// If it's a file, write its content to the zip
if !fi.IsDir() {
fileReader, err := os.Open(file)
if err != nil {
return err
}
defer fileReader.Close()

// Copy the file content to the zip writer
_, err = io.Copy(writer, fileReader)
if err != nil {
return err
}
}

return nil
})
}
33 changes: 30 additions & 3 deletions fvt/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ func (sdk *SDK) Get(command string) (resp *http.Response, err error) {
return http.Get(sdk.baseUrl.JoinPath(command).String())
}

func (sdk *SDK) Post(command string, body string) (resp *http.Response, err error) {
return http.Post(sdk.baseUrl.JoinPath(command).String(), ContentTypeJson, bytes.NewBufferString(body))
}

func (sdk *SDK) Delete(command string) (resp *http.Response, err error) {
req, err := http.NewRequest(http.MethodDelete, sdk.baseUrl.JoinPath(command).String(), nil)
if err != nil {
fmt.Println(err)
return
}
return sdk.httpClient.Do(req)
}

func (sdk *SDK) CreateStream(streamJson string) (resp *http.Response, err error) {
return http.Post(sdk.baseUrl.JoinPath("streams").String(), ContentTypeJson, bytes.NewBufferString(streamJson))
}
Expand All @@ -59,6 +72,10 @@ func (sdk *SDK) CreateRule(ruleJson string) (resp *http.Response, err error) {
return http.Post(sdk.baseUrl.JoinPath("rules").String(), ContentTypeJson, bytes.NewBufferString(ruleJson))
}

func (sdk *SDK) RestartRule(ruleId string) (resp *http.Response, err error) {
return http.Post(sdk.baseUrl.JoinPath("rules", ruleId, "restart").String(), ContentTypeJson, nil)
}

func (sdk *SDK) DeleteRule(name string) (resp *http.Response, err error) {
req, err := http.NewRequest(http.MethodDelete, sdk.baseUrl.JoinPath("rules", name).String(), nil)
if err != nil {
Expand All @@ -68,19 +85,29 @@ func (sdk *SDK) DeleteRule(name string) (resp *http.Response, err error) {
return sdk.httpClient.Do(req)
}

func (sdk *SDK) GetRulStatus(name string) (metrics map[string]any, err error) {
func (sdk *SDK) GetRuleStatus(name string) (map[string]any, error) {
resp, err := http.Get(sdk.baseUrl.JoinPath("rules", name, "status").String())
if err != nil {
fmt.Println(err)
return
return nil, err
}
return GetResponseResultMap(resp)
}

func GetResponseText(resp *http.Response) (string, error) {
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
return string(b), err
}

func GetResponseResultMap(resp *http.Response) (result map[string]any, err error) {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
return
}
err = json.Unmarshal(body, &metrics)
err = json.Unmarshal(body, &result)
if err != nil {
fmt.Println(err)
return
Expand Down
3 changes: 2 additions & 1 deletion internal/plugin/portable/manager.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2023 EMQ Technologies Co., Ltd.
// Copyright 2021-2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -281,6 +281,7 @@ func (m *Manager) install(name, src string, shellParas []string) (resultErr erro
if err != nil {
return err
}
break
}
}
if pi == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/topotest/plugin_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {
// The .so files must be in the plugins folder
func TestExtensions(t *testing.T) {
// Reset
streamList := []string{"ext", "ext2"}
streamList := []string{"ext"}
HandleStream(false, streamList, t)
tests := []RuleTest{
{
Expand Down

0 comments on commit 5f20f6a

Please sign in to comment.