From 6fa420a8e02b9cc732b527b8bd6cf32bbd616695 Mon Sep 17 00:00:00 2001 From: Vladimir Bauer Date: Thu, 31 Oct 2019 16:02:20 +0500 Subject: [PATCH] refactoring of pull 980 follow up on https://github.com/DNSCrypt/dnscrypt-proxy/pull/980#issuecomment-548153169 --- dnscrypt-proxy/main.go | 14 ++--- dnscrypt-proxy/proxy.go | 89 ++++++++++++++++++++----------- dnscrypt-proxy/systemd_android.go | 9 +--- dnscrypt-proxy/systemd_free.go | 9 +--- dnscrypt-proxy/systemd_linux.go | 21 ++------ 5 files changed, 71 insertions(+), 71 deletions(-) diff --git a/dnscrypt-proxy/main.go b/dnscrypt-proxy/main.go index 50613b43..797eab59 100644 --- a/dnscrypt-proxy/main.go +++ b/dnscrypt-proxy/main.go @@ -8,7 +8,6 @@ import ( "math/rand" "os" "os/signal" - "sync" "syscall" "github.com/facebookgo/pidfile" @@ -22,8 +21,6 @@ const ( ) type App struct { - wg sync.WaitGroup - quit chan struct{} proxy *Proxy flags *ConfigFlags } @@ -71,7 +68,6 @@ func main() { } app := &App{ - quit: make(chan struct{}), flags: &flags, } svc, err := service.New(app, svcConfig) @@ -101,7 +97,6 @@ func main() { } return } - app.wg.Add(1) if svc != nil { if err = svc.Run(); err != nil { dlog.Fatal(err) @@ -113,7 +108,7 @@ func main() { app.signalWatch() app.Start(nil) } - app.wg.Wait() + app.proxy.ConnCloseWait() dlog.Notice("Stopped.") } @@ -135,14 +130,13 @@ func (app *App) Stop(service service.Service) error { os.Remove(pidFilePath) } dlog.Notice("Quit signal received...") - close(app.quit) + app.proxy.Stop() return nil } func (app *App) appMain() { pidfile.Write() - app.proxy.StartProxy(app.quit) - app.wg.Done() + app.proxy.StartProxy() } func (app *App) signalWatch() { @@ -151,6 +145,6 @@ func (app *App) signalWatch() { go func() { <-quit signal.Stop(quit) - close(app.quit) + app.proxy.Stop() }() } diff --git a/dnscrypt-proxy/proxy.go b/dnscrypt-proxy/proxy.go index 36613e9c..6d274aea 100644 --- a/dnscrypt-proxy/proxy.go +++ b/dnscrypt-proxy/proxy.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net" "os" + "sync" "sync/atomic" "time" @@ -72,9 +73,13 @@ type Proxy struct { queryMeta []string routes *map[string][]string showCerts bool + + wg *sync.WaitGroup + quit chan struct{} } -func (proxy *Proxy) StartProxy(quit <-chan struct{}) { +// StartProxy is blocking +func (proxy *Proxy) StartProxy() { proxy.questionSizeEstimator = NewQuestionSizeEstimator() if _, err := crypto_rand.Read(proxy.proxySecretKey[:]); err != nil { dlog.Fatal(err) @@ -96,16 +101,12 @@ func (proxy *Proxy) StartProxy(quit <-chan struct{}) { // if 'userName' is not set, continue as before if !(len(proxy.userName) > 0) { - udpCloser, err := proxy.udpListenerFromAddr(listenUDPAddr) - if err != nil { + if err := proxy.udpListenerFromAddr(listenUDPAddr); err != nil { dlog.Fatal(err) } - tcpCloser, err := proxy.tcpListenerFromAddr(listenTCPAddr) - if err != nil { + if err := proxy.tcpListenerFromAddr(listenTCPAddr); err != nil { dlog.Fatal(err) } - defer udpCloser.Close() - defer tcpCloser.Close() } else { // if 'userName' is set and we are the parent process if !proxy.child { @@ -127,8 +128,13 @@ func (proxy *Proxy) StartProxy(quit <-chan struct{}) { if err != nil { dlog.Fatalf("Unable to switch to a different user: %v", err) } - defer listenerUDP.Close() - defer listenerTCP.Close() + proxy.wg.Add(1) + go func() { + defer proxy.wg.Done() + <-proxy.quit + listenerUDP.Close() + listenerTCP.Close() + }() FileDescriptors = append(FileDescriptors, fdUDP) FileDescriptors = append(FileDescriptors, fdTCP) @@ -148,9 +154,11 @@ func (proxy *Proxy) StartProxy(quit <-chan struct{}) { FileDescriptorNum++ dlog.Noticef("Now listening to %v [UDP]", listenUDPAddr) + proxy.wg.Add(1) go proxy.udpListener(listenerUDP.(*net.UDPConn)) dlog.Noticef("Now listening to %v [TCP]", listenAddrStr) + proxy.wg.Add(1) go proxy.tcpListener(listenerTCP.(*net.TCPListener)) } } @@ -160,11 +168,9 @@ func (proxy *Proxy) StartProxy(quit <-chan struct{}) { if len(proxy.userName) > 0 && !proxy.child { proxy.dropPrivilege(proxy.userName, FileDescriptors) } - sdc, err := proxy.SystemDListeners() - if err != nil { + if err := proxy.SystemDListeners(); err != nil { dlog.Fatal(err) } - defer sdc.Close() liveServers, err := proxy.serversInfo.refresh(proxy) if liveServers > 0 { proxy.certIgnoreTimestamp = false @@ -185,21 +191,18 @@ func (proxy *Proxy) StartProxy(quit <-chan struct{}) { } go proxy.prefetcher() if len(proxy.serversInfo.registeredServers) > 0 { - go func() { - for { - delay := proxy.certRefreshDelay - if liveServers == 0 { - delay = proxy.certRefreshDelayAfterFailure - } - clocksmith.Sleep(delay) - liveServers, _ = proxy.serversInfo.refresh(proxy) - if liveServers > 0 { - proxy.certIgnoreTimestamp = false - } + for { + delay := proxy.certRefreshDelay + if liveServers == 0 { + delay = proxy.certRefreshDelayAfterFailure } - }() + clocksmith.Sleep(delay) + liveServers, _ = proxy.serversInfo.refresh(proxy) + if liveServers > 0 { + proxy.certIgnoreTimestamp = false + } + } } - <-quit } func (proxy *Proxy) prefetcher() { @@ -220,6 +223,11 @@ func (proxy *Proxy) prefetcher() { } func (proxy *Proxy) udpListener(clientPc *net.UDPConn) { + go func() { + defer proxy.wg.Done() + <-proxy.quit + clientPc.Close() + }() for { buffer := make([]byte, MaxDNSPacketSize-1) length, clientAddr, err := clientPc.ReadFrom(buffer) @@ -239,17 +247,23 @@ func (proxy *Proxy) udpListener(clientPc *net.UDPConn) { } } -func (proxy *Proxy) udpListenerFromAddr(listenAddr *net.UDPAddr) (io.Closer, error) { +func (proxy *Proxy) udpListenerFromAddr(listenAddr *net.UDPAddr) error { clientPc, err := net.ListenUDP("udp", listenAddr) if err != nil { - return nil, err + return err } dlog.Noticef("Now listening to %v [UDP]", listenAddr) + proxy.wg.Add(1) go proxy.udpListener(clientPc) - return clientPc, nil + return nil } func (proxy *Proxy) tcpListener(acceptPc *net.TCPListener) { + go func() { + defer proxy.wg.Done() + <-proxy.quit + acceptPc.Close() + }() for { clientPc, err := acceptPc.Accept() if err != nil { @@ -274,14 +288,15 @@ func (proxy *Proxy) tcpListener(acceptPc *net.TCPListener) { } } -func (proxy *Proxy) tcpListenerFromAddr(listenAddr *net.TCPAddr) (io.Closer, error) { +func (proxy *Proxy) tcpListenerFromAddr(listenAddr *net.TCPAddr) error { acceptPc, err := net.ListenTCP("tcp", listenAddr) if err != nil { - return nil, err + return err } dlog.Noticef("Now listening to %v [TCP]", listenAddr) + proxy.wg.Add(1) go proxy.tcpListener(acceptPc) - return acceptPc, nil + return nil } func (proxy *Proxy) prepareForRelay(ip net.IP, port int, encryptedQuery *[]byte) { @@ -510,8 +525,20 @@ func (proxy *Proxy) processIncomingQuery(serverInfo *ServerInfo, clientProto str } } +func (proxy *Proxy) Stop() { + if proxy.quit != nil { + close(proxy.quit) + } +} + +func (proxy *Proxy) ConnCloseWait() { + proxy.wg.Wait() +} + func NewProxy() *Proxy { return &Proxy{ serversInfo: NewServersInfo(), + wg: new(sync.WaitGroup), + quit: make(chan struct{}), } } diff --git a/dnscrypt-proxy/systemd_android.go b/dnscrypt-proxy/systemd_android.go index 1c1abaac..715dab1e 100644 --- a/dnscrypt-proxy/systemd_android.go +++ b/dnscrypt-proxy/systemd_android.go @@ -2,11 +2,6 @@ package main -import ( - "io" - "io/ioutil" -) - -func (proxy *Proxy) SystemDListeners() (io.Closer, error) { - return ioutil.NopCloser(nil), nil +func (proxy *Proxy) SystemDListeners() error { + return nil } diff --git a/dnscrypt-proxy/systemd_free.go b/dnscrypt-proxy/systemd_free.go index 805689e0..b18f1b62 100644 --- a/dnscrypt-proxy/systemd_free.go +++ b/dnscrypt-proxy/systemd_free.go @@ -2,11 +2,6 @@ package main -import ( - "io" - "io/ioutil" -) - -func (proxy *Proxy) SystemDListeners() (io.Closer, error) { - return ioutil.NopCloser(nil), nil +func (proxy *Proxy) SystemDListeners() error { + return nil } diff --git a/dnscrypt-proxy/systemd_linux.go b/dnscrypt-proxy/systemd_linux.go index ca3a11cc..cc5d7e00 100644 --- a/dnscrypt-proxy/systemd_linux.go +++ b/dnscrypt-proxy/systemd_linux.go @@ -4,23 +4,13 @@ package main import ( "fmt" - "io" "net" "github.com/coreos/go-systemd/activation" "github.com/jedisct1/dlog" ) -type multiCloser []io.Closer - -func (mc multiCloser) Close() (err error) { - for _, c := range mc { - err = c.Close() - } - return err -} - -func (proxy *Proxy) SystemDListeners() (io.Closer, error) { +func (proxy *Proxy) SystemDListeners() error { files := activation.Files(true) if len(files) > 0 { @@ -29,25 +19,24 @@ func (proxy *Proxy) SystemDListeners() (io.Closer, error) { } dlog.Warn("Systemd sockets are untested and unsupported - use at your own risk") } - var mc multiCloser for i, file := range files { defer file.Close() ok := false if listener, err := net.FileListener(file); err == nil { dlog.Noticef("Wiring systemd TCP socket #%d, %s, %s", i, file.Name(), listener.Addr()) ok = true - mc = append(mc, listener) + proxy.wg.Add(1) go proxy.tcpListener(listener.(*net.TCPListener)) } else if pc, err := net.FilePacketConn(file); err == nil { dlog.Noticef("Wiring systemd UDP socket #%d, %s, %s", i, file.Name(), pc.LocalAddr()) ok = true - mc = append(mc, pc) + proxy.wg.Add(1) go proxy.udpListener(pc.(*net.UDPConn)) } if !ok { - return nil, fmt.Errorf("Could not wire systemd socket #%d, %s", i, file.Name()) + return fmt.Errorf("Could not wire systemd socket #%d, %s", i, file.Name()) } } - return mc, nil + return nil }