Skip to content

Commit

Permalink
use ctrlC to kill query executing
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince committed Jul 31, 2021
1 parent bacb3b7 commit 2c71fa3
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ vendorbuild: clean fmt
clean:
@rm -rf ${name} vendor

clean_all:
clean-all:
@rm -rf ${name} vendor box/blob.go

fmt:
Expand Down
8 changes: 4 additions & 4 deletions cli/icli.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"log"
"os"

"github.com/dutor/liner"
"github.com/jievince/liner"
"github.com/vesoft-inc/nebula-console/completer"
)

Expand All @@ -38,7 +38,7 @@ func NewiCli(historyFile, user string) Cli {

f, err := os.OpenFile(historyFile, os.O_RDONLY|os.O_CREATE, 0666)
if err != nil {
log.Panicf("Open history file %s failed, %s", historyFile, err.Error())
log.Panicf("Open or create history file %s failed, %s", historyFile, err.Error())
}
defer f.Close()
c.ReadHistory(f)
Expand Down Expand Up @@ -121,9 +121,9 @@ func (l iCli) IsPlayingData() bool {

func (l *iCli) Close() {
defer l.terminal.Close()
f, err := os.Create(l.status.historyFile)
f, err := os.OpenFile(l.status.historyFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Panicf("Write history file %s failed, %s", l.status.historyFile, err.Error())
log.Panicf("Open or create history file %s failed, %s", l.status.historyFile, err.Error())
}
defer f.Close()
l.terminal.WriteHistory(f)
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ module github.com/vesoft-inc/nebula-console
go 1.11

require (
github.com/dutor/liner v1.2.2
github.com/jedib0t/go-pretty/v6 v6.0.5
github.com/jievince/liner v1.2.3
github.com/vesoft-inc/nebula-go/v2 v2.0.0-20210701060243-a0577f67f375
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
)

replace github.com/vesoft-inc/nebula-go/v2 => /home/jie.wang/nebula-go
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dutor/liner v1.2.2 h1:bet1doARkm1DKTuQ45DgMmgaKLJ9S4taUWQGH3kjlyo=
github.com/dutor/liner v1.2.2/go.mod h1:TLqJOMU/a2J4ualIteF7J7DkL2Tv3WE8hZs5o+ogcDI=
github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488 h1:A4KCT0mvTBkvb93gGN+efLPkrgTqmqMeaLDG51KVhMM=
github.com/facebook/fbthrift v0.31.1-0.20210223140454-614a73a42488/go.mod h1:2tncLx5rmw69e5kMBv/yJneERbzrr1yr5fdlnTbu8lU=
github.com/jedib0t/go-pretty/v6 v6.0.5 h1:oOo0/jSb3NEYKT6l1hhFXoX2UZnkanMuCE2DVT1mqnE=
github.com/jedib0t/go-pretty/v6 v6.0.5/go.mod h1:MTr6FgcfNdnN5wPVBzJ6mhJeDyiF0yBvS2TMXEV/XSU=
github.com/jievince/liner v1.2.3 h1:hpMEqBKkIg/RHzHCHZqbs19MDq8bT8b5o85D3o/Yggk=
github.com/jievince/liner v1.2.3/go.mod h1:6szfFB+ea00sIHdOn/4gDFoD6sa2UaUHR28Ca8QGj9U=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
Expand All @@ -17,8 +17,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/vesoft-inc/nebula-go/v2 v2.0.0-20210701060243-a0577f67f375 h1:7us19i0qiJffpnKeWMQTPxAjaYItR8xTVohSa3JZVqk=
github.com/vesoft-inc/nebula-go/v2 v2.0.0-20210701060243-a0577f67f375/go.mod h1:B7nR6+nOSo0umq/HkCmUfyRtYrJVOsNiPS9u4djDbSc=
golang.org/x/sys v0.0.0-20180816055513-1c9583448a9c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 h1:nxC68pudNYkKU6jWhgrqdreuFiOQWj1Fs7T3VrH4Pjw=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
104 changes: 80 additions & 24 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"fmt"
"log"
"os"
"os/signal"
"path"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"

"github.com/vesoft-inc/nebula-console/box"
Expand Down Expand Up @@ -73,25 +75,26 @@ func playData(data string) (string, error) {
boxfilePath := "/" + data + ".ngql"
posixfilePath := "./data/" + data + ".ngql"
var c cli.Cli
// First find it in embeded box. If not found, then find it in the directory ./data/
if box.Has(boxfilePath) {
// First find it in directory ./data/. If not found, then find it in the embeded box
if fd, err := os.Open(posixfilePath); err == nil {
c = cli.NewnCli(fd, false, "", func() { fd.Close() })
} else if box.Has(boxfilePath) {
fileStr := string(box.Get(boxfilePath))
c = cli.NewnCli(strings.NewReader(fileStr), false, "", nil)
} else if fd, err := os.Open(posixfilePath); err == nil {
c = cli.NewnCli(fd, false, "", func() { fd.Close() })
} else {
return "", fmt.Errorf("file %s.ngql not existed in embed box and file directory ./data/ ", data)
}

c.PlayingData(true)
playingData = true
defer func() {
playingData = false
loopContinue = true
}()

defer c.PlayingData(false)
fmt.Printf("Start loading dataset %s...\n", data)
childSession, err := pool.GetSession(*username, *password)
if err != nil {
log.Panicf("Fail to create a new session from connection pool, %s", err.Error())
}
defer childSession.Release()
err = loop(childSession, c)
err := loop(c)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -241,8 +244,8 @@ func printResultSet(res *nebula.ResultSet, startTime time.Time) (duration time.D
// Loop the request util fatal or timeout
// We treat one line as one query
// Add line break yourself as `SHOW \<CR>HOSTS`
func loop(session *nebula.Session, c cli.Cli) error {
for {
func loop(c cli.Cli) error {
for loopContinue {
line, exit, err := c.ReadLine()
if err != nil {
return err
Expand All @@ -251,7 +254,7 @@ func loop(session *nebula.Session, c cli.Cli) error {
fmt.Println()
return nil
}
if len(line) == 0 {
if len(line) == 0 { // 1). The line input is empty, or 2). user presses ctrlC so the input is truncated
continue
}
// Console side command
Expand Down Expand Up @@ -299,6 +302,7 @@ func loop(session *nebula.Session, c cli.Cli) error {
}
g_repeats = 1
}
return fmt.Errorf("loop is stoped")
}

// Nebula Console version related
Expand Down Expand Up @@ -328,15 +332,15 @@ func init() {
flag.StringVar(file, "file", "", "The nGQL script file name")
}

func isFlagPassed(name string) bool {
found := false
flag.Visit(func(f *flag.Flag) {
if f.Name == name {
found = true
}
})
return found
}
// func isFlagPassed(name string) bool {
// found := false
// flag.Visit(func(f *flag.Flag) {
// if f.Name == name {
// found = true
// }
// })
// return found
// }

func validateFlags() {
if *port == -1 {
Expand All @@ -350,8 +354,54 @@ func validateFlags() {
}
}

// RegisterSignalHandler creates a 'listener' on a new goroutine which will notify the
// program if it receives an interrupt from the OS. We then handle this by calling
// our clean up procedure and exiting the program.
func RegisterSignalHandler(cli cli.Cli, toBeKilledSessionID int64) {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM) // ctrlC

go func() {
helperSession, err := pool.GetSession(*username, *password)
if err != nil {
log.Panicf("Fail to create a new session from connection pool, %s", err.Error())
}

for {
<-c
fmt.Println("^C")
if playingData {
loopContinue = false
} else {
killQueryStmt := fmt.Sprintf("SHOW ALL QUERIES | YIELD $-.SessionID AS sid, "+
"$-.ExecutionPlanID AS eid, $-.StartTime AS st "+
"WHERE $-.SessionID == %v | ORDER BY $-.st DESC | LIMIT 1 | "+
"KILL QUERY(session=$-.sid, plan=$-.eid)", toBeKilledSessionID)
res, err := helperSession.Execute(killQueryStmt)
if err != nil {
fmt.Println(err)
helperSession.Release()
// if call os.Exit, the defer stack of main() will not be executed.
// But I have no idea to execute the defer stack manually here, so direcyly call os.Exit.
os.Exit(1)
}
if !res.IsSucceed() {
fmt.Printf("[ERROR (%d)]: %s", res.GetErrorCode(), res.GetErrorMsg())
}
}

}
}()
}

var pool *nebula.ConnectionPool

var session *nebula.Session

var loopContinue bool

var playingData bool

func main() {
flag.Parse()

Expand Down Expand Up @@ -389,7 +439,7 @@ func main() {
}
defer pool.Close()

session, err := pool.GetSession(*username, *password)
session, err = pool.GetSession(*username, *password)
if err != nil {
log.Panicf("Fail to create a new session from connection pool, %s", err.Error())
}
Expand Down Expand Up @@ -418,8 +468,14 @@ func main() {
}

defer c.Close()
err = loop(session, c)

RegisterSignalHandler(c, session.GetSessionID())

loopContinue = true
err = loop(c)

if err != nil {
log.Panicf("Loop error, %s", err.Error())
}

}

0 comments on commit 2c71fa3

Please sign in to comment.