strategy: support priority now (need to check)

This commit is contained in:
nadoo 2018-08-12 18:50:44 +08:00
parent a46ab20901
commit e1c318990b
3 changed files with 44 additions and 30 deletions

View File

@ -72,7 +72,7 @@ func main() {
// Proxy Servers // Proxy Servers
for _, listen := range conf.Listen { for _, listen := range conf.Listen {
local, err := proxy.ServerFromURL(listen, proxy.NewForwarder(dialer)) local, err := proxy.ServerFromURL(listen, dialer)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }

View File

@ -6,6 +6,8 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
"github.com/nadoo/glider/common/log"
) )
// Forwarder is a forwarder // Forwarder is a forwarder
@ -70,7 +72,7 @@ func (f *Forwarder) Dial(network, addr string) (c net.Conn, err error) {
// TODO: proxy timeout, target timeout? // TODO: proxy timeout, target timeout?
if err != nil { if err != nil {
atomic.AddUint32(&f.failures, 1) atomic.AddUint32(&f.failures, 1)
// log.F("forward dial failed, %d", f.failures) log.F("forward dial failed, %d, addr: %s", f.failures, f.addr)
} }
return c, err return c, err
@ -83,13 +85,12 @@ func (f *Forwarder) Failures() uint32 {
// Enable . // Enable .
func (f *Forwarder) Enable() { func (f *Forwarder) Enable() {
atomic.StoreUint32(&f.failures, 0) atomic.StoreUint32(&f.disabled, 0)
atomic.StoreUint32(&f.failures, 0)
} }
// Disable . // Disable .
func (f *Forwarder) Disable() { func (f *Forwarder) Disable() {
atomic.StoreUint32(&f.failures, 1) atomic.StoreUint32(&f.disabled, 1)
} }
// Enabled . // Enabled .

View File

@ -4,8 +4,8 @@ import (
"bytes" "bytes"
"io" "io"
"net" "net"
"sort"
"strings" "strings"
"sync"
"time" "time"
"github.com/nadoo/glider/common/log" "github.com/nadoo/glider/common/log"
@ -55,12 +55,17 @@ func NewDialer(s []string, c *Config) proxy.Dialer {
return dialer return dialer
} }
// rrDialer is the base struct of strategy dialer type forwarderSlice []*proxy.Forwarder
type rrDialer struct {
fwdrs []*proxy.Forwarder
idx int
status sync.Map func (p forwarderSlice) Len() int { return len(p) }
func (p forwarderSlice) Less(i, j int) bool { return p[i].Priority > p[j].Priority }
func (p forwarderSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// rrDialer is a rr dialer
type rrDialer struct {
fwdrs forwarderSlice
idx int
priority int
// for checking // for checking
website string website string
@ -68,14 +73,16 @@ type rrDialer struct {
} }
// newRRDialer returns a new rrDialer // newRRDialer returns a new rrDialer
func newRRDialer(fwdrs []*proxy.Forwarder, website string, interval int) *rrDialer { func newRRDialer(fs []*proxy.Forwarder, website string, interval int) *rrDialer {
rr := &rrDialer{fwdrs: fwdrs} rr := &rrDialer{fwdrs: fs}
rr.website = website rr.website = website
rr.interval = interval rr.interval = interval
for k := range fwdrs { sort.Sort(rr.fwdrs)
rr.status.Store(k, true) rr.priority = rr.fwdrs[0].Priority
for k := range rr.fwdrs {
log.F("k: %d, %s, priority: %d", k, rr.fwdrs[k].Addr(), rr.fwdrs[k].Priority)
go rr.checkDialer(k) go rr.checkDialer(k)
} }
@ -97,12 +104,24 @@ func (rr *rrDialer) nextDialer(dstAddr string) *proxy.Forwarder {
rr.idx = 0 rr.idx = 0
} }
for _, fwder := range rr.fwdrs {
if fwder.Enabled() {
rr.priority = fwder.Priority
break
}
}
if rr.fwdrs[rr.idx].Priority < rr.priority {
rr.idx = 0
}
found := false found := false
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
rr.idx = (rr.idx + 1) % n rr.idx = (rr.idx + 1) % n
result, ok := rr.status.Load(rr.idx) if rr.fwdrs[rr.idx].Enabled() &&
if ok && result.(bool) { rr.fwdrs[rr.idx].Priority >= rr.priority {
found = true found = true
rr.priority = rr.fwdrs[rr.idx].Priority
break break
} }
} }
@ -140,7 +159,7 @@ func (rr *rrDialer) checkDialer(idx int) {
startTime := time.Now() startTime := time.Now()
c, err := d.Dial("tcp", rr.website) c, err := d.Dial("tcp", rr.website)
if err != nil { if err != nil {
rr.status.Store(idx, false) rr.fwdrs[idx].Disable()
log.F("[check] %s -> %s, set to DISABLED. error in dial: %s", d.Addr(), rr.website, err) log.F("[check] %s -> %s, set to DISABLED. error in dial: %s", d.Addr(), rr.website, err)
continue continue
} }
@ -149,15 +168,15 @@ func (rr *rrDialer) checkDialer(idx int) {
_, err = io.ReadFull(c, buf) _, err = io.ReadFull(c, buf)
if err != nil { if err != nil {
rr.status.Store(idx, false) rr.fwdrs[idx].Disable()
log.F("[check] %s -> %s, set to DISABLED. error in read: %s", d.Addr(), rr.website, err) log.F("[check] %s -> %s, set to DISABLED. error in read: %s", d.Addr(), rr.website, err)
} else if bytes.Equal([]byte("HTTP"), buf) { } else if bytes.Equal([]byte("HTTP"), buf) {
rr.status.Store(idx, true) rr.fwdrs[idx].Enable()
retry = 2 retry = 2
dialTime := time.Since(startTime) dialTime := time.Since(startTime)
log.F("[check] %s -> %s, set to ENABLED. connect time: %s", d.Addr(), rr.website, dialTime.String()) log.F("[check] %s -> %s, set to ENABLED. connect time: %s", d.Addr(), rr.website, dialTime.String())
} else { } else {
rr.status.Store(idx, false) rr.fwdrs[idx].Disable()
log.F("[check] %s -> %s, set to DISABLED. server response: %s", d.Addr(), rr.website, buf) log.F("[check] %s -> %s, set to DISABLED. server response: %s", d.Addr(), rr.website, buf)
} }
@ -177,22 +196,16 @@ func newHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy
func (ha *haDialer) Dial(network, addr string) (net.Conn, error) { func (ha *haDialer) Dial(network, addr string) (net.Conn, error) {
d := ha.fwdrs[ha.idx] d := ha.fwdrs[ha.idx]
if !d.Enabled() {
result, ok := ha.status.Load(ha.idx)
if ok && !result.(bool) {
d = ha.nextDialer(addr) d = ha.nextDialer(addr)
} }
return d.Dial(network, addr) return d.Dial(network, addr)
} }
func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) { func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
d := ha.fwdrs[ha.idx] d := ha.fwdrs[ha.idx]
if !d.Enabled() {
result, ok := ha.status.Load(ha.idx)
if ok && !result.(bool) {
d = ha.nextDialer(addr) d = ha.nextDialer(addr)
} }
return d.DialUDP(network, addr) return d.DialUDP(network, addr)
} }