From bec2c1fd63f192b1b2e2feab3ed1741832d6bcfb Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Sat, 25 Aug 2018 23:56:18 +0800 Subject: [PATCH] strategy: add the ability to handle forwarder status change events --- proxy/forwarder.go | 21 ++++- strategy/dh.go | 31 ------- strategy/ha.go | 31 ------- strategy/lha.go | 46 ---------- strategy/rr.go | 161 --------------------------------- strategy/scheduler.go | 49 ++++++++++ strategy/strategy.go | 204 ++++++++++++++++++++++++++++++++++++++---- 7 files changed, 254 insertions(+), 289 deletions(-) delete mode 100644 strategy/dh.go delete mode 100644 strategy/ha.go delete mode 100644 strategy/lha.go delete mode 100644 strategy/rr.go create mode 100644 strategy/scheduler.go diff --git a/proxy/forwarder.go b/proxy/forwarder.go index 9ae27e0..2e7f6b1 100644 --- a/proxy/forwarder.go +++ b/proxy/forwarder.go @@ -10,6 +10,9 @@ import ( "github.com/nadoo/glider/common/log" ) +// StatusChangedHandler function will be called when the forwarder's status changed +type StatusChangedHandler func(*Forwarder) + // Forwarder is a forwarder type Forwarder struct { Dialer @@ -20,6 +23,7 @@ type Forwarder struct { failures uint32 latency int64 intface string // local interface or ip address + handlers []StatusChangedHandler } // ForwarderFromURL parses `forward=` command value and returns a new forwarder @@ -102,15 +106,28 @@ func (f *Forwarder) IncFailures() { atomic.AddUint32(&f.failures, 1) } +// AddHandler adds a custom handler to handle the status change event +func (f *Forwarder) AddHandler(h StatusChangedHandler) { + f.handlers = append(f.handlers, h) +} + // Enable the forwarder func (f *Forwarder) Enable() { - atomic.StoreUint32(&f.disabled, 0) + if atomic.CompareAndSwapUint32(&f.disabled, 1, 0) { + for _, h := range f.handlers { + h(f) + } + } atomic.StoreUint32(&f.failures, 0) } // Disable the forwarder func (f *Forwarder) Disable() { - atomic.StoreUint32(&f.disabled, 1) + if atomic.CompareAndSwapUint32(&f.disabled, 0, 1) { + for _, h := range f.handlers { + h(f) + } + } } // Enabled returns the status of forwarder diff --git a/strategy/dh.go b/strategy/dh.go deleted file mode 100644 index 47e2b93..0000000 --- a/strategy/dh.go +++ /dev/null @@ -1,31 +0,0 @@ -package strategy - -import ( - "hash/fnv" - "net" - - "github.com/nadoo/glider/proxy" -) - -// destination hashing dialer -type dhDialer struct{ *rrDialer } - -// newDHDialer . -func newDHDialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer { - return &dhDialer{rrDialer: newRRDialer(dialers, webhost, duration)} -} - -func (dh *dhDialer) NextDialer(dstAddr string) proxy.Dialer { - fnv1a := fnv.New32a() - fnv1a.Write([]byte(dstAddr)) - idx := fnv1a.Sum32() % uint32(len(dh.fwdrs)) - return dh.fwdrs[idx] -} - -func (dh *dhDialer) Dial(network, addr string) (net.Conn, error) { - return dh.NextDialer(addr).Dial(network, addr) -} - -func (dh *dhDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { - return dh.NextDialer(addr).DialUDP(network, addr) -} diff --git a/strategy/ha.go b/strategy/ha.go deleted file mode 100644 index ff8d2a2..0000000 --- a/strategy/ha.go +++ /dev/null @@ -1,31 +0,0 @@ -package strategy - -import ( - "net" - - "github.com/nadoo/glider/proxy" -) - -// high availability dialer -type haDialer struct{ *rrDialer } - -// newHADialer . -func newHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer { - return &haDialer{rrDialer: newRRDialer(dialers, webhost, duration)} -} - -func (ha *haDialer) NextDialer(dstAddr string) proxy.Dialer { - d := ha.fwdrs[ha.Index()] - if !d.Enabled() || d.Priority() < ha.Priority() { - d = ha.nextDialer(dstAddr) - } - return d -} - -func (ha *haDialer) Dial(network, addr string) (net.Conn, error) { - return ha.NextDialer(addr).Dial(network, addr) -} - -func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { - return ha.NextDialer(addr).DialUDP(network, addr) -} diff --git a/strategy/lha.go b/strategy/lha.go deleted file mode 100644 index 0e897d7..0000000 --- a/strategy/lha.go +++ /dev/null @@ -1,46 +0,0 @@ -package strategy - -import ( - "net" - - "github.com/nadoo/glider/proxy" -) - -// latency based high availability dialer -type lhaDialer struct{ *rrDialer } - -// newLHADialer . -func newLHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer { - return &lhaDialer{rrDialer: newRRDialer(dialers, webhost, duration)} -} - -func (lha *lhaDialer) NextDialer(dstAddr string) proxy.Dialer { - idx := lha.Index() - var lowest int64 - for i, fwder := range lha.fwdrs { - if fwder.Enabled() { - lha.SetPriority(fwder.Priority()) - lowest = fwder.Latency() - idx = int32(i) - break - } - } - - for i, fwder := range lha.fwdrs { - if fwder.Enabled() && fwder.Priority() >= lha.Priority() && fwder.Latency() < lowest { - lowest = fwder.Latency() - idx = int32(i) - } - } - - lha.SetIndex(idx) - return lha.fwdrs[idx] -} - -func (lha *lhaDialer) Dial(network, addr string) (net.Conn, error) { - return lha.NextDialer(addr).Dial(network, addr) -} - -func (lha *lhaDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { - return lha.NextDialer(addr).DialUDP(network, addr) -} diff --git a/strategy/rr.go b/strategy/rr.go deleted file mode 100644 index ef5e514..0000000 --- a/strategy/rr.go +++ /dev/null @@ -1,161 +0,0 @@ -package strategy - -import ( - "bytes" - "io" - "net" - "sort" - "strings" - "sync/atomic" - "time" - - "github.com/nadoo/glider/common/log" - "github.com/nadoo/glider/proxy" -) - -// forwarder slice orderd by priority -type priSlice []*proxy.Forwarder - -func (p priSlice) Len() int { return len(p) } -func (p priSlice) Less(i, j int) bool { return p[i].Priority() > p[j].Priority() } -func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -// rrDialer is a round robin dialer -type rrDialer struct { - fwdrs priSlice - index int32 - priority uint32 - website string - interval int -} - -// newRRDialer returns a new rrDialer -func newRRDialer(fwdrs []*proxy.Forwarder, website string, interval int) *rrDialer { - rr := &rrDialer{fwdrs: fwdrs} - sort.Sort(rr.fwdrs) - - rr.website = website - if strings.IndexByte(rr.website, ':') == -1 { - rr.website += ":80" - } - - rr.interval = interval - return rr -} - -func (rr *rrDialer) Addr() string { return "STRATEGY" } - -func (rr *rrDialer) Dial(network, addr string) (net.Conn, error) { - return rr.NextDialer(addr).Dial(network, addr) -} - -func (rr *rrDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { - return rr.NextDialer(addr).DialUDP(network, addr) -} - -func (rr *rrDialer) NextDialer(dstAddr string) proxy.Dialer { return rr.nextDialer(dstAddr) } - -func (rr *rrDialer) nextDialer(dstAddr string) *proxy.Forwarder { - n := int32(len(rr.fwdrs)) - if n == 1 { - return rr.fwdrs[0] - } - - for _, fwder := range rr.fwdrs { - if fwder.Enabled() { - rr.SetPriority(fwder.Priority()) - break - } - } - - idx := rr.Index() - if rr.fwdrs[idx].Priority() < rr.Priority() { - idx = 0 - } - - found := false - var i int32 - for i = 0; i < n; i++ { - idx = (idx + 1) % n - if rr.fwdrs[idx].Enabled() && - rr.fwdrs[idx].Priority() >= rr.Priority() { - found = true - rr.SetPriority(rr.fwdrs[idx].Priority()) - break - } - } - - if !found { - rr.SetPriority(0) - log.F("NO AVAILABLE PROXY FOUND! please check your network or proxy server settings.") - } - - rr.SetIndex(idx) - - return rr.fwdrs[idx] -} - -// Index returns the active forwarder's Index of rrDialer -func (rr *rrDialer) Index() int32 { return atomic.LoadInt32(&rr.index) } - -// SetIndex sets the active forwarder's Index of rrDialer -func (rr *rrDialer) SetIndex(p int32) { atomic.StoreInt32(&rr.index, p) } - -// Priority returns the active priority of rrDialer -func (rr *rrDialer) Priority() uint32 { return atomic.LoadUint32(&rr.priority) } - -// SetPriority sets the active priority of rrDialer -func (rr *rrDialer) SetPriority(p uint32) { atomic.StoreUint32(&rr.priority, p) } - -// Check implements the Checker interface -func (rr *rrDialer) Check() { - for i := 0; i < len(rr.fwdrs); i++ { - go rr.check(i) - } -} - -func (rr *rrDialer) check(i int) { - f := rr.fwdrs[i] - retry := 1 - buf := make([]byte, 4) - - for { - time.Sleep(time.Duration(rr.interval) * time.Second * time.Duration(retry>>1)) - - retry <<= 1 - if retry > 16 { - retry = 16 - } - - if f.Priority() < rr.Priority() { - continue - } - - startTime := time.Now() - rc, err := f.Dial("tcp", rr.website) - if err != nil { - f.Disable() - log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority(), rr.website, err) - continue - } - - rc.Write([]byte("GET / HTTP/1.0\r\n\r\n")) - - _, err = io.ReadFull(rc, buf) - if err != nil { - f.Disable() - log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority(), rr.website, err) - } else if bytes.Equal([]byte("HTTP"), buf) { - f.Enable() - retry = 2 - readTime := time.Since(startTime) - f.SetLatency(int64(readTime)) - log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority(), rr.website, readTime.String()) - } else { - f.Disable() - log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority(), rr.website, buf) - } - - rc.Close() - } -} diff --git a/strategy/scheduler.go b/strategy/scheduler.go new file mode 100644 index 0000000..13e669f --- /dev/null +++ b/strategy/scheduler.go @@ -0,0 +1,49 @@ +package strategy + +import ( + "hash/fnv" + + "github.com/nadoo/glider/proxy" +) + +func (d *Dialer) scheduleRR(dstAddr string) *proxy.Forwarder { + d.mu.Lock() + defer d.mu.Unlock() + + idx := d.IncIndex() % int32(len(d.valid)) + d.SetIndex(idx) + return d.valid[idx] +} + +func (d *Dialer) scheduleHA(dstAddr string) *proxy.Forwarder { + d.mu.Lock() + defer d.mu.Unlock() + + return d.valid[0] +} + +func (d *Dialer) scheduleLHA(dstAddr string) *proxy.Forwarder { + d.mu.Lock() + defer d.mu.Unlock() + + fwdr := d.valid[0] + lowest := fwdr.Latency() + for _, f := range d.valid { + if f.Latency() < lowest { + lowest = f.Latency() + fwdr = f + } + } + + return fwdr +} + +func (d *Dialer) scheduleDH(dstAddr string) *proxy.Forwarder { + d.mu.Lock() + defer d.mu.Unlock() + + fnv1a := fnv.New32a() + fnv1a.Write([]byte(dstAddr)) + idx := fnv1a.Sum32() % uint32(len(d.valid)) + return d.valid[idx] +} diff --git a/strategy/strategy.go b/strategy/strategy.go index dcfe5bb..7bd5d87 100644 --- a/strategy/strategy.go +++ b/strategy/strategy.go @@ -1,6 +1,15 @@ package strategy import ( + "bytes" + "io" + "net" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + "github.com/nadoo/glider/common/log" "github.com/nadoo/glider/proxy" ) @@ -19,6 +28,25 @@ type Config struct { IntFace string } +// forwarder slice orderd by priority +type priSlice []*proxy.Forwarder + +func (p priSlice) Len() int { return len(p) } +func (p priSlice) Less(i, j int) bool { return p[i].Priority() > p[j].Priority() } +func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// Dialer . +type Dialer struct { + config *Config + fwdrs priSlice + valid []*proxy.Forwarder + mu sync.Mutex + index int32 + priority uint32 + + nextForwarder func(addr string) *proxy.Forwarder +} + // NewDialer returns a new strategy dialer func NewDialer(s []string, c *Config) proxy.Dialer { var fwdrs []*proxy.Forwarder @@ -43,24 +71,164 @@ func NewDialer(s []string, c *Config) proxy.Dialer { return fwdrs[0] } - var dialer proxy.Dialer - switch c.Strategy { - case "rr": - dialer = newRRDialer(fwdrs, c.CheckWebSite, c.CheckInterval) - log.F("forward to remote servers in round robin mode.") - case "ha": - dialer = newHADialer(fwdrs, c.CheckWebSite, c.CheckInterval) - log.F("forward to remote servers in high availability mode.") - case "lha": - dialer = newLHADialer(fwdrs, c.CheckWebSite, c.CheckInterval) - log.F("forward to remote servers in latency based high availability mode.") - case "dh": - dialer = newDHDialer(fwdrs, c.CheckWebSite, c.CheckInterval) - log.F("forward to remote servers in destination hashing mode.") - default: - log.F("not supported forward mode '%s', just use the first forward server.", c.Strategy) - dialer = fwdrs[0] + return newDialer(fwdrs, c) +} + +// newDialer returns a new rrDialer +func newDialer(fwdrs []*proxy.Forwarder, c *Config) *Dialer { + d := &Dialer{fwdrs: fwdrs, config: c} + sort.Sort(d.fwdrs) + + d.mu.Lock() + d.valid = d.fwdrs + d.mu.Unlock() + + if strings.IndexByte(d.config.CheckWebSite, ':') == -1 { + d.config.CheckWebSite += ":80" } - return dialer + switch c.Strategy { + case "rr": + d.nextForwarder = d.scheduleRR + log.F("forward to remote servers in round robin mode.") + case "ha": + d.nextForwarder = d.scheduleHA + log.F("forward to remote servers in high availability mode.") + case "lha": + d.nextForwarder = d.scheduleLHA + log.F("forward to remote servers in latency based high availability mode.") + case "dh": + d.nextForwarder = d.scheduleDH + log.F("forward to remote servers in destination hashing mode.") + default: + d.nextForwarder = d.scheduleRR + log.F("not supported forward mode '%s', use round robin mode.", c.Strategy) + } + + for _, f := range fwdrs { + f.AddHandler(d.OnStatusChanged) + } + + return d +} + +// Addr returns forwarder's address +func (d *Dialer) Addr() string { return "STRATEGY" } + +// Dial connects to the address addr on the network net +func (d *Dialer) Dial(network, addr string) (net.Conn, error) { + return d.NextDialer(addr).Dial(network, addr) +} + +// DialUDP connects to the given address +func (d *Dialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { + return d.NextDialer(addr).DialUDP(network, addr) +} + +// NextDialer returns the next dialer +func (d *Dialer) NextDialer(dstAddr string) proxy.Dialer { + return d.nextForwarder(dstAddr) +} + +// Index returns the active forwarder's Index of rrDialer +func (d *Dialer) Index() int32 { return atomic.LoadInt32(&d.index) } + +// SetIndex sets the active forwarder's Index of rrDialer +func (d *Dialer) SetIndex(p int32) { atomic.StoreInt32(&d.index, p) } + +// IncIndex increase the index by 1 +func (d *Dialer) IncIndex() int32 { return atomic.AddInt32(&d.index, 1) } + +// Priority returns the active priority of rrDialer +func (d *Dialer) Priority() uint32 { return atomic.LoadUint32(&d.priority) } + +// SetPriority sets the active priority of rrDialer +func (d *Dialer) SetPriority(p uint32) { atomic.StoreUint32(&d.priority, p) } + +// OnStatusChanged will be called when fwdr's status changed +func (d *Dialer) OnStatusChanged(fwdr *proxy.Forwarder) { + d.mu.Lock() + defer d.mu.Unlock() + + if fwdr.Enabled() { + if fwdr.Priority() == d.Priority() { + d.valid = append(d.valid, fwdr) + } else if fwdr.Priority() > d.Priority() { + d.SetPriority(fwdr.Priority()) + + d.valid = nil + for _, f := range d.fwdrs { + if f.Enabled() && f.Priority() >= d.Priority() { + d.valid = append(d.valid, f) + } + } + } + } + + if !fwdr.Enabled() { + for i, f := range d.valid { + if f == fwdr { + d.valid[i], d.valid = d.valid[len(d.valid)-1], d.valid[:len(d.valid)-1] + break + } + } + } + + if len(d.valid) == 0 { + d.valid = append(d.valid, d.fwdrs[0]) + } + +} + +// Check implements the Checker interface +func (d *Dialer) Check() { + for i := 0; i < len(d.fwdrs); i++ { + go d.check(i) + } +} + +func (d *Dialer) check(i int) { + f := d.fwdrs[i] + retry := 1 + buf := make([]byte, 4) + + for { + time.Sleep(time.Duration(d.config.CheckInterval) * time.Second * time.Duration(retry>>1)) + + retry <<= 1 + if retry > 16 { + retry = 16 + } + + if f.Priority() < d.Priority() { + continue + } + + startTime := time.Now() + rc, err := f.Dial("tcp", d.config.CheckWebSite) + if err != nil { + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority(), d.config.CheckWebSite, err) + continue + } + + rc.Write([]byte("GET / HTTP/1.0\r\n\r\n")) + + _, err = io.ReadFull(rc, buf) + if err != nil { + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority(), d.config.CheckWebSite, err) + } else if bytes.Equal([]byte("HTTP"), buf) { + f.Enable() + retry = 2 + readTime := time.Since(startTime) + f.SetLatency(int64(readTime)) + log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority(), d.config.CheckWebSite, readTime.String()) + } else { + f.Disable() + log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority(), d.config.CheckWebSite, buf) + } + + rc.Close() + } }