From e1c318990b5005328e0e198bc57d2bfe54f356cd Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Sun, 12 Aug 2018 18:50:44 +0800 Subject: [PATCH] strategy: support priority now (need to check) --- main.go | 2 +- proxy/forwarder.go | 9 ++++--- strategy/strategy.go | 63 ++++++++++++++++++++++++++------------------ 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/main.go b/main.go index 09d6f72..8a10f46 100644 --- a/main.go +++ b/main.go @@ -72,7 +72,7 @@ func main() { // Proxy Servers for _, listen := range conf.Listen { - local, err := proxy.ServerFromURL(listen, proxy.NewForwarder(dialer)) + local, err := proxy.ServerFromURL(listen, dialer) if err != nil { log.Fatal(err) } diff --git a/proxy/forwarder.go b/proxy/forwarder.go index d88aa5a..a64a2c8 100644 --- a/proxy/forwarder.go +++ b/proxy/forwarder.go @@ -6,6 +6,8 @@ import ( "strconv" "strings" "sync/atomic" + + "github.com/nadoo/glider/common/log" ) // Forwarder is a forwarder @@ -70,7 +72,7 @@ func (f *Forwarder) Dial(network, addr string) (c net.Conn, err error) { // TODO: proxy timeout, target timeout? if err != nil { atomic.AddUint32(&f.failures, 1) - // log.F("forward dial failed, %d", f.failures) + log.F("forward dial failed, %d, addr: %s", f.failures, f.addr) } return c, err @@ -83,13 +85,12 @@ func (f *Forwarder) Failures() uint32 { // Enable . func (f *Forwarder) Enable() { - atomic.StoreUint32(&f.failures, 0) - atomic.StoreUint32(&f.failures, 0) + atomic.StoreUint32(&f.disabled, 0) } // Disable . func (f *Forwarder) Disable() { - atomic.StoreUint32(&f.failures, 1) + atomic.StoreUint32(&f.disabled, 1) } // Enabled . diff --git a/strategy/strategy.go b/strategy/strategy.go index 34608d7..12321fd 100644 --- a/strategy/strategy.go +++ b/strategy/strategy.go @@ -4,8 +4,8 @@ import ( "bytes" "io" "net" + "sort" "strings" - "sync" "time" "github.com/nadoo/glider/common/log" @@ -55,12 +55,17 @@ func NewDialer(s []string, c *Config) proxy.Dialer { return dialer } -// rrDialer is the base struct of strategy dialer -type rrDialer struct { - fwdrs []*proxy.Forwarder - idx int +type forwarderSlice []*proxy.Forwarder - status sync.Map +func (p forwarderSlice) Len() int { return len(p) } +func (p forwarderSlice) Less(i, j int) bool { return p[i].Priority > p[j].Priority } +func (p forwarderSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// rrDialer is a rr dialer +type rrDialer struct { + fwdrs forwarderSlice + idx int + priority int // for checking website string @@ -68,14 +73,16 @@ type rrDialer struct { } // newRRDialer returns a new rrDialer -func newRRDialer(fwdrs []*proxy.Forwarder, website string, interval int) *rrDialer { - rr := &rrDialer{fwdrs: fwdrs} - +func newRRDialer(fs []*proxy.Forwarder, website string, interval int) *rrDialer { + rr := &rrDialer{fwdrs: fs} rr.website = website rr.interval = interval - for k := range fwdrs { - rr.status.Store(k, true) + sort.Sort(rr.fwdrs) + rr.priority = rr.fwdrs[0].Priority + + for k := range rr.fwdrs { + log.F("k: %d, %s, priority: %d", k, rr.fwdrs[k].Addr(), rr.fwdrs[k].Priority) go rr.checkDialer(k) } @@ -97,12 +104,24 @@ func (rr *rrDialer) nextDialer(dstAddr string) *proxy.Forwarder { rr.idx = 0 } + for _, fwder := range rr.fwdrs { + if fwder.Enabled() { + rr.priority = fwder.Priority + break + } + } + + if rr.fwdrs[rr.idx].Priority < rr.priority { + rr.idx = 0 + } + found := false for i := 0; i < n; i++ { rr.idx = (rr.idx + 1) % n - result, ok := rr.status.Load(rr.idx) - if ok && result.(bool) { + if rr.fwdrs[rr.idx].Enabled() && + rr.fwdrs[rr.idx].Priority >= rr.priority { found = true + rr.priority = rr.fwdrs[rr.idx].Priority break } } @@ -140,7 +159,7 @@ func (rr *rrDialer) checkDialer(idx int) { startTime := time.Now() c, err := d.Dial("tcp", rr.website) if err != nil { - rr.status.Store(idx, false) + rr.fwdrs[idx].Disable() log.F("[check] %s -> %s, set to DISABLED. error in dial: %s", d.Addr(), rr.website, err) continue } @@ -149,15 +168,15 @@ func (rr *rrDialer) checkDialer(idx int) { _, err = io.ReadFull(c, buf) if err != nil { - rr.status.Store(idx, false) + rr.fwdrs[idx].Disable() log.F("[check] %s -> %s, set to DISABLED. error in read: %s", d.Addr(), rr.website, err) } else if bytes.Equal([]byte("HTTP"), buf) { - rr.status.Store(idx, true) + rr.fwdrs[idx].Enable() retry = 2 dialTime := time.Since(startTime) log.F("[check] %s -> %s, set to ENABLED. connect time: %s", d.Addr(), rr.website, dialTime.String()) } else { - rr.status.Store(idx, false) + rr.fwdrs[idx].Disable() log.F("[check] %s -> %s, set to DISABLED. server response: %s", d.Addr(), rr.website, buf) } @@ -177,22 +196,16 @@ func newHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy func (ha *haDialer) Dial(network, addr string) (net.Conn, error) { d := ha.fwdrs[ha.idx] - - result, ok := ha.status.Load(ha.idx) - if ok && !result.(bool) { + if !d.Enabled() { d = ha.nextDialer(addr) } - return d.Dial(network, addr) } func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { d := ha.fwdrs[ha.idx] - - result, ok := ha.status.Load(ha.idx) - if ok && !result.(bool) { + if !d.Enabled() { d = ha.nextDialer(addr) } - return d.DialUDP(network, addr) }