glider/strategy/rr.go
2018-08-24 01:15:56 +08:00

162 lines
3.8 KiB
Go

package strategy
import (
"bytes"
"io"
"net"
"sort"
"strings"
"sync/atomic"
"time"
"github.com/nadoo/glider/common/log"
"github.com/nadoo/glider/proxy"
)
// forwarder slice orderd by priority
type priSlice []*proxy.Forwarder
func (p priSlice) Len() int { return len(p) }
func (p priSlice) Less(i, j int) bool { return p[i].Priority() > p[j].Priority() }
func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// rrDialer is a round robin dialer
type rrDialer struct {
fwdrs priSlice
index int32
priority uint32
website string
interval int
}
// newRRDialer returns a new rrDialer
func newRRDialer(fwdrs []*proxy.Forwarder, website string, interval int) *rrDialer {
rr := &rrDialer{fwdrs: fwdrs}
sort.Sort(rr.fwdrs)
rr.website = website
if strings.IndexByte(rr.website, ':') == -1 {
rr.website += ":80"
}
rr.interval = interval
return rr
}
func (rr *rrDialer) Addr() string { return "STRATEGY" }
func (rr *rrDialer) Dial(network, addr string) (net.Conn, error) {
return rr.NextDialer(addr).Dial(network, addr)
}
func (rr *rrDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
return rr.NextDialer(addr).DialUDP(network, addr)
}
func (rr *rrDialer) NextDialer(dstAddr string) proxy.Dialer { return rr.nextDialer(dstAddr) }
func (rr *rrDialer) nextDialer(dstAddr string) *proxy.Forwarder {
n := int32(len(rr.fwdrs))
if n == 1 {
return rr.fwdrs[0]
}
for _, fwder := range rr.fwdrs {
if fwder.Enabled() {
rr.SetPriority(fwder.Priority())
break
}
}
idx := rr.Index()
if rr.fwdrs[idx].Priority() < rr.Priority() {
idx = 0
}
found := false
var i int32
for i = 0; i < n; i++ {
idx = (idx + 1) % n
if rr.fwdrs[idx].Enabled() &&
rr.fwdrs[idx].Priority() >= rr.Priority() {
found = true
rr.SetPriority(rr.fwdrs[idx].Priority())
break
}
}
if !found {
rr.SetPriority(0)
log.F("NO AVAILABLE PROXY FOUND! please check your network or proxy server settings.")
}
rr.SetIndex(idx)
return rr.fwdrs[idx]
}
// Index returns the active forwarder's Index of rrDialer
func (rr *rrDialer) Index() int32 { return atomic.LoadInt32(&rr.index) }
// SetIndex sets the active forwarder's Index of rrDialer
func (rr *rrDialer) SetIndex(p int32) { atomic.StoreInt32(&rr.index, p) }
// Priority returns the active priority of rrDialer
func (rr *rrDialer) Priority() uint32 { return atomic.LoadUint32(&rr.priority) }
// SetPriority sets the active priority of rrDialer
func (rr *rrDialer) SetPriority(p uint32) { atomic.StoreUint32(&rr.priority, p) }
// Check implements the Checker interface
func (rr *rrDialer) Check() {
for i := 0; i < len(rr.fwdrs); i++ {
go rr.check(i)
}
}
func (rr *rrDialer) check(i int) {
f := rr.fwdrs[i]
retry := 1
buf := make([]byte, 4)
for {
time.Sleep(time.Duration(rr.interval) * time.Second * time.Duration(retry>>1))
retry <<= 1
if retry > 16 {
retry = 16
}
if f.Priority() < rr.Priority() {
continue
}
startTime := time.Now()
rc, err := f.Dial("tcp", rr.website)
if err != nil {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority(), rr.website, err)
continue
}
rc.Write([]byte("GET / HTTP/1.0\r\n\r\n"))
_, err = io.ReadFull(rc, buf)
if err != nil {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority(), rr.website, err)
} else if bytes.Equal([]byte("HTTP"), buf) {
f.Enable()
retry = 2
readTime := time.Since(startTime)
f.SetLatency(int64(readTime))
log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority(), rr.website, readTime.String())
} else {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority(), rr.website, buf)
}
rc.Close()
}
}