strategy: add the ability to handle forwarder status change events

This commit is contained in:
nadoo 2018-08-25 23:56:18 +08:00
parent eb0b17bea1
commit bec2c1fd63
7 changed files with 254 additions and 289 deletions

View File

@ -10,6 +10,9 @@ import (
"github.com/nadoo/glider/common/log"
)
// StatusChangedHandler function will be called when the forwarder's status changed
type StatusChangedHandler func(*Forwarder)
// Forwarder is a forwarder
type Forwarder struct {
Dialer
@ -20,6 +23,7 @@ type Forwarder struct {
failures uint32
latency int64
intface string // local interface or ip address
handlers []StatusChangedHandler
}
// ForwarderFromURL parses `forward=` command value and returns a new forwarder
@ -102,15 +106,28 @@ func (f *Forwarder) IncFailures() {
atomic.AddUint32(&f.failures, 1)
}
// AddHandler adds a custom handler to handle the status change event
func (f *Forwarder) AddHandler(h StatusChangedHandler) {
f.handlers = append(f.handlers, h)
}
// Enable the forwarder
func (f *Forwarder) Enable() {
atomic.StoreUint32(&f.disabled, 0)
if atomic.CompareAndSwapUint32(&f.disabled, 1, 0) {
for _, h := range f.handlers {
h(f)
}
}
atomic.StoreUint32(&f.failures, 0)
}
// Disable the forwarder
func (f *Forwarder) Disable() {
atomic.StoreUint32(&f.disabled, 1)
if atomic.CompareAndSwapUint32(&f.disabled, 0, 1) {
for _, h := range f.handlers {
h(f)
}
}
}
// Enabled returns the status of forwarder

View File

@ -1,31 +0,0 @@
package strategy
import (
"hash/fnv"
"net"
"github.com/nadoo/glider/proxy"
)
// destination hashing dialer
type dhDialer struct{ *rrDialer }
// newDHDialer .
func newDHDialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer {
return &dhDialer{rrDialer: newRRDialer(dialers, webhost, duration)}
}
func (dh *dhDialer) NextDialer(dstAddr string) proxy.Dialer {
fnv1a := fnv.New32a()
fnv1a.Write([]byte(dstAddr))
idx := fnv1a.Sum32() % uint32(len(dh.fwdrs))
return dh.fwdrs[idx]
}
func (dh *dhDialer) Dial(network, addr string) (net.Conn, error) {
return dh.NextDialer(addr).Dial(network, addr)
}
func (dh *dhDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
return dh.NextDialer(addr).DialUDP(network, addr)
}

View File

@ -1,31 +0,0 @@
package strategy
import (
"net"
"github.com/nadoo/glider/proxy"
)
// high availability dialer
type haDialer struct{ *rrDialer }
// newHADialer .
func newHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer {
return &haDialer{rrDialer: newRRDialer(dialers, webhost, duration)}
}
func (ha *haDialer) NextDialer(dstAddr string) proxy.Dialer {
d := ha.fwdrs[ha.Index()]
if !d.Enabled() || d.Priority() < ha.Priority() {
d = ha.nextDialer(dstAddr)
}
return d
}
func (ha *haDialer) Dial(network, addr string) (net.Conn, error) {
return ha.NextDialer(addr).Dial(network, addr)
}
func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
return ha.NextDialer(addr).DialUDP(network, addr)
}

View File

@ -1,46 +0,0 @@
package strategy
import (
"net"
"github.com/nadoo/glider/proxy"
)
// latency based high availability dialer
type lhaDialer struct{ *rrDialer }
// newLHADialer .
func newLHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer {
return &lhaDialer{rrDialer: newRRDialer(dialers, webhost, duration)}
}
func (lha *lhaDialer) NextDialer(dstAddr string) proxy.Dialer {
idx := lha.Index()
var lowest int64
for i, fwder := range lha.fwdrs {
if fwder.Enabled() {
lha.SetPriority(fwder.Priority())
lowest = fwder.Latency()
idx = int32(i)
break
}
}
for i, fwder := range lha.fwdrs {
if fwder.Enabled() && fwder.Priority() >= lha.Priority() && fwder.Latency() < lowest {
lowest = fwder.Latency()
idx = int32(i)
}
}
lha.SetIndex(idx)
return lha.fwdrs[idx]
}
func (lha *lhaDialer) Dial(network, addr string) (net.Conn, error) {
return lha.NextDialer(addr).Dial(network, addr)
}
func (lha *lhaDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
return lha.NextDialer(addr).DialUDP(network, addr)
}

