Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
feature: define the interface and http api of preheat
Browse files Browse the repository at this point in the history
Signed-off-by: lowzj <[email protected]>
  • Loading branch information
lowzj committed Mar 24, 2020
1 parent 7179243 commit da7adf1
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 3 deletions.
35 changes: 33 additions & 2 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,10 @@ paths:
description: "bad parameter"
schema:
$ref: '#/definitions/Error'
409:
description: "preheat task already exists"
schema:
$ref: '#/definitions/Error'
500:
$ref: "#/responses/500ErrorResponse"

Expand Down Expand Up @@ -628,6 +632,28 @@ paths:
500:
$ref: "#/responses/500ErrorResponse"

delete:
summary: "Delete a preheat task"
description: |
delete a preheat task
produces:
- "application/json"
parameters:
- name: id
in: path
required: true
description: "ID of preheat task"
type: string
responses:
200:
description: "no error"
404:
description: "no such preheat task"
schema:
$ref: "#/responses/404ErrorResponse"
500:
$ref: "#/responses/500ErrorResponse"

/task/metrics:
post:
summary: "upload dfclient download metrics"
Expand Down Expand Up @@ -1152,7 +1178,7 @@ definitions:
taskId:
type: "string"
description: |
the taskID of the piece.
the taskID of the piece.
srcCid:
type: "string"
description: |
Expand Down Expand Up @@ -1219,13 +1245,18 @@ definitions:
type: "object"
description: |
Request option of creating a preheat task in supernode.
required:
- type
- url
properties:
type:
type: "string"
enum: ["image", "file"]
description: |
this must be image or file
url:
type: "string"
minLength: 3
description: "the image or file location"
filter:
type: "string"
Expand Down Expand Up @@ -1283,7 +1314,7 @@ definitions:
status:
type: "string"
description: |
The status of Dfget download process.
The status of Dfget download process.
enum: ["WAITING", "RUNNING", "FAILED", "SUCCESS"]
peerID:
type: "string"
Expand Down
52 changes: 52 additions & 0 deletions supernode/daemon/mgr/preheat/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package preheat

import (
"context"
"fmt"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
)

var _ mgr.PreheatManager = &Manager{}

// Manager is an implementation of interface PreheatManager.
type Manager struct {
}

func NewManager(cfg *config.Config) (mgr.PreheatManager, error) {
return &Manager{}, nil
}

func (m *Manager) Create(ctx context.Context, task *types.PreheatCreateRequest) (id string, err error) {
return "", fmt.Errorf("not implement")
}

func (m *Manager) Get(ctx context.Context, id string) (task *mgr.PreheatTask, err error) {
return nil, fmt.Errorf("not implement")
}

func (m *Manager) Delete(ctx context.Context, id string) (err error) {
return fmt.Errorf("not implement")
}

func (m *Manager) GetAll(ctx context.Context) (task []*mgr.PreheatTask, err error) {
return nil, fmt.Errorf("not implement")
}
90 changes: 90 additions & 0 deletions supernode/daemon/mgr/preheat_mgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mgr

import (
"context"

"github.com/dragonflyoss/Dragonfly/apis/types"
)

// PreheatStatus
type PreheatStatus string

const (
// Waiting the preheat task is waiting for executing
Waiting PreheatStatus = "WAITING"

// Running the preheat task is running
Running PreheatStatus = "RUNNING"

// Success the preheat task is finished and successfully
Success PreheatStatus = "SUCCESS"

// Failed the preheat task is finished and failed
Failed PreheatStatus = "FAILED"
)

var preheatStatusEnums = []PreheatStatus{Waiting, Running, Success, Failed}

// IsValid tests the value of the instance whether is valid.
func (ps PreheatStatus) IsValid() bool {
for _, v := range preheatStatusEnums {
if v == ps {
return true
}
}
return false
}

// String return the string value of the instance.
func (ps PreheatStatus) String() string {
return (string)(ps)
}

// PreheatTask store the detailed preheat task information.
type PreheatTask struct {
ID string
URL string
Type string
Filter string
Identifier string
Headers map[string]string

ParentID string
Children []string
Status PreheatStatus
StartTime int64
FinishTime int64
ErrorMsg string
}

// PreheatManager
type PreheatManager interface {
// Create creates a preheat task to cache data in supernode, thus accelerating the
// p2p downloading.
Create(ctx context.Context, task *types.PreheatCreateRequest) (id string, err error)

// Get gets detailed preheat task information by id.
Get(ctx context.Context, id string) (task *PreheatTask, err error)

// Delete deletes a preheat task by id.
Delete(ctx context.Context, id string) (err error)

// GetAll gets all preheat tasks that unexpired.
GetAll(ctx context.Context) (task []*PreheatTask, err error)
}
155 changes: 155 additions & 0 deletions supernode/server/preheat_bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package server

