Skip to content

Commit

Permalink
Avoid high frequently retries of sync (#1245)
Browse files Browse the repository at this point in the history
* Avoid high frequently retries, To #38346130

Signed-off-by: cheyang <[email protected]>

* Avoid high frequently retries, To #38346130

Signed-off-by: cheyang <[email protected]>

* Avoid high frequently retries, To #38346130

Signed-off-by: cheyang <[email protected]>

* Set default sync duration, To #38346130

Signed-off-by: cheyang <[email protected]>

* Set default sync duration, To #38346130

Signed-off-by: cheyang <[email protected]>

* Set default sync duration, To #38346130

Signed-off-by: cheyang <[email protected]>

* Set default sync duration, To #38346130

Signed-off-by: cheyang <[email protected]>

* Set default sync duration, To #38346130

Signed-off-by: cheyang <[email protected]>
  • Loading branch information
cheyang authored Dec 10, 2021
1 parent 21a662f commit 7b5df4b
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 2 deletions.
1 change: 1 addition & 0 deletions charts/fluid/fluid/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@
### 0.7.0

* Add mountPropagation for registrar
* Add syncRetryDuration
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ spec:
- name: CRITICAL_FUSE_POD
value: {{ .Values.runtime.criticalFusePod | quote }}
{{- end }}
{{- if .Values.runtime.syncRetryDuration }}
- name: FLUID_SYNC_RETRY_DURATION
value: {{ .Values.runtime.syncRetryDuration | quote }}
{{- end }}
ports:
- containerPort: 8080
name: metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ spec:
- name: CRITICAL_FUSE_POD
value: {{ .Values.runtime.criticalFusePod | quote }}
{{- end }}
{{- if .Values.runtime.syncRetryDuration }}
- name: FLUID_SYNC_RETRY_DURATION
value: {{ .Values.runtime.syncRetryDuration | quote }}
{{- end }}
ports:
- containerPort: 8080
name: metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ spec:
- name: CRITICAL_FUSE_POD
value: {{ .Values.runtime.criticalFusePod | quote }}
{{- end }}
{{- if .Values.runtime.syncRetryDuration }}
- name: FLUID_SYNC_RETRY_DURATION
value: {{ .Values.runtime.syncRetryDuration | quote }}
{{- end }}
ports:
- containerPort: 8080
name: metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ spec:
- name: CRITICAL_FUSE_POD
value: {{ .Values.runtime.criticalFusePod | quote }}
{{- end }}
{{- if .Values.runtime.syncRetryDuration }}
- name: FLUID_SYNC_RETRY_DURATION
value: {{ .Values.runtime.syncRetryDuration | quote }}
{{- end }}
ports:
- containerPort: 8080
name: metrics
Expand Down
1 change: 1 addition & 0 deletions charts/fluid/fluid/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ csi:

runtime:
criticalFusePod: true
syncRetryDuration: 15s
mountRoot: /runtime-mnt
alluxio:
runtimeWorkers: 3
Expand Down
54 changes: 54 additions & 0 deletions pkg/ddc/base/runtime_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package base

import (
"os"
"reflect"
"testing"
"time"

"github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
fakeutils "github.com/fluid-cloudnative/fluid/pkg/utils/fake"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

func Test_convertToTieredstoreInfo(t *testing.T) {
Expand Down Expand Up @@ -417,3 +422,52 @@ func TestGetRuntimeInfo(t *testing.T) {
})
}
}

func TestGetSyncRetryDuration(t *testing.T) {

_, err := getSyncRetryDuration()
if err != nil {
t.Errorf("Failed to getSyncRetryDuration %v", err)
}

os.Setenv(syncRetryDurationEnv, "s")
_, err = getSyncRetryDuration()
if err == nil {
t.Errorf("Expect to get err, but got nil")
}

os.Setenv(syncRetryDurationEnv, "3s")
d, err := getSyncRetryDuration()
if err != nil {
t.Errorf("Failed to getSyncRetryDuration %v", err)
}
if d == nil {
t.Errorf("Failed to set the duration, expect %v, got %v", time.Duration(3*time.Second), d)
}
}

func TestPermitSync(t *testing.T) {

id := "test id"
ctx := cruntime.ReconcileRequestContext{
NamespacedName: types.NamespacedName{
Name: "hbase",
Namespace: "fluid",
},
Log: log.NullLogger{},
}

templateEngine := NewTemplateEngine(nil, id, ctx)
permit := templateEngine.permitSync(types.NamespacedName{Namespace: ctx.Namespace, Name: ctx.Namespace})
if permit {
t.Errorf("expect not permit, but got %v", permit)
}

templateEngine.setTimeOfLastSync()
templateEngine.syncRetryDuration = 1 * time.Microsecond
time.Sleep(1 * time.Second)
permit = templateEngine.permitSync(types.NamespacedName{Namespace: ctx.Namespace, Name: ctx.Namespace})
if !permit {
t.Errorf("expect permit, but got %v", permit)
}
}
13 changes: 13 additions & 0 deletions pkg/ddc/base/syncs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"k8s.io/apimachinery/pkg/types"
)