View File

@ -1,161 +0,0 @@
package strategy
import (
"bytes"
"io"
"net"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/nadoo/glider/common/log"
"github.com/nadoo/glider/proxy"
)
// forwarder slice orderd by priority
type priSlice []*proxy.Forwarder
func (p priSlice) Len() int { return len(p) }
func (p priSlice) Less(i, j int) bool { return p[i].Priority() > p[j].Priority() }
func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// rrDialer is a round robin dialer
type rrDialer struct {
fwdrs priSlice
index int32
priority uint32
website string
interval int
}
// newRRDialer returns a new rrDialer
func newRRDialer(fwdrs []*proxy.Forwarder, website string, interval int) *rrDialer {
rr := &rrDialer{fwdrs: fwdrs}
sort.Sort(rr.fwdrs)
rr.website = website
if strings.IndexByte(rr.website, ':') == -1 {
rr.website += ":80"
}
rr.interval = interval
return rr
}
func (rr *rrDialer) Addr() string { return "STRATEGY" }
func (rr *rrDialer) Dial(network, addr string) (net.Conn, error) {
return rr.NextDialer(addr).Dial(network, addr)
}
func (rr *rrDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
return rr.NextDialer(addr).DialUDP(network, addr)
}
func (rr *rrDialer) NextDialer(dstAddr string) proxy.Dialer { return rr.nextDialer(dstAddr) }
func (rr *rrDialer) nextDialer(dstAddr string) *proxy.Forwarder {
n := int32(len(rr.fwdrs))
if n == 1 {
return rr.fwdrs[0]
}
for _, fwder := range rr.fwdrs {
if fwder.Enabled() {
rr.SetPriority(fwder.Priority())
break
}
}
idx := rr.Index()
if rr.fwdrs[idx].Priority() < rr.Priority() {
idx = 0
}
found := false
var i int32
for i = 0; i < n; i++ {
idx = (idx + 1) % n
if rr.fwdrs[idx].Enabled() &&
rr.fwdrs[idx].Priority() >= rr.Priority() {
found = true
rr.SetPriority(rr.fwdrs[idx].Priority())
break
}
}
if !found {
rr.SetPriority(0)
log.F("NO AVAILABLE PROXY FOUND! please check your network or proxy server settings.")
}
rr.SetIndex(idx)
return rr.fwdrs[idx]
}
// Index returns the active forwarder's Index of rrDialer
func (rr *rrDialer) Index() int32 { return atomic.LoadInt32(&rr.index) }
// SetIndex sets the active forwarder's Index of rrDialer
func (rr *rrDialer) SetIndex(p int32) { atomic.StoreInt32(&rr.index, p) }
// Priority returns the active priority of rrDialer
func (rr *rrDialer) Priority() uint32 { return atomic.LoadUint32(&rr.priority) }
// SetPriority sets the active priority of rrDialer
func (rr *rrDialer) SetPriority(p uint32) { atomic.StoreUint32(&rr.priority, p) }
// Check implements the Checker interface
func (rr *rrDialer) Check() {
for i := 0; i < len(rr.fwdrs); i++ {
go rr.check(i)
}
}
func (rr *rrDialer) check(i int) {
f := rr.fwdrs[i]
retry := 1
buf := make([]byte, 4)
for {
time.Sleep(time.Duration(rr.interval) * time.Second * time.Duration(retry>>1))
retry <<= 1
if retry > 16 {
retry = 16
}
if f.Priority() < rr.Priority() {
continue
}
startTime := time.Now()
rc, err := f.Dial("tcp", rr.website)
if err != nil {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority(), rr.website, err)
continue
}
rc.Write([]byte("GET / HTTP/1.0\r\n\r\n"))
_, err = io.ReadFull(rc, buf)
if err != nil {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority(), rr.website, err)
} else if bytes.Equal([]byte("HTTP"), buf) {
f.Enable()
retry = 2
readTime := time.Since(startTime)
f.SetLatency(int64(readTime))
log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority(), rr.website, readTime.String())
} else {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority(), rr.website, buf)
}
rc.Close()
}
}

