glider/rule/group.go

319 lines
8.3 KiB
Go

package rule
import (
"errors"
"hash/fnv"
"net"
"net/url"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nadoo/glider/pkg/log"
"github.com/nadoo/glider/proxy"
)
// forwarder slice orderd by priority.
type priSlice []*Forwarder
func (p priSlice) Len() int { return len(p) }
func (p priSlice) Less(i, j int) bool { return p[i].Priority() > p[j].Priority() }
func (p priSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
// FwdrGroup is a forwarder group.
type FwdrGroup struct {
name string
config *Strategy
fwdrs priSlice
avail []*Forwarder // available forwarders
mu sync.RWMutex
index uint32
priority uint32
next func(addr string) *Forwarder
}
// NewFwdrGroup returns a new forward group.
func NewFwdrGroup(rulePath string, s []string, c *Strategy) *FwdrGroup {
var fwdrs []*Forwarder
for _, chain := range s {
fwdr, err := ForwarderFromURL(chain, c.IntFace,
time.Duration(c.DialTimeout)*time.Second, time.Duration(c.RelayTimeout)*time.Second)
if err != nil {
log.Fatal(err)
}
fwdr.SetMaxFailures(uint32(c.MaxFailures))
fwdrs = append(fwdrs, fwdr)
}
if len(fwdrs) == 0 {
// direct forwarder
direct, err := DirectForwarder(c.IntFace,
time.Duration(c.DialTimeout)*time.Second, time.Duration(c.RelayTimeout)*time.Second)
if err != nil {
log.Fatal(err)
}
fwdrs = append(fwdrs, direct)
c.Strategy = "rr"
}
name := strings.TrimSuffix(filepath.Base(rulePath), filepath.Ext(rulePath))
return newFwdrGroup(name, fwdrs, c)
}
// newFwdrGroup returns a new FwdrGroup.
func newFwdrGroup(name string, fwdrs []*Forwarder, c *Strategy) *FwdrGroup {
p := &FwdrGroup{name: name, fwdrs: fwdrs, config: c}
sort.Sort(p.fwdrs)
p.init()
// default scheduler
p.next = p.scheduleRR
// if there're more than 1 forwarders, we care about the strategy.
if count := len(fwdrs); count > 1 {
switch c.Strategy {
case "rr":
p.next = p.scheduleRR
log.F("[strategy] %s: %d forwarders forward in round robin mode.", name, count)
case "ha":
p.next = p.scheduleHA
log.F("[strategy] %s: %d forwarders forward in high availability mode.", name, count)
case "lha":
p.next = p.scheduleLHA
log.F("[strategy] %s: %d forwarders forward in latency based high availability mode.", name, count)
case "dh":
p.next = p.scheduleDH
log.F("[strategy] %s: %d forwarders forward in destination hashing mode.", name, count)
default:
p.next = p.scheduleRR
log.F("[strategy] %s: not supported forward mode '%s', use round robin mode for %d forwarders.", name, c.Strategy, count)
}
}
for _, f := range fwdrs {
f.AddHandler(p.onStatusChanged)
}
return p
}
// Dial connects to the address addr on the network net.
func (p *FwdrGroup) Dial(network, addr string) (net.Conn, proxy.Dialer, error) {
nd := p.NextDialer(addr)
c, err := nd.Dial(network, addr)
return c, nd, err
}
// DialUDP connects to the given address.
func (p *FwdrGroup) DialUDP(network, addr string) (pc net.PacketConn, dialer proxy.UDPDialer, err error) {
nd := p.NextDialer(addr)
pc, err = nd.DialUDP(network, addr)
return pc, nd, err
}
// NextDialer returns the next dialer.
func (p *FwdrGroup) NextDialer(dstAddr string) proxy.Dialer {
p.mu.RLock()
defer p.mu.RUnlock()
if len(p.avail) == 0 {
return p.fwdrs[atomic.AddUint32(&p.index, 1)%uint32(len(p.fwdrs))]
}
return p.next(dstAddr)
}
// Priority returns the active priority of dialer.
func (p *FwdrGroup) Priority() uint32 { return atomic.LoadUint32(&p.priority) }
// SetPriority sets the active priority of daler.
func (p *FwdrGroup) SetPriority(pri uint32) { atomic.StoreUint32(&p.priority, pri) }
// init traverse d.fwdrs and init the available forwarder slice.
func (p *FwdrGroup) init() {
for _, f := range p.fwdrs {
if f.Enabled() {
p.SetPriority(f.Priority())
break
}
}
p.avail = nil
for _, f := range p.fwdrs {
if f.Enabled() && f.Priority() >= p.Priority() {
p.avail = append(p.avail, f)
}
}
if len(p.avail) == 0 {
// no available forwarders, set priority to 0 to check all forwarders in check func
p.SetPriority(0)
// log.F("[group] no available forwarders, please check your config file or network settings")
}
}
// onStatusChanged will be called when fwdr's status changed.
func (p *FwdrGroup) onStatusChanged(fwdr *Forwarder) {
p.mu.Lock()
defer p.mu.Unlock()
if fwdr.Enabled() {
if fwdr.Priority() == p.Priority() {
p.avail = append(p.avail, fwdr)
} else if fwdr.Priority() > p.Priority() {
p.init()
}
log.F("[group] %s: %s(%d) changed status from DISABLED to ENABLED (%d of %d currently enabled)",
p.name, fwdr.Addr(), fwdr.Priority(), len(p.avail), len(p.fwdrs))
} else {
for i, f := range p.avail {
if f == fwdr {
p.avail[i], p.avail = p.avail[len(p.avail)-1], p.avail[:len(p.avail)-1]
break
}
}
log.F("[group] %s: %s(%d) changed status from ENABLED to DISABLED (%d of %d currently enabled)",
p.name, fwdr.Addr(), fwdr.Priority(), len(p.avail), len(p.fwdrs))
}
if len(p.avail) == 0 {
p.init()
}
}
// Check runs the forwarder checks.
func (p *FwdrGroup) Check() {
if len(p.fwdrs) == 1 {
log.F("[group] %s: only 1 forwarder found, disable health checking", p.name)
return
}
if !strings.Contains(p.config.Check, "://") {
p.config.Check += "://"
}
u, err := url.Parse(p.config.Check)
if err != nil {
log.F("[group] %s: parse check config error: %s, disable health checking", p.name, err)
return
}
addr := u.Host
timeout := time.Duration(p.config.CheckTimeout) * time.Second
var checker Checker
switch u.Scheme {
case "tcp":
checker = newTcpChecker(addr, timeout)
case "http", "https":
expect := "HTTP" // default: check the first 4 chars in response
params, _ := url.ParseQuery(u.Fragment)
if ex := params.Get("expect"); ex != "" {
expect = ex
}
checker = newHttpChecker(addr, u.RequestURI(), expect, timeout, u.Scheme == "https")
case "file":
checker = newFileChecker(u.Host + u.Path)
default:
log.F("[group] %s: unknown scheme in check config `%s`, disable health checking", p.name, p.config.Check)
return
}
log.F("[group] %s: using check config: %s", p.name, p.config.Check)
for i := 0; i < len(p.fwdrs); i++ {
go p.check(p.fwdrs[i], checker)
}
}
func (p *FwdrGroup) check(fwdr *Forwarder, checker Checker) {
wait := uint8(0)
intval := time.Duration(p.config.CheckInterval) * time.Second
for {
time.Sleep(intval * time.Duration(wait))
// check all forwarders at least one time
if wait > 0 && (fwdr.Priority() < p.Priority()) {
continue
}
if fwdr.Enabled() && p.config.CheckDisabledOnly {
continue
}
elapsed, err := checker.Check(fwdr)
if err != nil {
if errors.Is(err, proxy.ErrNotSupported) {
fwdr.SetMaxFailures(0)
log.F("[check] %s: %s(%d), %s, stop checking", p.name, fwdr.Addr(), fwdr.Priority(), err)
fwdr.Enable()
break
}
wait++
if wait > 16 {
wait = 16
}
log.F("[check] %s: %s(%d), FAILED. error: %s", p.name, fwdr.Addr(), fwdr.Priority(), err)
fwdr.Disable()
continue
}
wait = 1
p.setLatency(fwdr, elapsed)
log.F("[check] %s: %s(%d), SUCCESS. Elapsed: %dms, Latency: %dms.",
p.name, fwdr.Addr(), fwdr.Priority(), elapsed.Milliseconds(), time.Duration(fwdr.Latency()).Milliseconds())
fwdr.Enable()
}
}
func (p *FwdrGroup) setLatency(fwdr *Forwarder, elapsed time.Duration) {
newLatency := int64(elapsed)
if cnt := p.config.CheckLatencySamples; cnt > 1 {
if lastLatency := fwdr.Latency(); lastLatency > 0 {
newLatency = (lastLatency*(int64(cnt)-1) + int64(elapsed)) / int64(cnt)
}
}
fwdr.SetLatency(newLatency)
}
// Round Robin.
func (p *FwdrGroup) scheduleRR(dstAddr string) *Forwarder {
return p.avail[atomic.AddUint32(&p.index, 1)%uint32(len(p.avail))]
}
// High Availability.
func (p *FwdrGroup) scheduleHA(dstAddr string) *Forwarder {
return p.avail[0]
}
// Latency based High Availability.
func (p *FwdrGroup) scheduleLHA(dstAddr string) *Forwarder {
oldfwdr, newfwdr := p.avail[0], p.avail[0]
lowest := oldfwdr.Latency()
for _, f := range p.avail {
if f.Latency() < lowest {
lowest = f.Latency()
newfwdr = f
}
}
tolerance := int64(p.config.CheckTolerance) * int64(time.Millisecond)
if newfwdr.Latency() < (oldfwdr.Latency() - tolerance) {
return newfwdr
}
return oldfwdr
}
// Destination Hashing.
func (p *FwdrGroup) scheduleDH(dstAddr string) *Forwarder {
fnv1a := fnv.New32a()
fnv1a.Write([]byte(dstAddr))
return p.avail[fnv1a.Sum32()%uint32(len(p.avail))]
}