strategy: optimized code

This commit is contained in:
nadoo 2018-08-26 01:25:22 +08:00
parent bec2c1fd63
commit 035e15df5c
3 changed files with 70 additions and 85 deletions

View File

@ -10,8 +10,8 @@ import (
"github.com/nadoo/glider/common/log"
)
// StatusChangedHandler function will be called when the forwarder's status changed
type StatusChangedHandler func(*Forwarder)
// StatusHandler function will be called when the forwarder's status changed
type StatusHandler func(*Forwarder)
// Forwarder is a forwarder
type Forwarder struct {
@ -23,7 +23,7 @@ type Forwarder struct {
failures uint32
latency int64
intface string // local interface or ip address
handlers []StatusChangedHandler
handlers []StatusHandler
}
// ForwarderFromURL parses `forward=` command value and returns a new forwarder
@ -107,7 +107,7 @@ func (f *Forwarder) IncFailures() {
}
// AddHandler adds a custom handler to handle the status change event
func (f *Forwarder) AddHandler(h StatusChangedHandler) {
func (f *Forwarder) AddHandler(h StatusHandler) {
f.handlers = append(f.handlers, h)
}

View File

@ -1,49 +0,0 @@
package strategy
import (
"hash/fnv"
"github.com/nadoo/glider/proxy"
)
func (d *Dialer) scheduleRR(dstAddr string) *proxy.Forwarder {
d.mu.Lock()
defer d.mu.Unlock()
idx := d.IncIndex() % int32(len(d.valid))
d.SetIndex(idx)
return d.valid[idx]
}
func (d *Dialer) scheduleHA(dstAddr string) *proxy.Forwarder {
d.mu.Lock()
defer d.mu.Unlock()
return d.valid[0]
}
func (d *Dialer) scheduleLHA(dstAddr string) *proxy.Forwarder {
d.mu.Lock()
defer d.mu.Unlock()
fwdr := d.valid[0]
lowest := fwdr.Latency()
for _, f := range d.valid {
if f.Latency() < lowest {
lowest = f.Latency()
fwdr = f
}
}
return fwdr
}
func (d *Dialer) scheduleDH(dstAddr string) *proxy.Forwarder {
d.mu.Lock()
defer d.mu.Unlock()
fnv1a := fnv.New32a()
fnv1a.Write([]byte(dstAddr))
idx := fnv1a.Sum32() % uint32(len(d.valid))
return d.valid[idx]
}

View File

@ -2,6 +2,7 @@ package strategy
import (
"bytes"
"hash/fnv"
"io"
"net"
"sort"
@ -37,12 +38,12 @@ func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Dialer .
type Dialer struct {
config *Config
fwdrs priSlice
valid []*proxy.Forwarder
mu sync.Mutex
index int32
priority uint32
config *Config
fwdrs priSlice
available []*proxy.Forwarder
mu sync.RWMutex
index uint32
priority uint32
nextForwarder func(addr string) *proxy.Forwarder
}
@ -79,9 +80,7 @@ func newDialer(fwdrs []*proxy.Forwarder, c *Config) *Dialer {
d := &Dialer{fwdrs: fwdrs, config: c}
sort.Sort(d.fwdrs)
d.mu.Lock()
d.valid = d.fwdrs
d.mu.Unlock()
d.initAvailable()
if strings.IndexByte(d.config.CheckWebSite, ':') == -1 {
d.config.CheckWebSite += ":80"
@ -127,24 +126,38 @@ func (d *Dialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.A
// NextDialer returns the next dialer
func (d *Dialer) NextDialer(dstAddr string) proxy.Dialer {
d.mu.RLock()
defer d.mu.RUnlock()
return d.nextForwarder(dstAddr)
}
// Index returns the active forwarder's Index of rrDialer
func (d *Dialer) Index() int32 { return atomic.LoadInt32(&d.index) }
// SetIndex sets the active forwarder's Index of rrDialer
func (d *Dialer) SetIndex(p int32) { atomic.StoreInt32(&d.index, p) }
// IncIndex increase the index by 1
func (d *Dialer) IncIndex() int32 { return atomic.AddInt32(&d.index, 1) }
// Priority returns the active priority of rrDialer
func (d *Dialer) Priority() uint32 { return atomic.LoadUint32(&d.priority) }
// SetPriority sets the active priority of rrDialer
func (d *Dialer) SetPriority(p uint32) { atomic.StoreUint32(&d.priority, p) }
// initAvailable traverse d.fwdrs and init the available forwarder slice
func (d *Dialer) initAvailable() {
for _, f := range d.fwdrs {
if f.Enabled() {
d.SetPriority(f.Priority())
break
}
}
d.available = nil
for _, f := range d.fwdrs {
if f.Enabled() && f.Priority() >= d.Priority() {
d.available = append(d.available, f)
}
}
if len(d.available) == 0 {
d.available = append(d.available, d.fwdrs[0])
}
}
// OnStatusChanged will be called when fwdr's status changed
func (d *Dialer) OnStatusChanged(fwdr *proxy.Forwarder) {
d.mu.Lock()
@ -152,32 +165,24 @@ func (d *Dialer) OnStatusChanged(fwdr *proxy.Forwarder) {
if fwdr.Enabled() {
if fwdr.Priority() == d.Priority() {
d.valid = append(d.valid, fwdr)
d.available = append(d.available, fwdr)
} else if fwdr.Priority() > d.Priority() {
d.SetPriority(fwdr.Priority())
d.valid = nil
for _, f := range d.fwdrs {
if f.Enabled() && f.Priority() >= d.Priority() {
d.valid = append(d.valid, f)
}
}
d.initAvailable()
}
}
if !fwdr.Enabled() {
for i, f := range d.valid {
for i, f := range d.available {
if f == fwdr {
d.valid[i], d.valid = d.valid[len(d.valid)-1], d.valid[:len(d.valid)-1]
d.available[i], d.available = d.available[len(d.available)-1], d.available[:len(d.available)-1]
break
}
}
}
if len(d.valid) == 0 {
d.valid = append(d.valid, d.fwdrs[0])
if len(d.available) == 0 {
d.initAvailable()
}
}
// Check implements the Checker interface
@ -232,3 +237,32 @@ func (d *Dialer) check(i int) {
rc.Close()
}
}
func (d *Dialer) scheduleRR(dstAddr string) *proxy.Forwarder {
idx := atomic.AddUint32(&d.index, 1) % uint32(len(d.available))
return d.available[idx]
}
func (d *Dialer) scheduleHA(dstAddr string) *proxy.Forwarder {
return d.available[0]
}
func (d *Dialer) scheduleLHA(dstAddr string) *proxy.Forwarder {
fwdr := d.available[0]
lowest := fwdr.Latency()
for _, f := range d.available {
if f.Latency() < lowest {
lowest = f.Latency()
fwdr = f
}
}
return fwdr
}
func (d *Dialer) scheduleDH(dstAddr string) *proxy.Forwarder {
fnv1a := fnv.New32a()
fnv1a.Write([]byte(dstAddr))
idx := fnv1a.Sum32() % uint32(len(d.available))
return d.available[idx]
}