49
strategy/scheduler.go Normal file
View File

@ -0,0 +1,49 @@
package strategy
import (
"hash/fnv"
"github.com/nadoo/glider/proxy"
)
func (d *Dialer) scheduleRR(dstAddr string) *proxy.Forwarder {
d.mu.Lock()
defer d.mu.Unlock()
idx := d.IncIndex() % int32(len(d.valid))
d.SetIndex(idx)
return d.valid[idx]
}
func (d *Dialer) scheduleHA(dstAddr string) *proxy.Forwarder {
d.mu.Lock()
defer d.mu.Unlock()
return d.valid[0]
}
func (d *Dialer) scheduleLHA(dstAddr string) *proxy.Forwarder {
d.mu.Lock()
defer d.mu.Unlock()
fwdr := d.valid[0]
lowest := fwdr.Latency()
for _, f := range d.valid {
if f.Latency() < lowest {
lowest = f.Latency()
fwdr = f
}
}
return fwdr
}
func (d *Dialer) scheduleDH(dstAddr string) *proxy.Forwarder {
d.mu.Lock()
defer d.mu.Unlock()
fnv1a := fnv.New32a()
fnv1a.Write([]byte(dstAddr))
idx := fnv1a.Sum32() % uint32(len(d.valid))
return d.valid[idx]
}

View File

