使用golang监控目录文件变化

想写个程序,监控目录和文件变化,原先目录非常大,所以感觉要用goroutine对每个目录派生一个goroutine进程,但程序在运行的时候发现,打开的目录非常多,以致系统出错,我们先来看看这个失败的程序,目录小是没有问题的。

// main.go
package main

import (
	//	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"
	//	"regexp"
	"runtime"
	"strings"

	"github.com/fsnotify/fsnotify"
)

type Fm struct {
	Basedir string
	Watcher *fsnotify.Watcher
	Wdone   chan bool
	Dirchan chan string
}

func (fm *Fm) Init(basedir string) {
	fm.Basedir = filepath.FromSlash(basedir)
	var err error
	fm.Watcher, err = fsnotify.NewWatcher()
	if err != nil {
		log.Fatal("create watcher error:", err)
	}

	fm.Wdone = make(chan bool)

	go func() {
		for {
			select {
			case event := <-fm.Watcher.Events:
				//log.Println("event:", event)
				fm.process_event(event)

			case err := <-fm.Watcher.Errors:
				log.Println("watcher error:", err)
			}
		}
	}()

	fm.Dirchan = make(chan string, 1000)
}

func (fm *Fm) walkdir(path string) {
	//log.Println("basedir:", path)
	fm.Dirchan <- path
	dir, err := ioutil.ReadDir(path)
	if err != nil {
		log.Fatal("opendir:", err)
	}
	for _, fi := range dir {

		fpath := filepath.FromSlash(path + "/" + fi.Name())
		if fi.IsDir() {

			if strings.HasPrefix(fi.Name(), ".") {
				continue
			}
			if strings.HasPrefix(fi.Name(), "..") {
				continue
			}
			if strings.Contains(fi.Name(), "lost+found") {
				continue
			}
			go fm.walkdir(fpath)
		} else {
			fm.Dirchan <- fpath
		}
		//log.Println("path:", fpath)
		//ch <- fpath
	}
	//
}
func (fm *Fm) lister() {
	var path string
	for {
		select {
		case path = <-fm.Dirchan:
			err := fm.Watcher.Add(path)
			if err != nil {
				log.Fatal("add watcher error:", err)
			}
		}
	}
}
func (fm *Fm) Start() {
	go fm.walkdir(fm.Basedir)
	go fm.lister()
	defer fm.Watcher.Close()
	<-fm.Wdone
}
func (fm *Fm) process_event(event fsnotify.Event) {
	switch event.Op {
	case fsnotify.Create:
		fm.Watcher.Add(event.Name)
		log.Println("create:", event.Name)
	case fsnotify.Rename, fsnotify.Remove:
		log.Println("remove:", event.Name)
		fm.Watcher.Remove(event.Name)
	case fsnotify.Write:
		log.Println("write:", event.Name)
	}
}
func main() {
	runtime.GOMAXPROCS(runtime.NumCPU() / 2)
	/*
	  echo 50000000 > /proc/sys/fs/inotify/max_user_watches
	  echo 327679 > /proc/sys/fs/inotify/max_queued_events
	*/
	filem := new(Fm)
	filem.Init(os.Args[1])
	filem.Start()
}

上面程序有意思的地方是,递归目录,用多线程进行通讯,但是目录和文件很多的时候,产生的线程也非常多。

下面来个简单的例子,这个例子可以正常工作,速还不错,140G的小文件目录,大约需要1.7G虚拟内存,实占大约500m左右,由于和sersync性能相差太远,所以暂时放弃了这个监控使用。

// main.go
package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/fsnotify/fsnotify"
)

//type MyWatcher *fsnotify.Watcher
func doev(watcher *fsnotify.Watcher, event fsnotify.Event) {
	switch event.Op {
	case fsnotify.Create:
		watcher.Add(event.Name)
		log.Println("create:", event.Name)
	case fsnotify.Rename, fsnotify.Remove:
		log.Println("remove:", event.Name)
		watcher.Remove(event.Name)
	case fsnotify.Write:
		log.Println("write:", event.Name)
	}
}
func main() {
	watchdir := os.Args[1]
	var err error
	watcher, err := fsnotify.NewWatcher()
	if err != nil {
		log.Fatal(err)
	}
	defer watcher.Close()

	done := make(chan bool)
	go func() {
		for {
			select {
			case event := <-watcher.Events:
				//log.Println("event:", event)
				doev(watcher, event)

			case err := <-watcher.Errors:
				log.Println("error:", err)
			}
		}
	}()
	err = watcher.Add(watchdir)
	if err != nil {
		log.Fatal(err)
	}
	err = filepath.Walk(watchdir, func(path string, info os.FileInfo, err error) error {
		err = watcher.Add(path)
		if err != nil {
			log.Fatal(err)
		}
		return nil
	})
	if err != nil {
		fmt.Printf("walk error [%v]\n", err)
	}
	<-done
}