add weight base strategy

This commit is contained in:
miladj 2024-02-11 06:42:10 +03:30
parent 6d2b1e95cc
commit ef3a81f686
2 changed files with 61 additions and 8 deletions

View File

@ -27,6 +27,7 @@ type Forwarder struct {
latency int64 latency int64
intface string // local interface or ip address intface string // local interface or ip address
handlers []StatusHandler handlers []StatusHandler
weight int64
} }
// ForwarderFromURL parses `forward=` command value and returns a new forwarder. // ForwarderFromURL parses `forward=` command value and returns a new forwarder.
@ -97,6 +98,17 @@ func (f *Forwarder) parseOption(option string) error {
} }
f.SetPriority(uint32(priority)) f.SetPriority(uint32(priority))
var weight int64
w := query.Get("weight")
if w != "" {
weight, err = strconv.ParseInt(w, 10, 32)
if err == nil && weight > 0 {
f.SetWeight(weight)
} else if weight < 0 {
log.F("weight should be more than 0, ignore weight value")
}
}
f.intface = query.Get("interface") f.intface = query.Get("interface")
return err return err
@ -204,3 +216,11 @@ func (f *Forwarder) Latency() int64 {
func (f *Forwarder) SetLatency(l int64) { func (f *Forwarder) SetLatency(l int64) {
atomic.StoreInt64(&f.latency, l) atomic.StoreInt64(&f.latency, l)
} }
func (f *Forwarder) Weight() int64 {
return f.weight
}
// SetWeight sets the weight of forwarder.
func (f *Forwarder) SetWeight(w int64) {
f.weight = w
}

View File

@ -3,6 +3,7 @@ package rule
import ( import (
"errors" "errors"
"hash/fnv" "hash/fnv"
"math/rand"
"net" "net"
"net/url" "net/url"
"path/filepath" "path/filepath"
@ -33,6 +34,7 @@ type FwdrGroup struct {
index uint32 index uint32
priority uint32 priority uint32
next func(addr string) *Forwarder next func(addr string) *Forwarder
randomGenerator *rand.Rand
} }
// NewFwdrGroup returns a new forward group. // NewFwdrGroup returns a new forward group.
@ -88,6 +90,10 @@ func newFwdrGroup(name string, fwdrs []*Forwarder, c *Strategy) *FwdrGroup {
case "dh": case "dh":
p.next = p.scheduleDH p.next = p.scheduleDH
log.F("[strategy] %s: %d forwarders forward in destination hashing mode.", name, count) log.F("[strategy] %s: %d forwarders forward in destination hashing mode.", name, count)
case "wb":
p.next = p.scheduleWB
p.randomGenerator = rand.New(rand.NewSource(time.Now().UnixNano()))
log.F("[strategy] %s: %d forwarders forward in Weight base mode.", name, count)
default: default:
p.next = p.scheduleRR p.next = p.scheduleRR
log.F("[strategy] %s: not supported forward mode '%s', use round robin mode for %d forwarders.", name, c.Strategy, count) log.F("[strategy] %s: not supported forward mode '%s', use round robin mode for %d forwarders.", name, c.Strategy, count)
@ -316,3 +322,30 @@ func (p *FwdrGroup) scheduleDH(dstAddr string) *Forwarder {
fnv1a.Write([]byte(dstAddr)) fnv1a.Write([]byte(dstAddr))
return p.avail[fnv1a.Sum32()%uint32(len(p.avail))] return p.avail[fnv1a.Sum32()%uint32(len(p.avail))]
} }
// Weight base
func (p *FwdrGroup) scheduleWB(dstAddr string) *Forwarder {
totalWeight := int64(0)
for _, f := range p.avail {
totalWeight += f.Weight()
}
if totalWeight <= 0 {
log.F("total weight is zero switch to rr mode")
return p.scheduleRR(dstAddr)
}
r := p.randomGenerator.Int63n(totalWeight)
var sum int64 = 0
for _, f := range p.avail {
weight := f.Weight()
if weight == 0 {
return f
}
sum += weight
if r < sum {
return f
}
}
// Shouldn't reach here
//just in case
return p.avail[0]
}