diff --git a/README.md b/README.md index d586a08..93ca817 100755 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ In the examples domain `x.y.z` is used, replace them with your domain - [How to use](#how-to-use) - [Configuration](#configuration) - [Single Port Configuration](#single-port-configuration-example) - - [Multiple Configuration](#multiple-configuration-example) + - [Multiple Ports Configuration](#multiple-ports-configuration-example) - [TCP/UDP Configuration](#tcpudp-configuration-example) - [Troubleshooting](#troubleshooting) - [Benchmarks](#benchmarks) @@ -99,7 +99,7 @@ whoami: # 2. visit https://apps.y.z/whoami ``` -### Multiple configuration example +### Multiple ports configuration example ```yaml minio: @@ -198,7 +198,7 @@ It takes ~ 0.1-0.4MB for each HTTP Proxy, and <2MB for each TCP/UDP Proxy 1. [Install go](https://go.dev/doc/install) if not already -2. Get dependencies with `go get` +2. get dependencies with `sh scripts/get.sh` 3. build binary with `sh scripts/build.sh` diff --git a/bin/go-proxy b/bin/go-proxy index 3d3c5db..a5afef0 100755 Binary files a/bin/go-proxy and b/bin/go-proxy differ diff --git a/scripts/get.sh b/scripts/get.sh new file mode 100644 index 0000000..de57f0e --- /dev/null +++ b/scripts/get.sh @@ -0,0 +1,2 @@ +#!/bin/sh +go get -d -u ./src/go-proxy \ No newline at end of file diff --git a/src/go-proxy/docker.go b/src/go-proxy/docker.go index 3cdf285..824714b 100644 --- a/src/go-proxy/docker.go +++ b/src/go-proxy/docker.go @@ -7,6 +7,7 @@ import ( "reflect" "sort" "strings" + "sync" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" @@ -17,6 +18,7 @@ import ( ) type ProxyConfig struct { + id string Alias string Scheme string Host string @@ -28,10 +30,15 @@ func NewProxyConfig() ProxyConfig { return ProxyConfig{} } +func (cfg *ProxyConfig) UpdateId() { + cfg.id = fmt.Sprintf("%s-%s-%s-%s-%s", cfg.Alias, cfg.Scheme, cfg.Host, cfg.Port, cfg.Path) +} + var dockerClient *client.Client func buildContainerRoute(container types.Container) { var aliases []string + var wg sync.WaitGroup container_name := strings.TrimPrefix(container.Names[0], "/") aliases_label, ok := container.Labels["proxy.aliases"] @@ -101,12 +108,19 @@ func buildContainerRoute(container types.Container) { } } config.Alias = alias - createProxy(config) + config.UpdateId() + + wg.Add(1) + go func() { + createRoute(&config) + wg.Done() + }() } + wg.Wait() } func buildRoutes() { - initProxyMaps() + initRoutes() containerSlice, err := dockerClient.ContainerList(context.Background(), container.ListOptions{}) if err != nil { log.Fatal(err) diff --git a/src/go-proxy/healthcheck.go b/src/go-proxy/healthcheck.go deleted file mode 100644 index e249069..0000000 --- a/src/go-proxy/healthcheck.go +++ /dev/null @@ -1,32 +0,0 @@ -package main - -import ( - "net" - "net/http" - "time" -) - -func healthCheckHttp(targetUrl string) error { - // try HEAD first - // if HEAD is not allowed, try GET - resp, err := healthCheckHttpClient.Head(targetUrl) - if resp != nil { - defer resp.Body.Close() - } - if err != nil && resp != nil && resp.StatusCode == http.StatusMethodNotAllowed { - _, err = healthCheckHttpClient.Get(targetUrl) - } - if resp != nil { - defer resp.Body.Close() - } - return err -} - -func healthCheckStream(scheme string, host string) error { - conn, err := net.DialTimeout(scheme, host, 5*time.Second) - if err != nil { - return err - } - defer conn.Close() - return nil -} diff --git a/src/go-proxy/main.go b/src/go-proxy/main.go index c5bcf2f..527d95e 100644 --- a/src/go-proxy/main.go +++ b/src/go-proxy/main.go @@ -20,7 +20,8 @@ func main() { log.Fatal(err) } buildRoutes() - log.Printf("[Build] built %v reverse proxies", countProxies()) + log.Printf("[Build] built %v reverse proxies", countRoutes()) + beginListenStreams() go func() { filter := filters.NewArgs( @@ -34,8 +35,10 @@ func main() { for msg := range msgs { // TODO: handle actor only log.Printf("[Event] %s %s caused rebuild", msg.Action, msg.Actor.Attributes["name"]) + endListenStreams() buildRoutes() - log.Printf("[Build] rebuilt %v reverse proxies", countProxies()) + log.Printf("[Build] rebuilt %v reverse proxies", countRoutes()) + beginListenStreams() } }() diff --git a/src/go-proxy/panel.go b/src/go-proxy/panel.go index fcd5c66..720affc 100644 --- a/src/go-proxy/panel.go +++ b/src/go-proxy/panel.go @@ -51,7 +51,7 @@ func panelIndex(w http.ResponseWriter, r *http.Request) { return } - err = tmpl.Execute(w, routes) + err = tmpl.Execute(w, &routes) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -79,9 +79,9 @@ func panelCheckTargetHealth(w http.ResponseWriter, r *http.Request) { scheme := url.Scheme if isStreamScheme(scheme) { - err = healthCheckStream(scheme, url.Host) + err = utils.healthCheckStream(scheme, url.Host) } else { - err = healthCheckHttp(targetUrl) + err = utils.healthCheckHttp(targetUrl) } if err != nil { diff --git a/src/go-proxy/proxy.go b/src/go-proxy/route.go similarity index 52% rename from src/go-proxy/proxy.go rename to src/go-proxy/route.go index 1a26ac1..bebebe0 100644 --- a/src/go-proxy/proxy.go +++ b/src/go-proxy/route.go @@ -3,30 +3,27 @@ package main import ( "fmt" "log" - "net" "net/url" "sync" ) type Routes struct { - HTTPRoutes map[string][]HTTPRoute // subdomain/alias -> path - StreamRoutes map[string]*StreamRoute // port -> target + HTTPRoutes map[string][]HTTPRoute // id -> path + StreamRoutes map[string]*StreamRoute // id -> target + Mutex sync.Mutex } var routes = Routes{ HTTPRoutes: make(map[string][]HTTPRoute), StreamRoutes: make(map[string]*StreamRoute), + Mutex: sync.Mutex{}, } -var routesMutex = sync.Mutex{} var streamSchemes = []string{"tcp", "udp"} // TODO: support "tcp:udp", "udp:tcp" var httpSchemes = []string{"http", "https"} var validSchemes = append(streamSchemes, httpSchemes...) -var lastFreePort int - - func isValidScheme(scheme string) bool { for _, v := range validSchemes { if v == scheme { @@ -45,36 +42,24 @@ func isStreamScheme(scheme string) bool { return false } -func initProxyMaps() { - routesMutex.Lock() - defer routesMutex.Unlock() +func initRoutes() { + routes.Mutex.Lock() + defer routes.Mutex.Unlock() - lastFreePort = 20000 - oldStreamRoutes := routes.StreamRoutes + utils.resetPortsInUse() routes.StreamRoutes = make(map[string]*StreamRoute) routes.HTTPRoutes = make(map[string][]HTTPRoute) - - var wg sync.WaitGroup - wg.Add(len(oldStreamRoutes)) - defer wg.Wait() - - for _, route := range oldStreamRoutes { - go func(r *StreamRoute) { - r.Cancel() - wg.Done() - }(route) - } } -func countProxies() int { +func countRoutes() int { return len(routes.HTTPRoutes) + len(routes.StreamRoutes) } -func createProxy(config ProxyConfig) { +func createRoute(config *ProxyConfig) { if isStreamScheme(config.Scheme) { - _, inMap := routes.StreamRoutes[config.Port] + _, inMap := routes.StreamRoutes[config.id] if inMap { - log.Printf("[Build] Duplicated stream :%s, ignoring", config.Port) + log.Printf("[Build] Duplicated stream %s, ignoring", config.id) return } route, err := NewStreamRoute(config) @@ -82,9 +67,11 @@ func createProxy(config ProxyConfig) { log.Println(err) return } - routes.StreamRoutes[config.Port] = route - go route.listenStream() + routes.Mutex.Lock() + routes.StreamRoutes[config.id] = route + routes.Mutex.Unlock() } else { + routes.Mutex.Lock() _, inMap := routes.HTTPRoutes[config.Alias] if !inMap { routes.HTTPRoutes[config.Alias] = make([]HTTPRoute, 0) @@ -94,27 +81,6 @@ func createProxy(config ProxyConfig) { log.Fatal(err) } routes.HTTPRoutes[config.Alias] = append(routes.HTTPRoutes[config.Alias], NewHTTPRoute(url, config.Path)) + routes.Mutex.Unlock() } } - -func findFreePort() (int, error) { - var portStr string - var l net.Listener - var err error = nil - - for lastFreePort <= 21000 { - portStr = fmt.Sprintf(":%d", lastFreePort) - l, err = net.Listen("tcp", portStr) - lastFreePort++ - if err != nil { - l.Close() - return lastFreePort, nil - } - } - l, err = net.Listen("tcp", ":0") - if err != nil { - return -1, fmt.Errorf("unable to find free port: %v", err) - } - // NOTE: may not be after 20000 - return l.Addr().(*net.TCPAddr).Port, nil -} diff --git a/src/go-proxy/stream.go b/src/go-proxy/stream.go index 14c2dc1..e9b17d1 100644 --- a/src/go-proxy/stream.go +++ b/src/go-proxy/stream.go @@ -33,6 +33,7 @@ var imageNamePortMap = map[string]string{ "mssql": "1433", "memcached": "11211", "rabbitmq": "5672", + "mongo": "27017", } var extraNamePortMap = map[string]string{ "dns": "53", @@ -56,9 +57,7 @@ var namePortMap = func() map[string]string { const UDPStreamType = "udp" const TCPStreamType = "tcp" -func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) { - port_split := strings.Split(config.Port, ":") - +func NewStreamRoute(config *ProxyConfig) (*StreamRoute, error) { var streamType string = TCPStreamType var srcPort string var dstPort string @@ -67,21 +66,13 @@ func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) { var srcUDPAddr *net.UDPAddr = nil var dstUDPAddr *net.UDPAddr = nil + port_split := strings.Split(config.Port, ":") if len(port_split) != 2 { - warnMsg := fmt.Sprintf(`[Build] Invalid stream port %s, `+ - `should be :`, config.Port) - freePort, err := findFreePort() - if err != nil { - return nil, fmt.Errorf("%s and %s", warnMsg, err) - } - srcPort = fmt.Sprintf("%d", freePort) + log.Printf(`[Build] Invalid stream port %s, `+ + `should be :, `+ + `assuming it is targetPort`, config.Port) + srcPort = "0" dstPort = config.Port - fmt.Printf(`%s, assuming %s is targetPort and `+ - `using free port %s as listeningPort`, - warnMsg, - srcPort, - dstPort, - ) } else { srcPort = port_split[0] dstPort = port_split[1] @@ -94,8 +85,8 @@ func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) { _, err := strconv.Atoi(dstPort) if err != nil { return nil, fmt.Errorf( - "[Build] Unrecognized stream target port %s, ignoring", - dstPort, + "[Build] %s: Unrecognized stream target port %s, ignoring", + config.Alias, dstPort, ) } @@ -125,6 +116,12 @@ func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) { } } + lsPort, err := strconv.Atoi(srcPort) + if err != nil { + return nil, err + } + utils.markPortInUse(lsPort) + ctx, cancel := context.WithCancel(context.Background()) route := StreamRoute{ @@ -140,28 +137,29 @@ func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) { Cancel: cancel, } - if streamType == UDPStreamType { - return (*StreamRoute)(unsafe.Pointer(&UDPRoute{ - StreamRoute: route, - ConnMap: make(map[net.Addr]*net.UDPConn), - ConnMapMutex: sync.Mutex{}, - QueueSize: atomic.Int32{}, - SourceUDPAddr: srcUDPAddr, - TargetUDPAddr: dstUDPAddr, - })), nil + if streamType == TCPStreamType { + return &route, nil } - return &route, nil + + return (*StreamRoute)(unsafe.Pointer(&UDPRoute{ + StreamRoute: route, + ConnMap: make(map[net.Addr]*net.UDPConn), + ConnMapMutex: sync.Mutex{}, + QueueSize: atomic.Int32{}, + SourceUDPAddr: srcUDPAddr, + TargetUDPAddr: dstUDPAddr, + })), nil } func (route *StreamRoute) PrintError(err error) { if err == nil { return } - log.Printf("[Stream] %s => %s error: %v", route.ListeningUrl(), route.TargetUrl(), err) + log.Printf("[Stream] %s (%s => %s) error: %v", route.Alias, route.ListeningUrl(), route.TargetUrl(), err) } func (route *StreamRoute) ListeningUrl() string { - return fmt.Sprintf("%s://:%s", route.ListeningScheme, route.ListeningPort) + return fmt.Sprintf("%s:%s", route.ListeningScheme, route.ListeningPort) } func (route *StreamRoute) TargetUrl() string { @@ -169,9 +167,40 @@ func (route *StreamRoute) TargetUrl() string { } func (route *StreamRoute) listenStream() { + if route.ListeningPort == "0" { + freePort, err := utils.findFreePort(20000) + if err != nil { + route.PrintError(err) + return + } + route.ListeningPort = fmt.Sprintf("%d", freePort) + utils.markPortInUse(freePort) + } if route.Type == UDPStreamType { listenUDP((*UDPRoute)(unsafe.Pointer(route))) } else { listenTCP(route) } -} \ No newline at end of file +} + +func beginListenStreams() { + for _, route := range routes.StreamRoutes { + go route.listenStream() + } +} + +func endListenStreams() { + var wg sync.WaitGroup + wg.Add(len(routes.StreamRoutes)) + defer wg.Wait() + + routes.Mutex.Lock() + defer routes.Mutex.Unlock() + + for _, route := range routes.StreamRoutes { + go func(r *StreamRoute) { + r.Cancel() + wg.Done() + }(route) + } +} diff --git a/src/go-proxy/udp.go b/src/go-proxy/udp.go index d4df6cd..a07d737 100644 --- a/src/go-proxy/udp.go +++ b/src/go-proxy/udp.go @@ -69,10 +69,10 @@ func udpLoop(route *UDPRoute, in *net.UDPConn, out *net.UDPConn, buffer []byte, defer route.QueueSize.Add(-1) defer wg.Done() - in.SetReadDeadline(time.Now().Add(udpListenTimeout)) - var nRead int var nWritten int + + in.SetReadDeadline(time.Now().Add(udpListenTimeout)) nRead, srcAddr, err := in.ReadFromUDP(buffer) if err != nil { diff --git a/src/go-proxy/utils.go b/src/go-proxy/utils.go new file mode 100644 index 0000000..fef734d --- /dev/null +++ b/src/go-proxy/utils.go @@ -0,0 +1,78 @@ +package main + +import ( + "fmt" + "net" + "net/http" + "sync" + "time" +) + +type Utils struct { + PortsInUse map[int]bool + portsInUseMutex sync.Mutex +} +var utils = &Utils{ + PortsInUse: make(map[int]bool), + portsInUseMutex: sync.Mutex{}, +} + +func (u *Utils) findFreePort(startingPort int) (int, error) { + for port := startingPort; port <= startingPort+100 && port <= 65535; port++ { + if u.PortsInUse[port] { + continue + } + addr := fmt.Sprintf(":%d", port) + l, err := net.Listen("tcp", addr) + if err == nil { + l.Close() + return port, nil + } + } + l, err := net.Listen("tcp", ":0") + if err == nil { + l.Close() + // NOTE: may not be after 20000 + return l.Addr().(*net.TCPAddr).Port, nil + } + return -1, fmt.Errorf("unable to find free port: %v", err) +} + +func (u *Utils) resetPortsInUse() { + u.portsInUseMutex.Lock() + defer u.portsInUseMutex.Unlock() + for port := range u.PortsInUse { + u.PortsInUse[port] = false + } +} + +func (u* Utils) markPortInUse(port int) { + u.portsInUseMutex.Lock() + defer u.portsInUseMutex.Unlock() + u.PortsInUse[port] = true +} + +func (*Utils) healthCheckHttp(targetUrl string) error { + // try HEAD first + // if HEAD is not allowed, try GET + resp, err := healthCheckHttpClient.Head(targetUrl) + if resp != nil { + defer resp.Body.Close() + } + if err != nil && resp != nil && resp.StatusCode == http.StatusMethodNotAllowed { + _, err = healthCheckHttpClient.Get(targetUrl) + } + if resp != nil { + defer resp.Body.Close() + } + return err +} + +func (*Utils) healthCheckStream(scheme string, host string) error { + conn, err := net.DialTimeout(scheme, host, 5*time.Second) + if err != nil { + return err + } + defer conn.Close() + return nil +}