runtime.md 16 KB

2021

系统监控 sysmon

在系统栈中创建新的线程,它会每隔一段时间检查 Go 语言运行时,它在内部启动了一个不会中止的循环。

类似于守护进程

usleep 微秒(1/1000毫秒) nanosleep

func sysmon() {
	sched.nmsys++
	checkdead()

	lasttrace := int64(0)
	idle := 0
	delay := uint32(0)
	for {
		if idle == 0 {
			delay = 20
		} else if idle > 50 {
			delay *= 2
		}
		if delay > 10*1000 {
			delay = 10 * 1000
		}
		usleep(delay)
		...
	}
}
//初始的休眠时间是 20μs;
//最长的休眠时间是 10ms;
//当系统监控在 50 个循环中都没有唤醒 Goroutine 时,休眠时间在每个循环都会倍增;
  • 运行计时器 — 获取下一个需要被触发的计时器;
  • 轮询网络 — 获取需要处理的到期文件描述符;
  • 抢占处理器 — 抢占运行时间较长的或者处于系统调用的 Goroutine;
  • 垃圾回收 — 在满足条件时触发垃圾收集回收内存;

计时器

计时器的实现分别经历了以下几个过程:

  • Go 1.9 版本之前,所有的计时器由全局唯一的四叉堆维护;
  • Go 1.10 ~ 1.13,全局使用 64 个四叉堆维护全部的计时器,每个处理器(P)创建的计时器会由对应的四叉堆维护;
  • Go 1.14 版本之后,每个处理器单独管理计时器并通过网络轮询器触发;

    type timer struct {
    	pp puintptr
    
    	when     int64
    	period   int64
    	f        func(interface{}, uintptr)
    	arg      interface{}
    	seq      uintptr
    	nextwhen int64
    	status   uint32
    }
    
    // runtime.timer 只是计时器运行时的私有结构体,对外暴露的计时器使用 time.Timer 
    
    type Timer struct {
    	C <-chan Time
    	r runtimeTimer
    }
    
    func NewTimer(d Duration) *Timer {
    	c := make(chan Time, 1)
    	t := &Timer{
    		C: c,
    		r: runtimeTimer{
    			when: when(d),
    			f:    sendTime,
    			arg:  c,
    		},
    	}
    	startTimer(&t.r)
    	return t
    }
    
    func sendTime(c interface{}, seq uintptr) {
    	select {
    	case c.(chan Time) <- Now():
    	default:
    	}
    }
    

time.Ticker

time.Timer

time.Tick

time.After

func After(d Duration) <-chan Time {
	return NewTimer(d).C
}

func Tick(d Duration) <-chan Time {
	if d <= 0 {
		return nil
	}
	return NewTicker(d).C
}

time.AfterFunc

AfterFunc 返回的 Timer 根本不会使用到通道 C,返回的计时器不会被触发,只能用于调用 Stop 和 Reset 方法。

func AfterFunc(d Duration, f func()) *Timer {
	t := &Timer{
		r: runtimeTimer{
			when: when(d),
			f:    goFunc,
			arg:  f,
		},
	}
	startTimer(&t.r)
	return t
}

(*time.Timer).Reset

需要注意的地方是使用 Reset 时需要确保 t.C 通道被释放时才能调用,以防止发生资源竞争的问题,因为在清空 channel 和计数器到期之间存在竞争,我们无法正确使用 Reset 返回值。Reset 方法必须作用于已停止或已过期的 channel 上。

假如在 Stop 调用期间触发了定时器,且 channel 存在未消费的数据, 则 C 会存在一个值。将导致后续读取是错误的。

    for {
        select {
        case <-other:
        case <-time.After(period):
        }
    }

    for {
        timer := time.NewTimer(period)
        select {
        case <-other:
        case <-timer.C:
        }
        timer.Stop()
    }

