diff --git a/proxy/forwarder.go b/proxy/forwarder.go index 4998378..9ae27e0 100644 --- a/proxy/forwarder.go +++ b/proxy/forwarder.go @@ -13,14 +13,13 @@ import ( // Forwarder is a forwarder type Forwarder struct { Dialer - Priority int - MaxFailures uint32 //maxfailures to set to Disabled - - addr string - disabled uint32 - failures uint32 - latency int64 - intface string // local interface or ip address + addr string + priority uint32 + maxFailures uint32 // maxfailures to set to Disabled + disabled uint32 + failures uint32 + latency int64 + intface string // local interface or ip address } // ForwarderFromURL parses `forward=` command value and returns a new forwarder @@ -67,7 +66,7 @@ func (f *Forwarder) parseOption(option string) error { if p != "" { priority, err = strconv.ParseUint(p, 10, 32) } - f.Priority = int(priority) + f.SetPriority(uint32(priority)) f.intface = query.Get("interface") @@ -83,8 +82,8 @@ func (f *Forwarder) Addr() string { func (f *Forwarder) Dial(network, addr string) (c net.Conn, err error) { c, err = f.Dialer.Dial(network, addr) if err != nil { - atomic.AddUint32(&f.failures, 1) - if f.Failures() >= f.MaxFailures { + f.IncFailures() + if f.Failures() >= f.MaxFailures() { f.Disable() log.F("[forwarder] %s reaches maxfailures, set to disabled", f.addr) } @@ -98,6 +97,11 @@ func (f *Forwarder) Failures() uint32 { return atomic.LoadUint32(&f.failures) } +// IncFailures increase the failuer count by 1 +func (f *Forwarder) IncFailures() { + atomic.AddUint32(&f.failures, 1) +} + // Enable the forwarder func (f *Forwarder) Enable() { atomic.StoreUint32(&f.disabled, 0) @@ -118,6 +122,26 @@ func isTrue(n uint32) bool { return n&1 == 1 } +// Priority returns the priority of forwarder +func (f *Forwarder) Priority() uint32 { + return atomic.LoadUint32(&f.priority) +} + +// SetPriority sets the priority of forwarder +func (f *Forwarder) SetPriority(l uint32) { + atomic.StoreUint32(&f.priority, l) +} + +// MaxFailures returns the maxFailures of forwarder +func (f *Forwarder) MaxFailures() uint32 { + return atomic.LoadUint32(&f.maxFailures) +} + +// SetMaxFailures sets the maxFailures of forwarder +func (f *Forwarder) SetMaxFailures(l uint32) { + atomic.StoreUint32(&f.maxFailures, l) +} + // Latency returns the latency of forwarder func (f *Forwarder) Latency() int64 { return atomic.LoadInt64(&f.latency) diff --git a/strategy/ha.go b/strategy/ha.go new file mode 100644 index 0000000..ef55fc9 --- /dev/null +++ b/strategy/ha.go @@ -0,0 +1,31 @@ +package strategy + +import ( + "net" + + "github.com/nadoo/glider/proxy" +) + +// high availability forwarder +type haDialer struct{ *rrDialer } + +// newHADialer . +func newHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer { + return &haDialer{rrDialer: newRRDialer(dialers, webhost, duration)} +} + +func (ha *haDialer) nextDialer(dstAddr string) *proxy.Forwarder { + d := ha.fwdrs[ha.index] + if !d.Enabled() { + d = ha.nextDialer(dstAddr) + } + return d +} + +func (ha *haDialer) Dial(network, addr string) (net.Conn, error) { + return ha.nextDialer(addr).Dial(network, addr) +} + +func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { + return ha.nextDialer(addr).DialUDP(network, addr) +} diff --git a/strategy/lha.go b/strategy/lha.go new file mode 100644 index 0000000..a45e62d --- /dev/null +++ b/strategy/lha.go @@ -0,0 +1,46 @@ +package strategy + +import ( + "net" + + "github.com/nadoo/glider/proxy" +) + +// latency based high availability forwarder +type lhaDialer struct{ *rrDialer } + +// newLHADialer . +func newLHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer { + return &lhaDialer{rrDialer: newRRDialer(dialers, webhost, duration)} +} + +func (lha *lhaDialer) nextDialer(dstAddr string) *proxy.Forwarder { + idx := lha.Index() + var lowest int64 + for i, fwder := range lha.fwdrs { + if fwder.Enabled() { + lha.SetPriority(fwder.Priority()) + lowest = fwder.Latency() + idx = int32(i) + break + } + } + + for i, fwder := range lha.fwdrs { + if fwder.Enabled() && fwder.Priority() >= lha.Priority() && fwder.Latency() < lowest { + lowest = fwder.Latency() + idx = int32(i) + } + } + + lha.SetIndex(idx) + return lha.fwdrs[idx] +} + +func (lha *lhaDialer) Dial(network, addr string) (net.Conn, error) { + return lha.nextDialer(addr).Dial(network, addr) +} + +func (lha *lhaDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { + return lha.nextDialer(addr).DialUDP(network, addr) +} diff --git a/strategy/rr.go b/strategy/rr.go new file mode 100644 index 0000000..a2745bd --- /dev/null +++ b/strategy/rr.go @@ -0,0 +1,161 @@ +package strategy + +import ( + "bytes" + "io" + "net" + "sort" + "strings" + "sync/atomic" + "time" + + "github.com/nadoo/glider/common/log" + "github.com/nadoo/glider/proxy" +) + +// forwarder slice orderd by priority +type priSlice []*proxy.Forwarder + +func (p priSlice) Len() int { return len(p) } +func (p priSlice) Less(i, j int) bool { return p[i].Priority() > p[j].Priority() } +func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// rrDialer is a round robin dialer +type rrDialer struct { + fwdrs priSlice + index int32 + priority uint32 + website string + interval int +} + +// newRRDialer returns a new rrDialer +func newRRDialer(fwdrs []*proxy.Forwarder, website string, interval int) *rrDialer { + rr := &rrDialer{fwdrs: fwdrs} + sort.Sort(rr.fwdrs) + + rr.website = website + if strings.IndexByte(rr.website, ':') == -1 { + rr.website += ":80" + } + + rr.interval = interval + return rr +} + +func (rr *rrDialer) Addr() string { return "STRATEGY" } + +func (rr *rrDialer) Dial(network, addr string) (net.Conn, error) { + return rr.nextDialer(addr).Dial(network, addr) +} + +func (rr *rrDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { + return rr.nextDialer(addr).DialUDP(network, addr) +} + +func (rr *rrDialer) NextDialer(dstAddr string) proxy.Dialer { return rr.nextDialer(dstAddr) } + +func (rr *rrDialer) nextDialer(dstAddr string) *proxy.Forwarder { + n := int32(len(rr.fwdrs)) + if n == 1 { + return rr.fwdrs[0] + } + + for _, fwder := range rr.fwdrs { + if fwder.Enabled() { + rr.SetPriority(fwder.Priority()) + break + } + } + + idx := rr.Index() + if rr.fwdrs[idx].Priority() < rr.Priority() { + idx = 0 + } + + found := false + var i int32 + for i = 0; i < n; i++ { + idx = (idx + 1) % n + if rr.fwdrs[idx].Enabled() && + rr.fwdrs[idx].Priority() >= rr.Priority() { + found = true + rr.SetPriority(rr.fwdrs[idx].Priority()) + break + } + } + + if !found { + rr.SetPriority(0) + log.F("NO AVAILABLE PROXY FOUND! please check your network or proxy server settings.") + } + + rr.SetIndex(idx) + + return rr.fwdrs[idx] +} + +// Check implements the Checker interface +func (rr *rrDialer) Check() { + for i := 0; i < len(rr.fwdrs); i++ { + go rr.check(i) + } +} + +func (rr *rrDialer) check(i int) { + f := rr.fwdrs[i] + retry := 1 + buf := make([]byte, 4) + + for { + time.Sleep(time.Duration(rr.interval) * time.Second * time.Duration(retry>>1)) + + retry <<= 1 + if retry > 16 { + retry = 16 + } + + if f.Priority() < rr.Priority() { + continue + } + + startTime := time.Now() + rc, err := f.Dial("tcp", rr.website) + if err != nil { + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority(), rr.website, err) + continue + } + + rc.Write([]byte("GET / HTTP/1.0\r\n\r\n")) + + _, err = io.ReadFull(rc, buf) + if err != nil { + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority(), rr.website, err) + } else if bytes.Equal([]byte("HTTP"), buf) { + f.Enable() + retry = 2 + readTime := time.Since(startTime) + f.SetLatency(int64(readTime)) + log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority(), rr.website, readTime.String()) + } else { + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority(), rr.website, buf) + } + + rc.Close() + } +} + +// Index returns the active forwarder's Index of rrDialer +func (rr *rrDialer) Index() int32 { return atomic.LoadInt32(&rr.index) } + +// SetIndex sets the active forwarder's Index of rrDialer +func (rr *rrDialer) SetIndex(p int32) { atomic.StoreInt32(&rr.index, p) } + +// Priority returns the active priority of rrDialer +func (rr *rrDialer) Priority() uint32 { return atomic.LoadUint32(&rr.priority) } + +// SetPriority sets the active priority of rrDialer +func (rr *rrDialer) SetPriority(p uint32) { atomic.StoreUint32(&rr.priority, p) } diff --git a/strategy/strategy.go b/strategy/strategy.go index 8954435..21781a2 100644 --- a/strategy/strategy.go +++ b/strategy/strategy.go @@ -1,17 +1,15 @@ package strategy import ( - "bytes" - "io" - "net" - "sort" - "strings" - "time" - "github.com/nadoo/glider/common/log" "github.com/nadoo/glider/proxy" ) +// Checker is an interface of forwarder checker +type Checker interface { + Check() +} + // Config of strategy type Config struct { Strategy string @@ -21,11 +19,6 @@ type Config struct { IntFace string } -// Checker is an interface of forwarder checker -type Checker interface { - Check() -} - // NewDialer returns a new strategy dialer func NewDialer(s []string, c *Config) proxy.Dialer { var fwdrs []*proxy.Forwarder @@ -34,7 +27,7 @@ func NewDialer(s []string, c *Config) proxy.Dialer { if err != nil { log.Fatal(err) } - fwdr.MaxFailures = uint32(c.MaxFailures) + fwdr.SetMaxFailures(uint32(c.MaxFailures)) fwdrs = append(fwdrs, fwdr) } @@ -68,217 +61,3 @@ func NewDialer(s []string, c *Config) proxy.Dialer { return dialer } - -// slice orderd by priority -type priSlice []*proxy.Forwarder - -func (p priSlice) Len() int { return len(p) } -func (p priSlice) Less(i, j int) bool { return p[i].Priority > p[j].Priority } -func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -// rrDialer is a round robin dialer -// 1. find the highest priority which there's at least 1 dialer is enabled -// 2. choose a enabled dialer in that priority using round robin mode -type rrDialer struct { - fwdrs priSlice - - // may have data races, but doesn't matter - idx int - priority int - - // for checking - website string - interval int -} - -// newRRDialer returns a new rrDialer -func newRRDialer(fs []*proxy.Forwarder, website string, interval int) *rrDialer { - rr := &rrDialer{fwdrs: fs} - sort.Sort(rr.fwdrs) - - rr.website = website - if strings.IndexByte(rr.website, ':') == -1 { - rr.website += ":80" - } - - rr.interval = interval - - return rr -} - -func (rr *rrDialer) Addr() string { return "STRATEGY" } -func (rr *rrDialer) Dial(network, addr string) (net.Conn, error) { - return rr.NextDialer(addr).Dial(network, addr) -} - -func (rr *rrDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { - return rr.NextDialer(addr).DialUDP(network, addr) -} - -func (rr *rrDialer) nextDialer(dstAddr string) *proxy.Forwarder { - n := len(rr.fwdrs) - if n == 1 { - rr.idx = 0 - } - - for _, fwder := range rr.fwdrs { - if fwder.Enabled() { - rr.priority = fwder.Priority - break - } - } - - if rr.fwdrs[rr.idx].Priority < rr.priority { - rr.idx = 0 - } - - found := false - for i := 0; i < n; i++ { - rr.idx = (rr.idx + 1) % n - if rr.fwdrs[rr.idx].Enabled() && - rr.fwdrs[rr.idx].Priority >= rr.priority { - found = true - rr.priority = rr.fwdrs[rr.idx].Priority - break - } - } - - if !found { - rr.priority = 0 - log.F("NO AVAILABLE PROXY FOUND! please check your network or proxy server settings.") - } - - return rr.fwdrs[rr.idx] -} - -func (rr *rrDialer) NextDialer(dstAddr string) proxy.Dialer { - return rr.nextDialer(dstAddr) -} - -// Check implements the Checker interface -func (rr *rrDialer) Check() { - for _, f := range rr.fwdrs { - go rr.checkDialer(f) - } -} - -// Check dialer -func (rr *rrDialer) checkDialer(f *proxy.Forwarder) { - retry := 1 - buf := make([]byte, 4) - - for { - time.Sleep(time.Duration(rr.interval) * time.Second * time.Duration(retry>>1)) - - retry <<= 1 - if retry > 16 { - retry = 16 - } - - // check forwarders whose priority not less than current priority only - if f.Priority < rr.priority { - // log.F("f.Priority:%d, rr.priority:%d", f.Priority, rr.priority) - continue - } - - startTime := time.Now() - rc, err := f.Dial("tcp", rr.website) - if err != nil { - f.Disable() - log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority, rr.website, err) - continue - } - - rc.Write([]byte("GET / HTTP/1.0\r\n\r\n")) - - _, err = io.ReadFull(rc, buf) - if err != nil { - f.Disable() - log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority, rr.website, err) - } else if bytes.Equal([]byte("HTTP"), buf) { - f.Enable() - retry = 2 - readTime := time.Since(startTime) - f.SetLatency(int64(readTime)) - log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority, rr.website, readTime.String()) - } else { - f.Disable() - log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority, rr.website, buf) - } - - rc.Close() - } -} - -// high availability forwarder -// 1. choose dialer whose priority is the highest -// 2. choose the first enabled dialer in that priority -type haDialer struct { - *rrDialer -} - -// newHADialer . -func newHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer { - return &haDialer{rrDialer: newRRDialer(dialers, webhost, duration)} -} - -func (ha *haDialer) nextDialer(dstAddr string) *proxy.Forwarder { - d := ha.fwdrs[ha.idx] - if !d.Enabled() { - d = ha.nextDialer(dstAddr) - } - return d -} - -func (ha *haDialer) Dial(network, addr string) (net.Conn, error) { - d := ha.nextDialer(addr) - return d.Dial(network, addr) -} - -func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { - d := ha.nextDialer(addr) - return d.DialUDP(network, addr) -} - -// latency based high availability forwarder -// 1. choose dialer whose priority is the highest -// 2. choose dialer with the lowest latency -type lhaDialer struct { - *rrDialer -} - -// newLHADialer . -func newLHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer { - return &lhaDialer{rrDialer: newRRDialer(dialers, webhost, duration)} -} - -func (lha *lhaDialer) nextDialer(dstAddr string) *proxy.Forwarder { - var latency int64 - for i, fwder := range lha.fwdrs { - if fwder.Enabled() { - lha.priority = fwder.Priority - latency = fwder.Latency() - lha.idx = i - break - } - } - - for i, fwder := range lha.fwdrs { - if fwder.Enabled() && fwder.Priority >= lha.priority && fwder.Latency() < latency { - latency = fwder.Latency() - lha.idx = i - } - } - - return lha.fwdrs[lha.idx] -} - -func (lha *lhaDialer) Dial(network, addr string) (net.Conn, error) { - d := lha.nextDialer(addr) - return d.Dial(network, addr) -} - -func (lha *lhaDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { - d := lha.nextDialer(addr) - return d.DialUDP(network, addr) -}