golang并发控制

在多线程编程中,我经常会用到2个方面的线程控制,一是启停线程,二是控制线程数量,在golang中,启动的是协程,一个比线程更好的东西。为了实现这两个功能,在github  fork  grtm 并进行了功能增强

更多例子请看 github.com/fy138/grtm

一、启停线程

package main

import (
	"fmt"
	//	"runtime"
	"time"

	"github.com/fy138/grtm"
)

func myfunc(me interface{}) {
	fmt.Println("hello+" + me.(string))
	time.Sleep(time.Second * 2)
}
func main() {
	gm := grtm.NewGrManager()
	//在创建gm后新建一个进程接收出错信息
	go func(gm *grtm.GrManager) {
		for {
			select {
			case err := <-gm.ErrChan:
				fmt.Println("Received error:", err.Error())
			case notify := <-gm.NotiChan:
				fmt.Println("Received Notify:", notify)
			}
		}
	}(gm)

	gm.NewLoopGoroutine("myfunc", myfunc, "1")
	gm.NewLoopGoroutine("myfunc2", myfunc, "2")
	time.Sleep(time.Second * 3)

	gm.StopLoopGoroutine("aaaaaa")
	time.Sleep(time.Second * 3)

	gm.StopLoopGoroutine("myfunc2")
	time.Sleep(time.Second * 3)

	gm.NewLoopGoroutine("myfunc", myfunc, "1")
	time.Sleep(time.Second * 3)

	for {
		for k, v := range gm.GetAllTask() {
			fmt.Printf("task name:%s,task id:%d,task name2:%s\n", k, v.Gid, v.Name)
		}
		fmt.Printf("NumTask:%d\n", gm.GetTaskTotal())
		time.Sleep(time.Second * 1)
	}
}

输出是这样的

hello+1
hello+2
hello+1
hello+2
Received error: not found goroutine name :aaaaaa
hello+1
hello+2
hello+1
Received Notify: gid[1597969999]quit
hello+1
Received error: goroutine channel already defined: "myfunc"
hello+1
hello+1
task name:myfunc,task id:5577006791947779410,task name2:myfunc
NumTask:1
task name:myfunc,task id:5577006791947779410,task name2:myfunc
NumTask:1
hello+1
task name:myfunc,task id:5577006791947779410,task name2:myfunc
NumTask:1

二、限制线程数量

package main

import (
	"fmt"
	"runtime"
	"time"

	"github.com/fy138/grtm"
)

func main() {
	go func() {
		for {
			//get  goroutine total
			fmt.Println("go goroutines:", runtime.NumGoroutine())
			time.Sleep(time.Second * 1)
		}

	}()
	//建立线程池
	pool_1 := grtm.NewPool(3)
	pool_2 := grtm.NewPool(2)
	for i := 100; i >= 1; i-- {
		fmt.Println("I=", i)
		//通过通道来限制goroutine 数量
		/* 下面是第一种调用方法 */
		pool_1.LimitChan <- true //importan
		pool_1.AddTask(Download, i, "Download_1")

		/* 下面是第二种调用方法 */
		pool_2.LimitChan <- true //importan
		go func(i int, str string) {
			Download2(i, str)
			//函数执行完释放通道
			defer func() {
				<-pool_2.LimitChan
			}()
		}(i, "Download_2")

	}
	time.Sleep(time.Second * 20) //防止主线程提前退出
}

func Download(args ...interface{}) {
	time.Sleep(2 * time.Second)
	fmt.Printf("%s => %d \n", args[0].([]interface{})[1].(string), args[0].([]interface{})[0].(int))
}
func Download2(i int, str string) {
	time.Sleep(2 * time.Second)
	fmt.Printf("%s => %d \n", str, i)
}

输出结果

I= 100
go goroutines: 4
I= 99
I= 98
go goroutines: 9
Download_2 => 100
I= 97
Download_1 => 100
go goroutines: 9
Download_2 => 99
I= 96
Download_1 => 99
Download_1 => 98
go goroutines: 8
Download_2 => 98
I= 95
Download_1 => 97
go goroutines: 8
Download_2 => 97
I= 94
Download_1 => 96
go goroutines: 8
Download_2 => 96
I= 93
Download_1 => 95
go goroutines: 8
Download_2 => 95
I= 92
Download_1 => 94
go goroutines: 8
Download_2 => 94
I= 91