Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
feature: define the interface and http api of preheat
Browse files Browse the repository at this point in the history
Signed-off-by: lowzj <[email protected]>
  • Loading branch information
lowzj committed Mar 12, 2020
1 parent bc16744 commit e97798f
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 1 deletion.
9 changes: 9 additions & 0 deletions apis/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,10 @@ paths:
description: "bad parameter"
schema:
$ref: '#/definitions/Error'
409:
description: "preheat task already exists"
schema:
$ref: '#/definitions/Error'
500:
$ref: "#/responses/500ErrorResponse"

Expand Down Expand Up @@ -1212,13 +1216,18 @@ definitions:
type: "object"
description: |
Request option of creating a preheat task in supernode.
required:
- type
- url
properties:
type:
type: "string"
enum: ["image", "file"]
description: |
this must be image or file
url:
type: "string"
minLength: 3
description: "the image or file location"
filter:
type: "string"
Expand Down
52 changes: 52 additions & 0 deletions supernode/daemon/mgr/preheat/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package preheat

import (
"context"
"fmt"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/supernode/config"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
)

var _ mgr.PreheatManager = &Manager{}

// Manager is an implementation of interface PreheatManager.
type Manager struct {
}

func NewManager(cfg *config.Config) (mgr.PreheatManager, error) {
return &Manager{}, nil
}

func (m *Manager) Create(ctx context.Context, task *types.PreheatCreateRequest) (id string, err error) {
return "", fmt.Errorf("not implement")
}

func (m *Manager) Get(ctx context.Context, id string) (task *mgr.PreheatTask, err error) {
return nil, fmt.Errorf("not implement")
}

func (m *Manager) Delete(ctx context.Context, id string) (err error) {
return fmt.Errorf("not implement")
}

func (m *Manager) GetAll(ctx context.Context) (task []*mgr.PreheatTask, err error) {
return nil, fmt.Errorf("not implement")
}
90 changes: 90 additions & 0 deletions supernode/daemon/mgr/preheat_mgr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package mgr

import (
"context"

"github.com/dragonflyoss/Dragonfly/apis/types"
)

// PreheatStatus
type PreheatStatus string

const (
// Waiting the preheat task is waiting for executing
Waiting PreheatStatus = "WAITING"

// Running the preheat task is running
Running PreheatStatus = "RUNNING"

// Success the preheat task is finished and successfully
Success PreheatStatus = "SUCCESS"

// Failed the preheat task is finished and failed
Failed PreheatStatus = "FAILED"
)

var preheatStatusEnums = []PreheatStatus{Waiting, Running, Success, Failed}

// IsValid tests the value of the instance whether is valid.
func (ps PreheatStatus) IsValid() bool {
for _, v := range preheatStatusEnums {
if v == ps {
return true
}
}
return false
}

// String return the string value of the instance.
func (ps PreheatStatus) String() string {
return (string)(ps)
}

// PreheatTask store the detailed preheat task information.
type PreheatTask struct {
ID string
URL string
Type string
Filter string
Identifier string
Headers map[string]string

ParentID string
Children []string
Status PreheatStatus
StartTime int64
FinishTime int64
ErrorMsg string
}

// PreheatManager
type PreheatManager interface {
// Create creates a preheat task to cache data in supernode, thus accelerating the
// p2p downloading.
Create(ctx context.Context, task *types.PreheatCreateRequest) (id string, err error)

// Get gets detailed preheat task information by id.
Get(ctx context.Context, id string) (task *PreheatTask, err error)

// Delete deletes a preheat task by id.
Delete(ctx context.Context, id string) (err error)

// GetAll gets all preheat tasks that unexpired.
GetAll(ctx context.Context) (task []*PreheatTask, err error)
}
70 changes: 70 additions & 0 deletions supernode/server/preheat_bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package server

import (
"context"
"net/http"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/go-openapi/strfmt"
"github.com/gorilla/mux"
)

func (s *Server) createPreheatTask(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
request := &types.PreheatCreateRequest{}
if err := parseRequest(req.Body, request, request.Validate); err != nil {
return err
}
preheatID, err := s.PreheatMgr.Create(ctx, request)
if err != nil {
return err
}
resp := types.PreheatCreateResponse{ID: preheatID}
return EncodeResponse(rw, http.StatusCreated, resp)
}

func (s *Server) getAllPreheatTasks(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
tasks, err := s.PreheatMgr.GetAll(ctx)
if err != nil {
return err
}
return EncodeResponse(rw, http.StatusOK, tasks)
}