import (
"context"
"encoding/json"
"io"
"net/http"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"

"github.com/go-openapi/strfmt"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
)

// ---------------------------------------------------------------------------
// handlers of preheat http apis

func (s *Server) createPreheatTask(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
request := &types.PreheatCreateRequest{}
if err := parseRequest(req.Body, request, request.Validate); err != nil {
return err
}
preheatID, err := s.PreheatMgr.Create(ctx, request)
if err != nil {
return err
}
resp := types.PreheatCreateResponse{ID: preheatID}
return EncodeResponse(rw, http.StatusCreated, resp)
}

func (s *Server) getAllPreheatTasks(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
tasks, err := s.PreheatMgr.GetAll(ctx)
if err != nil {
return err
}
return EncodeResponse(rw, http.StatusOK, tasks)
}

func (s *Server) getPreheatTask(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
id := mux.Vars(req)["id"]
task, err := s.PreheatMgr.Get(ctx, id)
if err != nil {
return err
}
resp := types.PreheatInfo{
ID: task.ID,
FinishTime: strfmt.DateTime{},
StartTime: strfmt.DateTime{},
Status: task.Status.String(),
}
return EncodeResponse(rw, http.StatusOK, resp)
}

func (s *Server) deletePreheatTask(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
id := mux.Vars(req)["id"]
if err := s.PreheatMgr.Delete(ctx, id); err != nil {
return err
}
return EncodeResponse(rw, http.StatusOK, true)
}

// ---------------------------------------------------------------------------
// helper functions

type validateFunc func(registry strfmt.Registry) error

func parseRequest(body io.Reader, request interface{}, validator validateFunc) error {
if err := json.NewDecoder(body).Decode(request); err != nil {
if err == io.EOF {
return errortypes.New(http.StatusBadRequest, "empty body")
}
return errortypes.New(http.StatusBadRequest, err.Error())
}
if validator != nil {
if err := validator(strfmt.NewFormats()); err != nil {
return errortypes.New(http.StatusBadRequest, err.Error())
}
}
return nil
}

// initPreheatHandlers register preheat apis
func initPreheatHandlers(s *Server, r *mux.Router) {
handlers := []*HandlerSpec{
{Method: http.MethodPost, Path: "/preheats", HandlerFunc: s.createPreheatTask},
{Method: http.MethodGet, Path: "/preheats", HandlerFunc: s.getAllPreheatTasks},
{Method: http.MethodGet, Path: "/preheats/{id}", HandlerFunc: s.getPreheatTask},
{Method: http.MethodDelete, Path: "/preheats/{id}", HandlerFunc: s.deletePreheatTask},
}
// register API
for _, h := range handlers {
if h != nil {
r.Path(versionMatcher + h.Path).Methods(h.Method).
Handler(m.instrumentHandler(h.Path, postPreheatHandler(h.HandlerFunc)))
r.Path("/api/v1" + h.Path).Methods(h.Method).
Handler(m.instrumentHandler(h.Path, postPreheatHandler(h.HandlerFunc)))
r.Path(h.Path).Methods(h.Method).
Handler(m.instrumentHandler(h.Path, postPreheatHandler(h.HandlerFunc)))
}
}
}

func postPreheatHandler(h Handler) http.HandlerFunc {
pctx := context.Background()

return func(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithCancel(pctx)
defer cancel()

// Start to handle request.
err := h(ctx, w, req)
if err != nil {
// Handle error if request handling fails.
handlePreheatErrorResponse(w, err)
}
logrus.Debugf("%s %v err:%v", req.Method, req.URL, err)
}
}

func handlePreheatErrorResponse(w http.ResponseWriter, err error) {
var (
code int
errMsg string
)

// By default, daemon side returns code 500 if error happens.
code = http.StatusInternalServerError
if e, ok := err.(*errortypes.DfError); ok {
code = e.Code
errMsg = e.Msg
}

_ = EncodeResponse(w, code, types.ErrorResponse{
Code: int64(code),
Message: errMsg,
})
}
2 changes: 1 addition & 1 deletion supernode/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/version"

"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -75,6 +74,7 @@ func initRoute(s *Server) *mux.Router {
r.Path(h.Path).Methods(h.Method).Handler(m.instrumentHandler(h.Path, filter(h.HandlerFunc)))
}
}
initPreheatHandlers(s, r)

if s.Config.Debug || s.Config.EnableProfiler {
r.PathPrefix("/debug/pprof/cmdline").HandlerFunc(pprof.Cmdline)
Expand Down
Loading

0 comments on commit da7adf1

Please sign in to comment.