diff --git a/bk/trace_common.go b/bk/trace_common.go index 0c998d7..c2deb67 100644 --- a/bk/trace_common.go +++ b/bk/trace_common.go @@ -3,6 +3,7 @@ package backtrace import ( "context" "errors" + "fmt" "net" "sort" "strings" @@ -10,19 +11,19 @@ import ( "sync/atomic" "time" + . "github.com/oneclickvirt/defaultset" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" - . "github.com/oneclickvirt/defaultset" ) // DefaultConfig is the default configuration for Tracer. var DefaultConfig = Config{ - Delay: 50 * time.Millisecond, - Timeout: 500 * time.Millisecond, - MaxHops: 15, - Count: 1, - Networks: []string{"ip4:icmp", "ip4:ip", "ip6:ipv6-icmp", "ip6:ip"}, + Delay: 50 * time.Millisecond, + Timeout: 500 * time.Millisecond, + MaxHops: 15, + Count: 1, + Networks: []string{"ip4:icmp", "ip4:ip", "ip6:ipv6-icmp", "ip6:ip"}, } // DefaultTracer is a tracer with DefaultConfig. @@ -117,37 +118,37 @@ func (t *Tracer) NewSession(ip net.IP) (*Session, error) { } func (t *Tracer) init() { - // 初始化IPv4连接 - for _, network := range t.Networks { - if strings.HasPrefix(network, "ip4") { - t.conn, t.err = t.listen(network, t.Addr) - if t.err == nil { - go t.serve(t.conn) - break - } - } - } - // 初始化IPv6连接 - for _, network := range t.Networks { - if strings.HasPrefix(network, "ip6") { - conn, err := net.ListenIP(network, t.Addr) - if err == nil { - t.ipv6conn = ipv6.NewPacketConn(conn) - err = t.ipv6conn.SetControlMessage(ipv6.FlagHopLimit|ipv6.FlagSrc|ipv6.FlagDst|ipv6.FlagInterface, true) - if err != nil { - if EnableLoger { + // 初始化IPv4连接 + for _, network := range t.Networks { + if strings.HasPrefix(network, "ip4") { + t.conn, t.err = t.listen(network, t.Addr) + if t.err == nil { + go t.serve(t.conn) + break + } + } + } + // 初始化IPv6连接 + for _, network := range t.Networks { + if strings.HasPrefix(network, "ip6") { + conn, err := net.ListenIP(network, t.Addr) + if err == nil { + t.ipv6conn = ipv6.NewPacketConn(conn) + err = t.ipv6conn.SetControlMessage(ipv6.FlagHopLimit|ipv6.FlagSrc|ipv6.FlagDst|ipv6.FlagInterface, true) + if err != nil { + if EnableLoger { InitLogger() defer Logger.Sync() - Logger.Info("设置IPv6控制消息失败: " + err.Error()) - } - t.ipv6conn.Close() - continue - } - go t.serveIPv6(t.ipv6conn) - break - } - } - } + Logger.Info("设置IPv6控制消息失败: " + err.Error()) + } + t.ipv6conn.Close() + continue + } + go t.serveIPv6(t.ipv6conn) + break + } + } + } } // Close closes listening socket. @@ -178,89 +179,112 @@ func (t *Tracer) serve(conn *net.IPConn) error { } func (t *Tracer) serveData(from net.IP, b []byte) error { - if from.To4() == nil { - // IPv6 处理 - msg, err := icmp.ParseMessage(ProtocolIPv6ICMP, b) - if err != nil { - return err - } - if msg.Type == ipv6.ICMPTypeEchoReply { - echo := msg.Body.(*icmp.Echo) - return t.serveReply(from, &packet{from, uint16(echo.ID), 1, time.Now()}) - } - b = getReplyData(msg) - if len(b) < ipv6.HeaderLen { - return errMessageTooShort - } - switch b[0] >> 4 { - case ipv6.Version: - ip, err := ipv6.ParseHeader(b) - if err != nil { - return err - } - return t.serveReply(ip.Dst, &packet{from, uint16(ip.FlowLabel), ip.HopLimit, time.Now()}) - default: - return errUnsupportedProtocol - } - } else { - // 原有的IPv4处理逻辑 - msg, err := icmp.ParseMessage(ProtocolICMP, b) - if err != nil { - return err - } - if msg.Type == ipv4.ICMPTypeEchoReply { - echo := msg.Body.(*icmp.Echo) - return t.serveReply(from, &packet{from, uint16(echo.ID), 1, time.Now()}) - } - b = getReplyData(msg) - if len(b) < ipv4.HeaderLen { - return errMessageTooShort - } - switch b[0] >> 4 { - case ipv4.Version: - ip, err := ipv4.ParseHeader(b) - if err != nil { - return err - } - return t.serveReply(ip.Dst, &packet{from, uint16(ip.ID), ip.TTL, time.Now()}) - default: - return errUnsupportedProtocol - } - } + if from.To4() == nil { + // IPv6处理 + msg, err := icmp.ParseMessage(ProtocolIPv6ICMP, b) + if err != nil { + if EnableLoger { + Logger.Warn("解析IPv6 ICMP消息失败: " + err.Error()) + } + return err + } + // 记录所有收到的消息类型,帮助调试 + if EnableLoger { + Logger.Info(fmt.Sprintf("收到IPv6 ICMP消息: 类型=%v, 代码=%v", msg.Type, msg.Code)) + } + // 处理不同类型的ICMP消息 + if msg.Type == ipv6.ICMPTypeEchoReply { + if echo, ok := msg.Body.(*icmp.Echo); ok { + if EnableLoger { + Logger.Info(fmt.Sprintf("处理IPv6回显应答: ID=%d, Seq=%d", echo.ID, echo.Seq)) + } + return t.serveReply(from, &packet{from, uint16(echo.ID), 1, time.Now()}) + } + } else if msg.Type == ipv6.ICMPTypeTimeExceeded { + // 时间超过(这是traceroute的关键响应类型) + b = getReplyData(msg) + if len(b) < ipv6.HeaderLen { + if EnableLoger { + Logger.Warn("IPv6时间超过消息太短") + } + return errMessageTooShort + } + // 解析原始IPv6包头 + if b[0]>>4 == ipv6.Version { + ip, err := ipv6.ParseHeader(b) + if err != nil { + if EnableLoger { + Logger.Warn("解析IPv6头部失败: " + err.Error()) + } + return err + } + if EnableLoger { + Logger.Info(fmt.Sprintf("处理IPv6时间超过: 目标=%v, FlowLabel=%d, HopLimit=%d", + ip.Dst, ip.FlowLabel, ip.HopLimit)) + } + return t.serveReply(ip.Dst, &packet{from, uint16(ip.FlowLabel), ip.HopLimit, time.Now()}) + } + } + } else { + // 原有的IPv4处理逻辑 + msg, err := icmp.ParseMessage(ProtocolICMP, b) + if err != nil { + return err + } + if msg.Type == ipv4.ICMPTypeEchoReply { + echo := msg.Body.(*icmp.Echo) + return t.serveReply(from, &packet{from, uint16(echo.ID), 1, time.Now()}) + } + b = getReplyData(msg) + if len(b) < ipv4.HeaderLen { + return errMessageTooShort + } + switch b[0] >> 4 { + case ipv4.Version: + ip, err := ipv4.ParseHeader(b) + if err != nil { + return err + } + return t.serveReply(ip.Dst, &packet{from, uint16(ip.ID), ip.TTL, time.Now()}) + default: + return errUnsupportedProtocol + } + } + return nil } func (t *Tracer) sendRequest(dst net.IP, ttl int) (*packet, error) { - id := uint16(atomic.AddUint32(&t.seq, 1)) - var b []byte - req := &packet{dst, id, ttl, time.Now()} - if dst.To4() == nil { - // IPv6 - b := newPacketV6(id, dst, ttl) - if t.ipv6conn != nil { - cm := &ipv6.ControlMessage{ - HopLimit: ttl, - } - _, err := t.ipv6conn.WriteTo(b, cm, &net.IPAddr{IP: dst}) - if err != nil { - if EnableLoger { + id := uint16(atomic.AddUint32(&t.seq, 1)) + var b []byte + req := &packet{dst, id, ttl, time.Now()} + if dst.To4() == nil { + // IPv6 + b := newPacketV6(id, dst, ttl) + if t.ipv6conn != nil { + cm := &ipv6.ControlMessage{ + HopLimit: ttl, + } + _, err := t.ipv6conn.WriteTo(b, cm, &net.IPAddr{IP: dst}) + if err != nil { + if EnableLoger { InitLogger() defer Logger.Sync() - Logger.Info("发送IPv6请求失败: " + err.Error()) - } - return nil, err - } - return req, nil - } - return nil, errors.New("IPv6连接不可用") - } else { - // IPv4 - b = newPacketV4(id, dst, ttl) - _, err := t.conn.WriteToIP(b, &net.IPAddr{IP: dst}) - if err != nil { - return nil, err - } - return req, nil - } + Logger.Info("发送IPv6请求失败: " + err.Error()) + } + return nil, err + } + return req, nil + } + return nil, errors.New("IPv6连接不可用") + } else { + // IPv4 + b = newPacketV4(id, dst, ttl) + _, err := t.conn.WriteToIP(b, &net.IPAddr{IP: dst}) + if err != nil { + return nil, err + } + return req, nil + } } func (t *Tracer) addSession(s *Session) { @@ -284,11 +308,41 @@ func (t *Tracer) removeSession(s *Session) { } } +// func (t *Tracer) serveReply(dst net.IP, res *packet) error { +// t.mu.RLock() +// defer t.mu.RUnlock() +// a := t.sess[string(shortIP(dst))] +// for _, s := range a { +// s.handle(res) +// } +// return nil +// } + func (t *Tracer) serveReply(dst net.IP, res *packet) error { + if EnableLoger { + Logger.Info(fmt.Sprintf("处理回复: 目标=%v, 来源=%v, ID=%d, TTL=%d", + dst, res.IP, res.ID, res.TTL)) + } + // 确保使用正确的IP格式进行查找 + shortDst := shortIP(dst) t.mu.RLock() defer t.mu.RUnlock() - a := t.sess[string(shortIP(dst))] + // 调试输出会话信息 + if EnableLoger && len(t.sess) > 0 { + for ip, sessions := range t.sess { + Logger.Info(fmt.Sprintf("会话信息: IP=%v, 会话数=%d", + net.IP([]byte(ip)), len(sessions))) + } + } + // 查找对应的会话 + a := t.sess[string(shortDst)] + if len(a) == 0 && EnableLoger { + Logger.Warn(fmt.Sprintf("找不到目标IP=%v的会话", dst)) + } for _, s := range a { + if EnableLoger { + Logger.Info(fmt.Sprintf("处理会话响应: 会话目标=%v", s.ip)) + } s.handle(res) } return nil @@ -348,16 +402,71 @@ func (s *Session) isDone(ttl int) bool { return true } +// func (s *Session) handle(res *packet) { +// now := res.Time +// n := 0 +// var req *packet +// s.mu.Lock() +// for _, r := range s.probes { +// if now.Sub(r.Time) > s.t.Timeout { +// continue +// } +// if r.ID == res.ID { +// req = r +// continue +// } +// s.probes[n] = r +// n++ +// } +// s.probes = s.probes[:n] +// s.mu.Unlock() +// if req == nil { +// return +// } +// hops := req.TTL - res.TTL + 1 +// if hops < 1 { +// hops = 1 +// } +// select { +// case s.ch <- &Reply{ +// IP: res.IP, +// RTT: res.Time.Sub(req.Time), +// Hops: hops, +// }: +// default: +// } +// } + func (s *Session) handle(res *packet) { now := res.Time n := 0 var req *packet + if EnableLoger { + Logger.Info(fmt.Sprintf("处理会话响应: 会话目标=%v, 响应源=%v, ID=%d, TTL=%d", + s.ip, res.IP, res.ID, res.TTL)) + } s.mu.Lock() + // 打印出所有待处理的探测包 + if EnableLoger && len(s.probes) > 0 { + Logger.Info(fmt.Sprintf("当前会话有 %d 个待处理的探测包", len(s.probes))) + for i, probe := range s.probes { + Logger.Info(fmt.Sprintf("探测包 #%d: ID=%d, TTL=%d, 时间=%v", + i, probe.ID, probe.TTL, probe.Time)) + } + } + // 查找匹配的请求包 for _, r := range s.probes { if now.Sub(r.Time) > s.t.Timeout { + if EnableLoger { + Logger.Info(fmt.Sprintf("探测包超时: ID=%d, TTL=%d", r.ID, r.TTL)) + } continue } - if r.ID == res.ID { + // 对于IPv6,可能需要松散匹配 + if r.ID == res.ID || (res.IP.To4() == nil && res.ID == 0) { + if EnableLoger { + Logger.Info(fmt.Sprintf("找到匹配的探测包: ID=%d, TTL=%d", r.ID, r.TTL)) + } req = r continue } @@ -367,19 +476,32 @@ func (s *Session) handle(res *packet) { s.probes = s.probes[:n] s.mu.Unlock() if req == nil { + if EnableLoger { + Logger.Warn(fmt.Sprintf("未找到匹配的探测包: 响应ID=%d", res.ID)) + } return } hops := req.TTL - res.TTL + 1 if hops < 1 { hops = 1 } + if EnableLoger { + Logger.Info(fmt.Sprintf("创建响应: IP=%v, RTT=%v, Hops=%d", + res.IP, res.Time.Sub(req.Time), hops)) + } select { case s.ch <- &Reply{ IP: res.IP, RTT: res.Time.Sub(req.Time), Hops: hops, }: + if EnableLoger { + Logger.Info("响应已发送到通道") + } default: + if EnableLoger { + Logger.Warn("发送响应到通道失败,通道已满") + } } }