glider/strategy/forward.go

187 lines
3.9 KiB
Go
Raw Normal View History

package strategy
import (
"net"
"net/url"
"strconv"
"strings"
"sync/atomic"
2018-08-13 00:42:59 +08:00
"github.com/nadoo/glider/common/log"
"github.com/nadoo/glider/proxy"
)
2018-08-26 01:25:22 +08:00
// StatusHandler function will be called when the forwarder's status changed
type StatusHandler func(*Forwarder)
// Forwarder is a forwarder
type Forwarder struct {
proxy.Dialer
2018-08-23 00:01:31 +08:00
addr string
priority uint32
maxFailures uint32 // maxfailures to set to Disabled
disabled uint32
failures uint32
latency int64
intface string // local interface or ip address
2018-08-26 01:25:22 +08:00
handlers []StatusHandler
}
// ForwarderFromURL parses `forward=` command value and returns a new forwarder
func ForwarderFromURL(s, intface string) (f *Forwarder, err error) {
f = &Forwarder{}
ss := strings.Split(s, "#")
if len(ss) > 1 {
err = f.parseOption(ss[1])
}
iface := intface
if f.intface != "" && f.intface != intface {
iface = f.intface
}
var d proxy.Dialer
d, err = proxy.NewDirect(iface)
if err != nil {
return nil, err
}
for _, url := range strings.Split(ss[0], ",") {
d, err = proxy.DialerFromURL(url, d)
if err != nil {
return nil, err
}
}
f.Dialer = d
f.addr = d.Addr()
2018-12-19 23:33:58 +08:00
// set forwarder to disabled by default
f.Disable()
return f, err
}
// DirectForwarder returns a direct forwarder
func DirectForwarder(intface string) *Forwarder {
d, err := proxy.NewDirect(intface)
if err != nil {
return nil
}
return &Forwarder{Dialer: d, addr: d.Addr()}
}
func (f *Forwarder) parseOption(option string) error {
query, err := url.ParseQuery(option)
if err != nil {
return err
}
var priority uint64
p := query.Get("priority")
if p != "" {
priority, err = strconv.ParseUint(p, 10, 32)
}
2018-08-23 00:01:31 +08:00
f.SetPriority(uint32(priority))
2018-08-18 23:59:21 +08:00
f.intface = query.Get("interface")
return err
}
// Addr .
func (f *Forwarder) Addr() string {
return f.addr
}
// Dial .
func (f *Forwarder) Dial(network, addr string) (c net.Conn, err error) {
c, err = f.Dialer.Dial(network, addr)
if err != nil {
2018-08-23 00:01:31 +08:00
f.IncFailures()
}
return c, err
}
// Failures returns the failuer count of forwarder
func (f *Forwarder) Failures() uint32 {
return atomic.LoadUint32(&f.failures)
}
2018-08-23 00:01:31 +08:00
// IncFailures increase the failuer count by 1
func (f *Forwarder) IncFailures() {
failures := atomic.AddUint32(&f.failures, 1)
log.F("[forwarder] %s recorded %d failures, maxfailures: %d", f.addr, failures, f.MaxFailures())
if f.MaxFailures() != 0 && failures >= f.MaxFailures() && f.Enabled() {
log.F("[forwarder] %s reaches maxfailures %d", f.addr, f.MaxFailures())
f.Disable()
}
2018-08-23 00:01:31 +08:00
}
// AddHandler adds a custom handler to handle the status change event
2018-08-26 01:25:22 +08:00
func (f *Forwarder) AddHandler(h StatusHandler) {
f.handlers = append(f.handlers, h)
}
// Enable the forwarder
func (f *Forwarder) Enable() {
if atomic.CompareAndSwapUint32(&f.disabled, 1, 0) {
for _, h := range f.handlers {
h(f)
}
}
atomic.StoreUint32(&f.failures, 0)
}
// Disable the forwarder
func (f *Forwarder) Disable() {
if atomic.CompareAndSwapUint32(&f.disabled, 0, 1) {
for _, h := range f.handlers {
h(f)
}
}
}
// Enabled returns the status of forwarder
func (f *Forwarder) Enabled() bool {
return !isTrue(atomic.LoadUint32(&f.disabled))
}
func isTrue(n uint32) bool {
return n&1 == 1
}
2018-08-23 00:01:31 +08:00
// Priority returns the priority of forwarder
func (f *Forwarder) Priority() uint32 {
return atomic.LoadUint32(&f.priority)
}
// SetPriority sets the priority of forwarder
func (f *Forwarder) SetPriority(l uint32) {
atomic.StoreUint32(&f.priority, l)
}
// MaxFailures returns the maxFailures of forwarder
func (f *Forwarder) MaxFailures() uint32 {
return atomic.LoadUint32(&f.maxFailures)
}
// SetMaxFailures sets the maxFailures of forwarder
func (f *Forwarder) SetMaxFailures(l uint32) {
atomic.StoreUint32(&f.maxFailures, l)
}
// Latency returns the latency of forwarder
func (f *Forwarder) Latency() int64 {
return atomic.LoadInt64(&f.latency)
}
// SetLatency sets the latency of forwarder
func (f *Forwarder) SetLatency(l int64) {
atomic.StoreInt64(&f.latency, l)
}