strategy: run init when no forwarders available

This commit is contained in:
nadoo 2020-04-03 20:31:59 +08:00
parent 1204cf30fd
commit fd3bdb8f56

View File

@ -36,11 +36,10 @@ func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
type Proxy struct { type Proxy struct {
config *Config config *Config
fwdrs priSlice fwdrs priSlice
available []*Forwarder avail []*Forwarder // avaliable forwarders
mu sync.RWMutex mu sync.RWMutex
index uint32 index uint32
priority uint32 priority uint32
next func(addr string) *Forwarder next func(addr string) *Forwarder
} }
@ -67,38 +66,38 @@ func NewProxy(s []string, c *Config) *Proxy {
// newProxy returns a new rrProxy // newProxy returns a new rrProxy
func newProxy(fwdrs []*Forwarder, c *Config) *Proxy { func newProxy(fwdrs []*Forwarder, c *Config) *Proxy {
d := &Proxy{fwdrs: fwdrs, config: c} p := &Proxy{fwdrs: fwdrs, config: c}
sort.Sort(d.fwdrs) sort.Sort(p.fwdrs)
d.initAvailable() p.init()
if strings.IndexByte(d.config.CheckWebSite, ':') == -1 { if strings.IndexByte(p.config.CheckWebSite, ':') == -1 {
d.config.CheckWebSite += ":80" p.config.CheckWebSite += ":80"
} }
switch c.Strategy { switch c.Strategy {
case "rr": case "rr":
d.next = d.scheduleRR p.next = p.scheduleRR
log.F("[strategy] forward to remote servers in round robin mode.") log.F("[strategy] forward to remote servers in round robin mode.")
case "ha": case "ha":
d.next = d.scheduleHA p.next = p.scheduleHA
log.F("[strategy] forward to remote servers in high availability mode.") log.F("[strategy] forward to remote servers in high availability mode.")
case "lha": case "lha":
d.next = d.scheduleLHA p.next = p.scheduleLHA
log.F("[strategy] forward to remote servers in latency based high availability mode.") log.F("[strategy] forward to remote servers in latency based high availability mode.")
case "dh": case "dh":
d.next = d.scheduleDH p.next = p.scheduleDH
log.F("[strategy] forward to remote servers in destination hashing mode.") log.F("[strategy] forward to remote servers in destination hashing mode.")
default: default:
d.next = d.scheduleRR p.next = p.scheduleRR
log.F("[strategy] not supported forward mode '%s', use round robin mode.", c.Strategy) log.F("[strategy] not supported forward mode '%s', use round robin mode.", c.Strategy)
} }
for _, f := range fwdrs { for _, f := range fwdrs {
f.AddHandler(d.onStatusChanged) f.AddHandler(p.onStatusChanged)
} }
return d return p
} }
// Dial connects to the address addr on the network net. // Dial connects to the address addr on the network net.
@ -126,7 +125,7 @@ func (p *Proxy) NextForwarder(dstAddr string) *Forwarder {
p.mu.RLock() p.mu.RLock()
defer p.mu.RUnlock() defer p.mu.RUnlock()
if len(p.available) == 0 { if len(p.avail) == 0 {
return p.fwdrs[0] return p.fwdrs[0]
} }
@ -139,8 +138,8 @@ func (p *Proxy) Priority() uint32 { return atomic.LoadUint32(&p.priority) }
// SetPriority sets the active priority of daler. // SetPriority sets the active priority of daler.
func (p *Proxy) SetPriority(pri uint32) { atomic.StoreUint32(&p.priority, pri) } func (p *Proxy) SetPriority(pri uint32) { atomic.StoreUint32(&p.priority, pri) }
// initAvailable traverse d.fwdrs and init the available forwarder slice. // init traverse d.fwdrs and init the available forwarder slice.
func (p *Proxy) initAvailable() { func (p *Proxy) init() {
for _, f := range p.fwdrs { for _, f := range p.fwdrs {
if f.Enabled() { if f.Enabled() {
p.SetPriority(f.Priority()) p.SetPriority(f.Priority())
@ -148,17 +147,17 @@ func (p *Proxy) initAvailable() {
} }
} }
p.available = nil p.avail = nil
for _, f := range p.fwdrs { for _, f := range p.fwdrs {
if f.Enabled() && f.Priority() >= p.Priority() { if f.Enabled() && f.Priority() >= p.Priority() {
p.available = append(p.available, f) p.avail = append(p.avail, f)
} }
} }
if len(p.available) == 0 { if len(p.avail) == 0 {
// no available forwarders, set priority to 0 to check all forwarders in check func // no available forwarders, set priority to 0 to check all forwarders in check func
p.SetPriority(0) p.SetPriority(0)
log.F("[strategy] no available forwarders, just use: %s, please check your config file or network settings", p.fwdrs[0].Addr()) log.F("[strategy] no available forwarders, please check your config file or network settings", p.fwdrs[0].Addr())
} }
} }
@ -170,23 +169,23 @@ func (p *Proxy) onStatusChanged(fwdr *Forwarder) {
if fwdr.Enabled() { if fwdr.Enabled() {
log.F("[strategy] %s changed status from Disabled to Enabled ", fwdr.Addr()) log.F("[strategy] %s changed status from Disabled to Enabled ", fwdr.Addr())
if fwdr.Priority() == p.Priority() { if fwdr.Priority() == p.Priority() {
p.available = append(p.available, fwdr) p.avail = append(p.avail, fwdr)
} else if fwdr.Priority() > p.Priority() { } else if fwdr.Priority() > p.Priority() {
p.initAvailable() p.init()
} }
} else { } else {
log.F("[strategy] %s changed status from Enabled to Disabled", fwdr.Addr()) log.F("[strategy] %s changed status from Enabled to Disabled", fwdr.Addr())
for i, f := range p.available { for i, f := range p.avail {
if f == fwdr { if f == fwdr {
p.available[i], p.available = p.available[len(p.available)-1], p.available[:len(p.available)-1] p.avail[i], p.avail = p.avail[len(p.avail)-1], p.avail[:len(p.avail)-1]
break break
} }
} }
} }
// if len(p.available) == 0 { if len(p.avail) == 0 {
// p.initAvailable() p.init()
// } }
} }
// Check implements the Checker interface. // Check implements the Checker interface.
@ -256,19 +255,19 @@ func (p *Proxy) check(i int) {
// Round Robin // Round Robin
func (p *Proxy) scheduleRR(dstAddr string) *Forwarder { func (p *Proxy) scheduleRR(dstAddr string) *Forwarder {
return p.available[atomic.AddUint32(&p.index, 1)%uint32(len(p.available))] return p.avail[atomic.AddUint32(&p.index, 1)%uint32(len(p.avail))]
} }
// High Availability // High Availability
func (p *Proxy) scheduleHA(dstAddr string) *Forwarder { func (p *Proxy) scheduleHA(dstAddr string) *Forwarder {
return p.available[0] return p.avail[0]
} }
// Latency based High Availability // Latency based High Availability
func (p *Proxy) scheduleLHA(dstAddr string) *Forwarder { func (p *Proxy) scheduleLHA(dstAddr string) *Forwarder {
fwdr := p.available[0] fwdr := p.avail[0]
lowest := fwdr.Latency() lowest := fwdr.Latency()
for _, f := range p.available { for _, f := range p.avail {
if f.Latency() < lowest { if f.Latency() < lowest {
lowest = f.Latency() lowest = f.Latency()
fwdr = f fwdr = f
@ -281,5 +280,5 @@ func (p *Proxy) scheduleLHA(dstAddr string) *Forwarder {
func (p *Proxy) scheduleDH(dstAddr string) *Forwarder { func (p *Proxy) scheduleDH(dstAddr string) *Forwarder {
fnv1a := fnv.New32a() fnv1a := fnv.New32a()
fnv1a.Write([]byte(dstAddr)) fnv1a.Write([]byte(dstAddr))
return p.available[fnv1a.Sum32()%uint32(len(p.available))] return p.avail[fnv1a.Sum32()%uint32(len(p.avail))]
} }