Go

Go 知识量:6 - 35 - 115

1.6 常见的并发模式><

并发版本的“Hello, World”- 1.6.1 -

在Go语言中,可以使用goroutine和channel来实现并发版本的"Hello, World"。下面是一个简单的示例:

package main  
  
import (  
 "fmt"  
 "time"  
)  
  
func main() {  
 ch := make(chan string) // 创建一个字符串类型的通道  
  
 go func() { // 启动一个协程执行任务  
 ch <- "Hello, World" // 向通道发送数据  
 }()  
  
 // 主线程休眠一段时间,以便协程有足够的时间发送数据到通道  
 time.Sleep(1 * time.Second)  
  
 // 从通道接收数据并打印  
 msg := <-ch // 从通道接收数据  
 fmt.Println(msg) // 输出:Hello, World  
}

在上面的示例中,创建了一个字符串类型的通道ch。然后,使用匿名函数启动了一个协程,该协程向通道发送字符串"Hello, World"。主线程通过休眠1秒来模拟其他任务,以便协程有足够的时间发送数据到通道。最后,主线程从通道接收数据并打印出来。

生产者/消费者模型- 1.6.2 -

在并发编程中,生产者/消费者模型是一个经典的例子,用于平衡生产线程和消费线程的工作能力,以提高程序处理数据的速度。

生产者/消费者模型的基本思想是:生产者负责生成一定量的数据放入缓冲区,然后通知消费者线程从缓冲区中取出数据进行处理。当缓冲区满时,生产者线程会被阻塞,直到消费者线程从缓冲区中取出一些数据腾出空间。同样,当缓冲区为空时,消费者线程会被阻塞,直到生产者线程生成一些数据放入缓冲区。

在Go语言中,可以使用通道(channel)来实现生产者/消费者模型。通道是一种用于协程之间进行通信和同步的机制。通过通道,生产者和消费者可以发送和接收数据。

以下是一个简单的Go语言生产者/消费者模型示例:

package main  
  
import (  
 "fmt"  
 "time"  
)  
  
func producer(ch chan<- int) {  
 for i := 0; i < 5; i++ {  
 ch <- i // 将数据发送到通道  
 time.Sleep(time.Millisecond * 500) // 模拟生产数据的耗时操作  
 }  
 close(ch) // 关闭通道,表示没有更多的数据要发送了  
}  
  
func consumer(ch <-chan int) {  
 for num := range ch { // 从通道接收数据  
 fmt.Println("Received:", num)  
 }  
}  
  
func main() {  
 ch := make(chan int) // 创建一个整数类型的通道  
 go producer(ch)       // 启动生产者协程  
 consumer(ch)           // 启动消费者协程  
 time.Sleep(time.Second * 2) // 主线程休眠一段时间,以便观察输出结果  
}

在上面的示例中,producer函数模拟生产数据的操作,它将数据发送到通道ch中。consumer函数模拟消费数据的操作,它从通道ch中接收数据并打印出来。在main函数中,创建了一个整数类型的通道ch,然后启动了生产者和消费者协程。最后,主线程休眠一段时间以便观察输出结果。

发布/订阅模型- 1.6.3 -

发布/订阅(publish-subscribe)模型在Go语言中也有多种实现方式。这个模型是一种消息传递范式,其中消息的生产者称为发布者(publisher),而消息的消费者称为订阅者(subscriber)。发布/订阅模型与传统的生产者/消费者模型有所不同,它允许多个订阅者监听一个或多个主题,并在这些主题上接收相应的消息。

在Go语言中,可以使用内置的sync.WaitGroup和channel来实现发布/订阅模型。下面是一个简单的示例,展示了如何使用这些工具来构建一个发布/订阅模型:

package main  
  
import (  
 "fmt"  
 "sync"  
)  
  
type PubSub struct {  
 subscribers []chan string  
 mutex       sync.Mutex  
}  
  
func NewPubSub() *PubSub {  
 return &PubSub{}  
}  
  
func (ps *PubSub) Subscribe() chan string {  
 ps.mutex.Lock()  
 defer ps.mutex.Unlock()  
  
 ch := make(chan string)  
 ps.subscribers = append(ps.subscribers, ch)  
 return ch  
}  
  
func (ps *PubSub) Publish(msg string) {  
 ps.mutex.Lock()  
 defer ps.mutex.Unlock()  
  
 for _, ch := range ps.subscribers {  
 go func(ch chan string) {  
 ch <- msg // 将消息发送到订阅的通道  
 }(ch)  
 }  
}  
  
func main() {  
 pubsub := NewPubSub()  
 ch1 := pubsub.Subscribe()  
 ch2 := pubsub.Subscribe()  
  
 go func() { // 模拟订阅者监听消息并处理  
 for msg := range ch1 {  
 fmt.Println("Subscriber 1 received:", msg)  
 }  
 }()  
  
 go func() { // 模拟订阅者监听消息并处理  
 for msg := range ch2 {  
 fmt.Println("Subscriber 2 received:", msg)  
 }  
 }()  
  
 pubsub.Publish("Hello, subscribers!") // 发布消息到所有订阅的通道  
 time.Sleep(time.Second * 2) // 主线程休眠一段时间,以便观察输出结果  
}

