Go(七)Go线程、信道、并发编程

Go并发理论是 基于CSP(Communication Sequential Process)模型,提倡不要通过共享内存来通通讯,而是通过共享通讯来共享内存

一、线程、协程

goroutine是由 Go 运行时管理的轻量级线程,只需要一个关键字go即可启动一个线程并执行;goroutine在相同的地址空间中运行,因此在访问共享的内存时必须进行同步,可使用sync实现同步。
Go原生的支持并发,这也是Go的一大特点。更像协程coroutine,是非抢占式的多任务处理,由协程主动交出控制权;不是系统层面的多任务,而是编译器、解释器、虚拟机层面的多任务;多个协程可以再一个或多个线程上运行

goroutine可能切换点
1、I/O,select
2、信道channel
3、等待锁
4、函数调用
5、runtime.Gosched()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func threadFun(index int)  {
for {
//Printf是io操作 或有协程测切换 主动交出资源
fmt.Printf("第 %d 个线程....\n",index)
}
}

func RoutineDemo() {
//开启10个线程
for i := 0; i < 10; i ++ {
go threadFun(i)
}
time.Sleep(time.Millisecond) //休眠一毫秒

var arr [5]int
for i := 0; i < 5; i ++ {
go func(i int) {
for {
arr[i] ++
//手动交出协程
runtime.Gosched()
}
}(i)
}
time.Sleep(time.Millisecond) //休眠一毫秒
fmt.Println(arr)//[632 791 755 545 785] 并不是平均的
}

二、信道

信道是带有类型的管道,你可以通过它用信道操作符<-(“箭头”就是数据流的方向)来发送或者接收值。

ch <- v // 将 v 发送至信道 ch。
v := <-ch // 从 ch 接收值并赋予 v。

和映射与切片一样,信道在使用前必须创建

ch := make(chan int)

默认情况下,发送和接收操作在另一端准备好之前都会阻塞。这使得Go协程可以在没有显式的锁或竞态变量的情况下进行同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
//工作者
func worker(c chan int,id int) {
for {
//从channel接收值
n := <-c;
fmt.Printf("第 %d 个worker 从channel接收数据 %c \n", id, n)
}
}

//创建worker
func createWorker(id int) chan int {
c := make(chan int)
// 去工作接收信息
go worker(c, id)
return c
}

//创建worker 返回的channel只能接收数据 只能向channel发送数据
func createWorkeSend(id int) chan<- int {
c := make(chan int)
// 去工作接收信息
go worker(c, id)
return c
}

//创建worker 返回的channel只能接收receive数据 不能像返回值发送数据
func createWorkeReceive(id int) <-chan int {
c := make(chan int)
// 去工作接收信息
go func() {
//n := <-c;
c <- id
fmt.Printf("第 %d 个worker 从channel接收数据 %c \n", id, <-c)
}()
return c
}

//带缓冲的信道
func bufChannel() {
c := make(chan int,2)
c <- 1
c <- 2
fmt.Println(<-c)

}

func ChannelDemo() {
//创建一个信道
c := make(chan int)
//开启协程接收channel
go func() {
for {
n := <- c
fmt.Println(n)
}
}()

//向channel传递值
c <- 2
c <- 4
time.Sleep(time.Millisecond)

fmt.Println("=====接收channel的函数=====")
var channels [5]chan int
for i:= 0; i<5; i++ {
channels[i] = make(chan int)
go worker(channels[i],i)
}

//向channel发送值
for i:= 0; i<5; i++ {
channels[i] <- 'A' + i
}
time.Sleep(time.Millisecond)

fmt.Println("======返回channel的函数======")
var channels2 [5]chan int
for i:= 0; i<5; i++ {
channels2[i] = createWorker(i)
}

for i:= 0; i<5; i++ {
channels[i] <- 'A' + i
}

time.Sleep(time.Millisecond)

fmt.Println("======只能向返回的channel的发送数据 不能向返回channel接收数据======")
var channels3 [5]chan<- int
for i:= 0; i<5; i++ {
channels3[i] = createWorkeSend(i)
}

for i:= 0; i<5; i++ {
channels3[i] <- 'A' + i
//不能取数据 不能从channel取出数据 send-only
//n := <- channels3[i]
}

time.Sleep(time.Millisecond)

fmt.Println("======只能向返回channel接收数据 不能像返回的channel发送数据======")
var channels4 [5]<-chan int
for i:= 0; i<5; i++ {
channels4[i] = createWorkeReceive(i)
}

for i:= 0; i<5; i++ {
//不能像channel发送数据 只能接收数据 receive-only
//channels4[i] <- 'A' + i

n := <- channels4[i]
fmt.Println(n)
}

time.Sleep(time.Millisecond)

fmt.Println("======带缓冲的信道======")
bufChannel()
}