func (s *Server) getPreheatTask(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
id := mux.Vars(req)["id"]
task, err := s.PreheatMgr.Get(ctx, id)
if err != nil {
return err
}
resp := types.PreheatInfo{
ID: task.ID,
FinishTime: strfmt.DateTime{},
StartTime: strfmt.DateTime{},
Status: task.Status.String(),
}
return EncodeResponse(rw, http.StatusOK, resp)
}

func (s *Server) deletePreheatTask(ctx context.Context, rw http.ResponseWriter, req *http.Request) error {
id := mux.Vars(req)["id"]
if err := s.PreheatMgr.Delete(ctx, id); err != nil {
return err
}
return EncodeResponse(rw, http.StatusOK, true)
}
85 changes: 84 additions & 1 deletion supernode/server/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package server
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/pprof"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/pkg/errortypes"
"github.com/dragonflyoss/Dragonfly/version"

"github.com/go-openapi/strfmt"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
)

// versionMatcher defines to parse version url path.
Expand Down Expand Up @@ -75,6 +79,7 @@ func initRoute(s *Server) *mux.Router {
r.Path(h.Path).Methods(h.Method).Handler(m.instrumentHandler(h.Path, filter(h.HandlerFunc)))
}
}
initPreheatHandlers(s, r)

if s.Config.Debug || s.Config.EnableProfiler {
r.PathPrefix("/debug/pprof/cmdline").HandlerFunc(pprof.Cmdline)
Expand All @@ -86,6 +91,26 @@ func initRoute(s *Server) *mux.Router {
return r
}

func initPreheatHandlers(s *Server, r *mux.Router) {
handlers := []*HandlerSpec{
{Method: http.MethodPost, Path: "/preheats", HandlerFunc: s.createPreheatTask},
{Method: http.MethodGet, Path: "/preheats", HandlerFunc: s.getAllPreheatTasks},
{Method: http.MethodGet, Path: "/preheats/{id}", HandlerFunc: s.getPreheatTask},
{Method: http.MethodDelete, Path: "/preheats/{id}", HandlerFunc: s.deletePreheatTask},
}
// register API
for _, h := range handlers {
if h != nil {
r.Path(versionMatcher + h.Path).Methods(h.Method).
Handler(m.instrumentHandler(h.Path, postPreheatHandler(h.HandlerFunc)))
r.Path("/api/v1" + h.Path).Methods(h.Method).
Handler(m.instrumentHandler(h.Path, postPreheatHandler(h.HandlerFunc)))
r.Path(h.Path).Methods(h.Method).
Handler(m.instrumentHandler(h.Path, postPreheatHandler(h.HandlerFunc)))
}
}
}

func handleMetrics(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) {
promhttp.Handler().ServeHTTP(rw, req)
return nil
Expand All @@ -107,6 +132,37 @@ func filter(handler Handler) http.HandlerFunc {
}
}

func postPreheatHandler(h Handler) http.HandlerFunc {
pctx := context.Background()

return func(w http.ResponseWriter, req *http.Request) {
ctx, cancel := context.WithCancel(pctx)
defer cancel()

// Start to handle request.
err := h(ctx, w, req)
if err != nil {
// Handle error if request handling fails.
handlePreheatErrorResponse(w, err)
}
logrus.Debugf("%v err:%v", req.URL, err)
}
}

type validateFunc func(registry strfmt.Registry) error

func parseRequest(body io.Reader, request interface{}, validator validateFunc) error {
if err := json.NewDecoder(body).Decode(request); err != nil {
return errors.Wrap(errortypes.ErrInvalidValue, err.Error())
}
if validator != nil {
if err := validator(strfmt.NewFormats()); err != nil {
return errors.Wrap(errortypes.ErrInvalidValue, err.Error())
}
}
return nil
}

// EncodeResponse encodes response in json.
func EncodeResponse(rw http.ResponseWriter, statusCode int, data interface{}) error {
rw.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -135,3 +191,30 @@ func HandleErrorResponse(w http.ResponseWriter, err error) {
}
enc.Encode(resp)
}

func handlePreheatErrorResponse(w http.ResponseWriter, err error) {
var (
code int
errMsg string
)

// By default, daemon side returns code 500 if error happens.
code = http.StatusInternalServerError
if errortypes.IsInvalidValue(err) {
e, _ := errors.Cause(err).(errortypes.DfError)
code = http.StatusBadRequest
errMsg = e.Msg
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
enc := json.NewEncoder(w)
enc.SetEscapeHTML(false)

resp := types.ErrorResponse{
Code: int64(code),
Message: errMsg,
}

_ = enc.Encode(resp)
}
Loading

0 comments on commit e97798f

Please sign in to comment.