//为了使 C 上传递的消息有效,C 应该在每次 重置 之前被消费完。
    timer := time.NewTimer(period)
    for {
        if !timer.Stop() {
            <-timer.C
        }
        timer.Reset(period)
        select {
        case <-other:
        case <-timer.C:
        }
    }

	timer := time.NewTimer(period)
	for {
		if !timer.Stop() {
			select {
			case <-timer.C:
			default:
			}
		}

		timer.Reset(period)
		select {
		case <-other:
		case <-timer.C:
		}
	}

time.Ticker.Reset

内核态、用户态

用户态,内核态切换耗费资源和时间,所以应该减少系统调用的操作

  • 基于数据收发的基本原理,系统利用阻塞提高了 CPU 利用率
  • 为了优化上线文切换,设计了“IO 多路复用”,等着收到一批数据,再一次批量的处理数据
  • 为了优化“内核与监视进程的交互”,设计了三个版本的 API(select,poll,epoll)

一个进程的上下文可以分为三个部分:用户级上下文、寄存器上下文以及系统级上下文。

​ (1)用户级上下文: 正文、数据、用户堆栈以及共享存储区;

(2)寄存器上下文: 通用寄存器、程序寄存器(IP)、处理器状态寄存器(EFLAGS)、栈指针(ESP);
(3)系统级上下文: 进程控制块task_struct、内存管理信息(mm_struct、vm_area_struct、pgd、pte)、内核栈。

进行进程切换就是上下文切换(context switch).操作系统必须对上面提到的全部信息进行切换

模式切换最主要的任务只是切换进程寄存器上下文的切换

一个中断上下文,通常都会始终占有CPU

调度器

  • 单线程调度器 · 0.x

只包含 40 多行代码; 程序中只能存在一个活跃线程,由 G-M 模型组成;

  • 多线程调度器 · 1.0

允许运行多线程的程序; 全局锁导致竞争严重;线程需要经常互相传递可运行的 Goroutine,引入了大量的延迟;

  • 任务窃取调度器 · 1.1

引入了处理器 P,构成了目前的 G-M-P 模型; 在处理器 P 的基础上实现了基于工作窃取的调度器; 在某些情况下,Goroutine 不会让出线程,进而造成饥饿问题; 时间过长的垃圾回收(Stop-the-world,STW)会导致程序长时间无法工作;

  • 抢占式调度器 · 1.2 ~ 至今

基于协作的抢占式调度器 - 1.2 ~ 1.13 通过编译器在函数调用时插入抢占检查指令,在函数调用时检查当前 Goroutine 是否发起了抢占请求,实现基于协作的抢占式调度; Goroutine 可能会因为垃圾回收和循环长时间占用资源导致程序暂停;

  • 基于信号的抢占式调度器 - 1.14 ~ 至今

实现基于信号的真抢占式调度; 垃圾回收在扫描栈时会触发抢占调度; 抢占的时间点不够多,还不能覆盖全部的边缘情况;

  • 非均匀存储访问调度器 · 提案

对运行时的各种资源进行分区; 实现非常复杂,到今天还没有提上日程;

goroutine的栈管理

  • Go语言栈空间来运行被调用的函数。如果空间不足,Go的运行环境就会分配更多的栈空间。因为有了这个检查机制,一个goroutine的初始栈可以很小。这样Go程序员就可以把goroutine作为相对廉价的资源来使用。

  • Go 1.2 :协程的堆栈大小从 4Kb 增加到 8Kb。

  • Go 1.4 :协程的堆栈大小从 8Kb 减小到 2Kb。

  • 堆栈的大小之所以改变是因为堆栈分配的策略改变了

goroutine的阻塞

长时间等待,不需要cpu的状态

  • 使用channel
  • 使用waitGroup
  • sync.Mutex锁
  • 阻塞的系统调用 (读写文件)
  • time.sleep

网络轮询器

操作系统中的 I/O 多路复用机制和 Go 语言的运行时,在两个不同体系之间构建了桥梁

