From e82ea75cbadc6baeac081714451cdf8e775576af Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Tue, 14 Aug 2018 19:33:18 +0800 Subject: [PATCH] strategy: experimental latency based high availability mode supported --- conf.go | 2 +- dns/server.go | 19 ++++-- ipset/ipset_linux.go | 12 ++-- ipset/ipset_other.go | 10 +-- main.go | 15 ++++- proxy/forwarder.go | 15 ++++- rule/rule.go | 15 +++++ strategy/strategy.go | 156 ++++++++++++++++++++++++++++++------------- 8 files changed, 173 insertions(+), 71 deletions(-) diff --git a/conf.go b/conf.go index b5ca0df..0f1ac4b 100644 --- a/conf.go +++ b/conf.go @@ -43,7 +43,7 @@ func confInit() { flag.StringVar(&conf.StrategyConfig.CheckWebSite, "checkwebsite", "www.apple.com", "proxy check HTTP(NOT HTTPS) website address, format: HOST[:PORT], default port: 80") // TODO: change to checkinterval flag.IntVar(&conf.StrategyConfig.CheckInterval, "checkduration", 30, "proxy check interval(seconds)") - flag.IntVar(&conf.StrategyConfig.MaxFailures, "maxfailures", 3, "max failures to change status to disabled") + flag.IntVar(&conf.StrategyConfig.MaxFailures, "maxfailures", 3, "max failures to change forwarder status to disabled") flag.StringSliceUniqVar(&conf.RuleFile, "rulefile", nil, "rule file path") flag.StringVar(&conf.RulesDir, "rules-dir", "", "rule file folder") diff --git a/dns/server.go b/dns/server.go index 13243d7..0531e11 100644 --- a/dns/server.go +++ b/dns/server.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "io" "net" + "sync" "time" "github.com/nadoo/glider/common/log" @@ -31,15 +32,19 @@ func NewServer(addr string, dialer proxy.Dialer, config *Config) (*Server, error return s, err } -// ListenAndServe . -func (s *Server) ListenAndServe() { - go s.ListenAndServeTCP() - s.ListenAndServeUDP() +// Start . +func (s *Server) Start() { + var wg sync.WaitGroup + wg.Add(2) + go s.ListenAndServeTCP(&wg) + go s.ListenAndServeUDP(&wg) + wg.Wait() } // ListenAndServeUDP . -func (s *Server) ListenAndServeUDP() { +func (s *Server) ListenAndServeUDP(wg *sync.WaitGroup) { c, err := net.ListenPacket("udp", s.addr) + wg.Done() if err != nil { log.F("[dns] failed to listen on %s, error: %v", s.addr, err) return @@ -82,12 +87,14 @@ func (s *Server) ListenAndServeUDP() { } // ListenAndServeTCP . -func (s *Server) ListenAndServeTCP() { +func (s *Server) ListenAndServeTCP(wg *sync.WaitGroup) { l, err := net.Listen("tcp", s.addr) + wg.Done() if err != nil { log.F("[dns]-tcp error: %v", err) return } + defer l.Close() log.F("[dns]-tcp listening TCP on %s", s.addr) diff --git a/ipset/ipset_linux.go b/ipset/ipset_linux.go index 5a75a9c..1b55268 100644 --- a/ipset/ipset_linux.go +++ b/ipset/ipset_linux.go @@ -69,8 +69,8 @@ const ( var nextSeqNr uint32 var nativeEndian binary.ByteOrder -// IPSetManager struct -type IPSetManager struct { +// Manager struct +type Manager struct { fd int lsa syscall.SockaddrNetlink @@ -78,8 +78,8 @@ type IPSetManager struct { domainSet sync.Map } -// NewIPSetManager returns a IPSetManager -func NewIPSetManager(mainSet string, rules []*rule.Config) (*IPSetManager, error) { +// NewManager returns a Manager +func NewManager(mainSet string, rules []*rule.Config) (*Manager, error) { fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW, syscall.NETLINK_NETFILTER) if err != nil { log.F("%s", err) @@ -96,7 +96,7 @@ func NewIPSetManager(mainSet string, rules []*rule.Config) (*IPSetManager, error return nil, err } - m := &IPSetManager{fd: fd, lsa: lsa, mainSet: mainSet} + m := &Manager{fd: fd, lsa: lsa, mainSet: mainSet} CreateSet(fd, lsa, mainSet) for _, r := range rules { @@ -133,7 +133,7 @@ func NewIPSetManager(mainSet string, rules []*rule.Config) (*IPSetManager, error } // AddDomainIP implements the DNSAnswerHandler function, used to update ipset according to domainSet rule -func (m *IPSetManager) AddDomainIP(domain, ip string) error { +func (m *Manager) AddDomainIP(domain, ip string) error { if ip != "" { domainParts := strings.Split(domain, ".") length := len(domainParts) diff --git a/ipset/ipset_other.go b/ipset/ipset_other.go index 00ec7d8..ab9be65 100644 --- a/ipset/ipset_other.go +++ b/ipset/ipset_other.go @@ -8,15 +8,15 @@ import ( "github.com/nadoo/glider/rule" ) -// IPSetManager struct -type IPSetManager struct{} +// Manager struct +type Manager struct{} -// NewIPSetManager returns a IPSetManager -func NewIPSetManager(mainSet string, rules []*rule.Config) (*IPSetManager, error) { +// NewManager returns a Manager +func NewManager(mainSet string, rules []*rule.Config) (*Manager, error) { return nil, errors.New("ipset not supported on this os") } // AddDomainIP implements the DNSAnswerHandler function -func (m *IPSetManager) AddDomainIP(domain, ip string) error { +func (m *Manager) AddDomainIP(domain, ip string) error { return errors.New("ipset not supported on this os") } diff --git a/main.go b/main.go index c42dfc2..8fd3b5b 100644 --- a/main.go +++ b/main.go @@ -30,17 +30,23 @@ import ( const VERSION = "0.6.7" func main() { + // read configs confInit() + + // setup a log func log.F = func(f string, v ...interface{}) { if conf.Verbose { stdlog.Printf(f, v...) } } + // global rule dialer dialer := rule.NewDialer(conf.rules, strategy.NewDialer(conf.Forward, &conf.StrategyConfig)) - ipsetM, _ := ipset.NewIPSetManager(conf.IPSet, conf.rules) - // DNS Server + // ipset manager + ipsetM, _ := ipset.NewManager(conf.IPSet, conf.rules) + + // check and setup dns server if conf.DNS != "" { d, err := dns.NewServer(conf.DNS, dialer, &conf.DNSConfig) if err != nil { @@ -62,9 +68,12 @@ func main() { d.AddHandler(ipsetM.AddDomainIP) } - go d.ListenAndServe() + d.Start() } + // enable checkers + dialer.Check() + // Proxy Servers for _, listen := range conf.Listen { local, err := proxy.ServerFromURL(listen, dialer) diff --git a/proxy/forwarder.go b/proxy/forwarder.go index 421acd9..f1e5a82 100644 --- a/proxy/forwarder.go +++ b/proxy/forwarder.go @@ -18,10 +18,10 @@ type Forwarder struct { disabled uint32 failures uint32 MaxFailures uint32 //maxfailures to set to Disabled - latency int + latency int64 } -// ForwarderFromURL parses `forward=` command line and returns a new forwarder +// ForwarderFromURL parses `forward=` command value and returns a new forwarder func ForwarderFromURL(s string) (f *Forwarder, err error) { ss := strings.Split(s, "#") var d Dialer @@ -71,7 +71,6 @@ func (f *Forwarder) Dial(network, addr string) (c net.Conn, err error) { c, err = f.Dialer.Dial(network, addr) if err != nil { atomic.AddUint32(&f.failures, 1) - log.F("[forwarder] %s, dials %s, error:%s", f.addr, addr, err) if f.Failures() >= f.MaxFailures { f.Disable() log.F("[forwarder] %s reaches maxfailures, set to disabled", f.addr) @@ -105,3 +104,13 @@ func (f *Forwarder) Enabled() bool { func isTrue(n uint32) bool { return n&1 == 1 } + +// Latency returns the latency of forwarder +func (f *Forwarder) Latency() int64 { + return atomic.LoadInt64(&f.latency) +} + +// SetLatency sets the latency of forwarder +func (f *Forwarder) SetLatency(l int64) { + atomic.StoreInt64(&f.latency, l) +} diff --git a/rule/rule.go b/rule/rule.go index 747c666..52aea36 100644 --- a/rule/rule.go +++ b/rule/rule.go @@ -13,6 +13,7 @@ import ( // Dialer struct type Dialer struct { gDialer proxy.Dialer + dialers []proxy.Dialer domainMap sync.Map ipMap sync.Map @@ -25,6 +26,7 @@ func NewDialer(rules []*Config, gDialer proxy.Dialer) *Dialer { for _, r := range rules { sDialer := strategy.NewDialer(r.Forward, &r.StrategyConfig) + rd.dialers = append(rd.dialers, sDialer) for _, domain := range r.Domain { rd.domainMap.Store(strings.ToLower(domain), sDialer) @@ -123,3 +125,16 @@ func (rd *Dialer) AddDomainIP(domain, ip string) error { } return nil } + +// Check . +func (rd *Dialer) Check() { + if checker, ok := rd.gDialer.(strategy.Checker); ok { + checker.Check() + } + + for _, d := range rd.dialers { + if checker, ok := d.(strategy.Checker); ok { + checker.Check() + } + } +} diff --git a/strategy/strategy.go b/strategy/strategy.go index 1f624d8..52ecabe 100644 --- a/strategy/strategy.go +++ b/strategy/strategy.go @@ -20,9 +20,13 @@ type Config struct { MaxFailures int } +// Checker is an interface of forwarder checker +type Checker interface { + Check() +} + // NewDialer returns a new strategy dialer func NewDialer(s []string, c *Config) proxy.Dialer { - // global forwarders in xx.conf var fwdrs []*proxy.Forwarder for _, chain := range s { fwdr, err := proxy.ForwarderFromURL(chain) @@ -49,6 +53,9 @@ func NewDialer(s []string, c *Config) proxy.Dialer { case "ha": dialer = newHADialer(fwdrs, c.CheckWebSite, c.CheckInterval) log.F("forward to remote servers in high availability mode.") + case "lha": + dialer = newLHADialer(fwdrs, c.CheckWebSite, c.CheckInterval) + log.F("forward to remote servers in latency based high availability mode.") default: log.F("not supported forward mode '%s', just use the first forward server.", c.Strategy) dialer = fwdrs[0] @@ -57,15 +64,20 @@ func NewDialer(s []string, c *Config) proxy.Dialer { return dialer } -type forwarderSlice []*proxy.Forwarder +// slice orderd by priority +type priSlice []*proxy.Forwarder -func (p forwarderSlice) Len() int { return len(p) } -func (p forwarderSlice) Less(i, j int) bool { return p[i].Priority > p[j].Priority } -func (p forwarderSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p priSlice) Len() int { return len(p) } +func (p priSlice) Less(i, j int) bool { return p[i].Priority > p[j].Priority } +func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -// rrDialer is a rr dialer +// rrDialer is a round robin dialer +// 1. find the highest priority which there's at least 1 dialer is enabled +// 2. choose a enabled dialer in that priority using round robin mode type rrDialer struct { - fwdrs forwarderSlice + fwdrs priSlice + + // may have data races, but doesn't matter idx int priority int @@ -77,6 +89,7 @@ type rrDialer struct { // newRRDialer returns a new rrDialer func newRRDialer(fs []*proxy.Forwarder, website string, interval int) *rrDialer { rr := &rrDialer{fwdrs: fs} + sort.Sort(rr.fwdrs) rr.website = website if strings.IndexByte(rr.website, ':') == -1 { @@ -84,14 +97,8 @@ func newRRDialer(fs []*proxy.Forwarder, website string, interval int) *rrDialer } rr.interval = interval - - sort.Sort(rr.fwdrs) rr.priority = rr.fwdrs[0].Priority - for k := range rr.fwdrs { - go rr.checkDialer(k) - } - return rr } @@ -143,55 +150,64 @@ func (rr *rrDialer) NextDialer(dstAddr string) proxy.Dialer { return rr.nextDialer(dstAddr) } +// Check implements the Checker interface +func (rr *rrDialer) Check() { + for _, f := range rr.fwdrs { + go rr.checkDialer(f) + } +} + // Check dialer -func (rr *rrDialer) checkDialer(idx int) { +func (rr *rrDialer) checkDialer(f *proxy.Forwarder) { retry := 1 buf := make([]byte, 4) - d := rr.fwdrs[idx] - for { time.Sleep(time.Duration(rr.interval) * time.Second * time.Duration(retry>>1)) - // check forwarders whose priority not less than current priority only - if d.Priority < rr.priority { - continue - } - retry <<= 1 if retry > 16 { retry = 16 } - startTime := time.Now() - c, err := d.Dial("tcp", rr.website) - if err != nil { - rr.fwdrs[idx].Disable() - log.F("[check] %s -> %s, set to DISABLED. error in dial: %s", d.Addr(), rr.website, err) + // check forwarders whose priority not less than current priority only + if f.Priority < rr.priority { + // log.F("f.Priority:%d, rr.priority:%d", f.Priority, rr.priority) continue } - c.Write([]byte("GET / HTTP/1.0\r\n\r\n")) - - _, err = io.ReadFull(c, buf) + startTime := time.Now() + rc, err := f.Dial("tcp", rr.website) if err != nil { - rr.fwdrs[idx].Disable() - log.F("[check] %s -> %s, set to DISABLED. error in read: %s", d.Addr(), rr.website, err) - } else if bytes.Equal([]byte("HTTP"), buf) { - rr.fwdrs[idx].Enable() - retry = 2 - dialTime := time.Since(startTime) - log.F("[check] %s -> %s, set to ENABLED. connect time: %s", d.Addr(), rr.website, dialTime.String()) - } else { - rr.fwdrs[idx].Disable() - log.F("[check] %s -> %s, set to DISABLED. server response: %s", d.Addr(), rr.website, buf) + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority, rr.website, err) + continue } - c.Close() + rc.Write([]byte("GET / HTTP/1.0\r\n\r\n")) + + _, err = io.ReadFull(rc, buf) + if err != nil { + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority, rr.website, err) + } else if bytes.Equal([]byte("HTTP"), buf) { + f.Enable() + retry = 2 + readTime := time.Since(startTime) + f.SetLatency(int64(readTime)) + log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority, rr.website, readTime.String()) + } else { + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority, rr.website, buf) + } + + rc.Close() } } -// high availability proxy +// high availability forwarder +// 1. choose dialer whose priority is the highest +// 2. choose the first enabled dialer in that priority type haDialer struct { *rrDialer } @@ -201,18 +217,64 @@ func newHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy return &haDialer{rrDialer: newRRDialer(dialers, webhost, duration)} } -func (ha *haDialer) Dial(network, addr string) (net.Conn, error) { +func (ha *haDialer) nextDialer(dstAddr string) *proxy.Forwarder { d := ha.fwdrs[ha.idx] if !d.Enabled() { - d = ha.nextDialer(addr) + d = ha.nextDialer(dstAddr) } + return d +} + +func (ha *haDialer) Dial(network, addr string) (net.Conn, error) { + d := ha.nextDialer(addr) return d.Dial(network, addr) } func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { - d := ha.fwdrs[ha.idx] - if !d.Enabled() { - d = ha.nextDialer(addr) - } + d := ha.nextDialer(addr) + return d.DialUDP(network, addr) +} + +// high availability forwarder +// 1. choose dialer whose priority is the highest +// 2. choose dialer whose letency it the lowest +type lhaDialer struct { + *rrDialer +} + +// newLHADialer . +func newLHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer { + return &lhaDialer{rrDialer: newRRDialer(dialers, webhost, duration)} +} + +func (lha *lhaDialer) nextDialer(dstAddr string) *proxy.Forwarder { + var latency int64 + var d *proxy.Forwarder + for _, fwder := range lha.fwdrs { + if fwder.Enabled() { + lha.priority = fwder.Priority + latency = fwder.Latency() + d = fwder + break + } + } + + for _, fwder := range lha.fwdrs { + if fwder.Enabled() && fwder.Priority >= lha.priority && fwder.Latency() < latency { + latency = fwder.Latency() + d = fwder + } + } + + return d +} + +func (lha *lhaDialer) Dial(network, addr string) (net.Conn, error) { + d := lha.nextDialer(addr) + return d.Dial(network, addr) +} + +func (lha *lhaDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { + d := lha.nextDialer(addr) return d.DialUDP(network, addr) }