Support server refresh concurrency (#2537)
* simultaneously refresh all servers * Add `cert_refresh_concurrency` --------- Co-authored-by: YX Hao <lifenjoiner@163.com>
This commit is contained in:
parent
3be53642fe
commit
49e3570c2c
|
@ -42,6 +42,7 @@ type Config struct {
|
|||
Timeout int `toml:"timeout"`
|
||||
KeepAlive int `toml:"keepalive"`
|
||||
Proxy string `toml:"proxy"`
|
||||
CertRefreshConcurrency int `toml:"cert_refresh_concurrency"`
|
||||
CertRefreshDelay int `toml:"cert_refresh_delay"`
|
||||
CertIgnoreTimestamp bool `toml:"cert_ignore_timestamp"`
|
||||
EphemeralKeys bool `toml:"dnscrypt_ephemeral_keys"`
|
||||
|
@ -116,6 +117,7 @@ func newConfig() Config {
|
|||
LocalDoH: LocalDoHConfig{Path: "/dns-query"},
|
||||
Timeout: 5000,
|
||||
KeepAlive: 5,
|
||||
CertRefreshConcurrency: 10,
|
||||
CertRefreshDelay: 240,
|
||||
HTTP3: false,
|
||||
CertIgnoreTimestamp: false,
|
||||
|
@ -437,6 +439,7 @@ func ConfigLoad(proxy *Proxy, flags *ConfigFlags) error {
|
|||
if config.ForceTCP {
|
||||
proxy.mainProto = "tcp"
|
||||
}
|
||||
proxy.certRefreshConcurrency = Max(1, config.CertRefreshConcurrency)
|
||||
proxy.certRefreshDelay = time.Duration(Max(60, config.CertRefreshDelay)) * time.Minute
|
||||
proxy.certRefreshDelayAfterFailure = time.Duration(10 * time.Second)
|
||||
proxy.certIgnoreTimestamp = config.CertIgnoreTimestamp
|
||||
|
|
|
@ -183,6 +183,12 @@ keepalive = 30
|
|||
# use_syslog = true
|
||||
|
||||
|
||||
## The maximum concurrency to reload certificates from the resolvers.
|
||||
## Default is 10.
|
||||
|
||||
# cert_refresh_concurrency = 10
|
||||
|
||||
|
||||
## Delay, in minutes, after which certificates are reloaded
|
||||
|
||||
cert_refresh_delay = 240
|
||||
|
|
|
@ -74,6 +74,7 @@ type Proxy struct {
|
|||
certRefreshDelayAfterFailure time.Duration
|
||||
timeout time.Duration
|
||||
certRefreshDelay time.Duration
|
||||
certRefreshConcurrency int
|
||||
cacheSize int
|
||||
logMaxBackups int
|
||||
logMaxAge int
|
||||
|
|
|
@ -224,17 +224,34 @@ func (serversInfo *ServersInfo) refresh(proxy *Proxy) (int, error) {
|
|||
dlog.Debug("Refreshing certificates")
|
||||
serversInfo.RLock()
|
||||
// Appending registeredServers slice from sources may allocate new memory.
|
||||
registeredServers := make([]RegisteredServer, len(serversInfo.registeredServers))
|
||||
serversCount := len(serversInfo.registeredServers)
|
||||
registeredServers := make([]RegisteredServer, serversCount)
|
||||
copy(registeredServers, serversInfo.registeredServers)
|
||||
serversInfo.RUnlock()
|
||||
countChannel := make(chan struct{}, proxy.certRefreshConcurrency)
|
||||
errorChannel := make(chan error, serversCount)
|
||||
for i := range registeredServers {
|
||||
countChannel <- struct{}{}
|
||||
go func(registeredServer *RegisteredServer) {
|
||||
err := serversInfo.refreshServer(proxy, registeredServer.name, registeredServer.stamp)
|
||||
if err == nil {
|
||||
proxy.xTransport.internalResolverReady = true
|
||||
}
|
||||
errorChannel <- err
|
||||
<-countChannel
|
||||
}(®isteredServers[i])
|
||||
}
|
||||
liveServers := 0
|
||||
var err error
|
||||
for _, registeredServer := range registeredServers {
|
||||
if err = serversInfo.refreshServer(proxy, registeredServer.name, registeredServer.stamp); err == nil {
|
||||
for i := 0; i < serversCount; i++ {
|
||||
err = <-errorChannel
|
||||
if err == nil {
|
||||
liveServers++
|
||||
proxy.xTransport.internalResolverReady = true
|
||||
}
|
||||
}
|
||||
if liveServers > 0 {
|
||||
err = nil
|
||||
}
|
||||
serversInfo.Lock()
|
||||
sort.SliceStable(serversInfo.inner, func(i, j int) bool {
|
||||
return serversInfo.inner[i].initialRtt < serversInfo.inner[j].initialRtt
|
||||
|
|
Loading…
Reference in New Issue