Basic load balancing/failover

Try to send queries to one of the two fastest servers
This commit is contained in:
Frank Denis 2018-01-10 16:42:14 +01:00
parent 9eeb799d6e
commit fa22cc32d7
3 changed files with 46 additions and 9 deletions

View File

@ -7,7 +7,7 @@ import (
)
const (
EwmaDecay = 100.0
SizeEstimatorEwmaDecay = 100.0
)
type QuestionSizeEstimator struct {
@ -17,7 +17,7 @@ type QuestionSizeEstimator struct {
}
func NewQuestionSizeEstimator() QuestionSizeEstimator {
return QuestionSizeEstimator{minQuestionSize: InitialMinQuestionSize, ewma: ewma.NewMovingAverage(EwmaDecay)}
return QuestionSizeEstimator{minQuestionSize: InitialMinQuestionSize, ewma: ewma.NewMovingAverage(SizeEstimatorEwmaDecay)}
}
func (questionSizeEstimator *QuestionSizeEstimator) MinQuestionSize() int {

View File

@ -169,6 +169,7 @@ func (proxy *Proxy) processIncomingQuery(serverInfo *ServerInfo, serverProto str
if err != nil {
return
}
serverInfo.noticeBegin(proxy)
var response []byte
if serverProto == "udp" {
response, err = proxy.exchangeWithUDPServer(serverInfo, encryptedQuery, clientNonce)
@ -176,7 +177,7 @@ func (proxy *Proxy) processIncomingQuery(serverInfo *ServerInfo, serverProto str
response, err = proxy.exchangeWithTCPServer(serverInfo, encryptedQuery, clientNonce)
}
if err != nil {
serverInfo.noticeFailure()
serverInfo.noticeFailure(proxy)
return
}
if clientAddr != nil {
@ -195,9 +196,10 @@ func (proxy *Proxy) processIncomingQuery(serverInfo *ServerInfo, serverProto str
} else {
response, err = PrefixWithSize(response)
if err != nil {
serverInfo.noticeFailure()
serverInfo.noticeFailure(proxy)
return
}
clientPc.Write(response)
}
serverInfo.noticeSuccess(proxy)
}

View File

@ -10,9 +10,14 @@ import (
"sync"
"time"
"github.com/VividCortex/ewma"
"golang.org/x/crypto/ed25519"
)
const (
RTTEwmaDecay = 10.0
)
type ServerStamp struct {
serverAddrStr string
serverPkStr string
@ -33,6 +38,7 @@ func NewServerStampFromLegacy(serverAddrStr string, serverPkStr string, provider
}
type ServerInfo struct {
sync.RWMutex
MagicQuery [8]byte
ServerPk [32]byte
SharedKey [32]byte
@ -41,6 +47,8 @@ type ServerInfo struct {
Timeout time.Duration
UDPAddr *net.UDPAddr
TCPAddr *net.TCPAddr
lastActionTS time.Time
rtt ewma.MovingAverage
}
type ServersInfo struct {
@ -56,6 +64,7 @@ func (serversInfo *ServersInfo) registerServer(proxy *Proxy, name string, stamp
if err != nil {
return err
}
newServer.rtt = ewma.NewMovingAverage(RTTEwmaDecay)
for i, oldServer := range serversInfo.inner {
if oldServer.Name == newServer.Name {
serversInfo.inner[i] = newServer
@ -78,14 +87,21 @@ func (serversInfo *ServersInfo) refresh(proxy *Proxy) {
}
func (serversInfo *ServersInfo) getOne() *ServerInfo {
serversInfo.RLock()
serversInfo.Lock()
defer serversInfo.Unlock()
serversCount := len(serversInfo.inner)
if serversCount <= 0 {
serversInfo.RUnlock()
return nil
}
serverInfo := &serversInfo.inner[rand.Intn(serversCount)]
serversInfo.RUnlock()
candidate := rand.Intn(serversCount)
if candidate == 0 {
return &serversInfo.inner[candidate]
}
if serversInfo.inner[candidate].rtt.Value() < serversInfo.inner[0].rtt.Value() {
serversInfo.inner[candidate], serversInfo.inner[0] = serversInfo.inner[0], serversInfo.inner[candidate]
}
candidate = Min(serversCount, 2)
serverInfo := &serversInfo.inner[candidate]
return serverInfo
}
@ -119,5 +135,24 @@ func (serversInfo *ServersInfo) fetchServerInfo(proxy *Proxy, name string, stamp
return serverInfo, nil
}
func (serverInfo *ServerInfo) noticeFailure() {
func (serverInfo *ServerInfo) noticeFailure(proxy *Proxy) {
serverInfo.Lock()
serverInfo.rtt.Set(float64(proxy.timeout.Nanoseconds()))
serverInfo.Unlock()
}
func (serverInfo *ServerInfo) noticeBegin(proxy *Proxy) {
serverInfo.Lock()
serverInfo.lastActionTS = time.Now()
serverInfo.Unlock()
}
func (serverInfo *ServerInfo) noticeSuccess(proxy *Proxy) {
now := time.Now()
serverInfo.Lock()
elapsed := now.Sub(serverInfo.lastActionTS) / 1024
if elapsed > 0 {
serverInfo.rtt.Add(float64(elapsed))
}
serverInfo.Unlock()
}