From 035e15df5c8faea31fc121003a346d5e4c8a1a49 Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Sun, 26 Aug 2018 01:25:22 +0800 Subject: [PATCH] strategy: optimized code --- proxy/forwarder.go | 8 ++-- strategy/scheduler.go | 49 ---------------------- strategy/strategy.go | 98 +++++++++++++++++++++++++++++-------------- 3 files changed, 70 insertions(+), 85 deletions(-) delete mode 100644 strategy/scheduler.go diff --git a/proxy/forwarder.go b/proxy/forwarder.go index 2e7f6b1..641e947 100644 --- a/proxy/forwarder.go +++ b/proxy/forwarder.go @@ -10,8 +10,8 @@ import ( "github.com/nadoo/glider/common/log" ) -// StatusChangedHandler function will be called when the forwarder's status changed -type StatusChangedHandler func(*Forwarder) +// StatusHandler function will be called when the forwarder's status changed +type StatusHandler func(*Forwarder) // Forwarder is a forwarder type Forwarder struct { @@ -23,7 +23,7 @@ type Forwarder struct { failures uint32 latency int64 intface string // local interface or ip address - handlers []StatusChangedHandler + handlers []StatusHandler } // ForwarderFromURL parses `forward=` command value and returns a new forwarder @@ -107,7 +107,7 @@ func (f *Forwarder) IncFailures() { } // AddHandler adds a custom handler to handle the status change event -func (f *Forwarder) AddHandler(h StatusChangedHandler) { +func (f *Forwarder) AddHandler(h StatusHandler) { f.handlers = append(f.handlers, h) } diff --git a/strategy/scheduler.go b/strategy/scheduler.go deleted file mode 100644 index 13e669f..0000000 --- a/strategy/scheduler.go +++ /dev/null @@ -1,49 +0,0 @@ -package strategy - -import ( - "hash/fnv" - - "github.com/nadoo/glider/proxy" -) - -func (d *Dialer) scheduleRR(dstAddr string) *proxy.Forwarder { - d.mu.Lock() - defer d.mu.Unlock() - - idx := d.IncIndex() % int32(len(d.valid)) - d.SetIndex(idx) - return d.valid[idx] -} - -func (d *Dialer) scheduleHA(dstAddr string) *proxy.Forwarder { - d.mu.Lock() - defer d.mu.Unlock() - - return d.valid[0] -} - -func (d *Dialer) scheduleLHA(dstAddr string) *proxy.Forwarder { - d.mu.Lock() - defer d.mu.Unlock() - - fwdr := d.valid[0] - lowest := fwdr.Latency() - for _, f := range d.valid { - if f.Latency() < lowest { - lowest = f.Latency() - fwdr = f - } - } - - return fwdr -} - -func (d *Dialer) scheduleDH(dstAddr string) *proxy.Forwarder { - d.mu.Lock() - defer d.mu.Unlock() - - fnv1a := fnv.New32a() - fnv1a.Write([]byte(dstAddr)) - idx := fnv1a.Sum32() % uint32(len(d.valid)) - return d.valid[idx] -} diff --git a/strategy/strategy.go b/strategy/strategy.go index 7bd5d87..3d7e6d9 100644 --- a/strategy/strategy.go +++ b/strategy/strategy.go @@ -2,6 +2,7 @@ package strategy import ( "bytes" + "hash/fnv" "io" "net" "sort" @@ -37,12 +38,12 @@ func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } // Dialer . type Dialer struct { - config *Config - fwdrs priSlice - valid []*proxy.Forwarder - mu sync.Mutex - index int32 - priority uint32 + config *Config + fwdrs priSlice + available []*proxy.Forwarder + mu sync.RWMutex + index uint32 + priority uint32 nextForwarder func(addr string) *proxy.Forwarder } @@ -79,9 +80,7 @@ func newDialer(fwdrs []*proxy.Forwarder, c *Config) *Dialer { d := &Dialer{fwdrs: fwdrs, config: c} sort.Sort(d.fwdrs) - d.mu.Lock() - d.valid = d.fwdrs - d.mu.Unlock() + d.initAvailable() if strings.IndexByte(d.config.CheckWebSite, ':') == -1 { d.config.CheckWebSite += ":80" @@ -127,24 +126,38 @@ func (d *Dialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.A // NextDialer returns the next dialer func (d *Dialer) NextDialer(dstAddr string) proxy.Dialer { + d.mu.RLock() + defer d.mu.RUnlock() return d.nextForwarder(dstAddr) } -// Index returns the active forwarder's Index of rrDialer -func (d *Dialer) Index() int32 { return atomic.LoadInt32(&d.index) } - -// SetIndex sets the active forwarder's Index of rrDialer -func (d *Dialer) SetIndex(p int32) { atomic.StoreInt32(&d.index, p) } - -// IncIndex increase the index by 1 -func (d *Dialer) IncIndex() int32 { return atomic.AddInt32(&d.index, 1) } - // Priority returns the active priority of rrDialer func (d *Dialer) Priority() uint32 { return atomic.LoadUint32(&d.priority) } // SetPriority sets the active priority of rrDialer func (d *Dialer) SetPriority(p uint32) { atomic.StoreUint32(&d.priority, p) } +// initAvailable traverse d.fwdrs and init the available forwarder slice +func (d *Dialer) initAvailable() { + for _, f := range d.fwdrs { + if f.Enabled() { + d.SetPriority(f.Priority()) + break + } + } + + d.available = nil + for _, f := range d.fwdrs { + if f.Enabled() && f.Priority() >= d.Priority() { + d.available = append(d.available, f) + } + } + + if len(d.available) == 0 { + d.available = append(d.available, d.fwdrs[0]) + } +} + // OnStatusChanged will be called when fwdr's status changed func (d *Dialer) OnStatusChanged(fwdr *proxy.Forwarder) { d.mu.Lock() @@ -152,32 +165,24 @@ func (d *Dialer) OnStatusChanged(fwdr *proxy.Forwarder) { if fwdr.Enabled() { if fwdr.Priority() == d.Priority() { - d.valid = append(d.valid, fwdr) + d.available = append(d.available, fwdr) } else if fwdr.Priority() > d.Priority() { - d.SetPriority(fwdr.Priority()) - - d.valid = nil - for _, f := range d.fwdrs { - if f.Enabled() && f.Priority() >= d.Priority() { - d.valid = append(d.valid, f) - } - } + d.initAvailable() } } if !fwdr.Enabled() { - for i, f := range d.valid { + for i, f := range d.available { if f == fwdr { - d.valid[i], d.valid = d.valid[len(d.valid)-1], d.valid[:len(d.valid)-1] + d.available[i], d.available = d.available[len(d.available)-1], d.available[:len(d.available)-1] break } } } - if len(d.valid) == 0 { - d.valid = append(d.valid, d.fwdrs[0]) + if len(d.available) == 0 { + d.initAvailable() } - } // Check implements the Checker interface @@ -232,3 +237,32 @@ func (d *Dialer) check(i int) { rc.Close() } } + +func (d *Dialer) scheduleRR(dstAddr string) *proxy.Forwarder { + idx := atomic.AddUint32(&d.index, 1) % uint32(len(d.available)) + return d.available[idx] +} + +func (d *Dialer) scheduleHA(dstAddr string) *proxy.Forwarder { + return d.available[0] +} + +func (d *Dialer) scheduleLHA(dstAddr string) *proxy.Forwarder { + fwdr := d.available[0] + lowest := fwdr.Latency() + for _, f := range d.available { + if f.Latency() < lowest { + lowest = f.Latency() + fwdr = f + } + } + + return fwdr +} + +func (d *Dialer) scheduleDH(dstAddr string) *proxy.Forwarder { + fnv1a := fnv.New32a() + fnv1a.Write([]byte(dstAddr)) + idx := fnv1a.Sum32() % uint32(len(d.available)) + return d.available[idx] +}