Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1456 from ansinlee/master
Browse files Browse the repository at this point in the history
add preheat implements
  • Loading branch information
garfield009 authored Aug 25, 2020
2 parents 4906cd1 + 9aafd32 commit 5c732d5
Show file tree
Hide file tree
Showing 16 changed files with 1,010 additions and 14 deletions.
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ jobs:

unit-test-golang:
docker:
- image: circleci/golang:1.12.10
- image: circleci/golang:1.13.15
working_directory: /go/src/github.com/dragonflyoss/Dragonfly
steps:
- checkout
Expand All @@ -88,7 +88,7 @@ jobs:

api-integration-test:
docker:
- image: circleci/golang:1.12.10
- image: circleci/golang:1.13.15
working_directory: /go/src/github.com/dragonflyoss/Dragonfly
steps:
- checkout
Expand All @@ -104,7 +104,7 @@ jobs:

release:
docker:
- image: circleci/golang:1.12.10
- image: circleci/golang:1.13.15
working_directory: /go/src/github.com/dragonflyoss/Dragonfly
steps:
- checkout
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.12.10-alpine as builder
FROM golang:1.13.15-alpine as builder

WORKDIR /go/src/github.com/dragonflyoss/Dragonfly
RUN apk --no-cache add bash make gcc libc-dev git
Expand Down
6 changes: 5 additions & 1 deletion Dockerfile.supernode
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.12.10-alpine as builder
FROM golang:1.13.15-alpine as builder

WORKDIR /go/src/github.com/dragonflyoss/Dragonfly
RUN apk --no-cache add bash make gcc libc-dev git
Expand All @@ -9,6 +9,7 @@ COPY . /go/src/github.com/dragonflyoss/Dragonfly
# write the resulting executable to the dir /opt/dragonfly/df-supernode.
ARG GOPROXY
RUN make build-supernode && make install-supernode
RUN make build-client && make install-client

FROM dragonflyoss/nginx:apline

Expand All @@ -17,6 +18,9 @@ RUN apk --no-cache add ca-certificates bash
COPY --from=builder /go/src/github.com/dragonflyoss/Dragonfly/hack/start-supernode.sh /root/start.sh
COPY --from=builder /go/src/github.com/dragonflyoss/Dragonfly/hack/supernode-nginx.conf /etc/nginx/nginx.conf
COPY --from=builder /opt/dragonfly/df-supernode/supernode /opt/dragonfly/df-supernode/supernode
COPY --from=builder /opt/dragonfly/df-client /opt/dragonfly/df-client
RUN ln -s /opt/dragonfly/df-client/dfget /usr/local/bin/dfget


# supernode will listen 8001,8002 in default.
EXPOSE 8001 8002
Expand Down
3 changes: 3 additions & 0 deletions apis/types/preheat_info.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion hack/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ DFDAEMON_BINARY_NAME=dfdaemon
DFGET_BINARY_NAME=dfget
SUPERNODE_BINARY_NAME=supernode
PKG=github.com/dragonflyoss/Dragonfly
BUILD_IMAGE=golang:1.12.10
BUILD_IMAGE=golang:1.13.15
VERSION=$(git describe --tags "$(git rev-list --tags --max-count=1)")
REVISION=$(git rev-parse --short HEAD)
DATE=$(date "+%Y%m%d-%H:%M:%S")
Expand Down
68 changes: 68 additions & 0 deletions supernode/daemon/mgr/preheat/base_preaheater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package preheat

import (
"sync"

"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
)

var _ Preheater = &BasePreheater{}

type BasePreheater struct {}

/**
* The type of this preheater
*/
func (p *BasePreheater) Type() string {
panic("not implement")
}

/**
* Create a worker to preheat the task.
*/
func (p *BasePreheater) NewWorker(task *mgr.PreheatTask , service *PreheatService) IWorker {
panic("not implement")
}

/**
* cancel the running task
*/
func (p *BasePreheater) Cancel(id string) {
woker, ok := workerMap.Load(id)
if !ok {
return
}
woker.(IWorker).Stop()
}

/**
* remove a running preheat task
*/
func (p *BasePreheater) Remove(id string) {
p.Cancel(id)
workerMap.Delete(id)
}

/**
* add a worker to workerMap.
*/
func (p *BasePreheater) addWorker(id string, worker IWorker) {
workerMap.Store(id, worker)
}

var workerMap = new(sync.Map)
115 changes: 115 additions & 0 deletions supernode/daemon/mgr/preheat/base_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package preheat

