check: stop checking when protocol not supported

This commit is contained in:
nadoo 2020-11-26 19:21:27 +08:00
parent 6820644073
commit 98ce20b295
9 changed files with 69 additions and 122 deletions

4
go.mod
View File

@ -14,9 +14,9 @@ require (
github.com/nadoo/ipset v0.3.0
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/xtaci/kcp-go/v5 v5.6.1
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9
golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
golang.org/x/tools v0.0.0-20201124005743-911501bfb504 // indirect
golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b // indirect
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
)

8
go.sum
View File

@ -87,8 +87,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9 h1:phUcVbl53swtrUN8kQEXFhUxPlIlWyBfKmidCu7P95o=
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392 h1:xYJJ3S178yv++9zXV/hnr29plCAGO9vAFG9dorqaFQc=
golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
@ -141,8 +141,8 @@ golang.org/x/tools v0.0.0-20200425043458-8463f397d07c/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200808161706-5bf02b21f123/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174 h1:0rx0F4EjJNbxTuzWe0KjKcIzs+3VEb/Mrs/d1ciNz1c=
golang.org/x/tools v0.0.0-20201105001634-bc3cf281b174/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201124005743-911501bfb504 h1:jOKV2ysikH1GANB7t2LotmhyvkkPvl7HQoEXkV6slJA=
golang.org/x/tools v0.0.0-20201124005743-911501bfb504/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b h1:Lq5JUTFhiybGVf28jB6QRpqd13/JPOaCnET17PVzYJE=
golang.org/x/tools v0.0.0-20201125231158-b5590deeca9b/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -6,6 +6,10 @@ import (
"strings"
)
var (
ErrNotSupported = errors.New("not supported")
)
// Dialer is used to create connection.
type Dialer interface {
TCPDialer

View File

@ -1,7 +1,6 @@
package tcp
import (
"errors"
"net"
"net/url"
"strings"
@ -12,10 +11,9 @@ import (
// TCP struct.
type TCP struct {
addr string
dialer proxy.Dialer
proxy proxy.Proxy
server proxy.Server
addr string
}
func init() {
@ -47,19 +45,6 @@ func NewTCPDialer(s string, d proxy.Dialer) (proxy.Dialer, error) {
// NewTCPServer returns a tcp transport layer before the real server.
func NewTCPServer(s string, p proxy.Proxy) (proxy.Server, error) {
// transport := strings.Split(s, ",")
// prepare transport listener
// TODO: check here
// if len(transport) < 2 {
// return nil, errors.New("[tcp] malformd listener:" + s)
// }
// t.server, err = proxy.ServerFromURL(transport[1], p)
// if err != nil {
// return nil, err
// }
return NewTCP(s, nil, p)
}
@ -87,14 +72,6 @@ func (s *TCP) ListenAndServe() {
// Serve serves a connection.
func (s *TCP) Serve(c net.Conn) {
// we know the internal server will close the connection after serve
// defer c.Close()
if s.server != nil {
s.server.Serve(c)
return
}
defer c.Close()
if c, ok := c.(*net.TCPConn); ok {
@ -135,5 +112,5 @@ func (s *TCP) Dial(network, addr string) (net.Conn, error) {
// DialUDP connects to the given address via the proxy.
func (s *TCP) DialUDP(network, addr string) (net.PacketConn, net.Addr, error) {
return nil, nil, errors.New("tcp client does not support udp now")
return nil, nil, proxy.ErrNotSupported
}

View File

@ -1,6 +1,7 @@
package udp
import (
"fmt"
"net"
"net/url"
"sync"
@ -15,7 +16,6 @@ type UDP struct {
addr string
dialer proxy.Dialer
proxy proxy.Proxy
server proxy.Server
}
func init() {
@ -120,7 +120,7 @@ func (s *UDP) Addr() string {
// Dial connects to the address addr on the network net via the proxy.
func (s *UDP) Dial(network, addr string) (net.Conn, error) {
return s.dialer.Dial("udp", s.addr)
return nil, fmt.Errorf("can not dial tcp via udp dialer: %w", proxy.ErrNotSupported)
}
// DialUDP connects to the given address via the proxy.

View File

@ -2,18 +2,20 @@ package rule
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"os/exec"
"time"
"github.com/nadoo/glider/log"
"github.com/nadoo/glider/pool"
"github.com/nadoo/glider/proxy"
)
// Checker is a forwarder health checker.
type Checker interface {
Check(fwdr *Forwarder) (healthy bool)
Check(dialer proxy.Dialer) (elap time.Duration, err error)
}
type tcpChecker struct {
@ -25,34 +27,16 @@ func newTcpChecker(addr string, timeout time.Duration) *tcpChecker {
return &tcpChecker{addr, timeout}
}
func (c *tcpChecker) Check(fwdr *Forwarder) bool {
// Check implements the Checker interface.
func (c *tcpChecker) Check(dialer proxy.Dialer) (time.Duration, error) {
startTime := time.Now()
rc, err := fwdr.Dial("tcp", c.addr)
rc, err := dialer.Dial("tcp", c.addr)
if err != nil {
log.F("[check] tcp:%s(%d), FAILED. error in dial: %s", fwdr.Addr(), fwdr.Priority(), err)
fwdr.Disable()
return false
return 0, err
}
defer rc.Close()
if c.timeout > 0 {
rc.SetDeadline(time.Now().Add(c.timeout))
}
elapsed := time.Since(startTime)
fwdr.SetLatency(int64(elapsed))
if elapsed > c.timeout {
log.F("[check] tcp:%s(%d), FAILED. check timeout: %s", fwdr.Addr(), fwdr.Priority(), elapsed)
fwdr.Disable()
return false
}
log.F("[check] tcp:%s(%d), SUCCESS. elapsed: %s", fwdr.Addr(), fwdr.Priority(), elapsed)
fwdr.Enable()
return true
return time.Since(startTime), nil
}
type httpChecker struct {
@ -66,13 +50,12 @@ func newHttpChecker(addr, uri, expect string, timeout time.Duration) *httpChecke
return &httpChecker{addr, uri, expect, timeout}
}
func (c *httpChecker) Check(fwdr *Forwarder) bool {
// Check implements the Checker interface.
func (c *httpChecker) Check(dialer proxy.Dialer) (time.Duration, error) {
startTime := time.Now()
rc, err := fwdr.Dial("tcp", c.addr)
rc, err := dialer.Dial("tcp", c.addr)
if err != nil {
log.F("[check] %s(%d) -> http://%s, FAILED. error in dial: %s", fwdr.Addr(), fwdr.Priority(), c.addr, err)
fwdr.Disable()
return false
return 0, err
}
defer rc.Close()
@ -80,11 +63,9 @@ func (c *httpChecker) Check(fwdr *Forwarder) bool {
rc.SetDeadline(time.Now().Add(c.timeout))
}
_, err = io.WriteString(rc, "GET "+c.uri+" HTTP/1.1\r\nHost:"+c.addr+"\r\nConnection: close"+"\r\n\r\n")
if err != nil {
log.F("[check] %s(%d) -> http://%s, FAILED. error in write: %s", fwdr.Addr(), fwdr.Priority(), c.addr, err)
fwdr.Disable()
return false
if _, err = io.WriteString(rc,
"GET "+c.uri+" HTTP/1.1\r\nHost:"+c.addr+"\r\nConnection: close"+"\r\n\r\n"); err != nil {
return 0, err
}
r := pool.GetBufReader(rc)
@ -92,55 +73,30 @@ func (c *httpChecker) Check(fwdr *Forwarder) bool {
line, _, err := r.ReadLine()
if err != nil {
log.F("[check] %s(%d) -> http://%s, FAILED. error in read: %s", fwdr.Addr(), fwdr.Priority(), c.addr, err)
fwdr.Disable()
return false
return 0, err
}
if !bytes.Contains(line, []byte(c.expect)) {
log.F("[check] %s(%d) -> http://%s, FAILED. expect: %s, server response: %s", fwdr.Addr(), fwdr.Priority(), c.addr, c.expect, line)
fwdr.Disable()
return false
return 0, fmt.Errorf("expect: %s, got: %s", c.expect, line)
}
elapsed := time.Since(startTime)
fwdr.SetLatency(int64(elapsed))
if elapsed > c.timeout {
log.F("[check] %s(%d) -> http://%s, FAILED. check timeout: %s", fwdr.Addr(), fwdr.Priority(), c.addr, elapsed)
fwdr.Disable()
return false
return elapsed, errors.New("timeout")
}
log.F("[check] %s(%d) -> http://%s, SUCCESS. elapsed: %s", fwdr.Addr(), fwdr.Priority(), c.addr, elapsed)
fwdr.Enable()
return true
return elapsed, nil
}
type fileChecker struct {
path string
}
type fileChecker struct{ path string }
func newFileChecker(path string) *fileChecker {
return &fileChecker{path}
}
func newFileChecker(path string) *fileChecker { return &fileChecker{path} }
func (c *fileChecker) Check(fwdr *Forwarder) bool {
// Check implements the Checker interface.
func (c *fileChecker) Check(dialer proxy.Dialer) (time.Duration, error) {
cmd := exec.Command(c.path)
cmd.Stdout = os.Stdout
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, "FORWARDER_ADDR="+fwdr.Addr())
err := cmd.Run()
if err != nil {
log.F("[check] file:%s(%d), FAILED. err: %s", fwdr.Addr(), fwdr.Priority(), err)
fwdr.Disable()
return false
}
log.F("[check] file:%s(%d), SUCCESS.", fwdr.Addr(), fwdr.Priority())
fwdr.Enable()
return true
cmd.Env = append(cmd.Env, "FORWARDER_ADDR="+dialer.Addr())
return 0, cmd.Run()
}

View File

@ -119,9 +119,10 @@ func (f *Forwarder) IncFailures() {
return
}
log.F("[forwarder] %s recorded %d failures, maxfailures: %d", f.addr, failures, f.MaxFailures())
// log.F("[forwarder] %s(%d) recorded %d failures, maxfailures: %d", f.addr, f.Priority(), failures, f.MaxFailures())
if failures >= f.MaxFailures() && f.Enabled() {
if failures == f.MaxFailures() && f.Enabled() {
log.F("[forwarder] %s(%d) reaches maxfailures: %d", f.addr, f.Priority(), f.MaxFailures())
f.Disable()
}
}

View File

@ -1,6 +1,7 @@
package rule
import (
"errors"
"hash/fnv"
"net"
"net/url"
@ -176,7 +177,6 @@ func (p *FwdrGroup) onStatusChanged(fwdr *Forwarder) {
// Check runs the forwarder checks.
func (p *FwdrGroup) Check() {
if len(p.fwdrs) == 1 {
p.config.MaxFailures = 0
log.F("[group] only 1 forwarder found, disable health checking")
return
}
@ -212,17 +212,18 @@ func (p *FwdrGroup) Check() {
case "file":
checker = newFileChecker(u.Host + u.Path)
default:
p.config.MaxFailures = 0
log.F("[group] invalid check config `%s`, disable health checking", p.config.Check)
return
}
log.F("[group] using check config: %s", p.config.Check)
for i := 0; i < len(p.fwdrs); i++ {
go p.check(p.fwdrs[i], checker)
}
}
func (p *FwdrGroup) check(f *Forwarder, checker Checker) {
func (p *FwdrGroup) check(fwdr *Forwarder, checker Checker) {
wait := uint8(0)
intval := time.Duration(p.config.CheckInterval) * time.Second
@ -230,27 +231,35 @@ func (p *FwdrGroup) check(f *Forwarder, checker Checker) {
time.Sleep(intval * time.Duration(wait))
// check all forwarders at least one time
if wait > 0 && (f.Priority() < p.Priority()) {
if wait > 0 && (fwdr.Priority() < p.Priority()) {
continue
}
if f.Enabled() && p.config.CheckDisabledOnly {
if fwdr.Enabled() && p.config.CheckDisabledOnly {
continue
}
if checker.Check(f) {
wait = 1
continue
elapsed, err := checker.Check(fwdr)
if err != nil {
if errors.Is(err, proxy.ErrNotSupported) {
fwdr.SetMaxFailures(0)
log.F("[check] %s(%d), %s, stop checking", fwdr.Addr(), fwdr.Priority(), err)
break
}
wait++
if wait > 16 {
wait = 16
}
log.F("[check] %s(%d), FAILED. error: %s", fwdr.Addr(), fwdr.Priority(), err)
fwdr.Disable()
}
if wait == 0 {
wait = 1
}
wait *= 2
if wait > 16 {
wait = 16
}
wait = 1
fwdr.Enable()
fwdr.SetLatency(int64(elapsed))
log.F("[check] %s(%d), SUCCESS. elapsed: %s", fwdr.Addr(), fwdr.Priority(), elapsed)
}
}

View File

@ -45,7 +45,7 @@ func NewProxy(mainForwarders []string, mainStrategy *Strategy, rules []*Config)
if len(mainForwarders) > 0 {
direct := NewFwdrGroup("", nil, mainStrategy)
for _, f := range rd.main.fwdrs {
host := strings.Split(f.addr, ":")[0]
host, _, _ := net.SplitHostPort(f.addr)
if ip := net.ParseIP(host); ip == nil {
rd.domainMap.Store(strings.ToLower(host), direct)
}