在不同操作系统上的 I/O 操作,使用平台特定的函数实现了多个版本的网络轮询模块:

  • src/runtime/netpoll_epoll.go
  • src/runtime/netpoll_kqueue.go
  • src/runtime/netpoll_solaris.go
  • src/runtime/netpoll_windows.go
  • src/runtime/netpoll_aix.go
  • src/runtime/netpoll_fake.go

    func netpoll(delay int64) gList {
    	var waitms int32
    	if delay < 0 {
    		waitms = -1
    	} else if delay == 0 {
    		waitms = 0
    	} else if delay < 1e6 {
    		waitms = 1
    	} else if delay < 1e15 {
    		waitms = int32(delay / 1e6)
    	} else {
    		waitms = 1e9
    	}
    	
    	var events [128]epollevent
    retry:
    	n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    	if n < 0 {
    		if waitms > 0 {
    			return gList{}
    		}
    		goto retry
    	}
    	
    	var toRun gList
    	for i := int32(0); i < n; i++ {
    		ev := &events[i]
    		if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
    			...
    			continue
    		}
    
    		var mode int32
    		if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
    			mode += 'r'
    		}
    		...
    		if mode != 0 {
    			pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
    			pd.everr = false
    			netpollready(&toRun, pd, mode)
    		}
    	}
    	return toRun
    }
    
    

channel

我们能对channel进行的操作只有4种:

  • 创建channel (通过make()函数)
  • 放入数据 (通过 channel <- data 操作) 
  • 取出数据 (通过 <-channel 操作)
  • 关闭channel (通过close()函数)

  • 先入先出

先从 Channel 读取数据的 Goroutine 会先接收到数据;

先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

  • 无锁管道

Channel 是一个用于同步和通信的有锁队列,使用互斥锁解决程序中可能存在的线程竞争问题

Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,目前通过 CAS 实现的无锁 Channel 没有提供先进先出的特性,所以该提案暂时也被搁浅了。

Channel 在运行时使用 runtime.hchan 结构体表示

type hchan struct {
	qcount   uint
	dataqsiz uint
	buf      unsafe.Pointer
	elemsize uint16
	closed   uint32
	elemtype *_type
	sendx    uint
	recvx    uint
	recvq    waitq
	sendq    waitq

	lock mutex
}

同步原语与锁

  • sync.Mutex 互斥锁

    type Mutex struct {
    	state int32
    	sema  uint32
    }
    
    //1s=10^3ms(毫秒)=10^6μs(微秒)=10^9ns(纳秒)=10^12ps(皮秒)=10^15fs(飞秒)=10^18as(阿秒)=10^21zm(仄秒)=10^24ym(幺秒) 
    
  • 正常模式和饥饿模式

    package main
    
    import (
    	"sync"
    	"time"
    )
    
    func main() {
    	done1 := make(chan bool, 1)
    	done2 := make(chan bool, 1)
    	var mu sync.Mutex
    
    	// goroutine 1
    	go func() {
    		for {
    			select {
    			case <-done1:
    				return
    			default:
    				mu.Lock()
    				time.Sleep(100 * time.Millisecond)
    				mu.Unlock()
    			}
    		}
    	}()
    
    	// goroutine 2
    	go func() {
    		for {
    			select {
    			case <-done2:
    				return
    			default:
    				time.Sleep(100 * time.Millisecond)
    				mu.Lock()
    				mu.Unlock()
    			}
    		}
    	}()
    
    	time.Sleep(1000 * time.Millisecond)
    	done1 <- true
    	done2 <- true
    }
    

互斥锁的加锁过程,它涉及自旋、信号量以及调度等概念:

  • 如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
  • 如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
  • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式;
  • 互斥锁在正常情况下会通过 runtime.sync_runtime_SemacquireMutex 将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒;
  • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,那么它会将互斥锁切换回正常模式;

