Go 知识量:6 - 35 - 115
在Go语言中,可以使用goroutine和channel来实现并发版本的"Hello, World"。下面是一个简单的示例:
package main import ( "fmt" "time" ) func main() { ch := make(chan string) // 创建一个字符串类型的通道 go func() { // 启动一个协程执行任务 ch <- "Hello, World" // 向通道发送数据 }() // 主线程休眠一段时间,以便协程有足够的时间发送数据到通道 time.Sleep(1 * time.Second) // 从通道接收数据并打印 msg := <-ch // 从通道接收数据 fmt.Println(msg) // 输出:Hello, World }
在上面的示例中,创建了一个字符串类型的通道ch。然后,使用匿名函数启动了一个协程,该协程向通道发送字符串"Hello, World"。主线程通过休眠1秒来模拟其他任务,以便协程有足够的时间发送数据到通道。最后,主线程从通道接收数据并打印出来。
在并发编程中,生产者/消费者模型是一个经典的例子,用于平衡生产线程和消费线程的工作能力,以提高程序处理数据的速度。
生产者/消费者模型的基本思想是:生产者负责生成一定量的数据放入缓冲区,然后通知消费者线程从缓冲区中取出数据进行处理。当缓冲区满时,生产者线程会被阻塞,直到消费者线程从缓冲区中取出一些数据腾出空间。同样,当缓冲区为空时,消费者线程会被阻塞,直到生产者线程生成一些数据放入缓冲区。
在Go语言中,可以使用通道(channel)来实现生产者/消费者模型。通道是一种用于协程之间进行通信和同步的机制。通过通道,生产者和消费者可以发送和接收数据。
以下是一个简单的Go语言生产者/消费者模型示例:
package main import ( "fmt" "time" ) func producer(ch chan<- int) { for i := 0; i < 5; i++ { ch <- i // 将数据发送到通道 time.Sleep(time.Millisecond * 500) // 模拟生产数据的耗时操作 } close(ch) // 关闭通道,表示没有更多的数据要发送了 } func consumer(ch <-chan int) { for num := range ch { // 从通道接收数据 fmt.Println("Received:", num) } } func main() { ch := make(chan int) // 创建一个整数类型的通道 go producer(ch) // 启动生产者协程 consumer(ch) // 启动消费者协程 time.Sleep(time.Second * 2) // 主线程休眠一段时间,以便观察输出结果 }
在上面的示例中,producer函数模拟生产数据的操作,它将数据发送到通道ch中。consumer函数模拟消费数据的操作,它从通道ch中接收数据并打印出来。在main函数中,创建了一个整数类型的通道ch,然后启动了生产者和消费者协程。最后,主线程休眠一段时间以便观察输出结果。
发布/订阅(publish-subscribe)模型在Go语言中也有多种实现方式。这个模型是一种消息传递范式,其中消息的生产者称为发布者(publisher),而消息的消费者称为订阅者(subscriber)。发布/订阅模型与传统的生产者/消费者模型有所不同,它允许多个订阅者监听一个或多个主题,并在这些主题上接收相应的消息。
在Go语言中,可以使用内置的sync.WaitGroup和channel来实现发布/订阅模型。下面是一个简单的示例,展示了如何使用这些工具来构建一个发布/订阅模型:
package main import ( "fmt" "sync" ) type PubSub struct { subscribers []chan string mutex sync.Mutex } func NewPubSub() *PubSub { return &PubSub{} } func (ps *PubSub) Subscribe() chan string { ps.mutex.Lock() defer ps.mutex.Unlock() ch := make(chan string) ps.subscribers = append(ps.subscribers, ch) return ch } func (ps *PubSub) Publish(msg string) { ps.mutex.Lock() defer ps.mutex.Unlock() for _, ch := range ps.subscribers { go func(ch chan string) { ch <- msg // 将消息发送到订阅的通道 }(ch) } } func main() { pubsub := NewPubSub() ch1 := pubsub.Subscribe() ch2 := pubsub.Subscribe() go func() { // 模拟订阅者监听消息并处理 for msg := range ch1 { fmt.Println("Subscriber 1 received:", msg) } }() go func() { // 模拟订阅者监听消息并处理 for msg := range ch2 { fmt.Println("Subscriber 2 received:", msg) } }() pubsub.Publish("Hello, subscribers!") // 发布消息到所有订阅的通道 time.Sleep(time.Second * 2) // 主线程休眠一段时间,以便观察输出结果 }
在上面的示例中,定义了一个PubSub结构体,它维护了一个订阅者的通道列表。Subscribe方法用于创建一个新的通道并添加到订阅者的列表中,而Publish方法则将消息发送到所有订阅者的通道。使用sync.Mutex对访问订阅者的列表进行互斥操作,以确保并发安全。在main函数中,创建了一个PubSub实例,并使用Subscribe方法创建了两个订阅者的通道。然后,启动了两个协程来模拟订阅者监听并处理消息。最后,调用Publish方法发布一条消息,该消息将被发送到所有订阅者的通道。主线程休眠一段时间以便观察输出结果。
在Go语言中,可以使用goroutine和channel来控制并发数。Goroutine是Go语言中轻量级的线程,它可以与其他goroutine并发执行。Channel是用于goroutine之间进行通信的管道,它可以用来传递数据。
要控制并发数,可以使用缓冲通道(buffered channel)和带有select语句的default分支。通过设置通道的缓冲大小,可以限制并发执行的goroutine数量。当缓冲区已满时,发送操作会被阻塞,直到有接收操作从通道中取出数据。同样地,当缓冲区为空时,接收操作会被阻塞,直到有发送操作向通道中添加数据。
以下是一个示例代码,演示如何使用缓冲通道控制并发数:
package main import ( "fmt" "time" ) func worker(id int, jobs <-chan int, results chan<- int) { for j := range jobs { fmt.Println("Worker", id, "started job", j) time.Sleep(time.Second) // 模拟耗时操作 fmt.Println("Worker", id, "finished job", j) results <- j * 2 // 将结果发送到结果通道 } } func main() { const ( numJobs = 10 numWorkers = 3 ) // 创建输入和输出通道 jobs := make(chan int, numJobs) results := make(chan int, numJobs) // 启动指定数量的worker goroutine for i := 1; i <= numWorkers; i++ { go worker(i, jobs, results) } // 发送作业到输入通道 for j := 1; j <= numJobs; j++ { jobs <- j } close(jobs) // 关闭输入通道,表示没有更多的作业要发送了 // 从输出通道接收结果并打印 for a := 1; a <= numJobs; a++ { <-results // 从通道接收数据,如果通道为空则阻塞直到有数据可用 fmt.Println("Result", a, "is", <-results) // 打印结果 } }
在上述示例中,创建了一个输入通道jobs和一个输出通道results。然后,启动了指定数量的worker goroutine来处理作业。通过向jobs通道发送作业,可以控制并发执行的goroutine数量。通过限制jobs通道的缓冲大小,可以控制并发执行的goroutine数量。在本例中,设置了numJobs作为缓冲大小,这意味着最多同时执行numJobs个goroutine。当所有的作业都被发送后,关闭了jobs通道,表示没有更多的作业要发送了。最后,从results通道接收结果并打印出来。通过使用带有select语句的default分支,可以处理接收操作被阻塞的情况。如果results通道为空,接收操作会被阻塞直到有数据可用。在本例中,使用了<-results语法来从通道接收数据。
在Go语言中,要实现并发的安全退出,可以使用以下几种方法:
1. 使用context包:context包提供了一种机制来取消正在运行的goroutine,并且可以通过传递一个context.Context对象来控制goroutine的执行。可以使用context.WithCancel函数创建一个带有取消功能的上下文,然后在需要的时候调用cancelFunc()来取消goroutine的执行。在goroutine中,可以使用select语句和<-ctx.Done()来监听取消信号,并在接收到信号后进行清理操作或退出。
package main import ( "context" "fmt" "time" ) func worker(ctx context.Context) { for { select { case <-ctx.Done(): // 接收到取消信号,退出goroutine return default: // 执行任务 fmt.Println("Worker is running") time.Sleep(time.Second) } } } func main() { ctx, cancel := context.WithCancel(context.Background()) go worker(ctx) // 模拟一些工作 time.Sleep(5 * time.Second) cancel() // 取消goroutine的执行 // 等待一段时间确保goroutine退出 time.Sleep(time.Second) fmt.Println("Main goroutine exiting") }
2. 使用sync.WaitGroup:sync.WaitGroup提供了一种机制来等待一组goroutine完成。可以在每个goroutine启动时调用Add(1)来增加等待计数器,并在goroutine退出时调用Done()来减少等待计数器。在主goroutine中,可以使用Wait()方法等待所有goroutine完成。在需要安全退出时,可以调用Wait()方法来阻塞主goroutine,直到所有goroutine都退出。
package main import ( "fmt" "sync" "time" ) func worker(wg *sync.WaitGroup) { defer wg.Done() // 减少等待计数器,表示goroutine退出 for i := 0; i < 5; i++ { fmt.Println("Worker is running") time.Sleep(time.Second) } } func main() { var wg sync.WaitGroup wg.Add(1) // 增加等待计数器,表示有一个goroutine需要等待完成 go worker(&wg) // 启动goroutine并传入WaitGroup的指针 // 模拟一些工作 time.Sleep(5 * time.Second) wg.Wait() // 等待所有goroutine完成,阻塞主goroutine直到所有goroutine退出 fmt.Println("Main goroutine exiting") }
Copyright © 2017-Now pnotes.cn. All Rights Reserved.
编程学习笔记 保留所有权利
MARK:3.0.0.20240214.P35
From 2017.2.6