strategy: experimental latency based high availability mode supported

This commit is contained in:
nadoo 2018-08-14 19:33:18 +08:00
parent f4eab4d1b2
commit e82ea75cba
8 changed files with 173 additions and 71 deletions

View File

@ -43,7 +43,7 @@ func confInit() {
flag.StringVar(&conf.StrategyConfig.CheckWebSite, "checkwebsite", "www.apple.com", "proxy check HTTP(NOT HTTPS) website address, format: HOST[:PORT], default port: 80")
// TODO: change to checkinterval
flag.IntVar(&conf.StrategyConfig.CheckInterval, "checkduration", 30, "proxy check interval(seconds)")
flag.IntVar(&conf.StrategyConfig.MaxFailures, "maxfailures", 3, "max failures to change status to disabled")
flag.IntVar(&conf.StrategyConfig.MaxFailures, "maxfailures", 3, "max failures to change forwarder status to disabled")
flag.StringSliceUniqVar(&conf.RuleFile, "rulefile", nil, "rule file path")
flag.StringVar(&conf.RulesDir, "rules-dir", "", "rule file folder")

View File

@ -4,6 +4,7 @@ import (
"encoding/binary"
"io"
"net"
"sync"
"time"
"github.com/nadoo/glider/common/log"
@ -31,15 +32,19 @@ func NewServer(addr string, dialer proxy.Dialer, config *Config) (*Server, error
return s, err
}
// ListenAndServe .
func (s *Server) ListenAndServe() {
go s.ListenAndServeTCP()
s.ListenAndServeUDP()
// Start .
func (s *Server) Start() {
var wg sync.WaitGroup
wg.Add(2)
go s.ListenAndServeTCP(&wg)
go s.ListenAndServeUDP(&wg)
wg.Wait()
}
// ListenAndServeUDP .
func (s *Server) ListenAndServeUDP() {
func (s *Server) ListenAndServeUDP(wg *sync.WaitGroup) {
c, err := net.ListenPacket("udp", s.addr)
wg.Done()
if err != nil {
log.F("[dns] failed to listen on %s, error: %v", s.addr, err)
return
@ -82,12 +87,14 @@ func (s *Server) ListenAndServeUDP() {
}
// ListenAndServeTCP .
func (s *Server) ListenAndServeTCP() {
func (s *Server) ListenAndServeTCP(wg *sync.WaitGroup) {
l, err := net.Listen("tcp", s.addr)
wg.Done()
if err != nil {
log.F("[dns]-tcp error: %v", err)
return
}
defer l.Close()
log.F("[dns]-tcp listening TCP on %s", s.addr)

View File

@ -69,8 +69,8 @@ const (
var nextSeqNr uint32
var nativeEndian binary.ByteOrder
// IPSetManager struct
type IPSetManager struct {
// Manager struct
type Manager struct {
fd int
lsa syscall.SockaddrNetlink
@ -78,8 +78,8 @@ type IPSetManager struct {
domainSet sync.Map
}
// NewIPSetManager returns a IPSetManager
func NewIPSetManager(mainSet string, rules []*rule.Config) (*IPSetManager, error) {
// NewManager returns a Manager
func NewManager(mainSet string, rules []*rule.Config) (*Manager, error) {
fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW, syscall.NETLINK_NETFILTER)
if err != nil {
log.F("%s", err)
@ -96,7 +96,7 @@ func NewIPSetManager(mainSet string, rules []*rule.Config) (*IPSetManager, error
return nil, err
}
m := &IPSetManager{fd: fd, lsa: lsa, mainSet: mainSet}
m := &Manager{fd: fd, lsa: lsa, mainSet: mainSet}
CreateSet(fd, lsa, mainSet)
for _, r := range rules {
@ -133,7 +133,7 @@ func NewIPSetManager(mainSet string, rules []*rule.Config) (*IPSetManager, error
}
// AddDomainIP implements the DNSAnswerHandler function, used to update ipset according to domainSet rule
func (m *IPSetManager) AddDomainIP(domain, ip string) error {
func (m *Manager) AddDomainIP(domain, ip string) error {
if ip != "" {
domainParts := strings.Split(domain, ".")
length := len(domainParts)

View File

@ -8,15 +8,15 @@ import (
"github.com/nadoo/glider/rule"
)
// IPSetManager struct
type IPSetManager struct{}
// Manager struct
type Manager struct{}
// NewIPSetManager returns a IPSetManager
func NewIPSetManager(mainSet string, rules []*rule.Config) (*IPSetManager, error) {
// NewManager returns a Manager
func NewManager(mainSet string, rules []*rule.Config) (*Manager, error) {
return nil, errors.New("ipset not supported on this os")
}
// AddDomainIP implements the DNSAnswerHandler function
func (m *IPSetManager) AddDomainIP(domain, ip string) error {
func (m *Manager) AddDomainIP(domain, ip string) error {
return errors.New("ipset not supported on this os")
}

15
main.go
View File

@ -30,17 +30,23 @@ import (
const VERSION = "0.6.7"
func main() {
// read configs
confInit()
// setup a log func
log.F = func(f string, v ...interface{}) {
if conf.Verbose {
stdlog.Printf(f, v...)
}
}
// global rule dialer
dialer := rule.NewDialer(conf.rules, strategy.NewDialer(conf.Forward, &conf.StrategyConfig))
ipsetM, _ := ipset.NewIPSetManager(conf.IPSet, conf.rules)
// DNS Server
// ipset manager
ipsetM, _ := ipset.NewManager(conf.IPSet, conf.rules)
// check and setup dns server
if conf.DNS != "" {
d, err := dns.NewServer(conf.DNS, dialer, &conf.DNSConfig)
if err != nil {
@ -62,9 +68,12 @@ func main() {
d.AddHandler(ipsetM.AddDomainIP)
}
go d.ListenAndServe()
d.Start()
}
// enable checkers
dialer.Check()
// Proxy Servers
for _, listen := range conf.Listen {
local, err := proxy.ServerFromURL(listen, dialer)

View File

@ -18,10 +18,10 @@ type Forwarder struct {
disabled uint32
failures uint32
MaxFailures uint32 //maxfailures to set to Disabled
latency int
latency int64
}
// ForwarderFromURL parses `forward=` command line and returns a new forwarder
// ForwarderFromURL parses `forward=` command value and returns a new forwarder
func ForwarderFromURL(s string) (f *Forwarder, err error) {
ss := strings.Split(s, "#")
var d Dialer
@ -71,7 +71,6 @@ func (f *Forwarder) Dial(network, addr string) (c net.Conn, err error) {
c, err = f.Dialer.Dial(network, addr)
if err != nil {
atomic.AddUint32(&f.failures, 1)
log.F("[forwarder] %s, dials %s, error:%s", f.addr, addr, err)
if f.Failures() >= f.MaxFailures {
f.Disable()
log.F("[forwarder] %s reaches maxfailures, set to disabled", f.addr)
@ -105,3 +104,13 @@ func (f *Forwarder) Enabled() bool {
func isTrue(n uint32) bool {
return n&1 == 1
}
// Latency returns the latency of forwarder
func (f *Forwarder) Latency() int64 {
return atomic.LoadInt64(&f.latency)
}
// SetLatency sets the latency of forwarder
func (f *Forwarder) SetLatency(l int64) {
atomic.StoreInt64(&f.latency, l)
}

View File

@ -13,6 +13,7 @@ import (
// Dialer struct
type Dialer struct {
gDialer proxy.Dialer
dialers []proxy.Dialer
domainMap sync.Map
ipMap sync.Map
@ -25,6 +26,7 @@ func NewDialer(rules []*Config, gDialer proxy.Dialer) *Dialer {
for _, r := range rules {
sDialer := strategy.NewDialer(r.Forward, &r.StrategyConfig)
rd.dialers = append(rd.dialers, sDialer)
for _, domain := range r.Domain {
rd.domainMap.Store(strings.ToLower(domain), sDialer)
@ -123,3 +125,16 @@ func (rd *Dialer) AddDomainIP(domain, ip string) error {
}
return nil
}
// Check .
func (rd *Dialer) Check() {
if checker, ok := rd.gDialer.(strategy.Checker); ok {
checker.Check()
}
for _, d := range rd.dialers {
if checker, ok := d.(strategy.Checker); ok {
checker.Check()
}
}
}

View File

@ -20,9 +20,13 @@ type Config struct {
MaxFailures int
}
// Checker is an interface of forwarder checker
type Checker interface {
Check()
}
// NewDialer returns a new strategy dialer
func NewDialer(s []string, c *Config) proxy.Dialer {
// global forwarders in xx.conf
var fwdrs []*proxy.Forwarder
for _, chain := range s {
fwdr, err := proxy.ForwarderFromURL(chain)
@ -49,6 +53,9 @@ func NewDialer(s []string, c *Config) proxy.Dialer {
case "ha":
dialer = newHADialer(fwdrs, c.CheckWebSite, c.CheckInterval)
log.F("forward to remote servers in high availability mode.")
case "lha":
dialer = newLHADialer(fwdrs, c.CheckWebSite, c.CheckInterval)
log.F("forward to remote servers in latency based high availability mode.")
default:
log.F("not supported forward mode '%s', just use the first forward server.", c.Strategy)
dialer = fwdrs[0]
@ -57,15 +64,20 @@ func NewDialer(s []string, c *Config) proxy.Dialer {
return dialer
}
type forwarderSlice []*proxy.Forwarder
// slice orderd by priority
type priSlice []*proxy.Forwarder
func (p forwarderSlice) Len() int { return len(p) }
func (p forwarderSlice) Less(i, j int) bool { return p[i].Priority > p[j].Priority }
func (p forwarderSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p priSlice) Len() int { return len(p) }
func (p priSlice) Less(i, j int) bool { return p[i].Priority > p[j].Priority }
func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// rrDialer is a rr dialer
// rrDialer is a round robin dialer
// 1. find the highest priority which there's at least 1 dialer is enabled
// 2. choose a enabled dialer in that priority using round robin mode
type rrDialer struct {
fwdrs forwarderSlice
fwdrs priSlice
// may have data races, but doesn't matter
idx int
priority int
@ -77,6 +89,7 @@ type rrDialer struct {
// newRRDialer returns a new rrDialer
func newRRDialer(fs []*proxy.Forwarder, website string, interval int) *rrDialer {
rr := &rrDialer{fwdrs: fs}
sort.Sort(rr.fwdrs)
rr.website = website
if strings.IndexByte(rr.website, ':') == -1 {
@ -84,14 +97,8 @@ func newRRDialer(fs []*proxy.Forwarder, website string, interval int) *rrDialer
}
rr.interval = interval
sort.Sort(rr.fwdrs)
rr.priority = rr.fwdrs[0].Priority
for k := range rr.fwdrs {
go rr.checkDialer(k)
}
return rr
}
@ -143,55 +150,64 @@ func (rr *rrDialer) NextDialer(dstAddr string) proxy.Dialer {
return rr.nextDialer(dstAddr)
}
// Check implements the Checker interface
func (rr *rrDialer) Check() {
for _, f := range rr.fwdrs {
go rr.checkDialer(f)
}
}
// Check dialer
func (rr *rrDialer) checkDialer(idx int) {
func (rr *rrDialer) checkDialer(f *proxy.Forwarder) {
retry := 1
buf := make([]byte, 4)
d := rr.fwdrs[idx]
for {
time.Sleep(time.Duration(rr.interval) * time.Second * time.Duration(retry>>1))
// check forwarders whose priority not less than current priority only
if d.Priority < rr.priority {
continue
}
retry <<= 1
if retry > 16 {
retry = 16
}
startTime := time.Now()
c, err := d.Dial("tcp", rr.website)
if err != nil {
rr.fwdrs[idx].Disable()
log.F("[check] %s -> %s, set to DISABLED. error in dial: %s", d.Addr(), rr.website, err)
// check forwarders whose priority not less than current priority only
if f.Priority < rr.priority {
// log.F("f.Priority:%d, rr.priority:%d", f.Priority, rr.priority)
continue
}
c.Write([]byte("GET / HTTP/1.0\r\n\r\n"))
_, err = io.ReadFull(c, buf)
startTime := time.Now()
rc, err := f.Dial("tcp", rr.website)
if err != nil {
rr.fwdrs[idx].Disable()
log.F("[check] %s -> %s, set to DISABLED. error in read: %s", d.Addr(), rr.website, err)
} else if bytes.Equal([]byte("HTTP"), buf) {
rr.fwdrs[idx].Enable()
retry = 2
dialTime := time.Since(startTime)
log.F("[check] %s -> %s, set to ENABLED. connect time: %s", d.Addr(), rr.website, dialTime.String())
} else {
rr.fwdrs[idx].Disable()
log.F("[check] %s -> %s, set to DISABLED. server response: %s", d.Addr(), rr.website, buf)
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. error in dial: %s", f.Addr(), f.Priority, rr.website, err)
continue
}
c.Close()
rc.Write([]byte("GET / HTTP/1.0\r\n\r\n"))
_, err = io.ReadFull(rc, buf)
if err != nil {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. error in read: %s", f.Addr(), f.Priority, rr.website, err)
} else if bytes.Equal([]byte("HTTP"), buf) {
f.Enable()
retry = 2
readTime := time.Since(startTime)
f.SetLatency(int64(readTime))
log.F("[check] %s(%d) -> %s, ENABLED. connect time: %s", f.Addr(), f.Priority, rr.website, readTime.String())
} else {
f.Disable()
log.F("[check] %s(%d) -> %s, DISABLED. server response: %s", f.Addr(), f.Priority, rr.website, buf)
}
rc.Close()
}
}
// high availability proxy
// high availability forwarder
// 1. choose dialer whose priority is the highest
// 2. choose the first enabled dialer in that priority
type haDialer struct {
*rrDialer
}
@ -201,18 +217,64 @@ func newHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy
return &haDialer{rrDialer: newRRDialer(dialers, webhost, duration)}
}
func (ha *haDialer) Dial(network, addr string) (net.Conn, error) {
func (ha *haDialer) nextDialer(dstAddr string) *proxy.Forwarder {
d := ha.fwdrs[ha.idx]
if !d.Enabled() {
d = ha.nextDialer(addr)
d = ha.nextDialer(dstAddr)
}
return d
}
func (ha *haDialer) Dial(network, addr string) (net.Conn, error) {
d := ha.nextDialer(addr)
return d.Dial(network, addr)
}
func (ha *haDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
d := ha.fwdrs[ha.idx]
if !d.Enabled() {
d = ha.nextDialer(addr)
}
d := ha.nextDialer(addr)
return d.DialUDP(network, addr)
}
// high availability forwarder
// 1. choose dialer whose priority is the highest
// 2. choose dialer whose letency it the lowest
type lhaDialer struct {
*rrDialer
}
// newLHADialer .
func newLHADialer(dialers []*proxy.Forwarder, webhost string, duration int) proxy.Dialer {
return &lhaDialer{rrDialer: newRRDialer(dialers, webhost, duration)}
}
func (lha *lhaDialer) nextDialer(dstAddr string) *proxy.Forwarder {
var latency int64
var d *proxy.Forwarder
for _, fwder := range lha.fwdrs {
if fwder.Enabled() {
lha.priority = fwder.Priority
latency = fwder.Latency()
d = fwder
break
}
}
for _, fwder := range lha.fwdrs {
if fwder.Enabled() && fwder.Priority >= lha.priority && fwder.Latency() < latency {
latency = fwder.Latency()
d = fwder
}
}
return d
}
func (lha *lhaDialer) Dial(network, addr string) (net.Conn, error) {
d := lha.nextDialer(addr)
return d.Dial(network, addr)
}
func (lha *lhaDialer) DialUDP(network, addr string) (pc net.PacketConn, writeTo net.Addr, err error) {
d := lha.nextDialer(addr)
return d.DialUDP(network, addr)
}