互斥锁的解锁过程:

  • 当互斥锁已经被解锁时,调用 sync.Mutex.Unlock 会直接抛出异常;
  • 当互斥锁处于饥饿模式时,将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
  • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,会直接返回;在其他情况下会通过 sync.runtime_Semrelease 唤醒对应的 Goroutine;

  • 读写互斥锁 sync.RWMutex 是细粒度的互斥锁

    type RWMutex struct {
    	w           Mutex
    	writerSem   uint32
    	readerSem   uint32
    	readerCount int32
    	readerWait  int32
    }
    //w — 复用互斥锁提供的能力;
    //writerSem 和 readerSem — 分别用于写等待读和读等待写:
    //readerCount 存储了当前正在执行的读操作数量;
    //readerWait 表示当写操作被阻塞时等待的读操作个数;
    
  • 写操作使用 sync.RWMutex.Locksync.RWMutex.Unlock 方法;

  • 读操作使用 sync.RWMutex.RLocksync.RWMutex.RUnlock 方法;

  • 读锁和写锁的关系:

    • 调用 sync.RWMutex.Lock 尝试获取写锁时;每次 sync.RWMutex.RUnlock 都会将 readerCount 其减一,当它归零时该 Goroutine 会获得写锁;将 readerCount 减少 rwmutexMaxReaders 个数以阻塞后续的读操作;
    • 调用 sync.RWMutex.Unlock 释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁;
  • 并发竞争

    package main
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    )
    
    func main() {
    	var count int64
    	wg := sync.WaitGroup{}
    
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			for j := 0; j < 10; j++ {
    				atomic.AddInt64(&count, int64(j))
    				//count++
    			}
    		}()
    	}
    	wg.Wait()
    	fmt.Println(count)
    }
    
    //抢占,go1.8 自旋几次再阻塞(占用资源),go1.9 饥饿模式(性能不会明显下降)
    
  • goroutine 同步

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	wg := sync.WaitGroup{}
    	wg.Add(5)
    	for i := 0; i < 5; i++ {
    		go func(i int) {
    			defer wg.Done()
    			fmt.Println(i)
    		}(i)
    	}
    	wg.Wait()
    }
    
    
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	wg := sync.WaitGroup{}
    	wg.Add(5)
    	for i := 0; i < 5; i++ {
    		go func() {
    			defer wg.Done()
    			fmt.Println(i)
    		}()
    	}
    	wg.Wait()
    }
    
    
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	stop := make(chan bool)
    	go subProcess(stop)
    
    	time.Sleep(5 * time.Second)
    	fmt.Println("可以了,通知监控停止")
    	stop <- true
    	//为了检测监控过是否停止,如果没有监控输出,就表示停止了
    	time.Sleep(3 * time.Second)
    }
    
    func subProcess(stop chan bool) {
    	for {
    		select {
    		case <-stop:
    			fmt.Println("监控退出,停止了...")
    			return
    		default:
    			time.Sleep(time.Second)
    		}
    	}
    }
    //如有很多goroutine,并且这些goroutine还衍生了其他goroutine,此时chan就比较困难解决这样的问题了
    

 sync.Cond

Go 语言标准库中还包含条件变量 sync.Cond,它可以让一组的 Goroutine 都在满足特定条件时被唤醒。

sync.Once

Go 语言标准库中 sync.Once 可以保证在 Go 程序运行期间的某段代码只会执行一次。

context

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	go watch(ctx, "【监控1】")
	go watch(ctx, "【监控2】")
	go watch(ctx, "【监控3】")
	time.Sleep(5 * time.Second)
	fmt.Println("可以了,通知监控停止")
	cancel()
	//为了检测监控过是否停止,如果没有监控输出,就表示停止了
	time.Sleep(3 * time.Second)
}
func watch(ctx context.Context, name string) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "监控退出,停止了...")
			return
		default:
			fmt.Println(name, "goroutine监控中...")
			time.Sleep(time.Second)
		}
	}
}
//跟踪goroutine的方案才可以达到控制的目的,go为我们提供了Context

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context
type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key interface{}) interface{}
}

type cancelCtx struct {
	Context

	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}
  • Context使用原则

    • 不要把Context放在结构体中,要以参数的方式进行传递
    • 以Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位
    • 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
    • Context的Value相关方法应该传递必须的数据,不要什么数据都使用这个传递