// SyncReplicas syncs the replicas
func (t *TemplateEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) {
// Avoid the retires too frequently
if !t.permitSync(types.NamespacedName{Name: ctx.Name, Namespace: t.Context.Namespace}) {
return
}

defer utils.TimeTrack(time.Now(), "base.Sync", "ctx", ctx)
defer t.setTimeOfLastSync()

err = t.Implement.SyncMetadata()
if err != nil {
return
Expand Down Expand Up @@ -79,3 +87,8 @@ func (t *TemplateEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error)

return t.Implement.SyncScheduleInfoToCacheNodes()
}

func (t *TemplateEngine) setTimeOfLastSync() {
t.timeOfLastSync = time.Now()
t.Log.V(1).Info("Set timeOfLastSync", "timeOfLastSync", t.timeOfLastSync)
}
59 changes: 57 additions & 2 deletions pkg/ddc/base/template_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,34 @@ limitations under the License.
package base

import (
"fmt"
"os"
"time"

cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"k8s.io/apimachinery/pkg/types"

"github.com/go-logr/logr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
syncRetryDurationEnv string = "FLUID_SYNC_RETRY_DURATION"

defaultSyncRetryDuration time.Duration = time.Duration(5 * time.Second)
)

// Use compiler to check if the struct implements all the interface
var _ Engine = (*TemplateEngine)(nil)

type TemplateEngine struct {
Implement
Id string
client.Client
Log logr.Logger
Context cruntime.ReconcileRequestContext
Log logr.Logger
Context cruntime.ReconcileRequestContext
syncRetryDuration time.Duration
timeOfLastSync time.Time
}

// NewTemplateEngine creates template engine
Expand All @@ -47,6 +60,18 @@ func NewTemplateEngine(impl Implement,
// Log: log,
}
b.Log = context.Log.WithValues("engine", context.RuntimeType).WithValues("id", id)
b.timeOfLastSync = time.Now()
duration, err := getSyncRetryDuration()
if err != nil {
b.Log.Error(err, "Failed to parse syncRetryDurationEnv: FLUID_SYNC_RETRY_DURATION, use the default setting")
}
if duration != nil {
b.syncRetryDuration = *duration
} else {
b.syncRetryDuration = defaultSyncRetryDuration
}
b.Log.Info("Set the syncRetryDuration", "syncRetryDuration", b.syncRetryDuration)

return b
}

Expand All @@ -59,3 +84,33 @@ func (t *TemplateEngine) ID() string {
func (t *TemplateEngine) Shutdown() error {
return t.Implement.Shutdown()
}

func getSyncRetryDuration() (d *time.Duration, err error) {
if value, existed := os.LookupEnv(syncRetryDurationEnv); existed {
duration, err := time.ParseDuration(value)
if err != nil {
return d, err
}
d = &duration
}
return
}

func (t *TemplateEngine) permitSync(key types.NamespacedName) (permit bool) {
if time.Since(t.timeOfLastSync) < t.syncRetryDuration {
info := fmt.Sprintf("Skipping engine.Sync(). Not permmitted until %v (syncRetryDuration %v) since timeOfLastSync %v.",
t.timeOfLastSync.Add(t.syncRetryDuration),
t.syncRetryDuration,
t.timeOfLastSync)
t.Log.Info(info, "name", key.Name, "namespace", key.Namespace)
} else {
permit = true
info := fmt.Sprintf("Processing engine.Sync(). permmitted %v (syncRetryDuration %v) since timeOfLastSync %v.",
t.timeOfLastSync.Add(t.syncRetryDuration),
t.syncRetryDuration,
t.timeOfLastSync)
t.Log.V(1).Info(info, "name", key.Name, "namespace", key.Namespace)
}

return
}
3 changes: 3 additions & 0 deletions pkg/ddc/base/template_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package base_test

import (
"context"
"os"

"reflect"
"testing"
Expand Down Expand Up @@ -80,6 +81,7 @@ var _ = Describe("TemplateEngine", func() {
BeforeEach(func() {
ctrl = gomock.NewController(GinkgoT())
impl = enginemock.NewMockImplement(ctrl)
os.Setenv("FLUID_SYNC_RETRY_DURATION", "0s")
t = base.NewTemplateEngine(impl, "default-test", fakeCtx)
})

Expand Down Expand Up @@ -279,3 +281,4 @@ func TestID(t *testing.T) {
t.Errorf("expected %s, get %s", templateEngine.Id, templateEngine.ID())
}
}

0 comments on commit 7b5df4b

Please sign in to comment.