@ -1,6 +1,15 @@
package strategy
import (
"bytes"
"io"
"net"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nadoo/glider/common/log"
"github.com/nadoo/glider/proxy"
)
@ -19,6 +28,25 @@ type Config struct {
IntFace string
}
// forwarder slice orderd by priority
type priSlice []*proxy.Forwarder
func (p priSlice) Len() int { return len(p) }
func (p priSlice) Less(i, j int) bool { return p[i].Priority() > p[j].Priority() }
func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// Dialer .
type Dialer struct {
config *Config
fwdrs priSlice
valid []*proxy.Forwarder
mu sync.Mutex
index int32
priority uint32
nextForwarder func(addr string) *proxy.Forwarder
}
// NewDialer returns a new strategy dialer
func NewDialer(s []string, c *Config) proxy.Dialer {
var fwdrs []*proxy.Forwarder
@ -43,24 +71,164 @@ func NewDialer(s []string, c *Config) proxy.Dialer {
return fwdrs[0]
}
var dialer proxy.Dialer
switch c.Strategy {
case "rr":
dialer = newRRDialer(fwdrs, c.CheckWebSite, c.CheckInterval)
log.F("forward to remote servers in round robin mode.")
case "ha":
dialer = newHADialer(fwdrs, c.CheckWebSite, c.CheckInterval)
log.F("forward to remote servers in high availability mode.")
case "lha":
dialer = newLHADialer(fwdrs, c.CheckWebSite, c.CheckInterval)
log.F("forward to remote servers in latency based high availability mode.")
case "dh":
dialer = newDHDialer(fwdrs, c.CheckWebSite, c.CheckInterval)
log.F("forward to remote servers in destination hashing mode.")
default:
log.F("not supported forward mode '%s', just use the first forward server.", c.Strategy)
dialer = fwdrs[0]
return newDialer(fwdrs, c)
}
// newDialer returns a new rrDialer
func newDialer(fwdrs []*proxy.Forwarder, c *Config) *Dialer {
d := &Dialer{fwdrs: fwdrs, config: c}
sort.Sort(d.fwdrs)
d.mu.Lock()
d.valid = d.fwdrs
d.mu.Unlock()
if strings.IndexByte(d.config.CheckWebSite, ':') == -1 {
d.config.CheckWebSite += ":80"
}
return dialer
switch c.Strategy {
case "rr":
d.nextForwarder = d.scheduleRR
log.F("forward to remote servers in round robin mode.")
case "ha":
d.nextForwarder = d.scheduleHA
log.F("forward to remote servers in high availability mode.")
case "lha":
d.nextForwarder = d.scheduleLHA
log.F("forward to remote servers in latency based high availability mode.")
case "dh":
d.nextForwarder = d.scheduleDH
log.F("forward to remote servers in destination hashing mode.")
default:
d.nextForwarder = d.scheduleRR
log.F("not supported forward mode '%s', use round robin mode.", c.Strategy)
}
for _, f := range fwdrs {
f.AddHandler(d.OnStatusChanged)
}
return d
}
// Addr returns forwarder's address
func (d *Dialer) Addr() string { return "STRATEGY" }
// Dial connects to the address addr on the network net
func (d *Dialer) Dial(network, addr string) (net.Conn, error) {
return d.NextDialer(addr).Dial(network, addr)
}
// DialUDP connects to the given address
func (d *Dialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
return d.NextDialer(addr).DialUDP(network, addr)
}
// NextDialer returns the next dialer
func (d *Dialer) NextDialer(dstAddr string) proxy.Dialer {
return d.nextForwarder(dstAddr)
}
// Index returns the active forwarder's Index of rrDialer
func (d *Dialer) Index() int32 { return atomic.LoadInt32(&d.index) }
// SetIndex sets the active forwarder's Index of rrDialer
func (d *Dialer) SetIndex(p int32) { atomic.StoreInt32(&d.index, p) }
// IncIndex increase the index by 1
func (d *Dialer) IncIndex() int32 { return atomic.AddInt32(&d.index, 1) }
// Priority returns the active priority of rrDialer
func (d *Dialer) Priority() uint32 { return atomic.LoadUint32(&d.priority) }
// SetPriority sets the active priority of rrDialer
func (d *Dialer) SetPriority(p uint32) { atomic.StoreUint32(&d.priority, p) }
// OnStatusChanged will be called when fwdr's status changed
func (d *Dialer) OnStatusChanged(fwdr *proxy.Forwarder) {
d.mu.Lock()
defer d.mu.Unlock()
if fwdr.Enabled() {
if fwdr.Priority() == d.Priority() {
d.valid = append(d.valid, fwdr)
} else if fwdr.Priority() > d.Priority() {
d.SetPriority(fwdr.Priority())
d.valid = nil
for _, f := range d.fwdrs {
if f.Enabled() && f.Priority() >= d.Priority() {
d.valid = append(d.valid, f)
}
}
}
}
if !fwdr.Enabled() {
for i, f := range d.valid {
if f == fwdr {
d.valid[i], d.valid = d.valid[len(d.valid)-1], d.valid[:len(d.valid)-1]
break
}
}
}
if len(d.valid) == 0 {
d.valid = append(d.valid, d.fwdrs[0])
}
}
// Check implements the Checker interface
func (d *Dialer) Check() {
for i := 0; i < len(d.fwdrs); i++ {
go d.check(i)
}
}
func (d *Dialer) check(i int) {
f := d.fwdrs[i]
retry := 1
buf := make([]byte, 4)
for {
time.Sleep(time.Duration(d.config.CheckInterval) * time.Second * time.Duration(retry>>1))
retry <<= 1
if retry > 16 {
retry = 16
}
if f.Priority() < d.Priority() {
continue
}
startTime := time.Now()
rc, err := f.Dial("tcp", d.config.CheckWebSite)
if err != nil {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority(), d.config.CheckWebSite, err)
continue
}
rc.Write([]byte("GET / HTTP/1.0\r\n\r\n"))
_, err = io.ReadFull(rc, buf)
if err != nil {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority(), d.config.CheckWebSite, err)
} else if bytes.Equal([]byte("HTTP"), buf) {
f.Enable()
retry = 2
readTime := time.Since(startTime)
f.SetLatency(int64(readTime))
log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority(), d.config.CheckWebSite, readTime.String())
} else {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority(), d.config.CheckWebSite, buf)
}
rc.Close()
}
}