make sure tcp/udp Conn are closed on stop signal

This commit is contained in:
Vladimir Bauer 2019-10-24 21:51:26 +05:00 committed by Frank Denis
parent 220d418f2f
commit 6680faf665
7 changed files with 69 additions and 46 deletions

View File

@ -477,7 +477,7 @@ func ConfigLoad(proxy *Proxy, svcFlag *string) error {
}
proxy.showCerts = *showCerts || len(os.Getenv("SHOW_CERTS")) > 0
if proxy.showCerts {
proxy.listenAddresses = nil
proxy.listenAddresses = proxy.listenAddresses[0:0]
}
dlog.Noticef("dnscrypt-proxy %s", AppVersion)
if err := NetProbe(netprobeAddress, netprobeTimeout); err != nil {
@ -628,11 +628,11 @@ func (config *Config) loadSource(proxy *Proxy, requiredProps stamps.ServerInform
cfgSource.RefreshDelay = 72
}
source, sourceUrlsToPrefetch, err := NewSource(proxy.xTransport, cfgSource.URLs, cfgSource.MinisignKeyStr, cfgSource.CacheFile, cfgSource.FormatStr, time.Duration(cfgSource.RefreshDelay)*time.Hour)
proxy.urlsToPrefetch = append(proxy.urlsToPrefetch, sourceUrlsToPrefetch...)
if err != nil {
dlog.Criticalf("Unable to retrieve source [%s]: [%s]", cfgSourceName, err)
return err
}
proxy.urlsToPrefetch = append(proxy.urlsToPrefetch, sourceUrlsToPrefetch...)
registeredServers, err := source.Parse(cfgSource.Prefix)
if err != nil {
if len(registeredServers) == 0 {

View File

@ -111,8 +111,7 @@ func (app *App) Stop(service service.Service) error {
func (app *App) appMain() {
pidfile.Write()
app.proxy.StartProxy()
<-app.quit
app.proxy.StartProxy(app.quit)
app.wg.Done()
}

View File

@ -60,7 +60,7 @@ type Proxy struct {
forwardFile string
cloakFile string
pluginsGlobals PluginsGlobals
urlsToPrefetch []URLToPrefetch
urlsToPrefetch []*URLToPrefetch
clientsCount uint32
maxClients uint32
xTransport *XTransport
@ -74,7 +74,7 @@ type Proxy struct {
showCerts bool
}
func (proxy *Proxy) StartProxy() {
func (proxy *Proxy) StartProxy(quit <-chan struct{}) {
proxy.questionSizeEstimator = NewQuestionSizeEstimator()
if _, err := crypto_rand.Read(proxy.proxySecretKey[:]); err != nil {
dlog.Fatal(err)
@ -96,12 +96,16 @@ func (proxy *Proxy) StartProxy() {
// if 'userName' is not set, continue as before
if !(len(proxy.userName) > 0) {
if err := proxy.udpListenerFromAddr(listenUDPAddr); err != nil {
udpCloser, err := proxy.udpListenerFromAddr(listenUDPAddr)
if err != nil {
dlog.Fatal(err)
}
if err := proxy.tcpListenerFromAddr(listenTCPAddr); err != nil {
tcpCloser, err := proxy.tcpListenerFromAddr(listenTCPAddr)
if err != nil {
dlog.Fatal(err)
}
defer udpCloser.Close()
defer tcpCloser.Close()
} else {
// if 'userName' is set and we are the parent process
if !proxy.child {
@ -156,9 +160,11 @@ func (proxy *Proxy) StartProxy() {
if len(proxy.userName) > 0 && !proxy.child {
proxy.dropPrivilege(proxy.userName, FileDescriptors)
}
if err := proxy.SystemDListeners(); err != nil {
sdc, err := proxy.SystemDListeners()
if err != nil {
dlog.Fatal(err)
}
defer sdc.Close()
liveServers, err := proxy.serversInfo.refresh(proxy)
if liveServers > 0 {
proxy.certIgnoreTimestamp = false
@ -177,7 +183,7 @@ func (proxy *Proxy) StartProxy() {
dlog.Error(err)
dlog.Notice("dnscrypt-proxy is waiting for at least one server to be reachable")
}
proxy.prefetcher(&proxy.urlsToPrefetch)
go proxy.prefetcher()
if len(proxy.serversInfo.registeredServers) > 0 {
go func() {
for {
@ -193,30 +199,27 @@ func (proxy *Proxy) StartProxy() {
}
}()
}
<-quit
}
func (proxy *Proxy) prefetcher(urlsToPrefetch *[]URLToPrefetch) {
go func() {
for {
now := time.Now()
for i := range *urlsToPrefetch {
urlToPrefetch := &(*urlsToPrefetch)[i]
if now.After(urlToPrefetch.when) {
dlog.Debugf("Prefetching [%s]", urlToPrefetch.url)
if err := PrefetchSourceURL(proxy.xTransport, urlToPrefetch); err != nil {
dlog.Debugf("Prefetching [%s] failed: %s", urlToPrefetch.url, err)
} else {
dlog.Debugf("Prefetching [%s] succeeded. Next refresh scheduled for %v", urlToPrefetch.url, urlToPrefetch.when)
}
func (proxy *Proxy) prefetcher() {
for {
now := time.Now()
for _, urlToPrefetch := range proxy.urlsToPrefetch {
if now.After(urlToPrefetch.when) {
dlog.Debugf("Prefetching [%s]", urlToPrefetch.url)
if err := PrefetchSourceURL(proxy.xTransport, urlToPrefetch); err != nil {
dlog.Debugf("Prefetching [%s] failed: %s", urlToPrefetch.url, err)
} else {
dlog.Debugf("Prefetching [%s] succeeded. Next refresh scheduled for %v", urlToPrefetch.url, urlToPrefetch.when)
}
}
clocksmith.Sleep(60 * time.Second)
}
}()
clocksmith.Sleep(60 * time.Second)
}
}
func (proxy *Proxy) udpListener(clientPc *net.UDPConn) {
defer clientPc.Close()
for {
buffer := make([]byte, MaxDNSPacketSize-1)
length, clientAddr, err := clientPc.ReadFrom(buffer)
@ -236,18 +239,17 @@ func (proxy *Proxy) udpListener(clientPc *net.UDPConn) {
}
}
func (proxy *Proxy) udpListenerFromAddr(listenAddr *net.UDPAddr) error {
func (proxy *Proxy) udpListenerFromAddr(listenAddr *net.UDPAddr) (io.Closer, error) {
clientPc, err := net.ListenUDP("udp", listenAddr)
if err != nil {
return err
return nil, err
}
dlog.Noticef("Now listening to %v [UDP]", listenAddr)
go proxy.udpListener(clientPc)
return nil
return clientPc, nil
}
func (proxy *Proxy) tcpListener(acceptPc *net.TCPListener) {
defer acceptPc.Close()
for {
clientPc, err := acceptPc.Accept()
if err != nil {
@ -272,14 +274,14 @@ func (proxy *Proxy) tcpListener(acceptPc *net.TCPListener) {
}
}
func (proxy *Proxy) tcpListenerFromAddr(listenAddr *net.TCPAddr) error {
func (proxy *Proxy) tcpListenerFromAddr(listenAddr *net.TCPAddr) (io.Closer, error) {
acceptPc, err := net.ListenTCP("tcp", listenAddr)
if err != nil {
return err
return nil, err
}
dlog.Noticef("Now listening to %v [TCP]", listenAddr)
go proxy.tcpListener(acceptPc)
return nil
return acceptPc, nil
}
func (proxy *Proxy) prepareForRelay(ip net.IP, port int, encryptedQuery *[]byte) {

View File

@ -127,19 +127,19 @@ type URLToPrefetch struct {
when time.Time
}
func NewSource(xTransport *XTransport, urls []string, minisignKeyStr string, cacheFile string, formatStr string, refreshDelay time.Duration) (Source, []URLToPrefetch, error) {
func NewSource(xTransport *XTransport, urls []string, minisignKeyStr string, cacheFile string, formatStr string, refreshDelay time.Duration) (Source, []*URLToPrefetch, error) {
urlsToPrefetch := []*URLToPrefetch{}
source := Source{urls: urls}
if formatStr == "v2" {
source.format = SourceFormatV2
} else {
return source, []URLToPrefetch{}, fmt.Errorf("Unsupported source format: [%s]", formatStr)
return source, urlsToPrefetch, fmt.Errorf("Unsupported source format: [%s]", formatStr)
}
minisignKey, err := minisign.NewPublicKey(minisignKeyStr)
if err != nil {
return source, []URLToPrefetch{}, err
return source, urlsToPrefetch, err
}
now := time.Now()
urlsToPrefetch := []URLToPrefetch{}
sigCacheFile := cacheFile + ".minisig"
var sigStr, in string
@ -166,8 +166,8 @@ func NewSource(xTransport *XTransport, urls []string, minisignKeyStr string, cac
if len(preloadURL) > 0 {
url := preloadURL
sigURL := url + ".minisig"
urlsToPrefetch = append(urlsToPrefetch, URLToPrefetch{url: url, cacheFile: cacheFile, when: now.Add(delayTillNextUpdate)})
urlsToPrefetch = append(urlsToPrefetch, URLToPrefetch{url: sigURL, cacheFile: sigCacheFile, when: now.Add(sigDelayTillNextUpdate)})
urlsToPrefetch = append(urlsToPrefetch, &URLToPrefetch{url: url, cacheFile: cacheFile, when: now.Add(delayTillNextUpdate)})
urlsToPrefetch = append(urlsToPrefetch, &URLToPrefetch{url: sigURL, cacheFile: sigCacheFile, when: now.Add(sigDelayTillNextUpdate)})
}
if sigErr != nil && err == nil {
err = sigErr

View File

@ -1,5 +1,9 @@
package main
func (proxy *Proxy) SystemDListeners() error {
return nil
import (
"io"
)
func (proxy *Proxy) SystemDListeners() (io.Closer, error) {
return ioutil.NopCloser(nil), nil
}

View File

@ -2,6 +2,11 @@
package main
func (proxy *Proxy) SystemDListeners() error {
return nil
import (
"io"
"io/ioutil"
)
func (proxy *Proxy) SystemDListeners() (io.Closer, error) {
return ioutil.NopCloser(nil), nil
}

View File

@ -4,13 +4,23 @@ package main
import (
"fmt"
"io"
"net"
"github.com/coreos/go-systemd/activation"
"github.com/jedisct1/dlog"
)
func (proxy *Proxy) SystemDListeners() error {
type multiCloser []io.Closer
func (mc multiCloser) Close() (err error) {
for _, c := range mc {
err = c.Close()
}
return err
}
func (proxy *Proxy) SystemDListeners() (io.Closer, error) {
files := activation.Files(true)
if len(files) > 0 {
@ -19,22 +29,25 @@ func (proxy *Proxy) SystemDListeners() error {
}
dlog.Warn("Systemd sockets are untested and unsupported - use at your own risk")
}
var mc multiCloser
for i, file := range files {
defer file.Close()
ok := false
if listener, err := net.FileListener(file); err == nil {
dlog.Noticef("Wiring systemd TCP socket #%d, %s, %s", i, file.Name(), listener.Addr())
ok = true
mc = append(mc, listener)
go proxy.tcpListener(listener.(*net.TCPListener))
} else if pc, err := net.FilePacketConn(file); err == nil {
dlog.Noticef("Wiring systemd UDP socket #%d, %s, %s", i, file.Name(), pc.LocalAddr())
ok = true
mc = append(mc, pc)
go proxy.udpListener(pc.(*net.UDPConn))
}
if !ok {
return fmt.Errorf("Could not wire systemd socket #%d, %s", i, file.Name())
return nil, fmt.Errorf("Could not wire systemd socket #%d, %s", i, file.Name())
}
}
return nil
return mc, nil
}