带缓冲的信道
信道可以是带缓冲的。将缓冲长度作为第二个参数提供给 make 来初始化一个带缓冲的信道,仅当信道的缓冲区填满后,向其发送数据时才会阻塞。当缓冲区为空时,接受方会阻塞。

ch := make(chan int, 100)

信道可以作为的函数的参数和返回值

func 函数名称(c chan int,其他参数…){函数体}
func 函数名称(参数…) chan int {函数体}

1
2
3
4
5
6
7
//带缓冲的信道
func bufChannel() {
c := make(chan int,2)
c <- 1
c <- 2
fmt.Println(<-c)
}

range 和 close
发送者可通过 close 关闭一个信道来表示没有需要发送的值了。接收者可以通过为接收表达式分配第二个参数来测试信道是否被关闭:若没有值可以接收且信道已被关闭,那么在执行完

v, ok := <-ch

之后 ok 会被设置为 false
循环 for i := range c 会不断从信道接收值,直到它被关闭。

注意

  • 只有发送者才能关闭信道,而接收者不能。向一个已经关闭的信道发送数据会引发程序恐慌(panic)。
  • 信道与文件不同,通常情况下无需关闭它们。只有在必须告诉接收者不再有需要发送的值时才有必要关闭,例如终止一个 range 循环。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    //斐波那契
    func fibonacci(n int, c chan int) {
    x, y := 0, 1
    for i := 0; i < n; i++ {
    c <- x
    x, y = y, x+y
    }
    //关闭一个信道来表示没有需要发送的值了
    close(c)
    }

    func ChannelDemo() {
    c := make(chan int, 10)
    go fibonacci(cap(c), c)
    for i := range c {
    fmt.Println(i)
    }
    }

等待任务结束
等待一个或多个任务的执行可使用sync.WaitGroup来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func doTaskWG(id int,c chan int, wg *sync.WaitGroup)  {
for n := range c{
fmt.Printf("任务接收第 %d 个任务,值为 %c \n", id, n)
wg.Done()//结束
}
}

type taskWG struct {
c chan int
wg *sync.WaitGroup
}

func createTaskWG(id int,wg *sync.WaitGroup) taskWG{
t := taskWG{
c: make(chan int),
wg: wg,
}
go doTaskWG(id, t.c, wg)
return t
}

func WaitGroupDemo() {
var wg sync.WaitGroup
var workers [5]taskWG
for i:=0; i<5; i++ {
workers[i] = createTaskWG(i, &wg);
}

wg.Add(10)//10个任务
for i, worker := range workers {
worker.c <- 'A' + i
}

for i, worker := range workers {
worker.c <- 'a' + i
}
//等待任务结束
wg.Wait()
}

select 语句
select 语句使一个 Go 协程可以等待多个通信操作,select 会阻塞到某个分支可以继续执行为止,这时就会执行该分支。当多个分支都准备好时会随机选择一个执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func genChannel() chan int {
c := make(chan int)
go func() {
i := 0
for {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
c <- i
i++
}
}()
return c
}

func SelectDemo() {
var c1, c2 = genChannel(),genChannel()
//非阻塞是的接收值
//三秒结束
tm := time.After(time.Second * 3);
for {
select {
case n := <-c1 :
fmt.Printf("从c1接收到值%d \n",n)
case n := <-c2 :
fmt.Printf("从c2接收到值%d \n",n)
case <-tm:
fmt.Printf("程序执行结束....\n")
return
//default:
// fmt.Printf("没有值\n")
}
}
}