Revert "refactoring of pull 980"

This reverts commit 6fa420a8e0.
This commit is contained in:
Frank Denis 2019-10-31 17:36:59 +01:00
parent 7636a78d74
commit fb1fc14317
5 changed files with 71 additions and 71 deletions

View File

@ -8,6 +8,7 @@ import (
"math/rand" "math/rand"
"os" "os"
"os/signal" "os/signal"
"sync"
"github.com/facebookgo/pidfile" "github.com/facebookgo/pidfile"
"github.com/jedisct1/dlog" "github.com/jedisct1/dlog"
@ -20,6 +21,8 @@ const (
) )
type App struct { type App struct {
wg sync.WaitGroup
quit chan struct{}
proxy *Proxy proxy *Proxy
flags *ConfigFlags flags *ConfigFlags
} }
@ -67,6 +70,7 @@ func main() {
} }
app := &App{ app := &App{
quit: make(chan struct{}),
flags: &flags, flags: &flags,
} }
svc, err := service.New(app, svcConfig) svc, err := service.New(app, svcConfig)
@ -96,6 +100,7 @@ func main() {
} }
return return
} }
app.wg.Add(1)
if svc != nil { if svc != nil {
if err = svc.Run(); err != nil { if err = svc.Run(); err != nil {
dlog.Fatal(err) dlog.Fatal(err)
@ -107,7 +112,7 @@ func main() {
app.signalWatch() app.signalWatch()
app.Start(nil) app.Start(nil)
} }
app.proxy.ConnCloseWait() app.wg.Wait()
dlog.Notice("Stopped.") dlog.Notice("Stopped.")
} }
@ -129,13 +134,14 @@ func (app *App) Stop(service service.Service) error {
os.Remove(pidFilePath) os.Remove(pidFilePath)
} }
dlog.Notice("Quit signal received...") dlog.Notice("Quit signal received...")
app.proxy.Stop() close(app.quit)
return nil return nil
} }
func (app *App) appMain() { func (app *App) appMain() {
pidfile.Write() pidfile.Write()
app.proxy.StartProxy() app.proxy.StartProxy(app.quit)
app.wg.Done()
} }
func (app *App) signalWatch() { func (app *App) signalWatch() {
@ -144,6 +150,6 @@ func (app *App) signalWatch() {
go func() { go func() {
<-quit <-quit
signal.Stop(quit) signal.Stop(quit)
app.proxy.Stop() close(app.quit)
}() }()
} }

View File

@ -7,7 +7,6 @@ import (
"io/ioutil" "io/ioutil"
"net" "net"
"os" "os"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -73,13 +72,9 @@ type Proxy struct {
queryMeta []string queryMeta []string
routes *map[string][]string routes *map[string][]string
showCerts bool showCerts bool
wg *sync.WaitGroup
quit chan struct{}
} }
// StartProxy is blocking func (proxy *Proxy) StartProxy(quit <-chan struct{}) {
func (proxy *Proxy) StartProxy() {
proxy.questionSizeEstimator = NewQuestionSizeEstimator() proxy.questionSizeEstimator = NewQuestionSizeEstimator()
if _, err := crypto_rand.Read(proxy.proxySecretKey[:]); err != nil { if _, err := crypto_rand.Read(proxy.proxySecretKey[:]); err != nil {
dlog.Fatal(err) dlog.Fatal(err)
@ -101,12 +96,16 @@ func (proxy *Proxy) StartProxy() {
// if 'userName' is not set, continue as before // if 'userName' is not set, continue as before
if !(len(proxy.userName) > 0) { if !(len(proxy.userName) > 0) {
if err := proxy.udpListenerFromAddr(listenUDPAddr); err != nil { udpCloser, err := proxy.udpListenerFromAddr(listenUDPAddr)
if err != nil {
dlog.Fatal(err) dlog.Fatal(err)
} }
if err := proxy.tcpListenerFromAddr(listenTCPAddr); err != nil { tcpCloser, err := proxy.tcpListenerFromAddr(listenTCPAddr)
if err != nil {
dlog.Fatal(err) dlog.Fatal(err)
} }
defer udpCloser.Close()
defer tcpCloser.Close()
} else { } else {
// if 'userName' is set and we are the parent process // if 'userName' is set and we are the parent process
if !proxy.child { if !proxy.child {
@ -128,13 +127,8 @@ func (proxy *Proxy) StartProxy() {
if err != nil { if err != nil {
dlog.Fatalf("Unable to switch to a different user: %v", err) dlog.Fatalf("Unable to switch to a different user: %v", err)
} }
proxy.wg.Add(1) defer listenerUDP.Close()
go func() { defer listenerTCP.Close()
defer proxy.wg.Done()
<-proxy.quit
listenerUDP.Close()
listenerTCP.Close()
}()
FileDescriptors = append(FileDescriptors, fdUDP) FileDescriptors = append(FileDescriptors, fdUDP)
FileDescriptors = append(FileDescriptors, fdTCP) FileDescriptors = append(FileDescriptors, fdTCP)
@ -154,11 +148,9 @@ func (proxy *Proxy) StartProxy() {
FileDescriptorNum++ FileDescriptorNum++
dlog.Noticef("Now listening to %v [UDP]", listenUDPAddr) dlog.Noticef("Now listening to %v [UDP]", listenUDPAddr)
proxy.wg.Add(1)
go proxy.udpListener(listenerUDP.(*net.UDPConn)) go proxy.udpListener(listenerUDP.(*net.UDPConn))
dlog.Noticef("Now listening to %v [TCP]", listenAddrStr) dlog.Noticef("Now listening to %v [TCP]", listenAddrStr)
proxy.wg.Add(1)
go proxy.tcpListener(listenerTCP.(*net.TCPListener)) go proxy.tcpListener(listenerTCP.(*net.TCPListener))
} }
} }
@ -168,9 +160,11 @@ func (proxy *Proxy) StartProxy() {
if len(proxy.userName) > 0 && !proxy.child { if len(proxy.userName) > 0 && !proxy.child {
proxy.dropPrivilege(proxy.userName, FileDescriptors) proxy.dropPrivilege(proxy.userName, FileDescriptors)
} }
if err := proxy.SystemDListeners(); err != nil { sdc, err := proxy.SystemDListeners()
if err != nil {
dlog.Fatal(err) dlog.Fatal(err)
} }
defer sdc.Close()
liveServers, err := proxy.serversInfo.refresh(proxy) liveServers, err := proxy.serversInfo.refresh(proxy)
if liveServers > 0 { if liveServers > 0 {
proxy.certIgnoreTimestamp = false proxy.certIgnoreTimestamp = false
@ -191,18 +185,21 @@ func (proxy *Proxy) StartProxy() {
} }
go proxy.prefetcher() go proxy.prefetcher()
if len(proxy.serversInfo.registeredServers) > 0 { if len(proxy.serversInfo.registeredServers) > 0 {
for { go func() {
delay := proxy.certRefreshDelay for {
if liveServers == 0 { delay := proxy.certRefreshDelay
delay = proxy.certRefreshDelayAfterFailure if liveServers == 0 {
delay = proxy.certRefreshDelayAfterFailure
}
clocksmith.Sleep(delay)
liveServers, _ = proxy.serversInfo.refresh(proxy)
if liveServers > 0 {
proxy.certIgnoreTimestamp = false
}
} }
clocksmith.Sleep(delay) }()
liveServers, _ = proxy.serversInfo.refresh(proxy)
if liveServers > 0 {
proxy.certIgnoreTimestamp = false
}
}
} }
<-quit
} }
func (proxy *Proxy) prefetcher() { func (proxy *Proxy) prefetcher() {
@ -223,11 +220,6 @@ func (proxy *Proxy) prefetcher() {
} }
func (proxy *Proxy) udpListener(clientPc *net.UDPConn) { func (proxy *Proxy) udpListener(clientPc *net.UDPConn) {
go func() {
defer proxy.wg.Done()
<-proxy.quit
clientPc.Close()
}()
for { for {
buffer := make([]byte, MaxDNSPacketSize-1) buffer := make([]byte, MaxDNSPacketSize-1)
length, clientAddr, err := clientPc.ReadFrom(buffer) length, clientAddr, err := clientPc.ReadFrom(buffer)
@ -247,23 +239,17 @@ func (proxy *Proxy) udpListener(clientPc *net.UDPConn) {
} }
} }
func (proxy *Proxy) udpListenerFromAddr(listenAddr *net.UDPAddr) error { func (proxy *Proxy) udpListenerFromAddr(listenAddr *net.UDPAddr) (io.Closer, error) {
clientPc, err := net.ListenUDP("udp", listenAddr) clientPc, err := net.ListenUDP("udp", listenAddr)
if err != nil { if err != nil {
return err return nil, err
} }
dlog.Noticef("Now listening to %v [UDP]", listenAddr) dlog.Noticef("Now listening to %v [UDP]", listenAddr)
proxy.wg.Add(1)
go proxy.udpListener(clientPc) go proxy.udpListener(clientPc)
return nil return clientPc, nil
} }
func (proxy *Proxy) tcpListener(acceptPc *net.TCPListener) { func (proxy *Proxy) tcpListener(acceptPc *net.TCPListener) {
go func() {
defer proxy.wg.Done()
<-proxy.quit
acceptPc.Close()
}()
for { for {
clientPc, err := acceptPc.Accept() clientPc, err := acceptPc.Accept()
if err != nil { if err != nil {
@ -288,15 +274,14 @@ func (proxy *Proxy) tcpListener(acceptPc *net.TCPListener) {
} }
} }
func (proxy *Proxy) tcpListenerFromAddr(listenAddr *net.TCPAddr) error { func (proxy *Proxy) tcpListenerFromAddr(listenAddr *net.TCPAddr) (io.Closer, error) {
acceptPc, err := net.ListenTCP("tcp", listenAddr) acceptPc, err := net.ListenTCP("tcp", listenAddr)
if err != nil { if err != nil {
return err return nil, err
} }
dlog.Noticef("Now listening to %v [TCP]", listenAddr) dlog.Noticef("Now listening to %v [TCP]", listenAddr)
proxy.wg.Add(1)
go proxy.tcpListener(acceptPc) go proxy.tcpListener(acceptPc)
return nil return acceptPc, nil
} }
func (proxy *Proxy) prepareForRelay(ip net.IP, port int, encryptedQuery *[]byte) { func (proxy *Proxy) prepareForRelay(ip net.IP, port int, encryptedQuery *[]byte) {
@ -525,20 +510,8 @@ func (proxy *Proxy) processIncomingQuery(serverInfo *ServerInfo, clientProto str
} }
} }
func (proxy *Proxy) Stop() {
if proxy.quit != nil {
close(proxy.quit)
}
}
func (proxy *Proxy) ConnCloseWait() {
proxy.wg.Wait()
}
func NewProxy() *Proxy { func NewProxy() *Proxy {
return &Proxy{ return &Proxy{
serversInfo: NewServersInfo(), serversInfo: NewServersInfo(),
wg: new(sync.WaitGroup),
quit: make(chan struct{}),
} }
} }

View File

@ -2,6 +2,11 @@
package main package main
func (proxy *Proxy) SystemDListeners() error { import (
return nil "io"
"io/ioutil"
)
func (proxy *Proxy) SystemDListeners() (io.Closer, error) {
return ioutil.NopCloser(nil), nil
} }

View File

@ -2,6 +2,11 @@
package main package main
func (proxy *Proxy) SystemDListeners() error { import (
return nil "io"
"io/ioutil"
)
func (proxy *Proxy) SystemDListeners() (io.Closer, error) {
return ioutil.NopCloser(nil), nil
} }

View File

@ -4,13 +4,23 @@ package main
import ( import (
"fmt" "fmt"
"io"
"net" "net"
"github.com/coreos/go-systemd/activation" "github.com/coreos/go-systemd/activation"
"github.com/jedisct1/dlog" "github.com/jedisct1/dlog"
) )
func (proxy *Proxy) SystemDListeners() error { type multiCloser []io.Closer
func (mc multiCloser) Close() (err error) {
for _, c := range mc {
err = c.Close()
}
return err
}
func (proxy *Proxy) SystemDListeners() (io.Closer, error) {
files := activation.Files(true) files := activation.Files(true)
if len(files) > 0 { if len(files) > 0 {
@ -19,24 +29,25 @@ func (proxy *Proxy) SystemDListeners() error {
} }
dlog.Warn("Systemd sockets are untested and unsupported - use at your own risk") dlog.Warn("Systemd sockets are untested and unsupported - use at your own risk")
} }
var mc multiCloser
for i, file := range files { for i, file := range files {
defer file.Close() defer file.Close()
ok := false ok := false
if listener, err := net.FileListener(file); err == nil { if listener, err := net.FileListener(file); err == nil {
dlog.Noticef("Wiring systemd TCP socket #%d, %s, %s", i, file.Name(), listener.Addr()) dlog.Noticef("Wiring systemd TCP socket #%d, %s, %s", i, file.Name(), listener.Addr())
ok = true ok = true
proxy.wg.Add(1) mc = append(mc, listener)
go proxy.tcpListener(listener.(*net.TCPListener)) go proxy.tcpListener(listener.(*net.TCPListener))
} else if pc, err := net.FilePacketConn(file); err == nil { } else if pc, err := net.FilePacketConn(file); err == nil {
dlog.Noticef("Wiring systemd UDP socket #%d, %s, %s", i, file.Name(), pc.LocalAddr()) dlog.Noticef("Wiring systemd UDP socket #%d, %s, %s", i, file.Name(), pc.LocalAddr())
ok = true ok = true
proxy.wg.Add(1) mc = append(mc, pc)
go proxy.udpListener(pc.(*net.UDPConn)) go proxy.udpListener(pc.(*net.UDPConn))
} }
if !ok { if !ok {
return fmt.Errorf("Could not wire systemd socket #%d, %s", i, file.Name()) return nil, fmt.Errorf("Could not wire systemd socket #%d, %s", i, file.Name())
} }
} }
return nil return mc, nil
} }