Skip to content

Commit

Permalink
新增reset函数,TODO测试
Browse files Browse the repository at this point in the history
  • Loading branch information
guonaihong committed Feb 10, 2024
1 parent ff3d299 commit cdf2966
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 4 deletions.
4 changes: 3 additions & 1 deletion min_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (m *minHeap) removeTimeNode(node *minHeapNode) {

func (m *minHeap) resetTimeNode(node *minHeapNode, d time.Duration) {
m.mu.Lock()
heap.Push(&m.minHeaps, node)
node.userExpire = d
node.absExpire = time.Now().Add(d)
heap.Fix(&m.minHeaps, int(node.index))
select {
case m.chAdd <- struct{}{}:
default:
Expand Down
2 changes: 1 addition & 1 deletion min_heap_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (m *minHeapNode) Stop() {
m.root.removeTimeNode(m)
}
func (m *minHeapNode) Reset(d time.Duration) {

m.root.resetTimeNode(m, d)
}

func (m *minHeapNode) Next(now time.Time) time.Time {
Expand Down
2 changes: 2 additions & 0 deletions time_wheel.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func (t *timeWheel) AfterFunc(expire time.Duration, callback func()) TimeNoder {
node := &timeNode{
expire: uint64(expire),
callback: callback,
root: t,
}

return t.add(node, jiffies)
Expand All @@ -148,6 +149,7 @@ func (t *timeWheel) ScheduleFunc(userExpire time.Duration, callback func()) Time
expire: uint64(expire),
callback: callback,
isSchedule: true,
root: t,
}

return t.add(node, jiffies)
Expand Down
18 changes: 16 additions & 2 deletions time_wheel_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type timeNode struct {
list unsafe.Pointer //存放表头信息
version uint64 //保存节点版本信息
isSchedule bool

root *timeWheel
list.Head
}

Expand Down Expand Up @@ -90,6 +90,20 @@ func (t *timeNode) Stop() {
cpyList.Del(&t.Head)
}

func (t *timeNode) Reset(d time.Duration) {
// warning: 该函数目前没有稳定
func (t *timeNode) Reset(expire time.Duration) {
cpyList := (*Time)(atomic.LoadPointer(&t.list))
cpyList.Lock()
defer cpyList.Unlock()
// TODO: 这里有一个问题,如果在执行Reset的时候,这个节点已经被移动到tmp链表
// if atomic.LoadUint64(&t.version) != atomic.LoadUint64(&cpyList.version) {
// return
// }
cpyList.Del(&t.Head)
jiffies := atomic.LoadUint64(&t.root.jiffies)

expire = expire/(time.Millisecond*10) + time.Duration(jiffies)
t.expire = uint64(expire)

t.root.add(t, atomic.LoadUint64(&t.root.jiffies))
}

0 comments on commit cdf2966

Please sign in to comment.