import (
"runtime/debug"
"sync/atomic"
"time"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
)

const TIMEOUT = 30 * 60

var _ IWorker = &BaseWorker{}

type IWorker interface{
Run()
Stop()
query() chan error
preRun() bool
failed(errMsg string)
afterRun()
}

type BaseWorker struct {
Task *mgr.PreheatTask
Preheater Preheater
PreheatService *PreheatService
stop *atomic.Value
worker IWorker
}

func newBaseWorker(task *mgr.PreheatTask, preheater Preheater, preheatService *PreheatService) *BaseWorker {
worker := &BaseWorker{
Task: task,
Preheater: preheater,
PreheatService: preheatService,
stop: new(atomic.Value),
}
worker.worker = worker
return worker
}

func (w *BaseWorker) Run() {
go func() {
defer func(){
e := recover()
if e != nil {
debug.PrintStack()
}
}()

if w.worker.preRun() {
timer := time.NewTimer(time.Second*TIMEOUT)
ch := w.worker.query()
select {
case <-timer.C:
w.worker.failed("timeout")
case err := <-ch:
if err != nil {
w.worker.failed(err.Error())
}
}
}
w.worker.afterRun()
}()
}

func (w *BaseWorker) Stop() {
w.stop.Store(true)
}

func (w *BaseWorker) isRunning() bool {
return w.stop.Load() == nil
}

func (w *BaseWorker) preRun() bool {
panic("not implement")
}

func (w *BaseWorker) afterRun() {
w.Preheater.Remove(w.Task.ID)
}

func (w *BaseWorker) query() chan error {
panic("not implement")
}

func (w *BaseWorker) succeed() {
w.Task.FinishTime = time.Now().UnixNano()/int64(time.Millisecond)
w.Task.Status = types.PreheatStatusSUCCESS
w.PreheatService.Update(w.Task.ID, w.Task)
}

func (w *BaseWorker) failed(errMsg string) {
w.Task.FinishTime = time.Now().UnixNano()/int64(time.Millisecond)
w.Task.Status = types.PreheatStatusFAILED
w.Task.ErrorMsg = errMsg
w.PreheatService.Update(w.Task.ID, w.Task)
}
109 changes: 109 additions & 0 deletions supernode/daemon/mgr/preheat/file_preaheater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package preheat

import (
"errors"
"fmt"
"github.com/sirupsen/logrus"
"time"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
)

func init() {
RegisterPreheater("file", &FilePreheat{BasePreheater:new(BasePreheater)})
logrus.StandardLogger().SetLevel(logrus.DebugLevel)
}

type FilePreheat struct {
*BasePreheater
}

func (p *FilePreheat) Type() string {
return "file"
}

/**
* Create a worker to preheat the task.
*/
func (p *FilePreheat) NewWorker(task *mgr.PreheatTask , service *PreheatService) IWorker {
worker := &FileWorker{BaseWorker: newBaseWorker(task, p, service)}
worker.worker = worker
p.addWorker(task.ID, worker)
return worker
}

type FileWorker struct {
*BaseWorker
progress *PreheatProgress
}

func (w *FileWorker) preRun() bool {
w.Task.Status = types.PreheatStatusRUNNING
w.PreheatService.Update(w.Task.ID, w.Task)
var err error
w.progress, err = w.PreheatService.ExecutePreheat(w.Task)
if err != nil {
w.failed(err.Error())
return false
}
return true
}

func (w *FileWorker) afterRun() {
if w.progress != nil {
w.progress.cmd.Process.Kill()
}
w.BaseWorker.afterRun()
}

func (w *FileWorker) query() chan error {
result := make(chan error, 1)
go func(){
time.Sleep(time.Second*2)
for w.isRunning() {
if w.Task.FinishTime > 0 {
w.Preheater.Cancel(w.Task.ID)
return
}
if w.progress == nil {
w.succeed()
return
}
status := w.progress.cmd.ProcessState
if status != nil && status.Exited() {
if !status.Success() {
errMsg := fmt.Sprintf("dfget failed: %s err: %s", status.String(), w.progress.errmsg.String())
w.failed(errMsg)
w.Preheater.Cancel(w.Task.ID)
result <- errors.New(errMsg)
return
} else {
w.succeed()
w.Preheater.Cancel(w.Task.ID)
result <- nil
return
}
}

time.Sleep(time.Second*10)
}
}()
return result
}

Loading

0 comments on commit 5c732d5

Please sign in to comment.