在上面的示例中,定义了一个PubSub结构体,它维护了一个订阅者的通道列表。Subscribe方法用于创建一个新的通道并添加到订阅者的列表中,而Publish方法则将消息发送到所有订阅者的通道。使用sync.Mutex对访问订阅者的列表进行互斥操作,以确保并发安全。在main函数中,创建了一个PubSub实例,并使用Subscribe方法创建了两个订阅者的通道。然后,启动了两个协程来模拟订阅者监听并处理消息。最后,调用Publish方法发布一条消息,该消息将被发送到所有订阅者的通道。主线程休眠一段时间以便观察输出结果。

控制并发数- 1.6.4 -

在Go语言中,可以使用goroutine和channel来控制并发数。Goroutine是Go语言中轻量级的线程,它可以与其他goroutine并发执行。Channel是用于goroutine之间进行通信的管道,它可以用来传递数据。

要控制并发数,可以使用缓冲通道(buffered channel)和带有select语句的default分支。通过设置通道的缓冲大小,可以限制并发执行的goroutine数量。当缓冲区已满时,发送操作会被阻塞,直到有接收操作从通道中取出数据。同样地,当缓冲区为空时,接收操作会被阻塞,直到有发送操作向通道中添加数据。

以下是一个示例代码,演示如何使用缓冲通道控制并发数:

package main  
  
import (  
 "fmt"  
 "time"  
)  
  
func worker(id int, jobs <-chan int, results chan<- int) {  
 for j := range jobs {  
 fmt.Println("Worker", id, "started job", j)  
 time.Sleep(time.Second) // 模拟耗时操作  
 fmt.Println("Worker", id, "finished job", j)  
 results <- j * 2 // 将结果发送到结果通道  
 }  
}  
  
func main() {  
 const (  
 numJobs   = 10  
 numWorkers = 3  
 )  
  
 // 创建输入和输出通道  
 jobs := make(chan int, numJobs)  
 results := make(chan int, numJobs)  
  
 // 启动指定数量的worker goroutine  
 for i := 1; i <= numWorkers; i++ {  
 go worker(i, jobs, results)  
 }  
  
 // 发送作业到输入通道  
 for j := 1; j <= numJobs; j++ {  
 jobs <- j  
 }  
 close(jobs) // 关闭输入通道,表示没有更多的作业要发送了  
  
 // 从输出通道接收结果并打印  
 for a := 1; a <= numJobs; a++ {  
 <-results // 从通道接收数据,如果通道为空则阻塞直到有数据可用  
 fmt.Println("Result", a, "is", <-results) // 打印结果  
 }  
}

在上述示例中,创建了一个输入通道jobs和一个输出通道results。然后,启动了指定数量的worker goroutine来处理作业。通过向jobs通道发送作业,可以控制并发执行的goroutine数量。通过限制jobs通道的缓冲大小,可以控制并发执行的goroutine数量。在本例中,设置了numJobs作为缓冲大小,这意味着最多同时执行numJobs个goroutine。当所有的作业都被发送后,关闭了jobs通道,表示没有更多的作业要发送了。最后,从results通道接收结果并打印出来。通过使用带有select语句的default分支,可以处理接收操作被阻塞的情况。如果results通道为空,接收操作会被阻塞直到有数据可用。在本例中,使用了<-results语法来从通道接收数据。

并发的安全退出- 1.6.5 -

在Go语言中,要实现并发的安全退出,可以使用以下几种方法:

1. 使用context包:context包提供了一种机制来取消正在运行的goroutine,并且可以通过传递一个context.Context对象来控制goroutine的执行。可以使用context.WithCancel函数创建一个带有取消功能的上下文,然后在需要的时候调用cancelFunc()来取消goroutine的执行。在goroutine中,可以使用select语句和<-ctx.Done()来监听取消信号,并在接收到信号后进行清理操作或退出。

package main  
  
import (  
 "context"  
 "fmt"  
 "time"  
)  
  
func worker(ctx context.Context) {  
 for {  
 select {  
 case <-ctx.Done():  
 // 接收到取消信号,退出goroutine  
 return  
 default:  
 // 执行任务  
 fmt.Println("Worker is running")  
 time.Sleep(time.Second)  
 }  
 }  
}  
  
func main() {  
 ctx, cancel := context.WithCancel(context.Background())  
 go worker(ctx)  
  
 // 模拟一些工作  
 time.Sleep(5 * time.Second)  
 cancel() // 取消goroutine的执行  
  
 // 等待一段时间确保goroutine退出  
 time.Sleep(time.Second)  
 fmt.Println("Main goroutine exiting")  
}

2. 使用sync.WaitGroup:sync.WaitGroup提供了一种机制来等待一组goroutine完成。可以在每个goroutine启动时调用Add(1)来增加等待计数器,并在goroutine退出时调用Done()来减少等待计数器。在主goroutine中,可以使用Wait()方法等待所有goroutine完成。在需要安全退出时,可以调用Wait()方法来阻塞主goroutine,直到所有goroutine都退出。

package main  
  
import (  
 "fmt"  
 "sync"  
 "time"  
)  
  
func worker(wg *sync.WaitGroup) {  
 defer wg.Done() // 减少等待计数器,表示goroutine退出  
 for i := 0; i < 5; i++ {  
 fmt.Println("Worker is running")  
 time.Sleep(time.Second)  
 }  
}  
  
func main() {  
 var wg sync.WaitGroup  
 wg.Add(1) // 增加等待计数器,表示有一个goroutine需要等待完成  
 go worker(&wg) // 启动goroutine并传入WaitGroup的指针  
  
 // 模拟一些工作  
 time.Sleep(5 * time.Second)  
 wg.Wait() // 等待所有goroutine完成,阻塞主goroutine直到所有goroutine退出  
 fmt.Println("Main goroutine exiting")  
}