diff --git a/examples/rpc/client.go b/examples/rpc/client.go old mode 100644 new mode 100755 index 229d3581..8af1d375 --- a/examples/rpc/client.go +++ b/examples/rpc/client.go @@ -3,10 +3,13 @@ package main import ( typesv1alpha1 "buf.build/gen/go/chain4travel/camino-messenger-protocol/protocolbuffers/go/cmp/types/v1alpha1" "context" + "encoding/csv" + "flag" "fmt" - "log" "os" "sort" + "strconv" + "sync" "time" "buf.build/gen/go/chain4travel/camino-messenger-protocol/grpc/go/cmp/services/accommodation/v1alpha1/accommodationv1alpha1grpc" @@ -21,6 +24,8 @@ import ( ) func main() { + var mu sync.Mutex + var wg sync.WaitGroup var logger *zap.Logger cfg := zap.NewDevelopmentConfig() cfg.Level = zap.NewAtomicLevelAt(zap.DebugLevel) @@ -28,20 +33,43 @@ func main() { sLogger := logger.Sugar() logger.Sync() - argsWithoutProg := os.Args[1:] - unencrypted := len(argsWithoutProg) == 0 + times := flag.Int("requests", 1, "Repeat the request n times") + host := flag.String("host", "127.0.0.1", "Distributor bot host") + port := flag.Int("port", 9092, "Distributor bot port") + recipient := flag.String("recipient", "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", "Recipient address (format: @t-kopernikus[...]:matrix.camino.network") + caCertFile := flag.String("ca-cert-file", "", "CA certificate file (optional)") + flag.Parse() + ppConfig := config.PartnerPluginConfig{ - Host: "127.0.0.1", - Port: 9092, - Unencrypted: unencrypted, + Host: *host, + Port: *port, + Unencrypted: *caCertFile == "", } - if !unencrypted { - ppConfig.CACertFile = argsWithoutProg[0] + ppConfig.CACertFile = *caCertFile + + loadTestData := make([][]string, *times) + for i := 0; i < *times; i++ { + loadTestData[i] = make([]string, 6) + wg.Add(1) + go func(counter int) { + defer wg.Done() + createClientAndRunRequest(counter, ppConfig, sLogger, *recipient, loadTestData, mu) + }(i) } + + wg.Wait() + + if len(loadTestData) > 1 || len(loadTestData) == 1 && loadTestData[0][0] != "" { // otherwise no data have been recorded + persistToCSV(loadTestData) + } +} + +func createClientAndRunRequest(i int, ppConfig config.PartnerPluginConfig, sLogger *zap.SugaredLogger, recipient string, loadTestData [][]string, mu sync.Mutex) { c := client.NewClient(&ppConfig, sLogger) err := c.Start() if err != nil { - panic(err) + fmt.Errorf("error starting client: %v", err) + return } request := &accommodationv1alpha1.AccommodationSearchRequest{ Header: nil, @@ -60,12 +88,8 @@ func main() { }, } - err = c.Start() - if err != nil { - panic(err) - } md := metadata.New(map[string]string{ - "recipient": "@t-kopernikus1tyewqsap6v8r8wghg7qn7dyfzg2prtcrw04ke3:matrix.camino.network", + "recipient": recipient, }) ctx := metadata.NewOutgoingContext(context.Background(), md) @@ -74,16 +98,25 @@ func main() { begin := time.Now() resp, err := ass.AccommodationSearch(ctx, request, grpc.Header(&header)) if err != nil { - log.Fatal(err) + sLogger.Errorf("error when performing search: %v", err) + return } totalTime := time.Since(begin) - fmt.Printf("Total time|%s|%s\n", resp.Metadata.SearchId, totalTime) + fmt.Println(totalTime.Milliseconds()) + //fmt.Printf("Total time(ms)|%s|%d\n", resp.Metadata.SearchId.GetValue(), totalTime.Milliseconds()) metadata := &internalmetadata.Metadata{} err = metadata.FromGrpcMD(header) if err != nil { - fmt.Print("error extracting metadata") + sLogger.Errorf("error extracting metadata: %v", err) } + addToDataset(int64(i), totalTime.Milliseconds(), resp, metadata, loadTestData, mu) + + c.Shutdown() +} + +func addToDataset(counter int64, totalTime int64, resp *accommodationv1alpha1.AccommodationSearchResponse, metadata *internalmetadata.Metadata, loadTestData [][]string, mu sync.Mutex) { + var data []string var entries []struct { Key string Value int64 @@ -101,10 +134,56 @@ func main() { return entries[i].Value < entries[j].Value }) lastValue := int64(0) + data = append(data, strconv.FormatInt(counter+1, 10)) + data = append(data, strconv.FormatInt(totalTime, 10)) for _, entry := range entries { - fmt.Printf("%s|%s|%d|%d|%f\n", entry.Key, resp.Metadata.SearchId, entry.Value, entry.Value-lastValue, float32(entry.Value-lastValue)/float32(totalTime.Milliseconds())) + + if entry.Key == "request-gateway-request" { + lastValue = entry.Value + continue //skip + } + if entry.Key == "processor-request" { + + //lastValue = entry.Value + continue //skip + } + fmt.Printf("%d|%s|%s|%d|%.2f\n", entry.Value, entry.Key, resp.Metadata.SearchId.GetValue(), entry.Value-lastValue, float32(entry.Value-lastValue)/float32(totalTime)) + + data = append(data, strconv.FormatInt(entry.Value-lastValue, 10)) lastValue = entry.Value } - c.Shutdown() + mu.Lock() + loadTestData[counter] = data + mu.Unlock() +} +func persistToCSV(dataset [][]string) { + // Open a new CSV file + file, err := os.Create("load_test_data.csv") + if err != nil { + fmt.Println("Error creating CSV file:", err) + return + } + defer file.Close() + + // Create a CSV writer + writer := csv.NewWriter(file) + defer writer.Flush() + + // Write the header row + header := []string{"Request ID", "Total Time", "distributor -> matrix", "matrix -> provider", "provider -> matrix", "matrix -> distributor", "process-response"} + if err := writer.Write(header); err != nil { + fmt.Println("Error writing header:", err) + return + } + + // Write the load test data rows + for _, dataRow := range dataset { + if err := writer.Write(dataRow); err != nil { + fmt.Println("Error writing data row:", err) + return + } + } + + fmt.Println("CSV file created successfully.") } diff --git a/internal/matrix/matrix_messenger.go b/internal/matrix/matrix_messenger.go index 0db34e5c..148f156e 100644 --- a/internal/matrix/matrix_messenger.go +++ b/internal/matrix/matrix_messenger.go @@ -79,9 +79,8 @@ func (m *messenger) StartReceiver(botMode uint) (string, error) { if !completed { return // partial messages are not passed down to the msgChannel } - fmt.Printf("received-message: |%s|%d\n", completeMsg.Metadata.RequestID, t.UnixMilli()) - completeMsg.Metadata.StampOn(fmt.Sprintf("%s-%s", m.Checkpoint(), "received"), t.UnixMilli()) - completeMsg.Metadata.Stamp(fmt.Sprintf("%s-%s", m.Checkpoint(), "assembled")) + completeMsg.Metadata.StampOn(fmt.Sprintf("matrix-sent-%s", completeMsg.MsgType), evt.Timestamp) + completeMsg.Metadata.StampOn(fmt.Sprintf("%s-%s-%s", m.Checkpoint(), "received", completeMsg.MsgType), t.UnixMilli()) m.mu.Lock() m.msgChannel <- messaging.Message{ diff --git a/scripts/printReport.sh b/scripts/printReport.sh new file mode 100755 index 00000000..37fa31ca --- /dev/null +++ b/scripts/printReport.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +if [ $# -eq 0 ]; then + echo "Usage: $0 " + exit 1 +fi + +filename=$1 + +if [ ! -f "$filename" ]; then + echo "File not found: $filename" + exit 1 +fi + +# Read data from the file into an array +mapfile -t data < "$filename" + +# Function to calculate the average of an array +calculate_average() { + local sum=0 + local count=${#data[@]} + for value in "${data[@]}"; do + sum=$((sum + value)) + done + echo "scale=2; $sum / $count" | bc +} + +# Sort the array +sorted_data=($(for i in "${data[@]}"; do echo $i; done | sort -n)) + +# Calculate min, max, median, and average +min=${sorted_data[0]} +max=${sorted_data[-1]} +median=${sorted_data[${#sorted_data[@]}/2]} +average=$(calculate_average) +total=${#data[@]} +# Print the results +echo "Min: $min" +echo "Max: $max" +echo "Median: $median" +echo "Average: $average" +echo "Total: $total" diff --git a/scripts/sendXRequests.sh b/scripts/sendXRequests.sh index 3a12edb7..9e1a4849 100755 --- a/scripts/sendXRequests.sh +++ b/scripts/sendXRequests.sh @@ -16,13 +16,4 @@ times_to_run=$1 # Change the path to your Go file below go_file_path="examples/rpc/client.go" - -# Loop to run the Go file X times in parallel -for ((i=1; i<=$times_to_run; i++)) -do -# echo "Sending $i request..." - go run $go_file_path & -done - -# Wait for all background processes to finish -wait \ No newline at end of file +go run $go_file_path $times_to_run