From b38f8a8761b4d3d89fc201421a0278a21f8bb574 Mon Sep 17 00:00:00 2001 From: nadoo <287492+nadoo@users.noreply.github.com> Date: Mon, 26 Jul 2021 00:42:21 +0800 Subject: [PATCH] tproxy: use goroutine to serve sessions --- README.md | 2 +- proxy/tproxy/tproxy_linux.go | 102 +++++++++++++++++++---------------- 2 files changed, 56 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index a62cc27..13b74bc 100644 --- a/README.md +++ b/README.md @@ -375,7 +375,7 @@ Examples: - service=dhcpd,INTERFACE,START_IP,END_IP,LEASE_MINUTES[,MAC=IP,MAC=IP...] - e.g.: - service=dhcpd,eth1,192.168.1.100,192.168.1.199,720 - - service=dhcpd,eth2,192.168.2.100,192.168.2.199,720,fc:23:34:9e:25:01=192.168.2.101,fc:23:34:9e:25:02=192.168.2.102 + - service=dhcpd,eth2,192.168.2.100,192.168.2.199,720,fc:23:34:9e:25:01=192.168.2.101 ## Linux Service diff --git a/proxy/tproxy/tproxy_linux.go b/proxy/tproxy/tproxy_linux.go index 8285212..15d3f38 100644 --- a/proxy/tproxy/tproxy_linux.go +++ b/proxy/tproxy/tproxy_linux.go @@ -7,6 +7,7 @@ import ( "time" "github.com/nadoo/glider/log" + "github.com/nadoo/glider/pool" "github.com/nadoo/glider/proxy" ) @@ -20,6 +21,9 @@ func init() { proxy.RegisterServer("tproxy", NewTProxyServer) } +// nat mapping +var nm sync.Map + // NewTProxy returns a tproxy. func NewTProxy(s string, p proxy.Proxy) (*TProxy, error) { u, err := url.Parse(s) @@ -47,12 +51,17 @@ func (s *TProxy) ListenAndServe() { s.ListenAndServeUDP() } -// ListenAndServeTCP . +// ListenAndServeTCP listens and serves tcp. func (s *TProxy) ListenAndServeTCP() { log.F("[tproxy] tcp mode not supported now, please use 'redir' instead") } -// ListenAndServeUDP . +// Serve serves tcp conn. +func (s *TProxy) Serve(c net.Conn) { + log.F("[tproxy] func Serve: can not be called directly") +} + +// ListenAndServeUDP listens and serves udp. func (s *TProxy) ListenAndServeUDP() { laddr, err := net.ResolveUDPAddr("udp", s.addr) if err != nil { @@ -69,66 +78,65 @@ func (s *TProxy) ListenAndServeUDP() { log.F("[tproxyu] listening UDP on %s", s.addr) - var nm sync.Map - buf := make([]byte, proxy.UDPBufSize) - for { - n, lraddr, dstAddr, err := ReadFromUDP(lc, buf) + buf := pool.GetBuffer(proxy.UDPBufSize) + n, srcAddr, dstAddr, err := ReadFromUDP(lc, buf) if err != nil { log.F("[tproxyu] read error: %v", err) continue } - var session *natEntry - sessionKey := lraddr.String() + var session *Session + sessionKey := srcAddr.String() v, ok := nm.Load(sessionKey) if !ok && v == nil { - pc, dialer, writeTo, err := s.proxy.DialUDP("udp", dstAddr.String()) - if err != nil { - log.F("[tproxyu] dial to %s error: %v", dstAddr, err) - continue - } - - lpc, err := ListenPacket(dstAddr) - if err != nil { - log.F("[tproxyu] ListenPacket as %s error: %v", dstAddr, err) - pc.Close() - continue - } - - session = newNatEntry(pc, writeTo) + session = &Session{sessionKey, srcAddr, dstAddr, make(chan []byte, 32)} nm.Store(sessionKey, session) - - go func(lc net.PacketConn, pc net.PacketConn, lraddr *net.UDPAddr, key string) { - proxy.RelayUDP(lc, lraddr, pc, 2*time.Minute) - pc.Close() - nm.Delete(key) - }(lpc, pc, lraddr, sessionKey) - - log.F("[tproxyu] %s <-> %s via %s", lraddr, dstAddr, dialer.Addr()) - + go s.ServeSession(session) } else { - session = v.(*natEntry) - } - - _, err = session.WriteTo(buf[:n], session.writeTo) - if err != nil { - log.F("[tproxyu] writeTo %s error: %v", session.writeTo, err) + session = v.(*Session) } + session.msgQueue <- buf[:n] } } -// Serve . -func (s *TProxy) Serve(c net.Conn) { - log.F("[tproxy] func Serve: can not be called directly") +// ServeSession serves a udp session. +func (s *TProxy) ServeSession(session *Session) { + dstPC, dialer, writeTo, err := s.proxy.DialUDP("udp", session.dst.String()) + if err != nil { + log.F("[tproxyu] dial to %s error: %v", session.dst, err) + return + } + defer dstPC.Close() + + srcPC, err := ListenPacket(session.dst) + if err != nil { + log.F("[tproxyu] ListenPacket as %s error: %v", session.dst, err) + return + } + defer srcPC.Close() + + log.F("[tproxyu] %s <-> %s via %s", session.src, session.dst, dialer.Addr()) + + go func() { + proxy.RelayUDP(srcPC, session.src, dstPC, 2*time.Minute) + nm.Delete(session.key) + close(session.msgQueue) + }() + + for data := range session.msgQueue { + _, err = dstPC.WriteTo(data, writeTo) + if err != nil { + log.F("[tproxyu] writeTo %s error: %v", writeTo, err) + } + pool.PutBuffer(data) + } } -type natEntry struct { - net.PacketConn - writeTo net.Addr -} - -func newNatEntry(pc net.PacketConn, writeTo net.Addr) *natEntry { - return &natEntry{PacketConn: pc, writeTo: writeTo} +// Session is a udp session +type Session struct { + key string + src, dst *net.UDPAddr + msgQueue chan []byte }