2021
在系统栈中创建新的线程,它会每隔一段时间检查 Go 语言运行时,它在内部启动了一个不会中止的循环。
类似于守护进程
usleep 微秒(1/1000毫秒) nanosleep
func sysmon() {
sched.nmsys++
checkdead()
lasttrace := int64(0)
idle := 0
delay := uint32(0)
for {
if idle == 0 {
delay = 20
} else if idle > 50 {
delay *= 2
}
if delay > 10*1000 {
delay = 10 * 1000
}
usleep(delay)
...
}
}
//初始的休眠时间是 20μs;
//最长的休眠时间是 10ms;
//当系统监控在 50 个循环中都没有唤醒 Goroutine 时,休眠时间在每个循环都会倍增;
计时器的实现分别经历了以下几个过程:
Go 1.14 版本之后,每个处理器单独管理计时器并通过网络轮询器触发;
type timer struct {
pp puintptr
when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
nextwhen int64
status uint32
}
// runtime.timer 只是计时器运行时的私有结构体,对外暴露的计时器使用 time.Timer
type Timer struct {
C <-chan Time
r runtimeTimer
}
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
func sendTime(c interface{}, seq uintptr) {
select {
case c.(chan Time) <- Now():
default:
}
}
time.Ticker
time.Timer
time.Tick
time.After
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
func Tick(d Duration) <-chan Time {
if d <= 0 {
return nil
}
return NewTicker(d).C
}
time.AfterFunc
AfterFunc 返回的 Timer 根本不会使用到通道 C,返回的计时器不会被触发,只能用于调用 Stop 和 Reset 方法。
func AfterFunc(d Duration, f func()) *Timer {
t := &Timer{
r: runtimeTimer{
when: when(d),
f: goFunc,
arg: f,
},
}
startTimer(&t.r)
return t
}
(*time.Timer).Reset
需要注意的地方是使用 Reset
时需要确保 t.C
通道被释放时才能调用,以防止发生资源竞争的问题,因为在清空 channel 和计数器到期之间存在竞争,我们无法正确使用 Reset 返回值。Reset 方法必须作用于已停止或已过期的 channel 上。
假如在 Stop 调用期间触发了定时器,且 channel 存在未消费的数据, 则 C 会存在一个值。将导致后续读取是错误的。
for {
select {
case <-other:
case <-time.After(period):
}
}
for {
timer := time.NewTimer(period)
select {
case <-other:
case <-timer.C:
}
timer.Stop()
}
//为了使 C 上传递的消息有效,C 应该在每次 重置 之前被消费完。
timer := time.NewTimer(period)
for {
if !timer.Stop() {
<-timer.C
}
timer.Reset(period)
select {
case <-other:
case <-timer.C:
}
}
timer := time.NewTimer(period)
for {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(period)
select {
case <-other:
case <-timer.C:
}
}
time.Ticker.Reset
用户态,内核态切换耗费资源和时间,所以应该减少系统调用的操作
一个进程的上下文可以分为三个部分:用户级上下文、寄存器上下文以及系统级上下文。
(1)用户级上下文: 正文、数据、用户堆栈以及共享存储区;
(2)寄存器上下文: 通用寄存器、程序寄存器(IP)、处理器状态寄存器(EFLAGS)、栈指针(ESP);
(3)系统级上下文: 进程控制块task_struct、内存管理信息(mm_struct、vm_area_struct、pgd、pte)、内核栈。
进行进程切换就是上下文切换(context switch).操作系统必须对上面提到的全部信息进行切换
模式切换最主要的任务只是切换进程寄存器上下文的切换
一个中断上下文,通常都会始终占有CPU
只包含 40 多行代码; 程序中只能存在一个活跃线程,由 G-M 模型组成;
允许运行多线程的程序; 全局锁导致竞争严重;线程需要经常互相传递可运行的 Goroutine,引入了大量的延迟;
引入了处理器 P,构成了目前的 G-M-P 模型; 在处理器 P 的基础上实现了基于工作窃取的调度器; 在某些情况下,Goroutine 不会让出线程,进而造成饥饿问题; 时间过长的垃圾回收(Stop-the-world,STW)会导致程序长时间无法工作;
基于协作的抢占式调度器 - 1.2 ~ 1.13 通过编译器在函数调用时插入抢占检查指令,在函数调用时检查当前 Goroutine 是否发起了抢占请求,实现基于协作的抢占式调度; Goroutine 可能会因为垃圾回收和循环长时间占用资源导致程序暂停;
实现基于信号的真抢占式调度; 垃圾回收在扫描栈时会触发抢占调度; 抢占的时间点不够多,还不能覆盖全部的边缘情况;
对运行时的各种资源进行分区; 实现非常复杂,到今天还没有提上日程;
Go语言栈空间来运行被调用的函数。如果空间不足,Go的运行环境就会分配更多的栈空间。因为有了这个检查机制,一个goroutine的初始栈可以很小。这样Go程序员就可以把goroutine作为相对廉价的资源来使用。
Go 1.2 :协程的堆栈大小从 4Kb 增加到 8Kb。
Go 1.4 :协程的堆栈大小从 8Kb 减小到 2Kb。
堆栈的大小之所以改变是因为堆栈分配的策略改变了
长时间等待,不需要cpu的状态
操作系统中的 I/O 多路复用机制和 Go 语言的运行时,在两个不同体系之间构建了桥梁
在不同操作系统上的 I/O 操作,使用平台特定的函数实现了多个版本的网络轮询模块:
src/runtime/netpoll_epoll.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_windows.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_fake.go
func netpoll(delay int64) gList {
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
waitms = 1e9
}
var events [128]epollevent
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
...
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
...
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
netpollready(&toRun, pd, mode)
}
}
return toRun
}
我们能对channel进行的操作只有4种:
关闭channel (通过close()函数)
先入先出
先从 Channel 读取数据的 Goroutine 会先接收到数据;
先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
Channel 是一个用于同步和通信的有锁队列,使用互斥锁解决程序中可能存在的线程竞争问题
Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,目前通过 CAS 实现的无锁 Channel 没有提供先进先出的特性,所以该提案暂时也被搁浅了。
Channel 在运行时使用 runtime.hchan
结构体表示
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
lock mutex
}
sync.Mutex 互斥锁
type Mutex struct {
state int32
sema uint32
}
//1s=10^3ms(毫秒)=10^6μs(微秒)=10^9ns(纳秒)=10^12ps(皮秒)=10^15fs(飞秒)=10^18as(阿秒)=10^21zm(仄秒)=10^24ym(幺秒)
正常模式和饥饿模式
package main
import (
"sync"
"time"
)
func main() {
done1 := make(chan bool, 1)
done2 := make(chan bool, 1)
var mu sync.Mutex
// goroutine 1
go func() {
for {
select {
case <-done1:
return
default:
mu.Lock()
time.Sleep(100 * time.Millisecond)
mu.Unlock()
}
}
}()
// goroutine 2
go func() {
for {
select {
case <-done2:
return
default:
time.Sleep(100 * time.Millisecond)
mu.Lock()
mu.Unlock()
}
}
}()
time.Sleep(1000 * time.Millisecond)
done1 <- true
done2 <- true
}
互斥锁的加锁过程,它涉及自旋、信号量以及调度等概念:
mutexLocked
加锁;mutexLocked
状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE
指令消耗 CPU 时间等待锁的释放;runtime.sync_runtime_SemacquireMutex
将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;互斥锁的解锁过程:
sync.Mutex.Unlock
会直接抛出异常;mutexLocked
标志位;当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease
唤醒对应的 Goroutine;
读写互斥锁 sync.RWMutex
是细粒度的互斥锁
type RWMutex struct {
w Mutex
writerSem uint32
readerSem uint32
readerCount int32
readerWait int32
}
//w — 复用互斥锁提供的能力;
//writerSem 和 readerSem — 分别用于写等待读和读等待写:
//readerCount 存储了当前正在执行的读操作数量;
//readerWait 表示当写操作被阻塞时等待的读操作个数;
写操作使用 sync.RWMutex.Lock
和 sync.RWMutex.Unlock
方法;
读操作使用 sync.RWMutex.RLock
和 sync.RWMutex.RUnlock
方法;
读锁和写锁的关系:
并发竞争
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var count int64
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
atomic.AddInt64(&count, int64(j))
//count++
}
}()
}
wg.Wait()
fmt.Println(count)
}
//抢占,go1.8 自旋几次再阻塞(占用资源),go1.9 饥饿模式(性能不会明显下降)
goroutine 同步
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(5)
for i := 0; i < 5; i++ {
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}
wg.Wait()
}
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}
package main
import (
"fmt"
"time"
)
func main() {
stop := make(chan bool)
go subProcess(stop)
time.Sleep(5 * time.Second)
fmt.Println("可以了,通知监控停止")
stop <- true
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(3 * time.Second)
}
func subProcess(stop chan bool) {
for {
select {
case <-stop:
fmt.Println("监控退出,停止了...")
return
default:
time.Sleep(time.Second)
}
}
}
//如有很多goroutine,并且这些goroutine还衍生了其他goroutine,此时chan就比较困难解决这样的问题了
sync.Cond
Go 语言标准库中还包含条件变量 sync.Cond
,它可以让一组的 Goroutine 都在满足特定条件时被唤醒。
sync.Once
Go 语言标准库中 sync.Once
可以保证在 Go 程序运行期间的某段代码只会执行一次。
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
go watch(ctx, "【监控1】")
go watch(ctx, "【监控2】")
go watch(ctx, "【监控3】")
time.Sleep(5 * time.Second)
fmt.Println("可以了,通知监控停止")
cancel()
//为了检测监控过是否停止,如果没有监控输出,就表示停止了
time.Sleep(3 * time.Second)
}
func watch(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println(name, "监控退出,停止了...")
return
default:
fmt.Println(name, "goroutine监控中...")
time.Sleep(time.Second)
}
}
}
//跟踪goroutine的方案才可以达到控制的目的,go为我们提供了Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
Context使用原则