源码目录 time/time.go (1.1.4.1)
数据结构
/// time/sleep.go
type Timer struct {
C <-chan Time // 时间到达时的通道
r runtimeTimer
}
/// time/time.go
type Ticker struct {
C <-chan Time
r runtimeTimer
}
/// time/sleep.go
type runtimeTimer struct {
pp uintptr
when int64 // 到期时间
period int64 // ticker的周期。timer只会触发一次,所有为空
f func(interface{}, uintptr) // 时间到达时触发的方法
arg interface{}
seq uintptr
nextwhen int64
status uint32
}
/// runtime/runtime2.go
type p struct {
.....
timersLock mutex
timers []*timer // 每个P都有个timer集合。
....
}
从数据结构上看,timer和ticker其实是一样的。每个P都维护一个timer的最小堆
创建
func NewTicker(d Duration) *Ticker {
if d <= 0 { // 时间必须大于0
panic(errors.New("non-positive interval for NewTicker"))
}
c := make(chan Time, 1) // 通道容量为1,那ticker是否会堵塞呢?
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d), // 到期时间
period: int64(d), // 周期
f: sendTime, // 时间到达时往通道发送消息
arg: c,
},
}
startTimer(&t.r)
return t
}
// 时间到达时往通道发送消息
func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) <- Now():
default: // default 说明通道不会堵塞
}
}
// runtimeTimer的参数比tiker少了个period。因为timer只触发一次
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
timer的创建和ticker的创建基本一致,初始化通道和runtimeTimer。
添加timer到P
/// runtime/time.go
func startTimer(t *timer) {
addtimer(t)
}
func addtimer(t *timer) {
....
t.status = timerWaiting
addInitializedTimer(t)
}
func addInitializedTimer(t *timer) {
when := t.when
pp := getg().m.p.ptr() // 获取当前P
lock(&pp.timersLock)
cleantimers(pp) // 清理P中无用的timer
doaddtimer(pp, t) // 添加timer到p
unlock(&pp.timersLock)
wakeNetPoller(when)
}
func doaddtimer(pp *p, t *timer) {
...
t.pp.set(pp) // timer 和 p关联
i := len(pp.timers)
pp.timers = append(pp.timers, t) // timer加入P的timer集合
siftupTimer(pp.timers, i) // 调整最小堆,最先到达的时间在最前面
if t == pp.timers[0] { // 如果当前timer就是最小堆的一个,更新最先的到达时间
atomic.Store64(&pp.timer0When, uint64(t.when))
}
atomic.Xadd(&pp.numTimers, 1) // 更新timer数
}
将timer添加到当前P的最小堆中
调度
/// runtime/proc.go
func schedule() {
...
pp := _g_.m.p.ptr() // 获取当前P
...
checkTimers(pp, 0)
}
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
...
lock(&pp.timersLock)
adjusttimers(pp)
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 { // timers集合大于0,循环
if tw := runtimer(pp, rnow); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break // checkTimers结束
}
ran = true
}
}
...
unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
func runtimer(pp *p, now int64) int64 {
for {
t := pp.timers[0] // 最小堆的第一个
switch s := atomic.Load(&t.status); s {
case timerWaiting:
if t.when > now { // 时间还没到,返回到期时间,checkTimers的for循环也会终止
return t.when
}
if !atomic.Cas(&t.status, s, timerRunning) {
continue
}
runOneTimer(pp, t, now) // 时间到期
return 0
.....
}
}
}
func runOneTimer(pp *p, t *timer, now int64) {
f := t.f // timer的回调方法,创建时候设置的sendTime
arg := t.arg // 创建时候设置的通道
seq := t.seq
if t.period > 0 { // t.period > 0说明是ticker
delta := t.when - now
t.when += t.period * (1 + -delta/t.period) // 设置下一轮的到达时间
siftdownTimer(pp.timers, 0) // 调整最小堆
if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
badTimer()
}
updateTimer0When(pp) // 更新P的到达时间
} else { // 如果不是ticker 则移除出最小堆
dodeltimer0(pp)
if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
badTimer()
}
}
unlock(&pp.timersLock)
f(arg, seq) // 执行sendTime
lock(&pp.timersLock)
}
schedule() 是goroutine调度中的方法(参考我的另一篇博客), 获取当前p的timers最小堆中的一个timer,如果timer时间到期,则执行回调向通道发送消息,通知上层程序。如果是ticker则重新设置下次到达的时间,更新最小堆。
小结
timer和ticker的数据结构相同。每个P都有一个timer的最小堆。在runtime调度时,会获取当前P最小堆的一个timer,如果时间到期,则调用回调方法通过通道通知上层程序。如果timer是ticker则重新设置到期时间。