tproxy: use goroutine to serve sessions

This commit is contained in:
nadoo 2021-07-26 00:42:21 +08:00
parent d615dc087e
commit b38f8a8761
2 changed files with 56 additions and 48 deletions

View File

@ -375,7 +375,7 @@ Examples:
- service=dhcpd,INTERFACE,START_IP,END_IP,LEASE_MINUTES[,MAC=IP,MAC=IP...]
- e.g.:
- service=dhcpd,eth1,192.168.1.100,192.168.1.199,720
- service=dhcpd,eth2,192.168.2.100,192.168.2.199,720,fc:23:34:9e:25:01=192.168.2.101,fc:23:34:9e:25:02=192.168.2.102
- service=dhcpd,eth2,192.168.2.100,192.168.2.199,720,fc:23:34:9e:25:01=192.168.2.101
## Linux Service

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/nadoo/glider/log"
"github.com/nadoo/glider/pool"
"github.com/nadoo/glider/proxy"
)
@ -20,6 +21,9 @@ func init() {
proxy.RegisterServer("tproxy", NewTProxyServer)
}
// nat mapping
var nm sync.Map
// NewTProxy returns a tproxy.
func NewTProxy(s string, p proxy.Proxy) (*TProxy, error) {
u, err := url.Parse(s)
@ -47,12 +51,17 @@ func (s *TProxy) ListenAndServe() {
s.ListenAndServeUDP()
}
// ListenAndServeTCP .
// ListenAndServeTCP listens and serves tcp.
func (s *TProxy) ListenAndServeTCP() {
log.F("[tproxy] tcp mode not supported now, please use 'redir' instead")
}
// ListenAndServeUDP .
// Serve serves tcp conn.
func (s *TProxy) Serve(c net.Conn) {
log.F("[tproxy] func Serve: can not be called directly")
}
// ListenAndServeUDP listens and serves udp.
func (s *TProxy) ListenAndServeUDP() {
laddr, err := net.ResolveUDPAddr("udp", s.addr)
if err != nil {
@ -69,66 +78,65 @@ func (s *TProxy) ListenAndServeUDP() {
log.F("[tproxyu] listening UDP on %s", s.addr)
var nm sync.Map
buf := make([]byte, proxy.UDPBufSize)
for {
n, lraddr, dstAddr, err := ReadFromUDP(lc, buf)
buf := pool.GetBuffer(proxy.UDPBufSize)
n, srcAddr, dstAddr, err := ReadFromUDP(lc, buf)
if err != nil {
log.F("[tproxyu] read error: %v", err)
continue
}
var session *natEntry
sessionKey := lraddr.String()
var session *Session
sessionKey := srcAddr.String()
v, ok := nm.Load(sessionKey)
if !ok && v == nil {
pc, dialer, writeTo, err := s.proxy.DialUDP("udp", dstAddr.String())
if err != nil {
log.F("[tproxyu] dial to %s error: %v", dstAddr, err)
continue
}
lpc, err := ListenPacket(dstAddr)
if err != nil {
log.F("[tproxyu] ListenPacket as %s error: %v", dstAddr, err)
pc.Close()
continue
}
session = newNatEntry(pc, writeTo)
session = &Session{sessionKey, srcAddr, dstAddr, make(chan []byte, 32)}
nm.Store(sessionKey, session)
go func(lc net.PacketConn, pc net.PacketConn, lraddr *net.UDPAddr, key string) {
proxy.RelayUDP(lc, lraddr, pc, 2*time.Minute)
pc.Close()
nm.Delete(key)
}(lpc, pc, lraddr, sessionKey)
log.F("[tproxyu] %s <-> %s via %s", lraddr, dstAddr, dialer.Addr())
go s.ServeSession(session)
} else {
session = v.(*natEntry)
}
_, err = session.WriteTo(buf[:n], session.writeTo)
if err != nil {
log.F("[tproxyu] writeTo %s error: %v", session.writeTo, err)
session = v.(*Session)
}
session.msgQueue <- buf[:n]
}
}
// Serve .
func (s *TProxy) Serve(c net.Conn) {
log.F("[tproxy] func Serve: can not be called directly")
// ServeSession serves a udp session.
func (s *TProxy) ServeSession(session *Session) {
dstPC, dialer, writeTo, err := s.proxy.DialUDP("udp", session.dst.String())
if err != nil {
log.F("[tproxyu] dial to %s error: %v", session.dst, err)
return
}
defer dstPC.Close()
srcPC, err := ListenPacket(session.dst)
if err != nil {
log.F("[tproxyu] ListenPacket as %s error: %v", session.dst, err)
return
}
defer srcPC.Close()
log.F("[tproxyu] %s <-> %s via %s", session.src, session.dst, dialer.Addr())
go func() {
proxy.RelayUDP(srcPC, session.src, dstPC, 2*time.Minute)
nm.Delete(session.key)
close(session.msgQueue)
}()
for data := range session.msgQueue {
_, err = dstPC.WriteTo(data, writeTo)
if err != nil {
log.F("[tproxyu] writeTo %s error: %v", writeTo, err)
}
pool.PutBuffer(data)
}
}
type natEntry struct {
net.PacketConn
writeTo net.Addr
}
func newNatEntry(pc net.PacketConn, writeTo net.Addr) *natEntry {
return &natEntry{PacketConn: pc, writeTo: writeTo}
// Session is a udp session
type Session struct {
key string
src, dst *net.UDPAddr
msgQueue chan []byte
}