Skip to content

Commit

Permalink
Introduce a basic web interface (pingcap#199)
Browse files Browse the repository at this point in the history
* restore,checkpoints: move checkpoints into its own package

This allows both the "restore" package to import the "web" package, and
allow the "web" package to use "checkpoints", without leading to circular
dependency.

* verification: implemented json.Marshaler for KVChecksum

* *: expose the current import progress to HTTP interface

* common: added "Pauser" synchronization primitive

* lightning: allows status address to reliably use port 0 for testing

* config: ensure AllIDs() return a deterministic order

* lightning,restore: support pausing, moving and deleting tasks through HTTP

Also fixed some goroutine leaks and crashes after canceling.

* common: fixed the bug where checksum is not cancelable

* config: added configlist.{MoveToFront, MoveToBack}

* web,lightning: added a web interface

* web: explain the web interface

* web: added OpenAPI (Swagger) spec of the HTTP API

* common: avoid double-close a channel

The channel may be double-closed given this sequence:

0. [B] p.Pause()
1. [A] p.Wait(ctx), run until the select
2. [B] p.Resume(), run until the for loop
3. [C] cancel the ctx
4. [A] continue from select, and close the channel
5. [B] continue the for loop, using the old copy of waiters, it will close
       the channel again, causing double-close error.

We just avoid closing the waiter when ctx expired.

* common: added a test to check for contended pause/resume flip

* common: fixed a potential race condition

* verification: change JSON field of checksum from cksum to checksum

* web: document the OpenAPI def and why we don't support webpack-dev-server

Fixed a potential typing error (see TypeStrong/atom-typescript#1053).

* config: prevents task ID conflict which may happen with a coarse clock

* restore: prevent encodeLoop panicking if deliverResult is closed?

* checkpoints,lightning: address comments
  • Loading branch information
kennytm authored Jul 5, 2019
1 parent 8601657 commit adda917
Show file tree
Hide file tree
Showing 60 changed files with 9,382 additions and 257 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ test_*/
*.ezdraw
vendor/
tools/bin/

# for the web interface
web/node_modules/
web/dist/
19 changes: 15 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ LIGHTNING_CTL_BIN := bin/tidb-lightning-ctl
FAILPOINT_CTL_BIN := tools/bin/failpoint-ctl
REVIVE_BIN := tools/bin/revive
GOLANGCI_LINT_BIN := tools/bin/golangci-lint
VFSGENDEV_BIN := tools/bin/vfsgendev
TEST_DIR := /tmp/lightning_test_result
# this is hard-coded unless we want to generate *.toml on fly.

Expand Down Expand Up @@ -38,15 +39,15 @@ endif

.PHONY: all build clean lightning lightning-ctl test lightning_for_integration_test \
integration_test coverage update ensure_failpoint_ctl failpoint_enable failpoint_disable \
check vet fmt revive
check vet fmt revive web

default: clean lightning lightning-ctl checksuccess

build:
$(GOBUILD)

clean:
rm -f $(LIGHTNING_BIN) $(LIGHTNING_CTRL_BIN) $(FAILPOINT_CTL_BIN) $(REVIVE_BIN)
rm -f $(LIGHTNING_BIN) $(LIGHTNING_CTRL_BIN) $(FAILPOINT_CTL_BIN) $(REVIVE_BIN) $(VFSGENDEV_BIN)

checksuccess:
@if [ -f $(LIGHTNING_BIN) ] && [ -f $(LIGHTNING_CTRL_BIN) ]; \
Expand All @@ -59,8 +60,18 @@ checksuccess:
@echo '// Code generated by ragel DO NOT EDIT.' | cat - tmp_parser.go | sed 's|//line |//.... |g' > $@
@rm tmp_parser.go

data_parsers: lightning/mydump/parser_generated.go lightning/mydump/csv_parser_generated.go
PATH="$(GOPATH)/bin":$(PATH) protoc -I. -I"$(GOPATH)/src" lightning/restore/file_checkpoints.proto --gogofaster_out=.
$(VFSGENDEV_BIN):
cd tools && $(GOBUILD) -o ../$(VFSGENDEV_BIN) github.com/shurcooL/vfsgen/cmd/vfsgendev

data_parsers: $(VFSGENDEV_BIN) lightning/mydump/parser_generated.go lightning/mydump/csv_parser_generated.go
PATH="$(GOPATH)/bin":$(PATH) protoc -I. -I"$(GOPATH)/src" lightning/checkpoints/file_checkpoints.proto --gogofaster_out=.
$(VFSGENDEV_BIN) -source='"github.com/pingcap/tidb-lightning/lightning/web".Res' && mv res_vfsdata.go lightning/web/

web:
cd web && npm install && npm run build

lightning_for_web:
$(GOBUILD) $(RACE_FLAG) -tags dev -ldflags '$(LDFLAGS)' -o $(LIGHTNING_BIN) cmd/tidb-lightning/main.go

lightning:
$(GOBUILD) $(RACE_FLAG) -ldflags '$(LDFLAGS)' -o $(LIGHTNING_BIN) cmd/tidb-lightning/main.go
Expand Down
11 changes: 7 additions & 4 deletions cmd/tidb-lightning/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package main

import (
"fmt"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -43,15 +42,19 @@ func main() {
app.Stop()
}()

go app.Serve()
logger := log.L()

err := app.GoServe()
if err != nil {
logger.Error("failed to start HTTP server", zap.Error(err))
return
}

var err error
if cfg.App.ServerMode {
err = app.RunServer()
} else {
err = app.RunOnce()
}
logger := log.L()
if err != nil {
logger.Error("tidb lightning encountered error", zap.Error(err))
} else {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90
github.com/satori/go.uuid v1.2.0
github.com/shurcooL/httpgzip v0.0.0-20190516014818-1c7afaae1203
go.uber.org/zap v1.10.0
golang.org/x/text v0.3.2
google.golang.org/grpc v1.21.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/shirou/gopsutil v2.18.10+incompatible h1:cy84jW6EVRPa5g9HAHrlbxMSIjBhDSX0OFYyMYminYs=
github.com/shirou/gopsutil v2.18.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371 h1:SWV2fHctRpRrp49VXJ6UZja7gU9QLHwRpIPBN89SKEo=
github.com/shurcooL/httpfs v0.0.0-20171119174359-809beceb2371/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/httpgzip v0.0.0-20190516014818-1c7afaae1203 h1:JcuOigWBeic6AhwUYIBLpo9h2ugpTfoMSJb1rhsvcLY=
github.com/shurcooL/httpgzip v0.0.0-20190516014818-1c7afaae1203/go.mod h1:919LwcH0M7/W4fcZ0/jy0qGght1GIhqyS/EgWGH2j5Q=
github.com/shurcooL/vfsgen v0.0.0-20181020040650-a97a25d856ca h1:3fECS8atRjByijiI8yYiuwLwQ2ZxXobW7ua/8GRB3pI=
github.com/shurcooL/vfsgen v0.0.0-20181020040650-a97a25d856ca/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package restore
package checkpoints

import (
"context"
Expand Down Expand Up @@ -56,6 +56,8 @@ const (

const nodeID = 0

const WholeTableEngineID = math.MaxInt32

const (
// the table names to store each kind of checkpoint in the checkpoint database
// remember to increase the version number in case of incompatible change.
Expand Down Expand Up @@ -116,17 +118,50 @@ type ChunkCheckpoint struct {
Checksum verify.KVChecksum
}

func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint {
colPerm := make([]int, 0, len(ccp.ColumnPermutation))
colPerm = append(colPerm, ccp.ColumnPermutation...)
return &ChunkCheckpoint{
Key: ccp.Key,
ColumnPermutation: colPerm,
Chunk: ccp.Chunk,
Checksum: ccp.Checksum,
}
}

type EngineCheckpoint struct {
Status CheckpointStatus
Chunks []*ChunkCheckpoint // a sorted array
}

func (engine *EngineCheckpoint) DeepCopy() *EngineCheckpoint {
chunks := make([]*ChunkCheckpoint, 0, len(engine.Chunks))
for _, chunk := range engine.Chunks {
chunks = append(chunks, chunk.DeepCopy())
}
return &EngineCheckpoint{
Status: engine.Status,
Chunks: chunks,
}
}

type TableCheckpoint struct {
Status CheckpointStatus
AllocBase int64
Engines map[int32]*EngineCheckpoint
}

func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint {
engines := make(map[int32]*EngineCheckpoint, len(cp.Engines))
for engineID, engine := range cp.Engines {
engines[engineID] = engine.DeepCopy()
}
return &TableCheckpoint{
Status: cp.Status,
AllocBase: cp.AllocBase,
Engines: engines,
}
}
func (cp *TableCheckpoint) CountChunks() int {
result := 0
for _, engine := range cp.Engines {
Expand Down Expand Up @@ -182,6 +217,40 @@ func (cpd *TableCheckpointDiff) String() string {
)
}

// Apply the diff to the existing chunk and engine checkpoints in `cp`.
func (cp *TableCheckpoint) Apply(cpd *TableCheckpointDiff) {
if cpd.hasStatus {
cp.Status = cpd.status
}
if cpd.hasRebase {
cp.AllocBase = cpd.allocBase
}
for engineID, engineDiff := range cpd.engines {
engine := cp.Engines[engineID]
if engine == nil {
continue
}
if engineDiff.hasStatus {
engine.Status = engineDiff.status
}
for key, diff := range engineDiff.chunks {
index := sort.Search(len(engine.Chunks), func(i int) bool {
return !engine.Chunks[i].Key.less(&key)
})
if index >= len(engine.Chunks) {
continue
}
chunk := engine.Chunks[index]
if chunk.Key != key {
continue
}
chunk.Chunk.Offset = diff.pos
chunk.Chunk.PrevRowIDMax = diff.rowID
chunk.Checksum = diff.checksum
}
}
}

type TableCheckpointMerger interface {
// MergeInto the table checkpoint diff from a status update or chunk update.
// If there are multiple updates to the same table, only the last one will
Expand Down
Loading

0 comments on commit adda917

Please sign in to comment.