fixed findFreePort nil reference or conflict, made buildRoutes goroutine

This commit is contained in:
yusing 2024-03-02 22:56:22 +00:00
parent 2d14b11655
commit 0d9c6e72cc
11 changed files with 186 additions and 126 deletions

View file

@ -11,7 +11,7 @@ In the examples domain `x.y.z` is used, replace them with your domain
- [How to use](#how-to-use)
- [Configuration](#configuration)
- [Single Port Configuration](#single-port-configuration-example)
- [Multiple Configuration](#multiple-configuration-example)
- [Multiple Ports Configuration](#multiple-ports-configuration-example)
- [TCP/UDP Configuration](#tcpudp-configuration-example)
- [Troubleshooting](#troubleshooting)
- [Benchmarks](#benchmarks)
@ -99,7 +99,7 @@ whoami:
# 2. visit https://apps.y.z/whoami
```
### Multiple configuration example
### Multiple ports configuration example
```yaml
minio:
@ -198,7 +198,7 @@ It takes ~ 0.1-0.4MB for each HTTP Proxy, and <2MB for each TCP/UDP Proxy
1. [Install go](https://go.dev/doc/install) if not already
2. Get dependencies with `go get`
2. get dependencies with `sh scripts/get.sh`
3. build binary with `sh scripts/build.sh`

Binary file not shown.

2
scripts/get.sh Normal file
View file

@ -0,0 +1,2 @@
#!/bin/sh
go get -d -u ./src/go-proxy

View file

@ -7,6 +7,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
@ -17,6 +18,7 @@ import (
)
type ProxyConfig struct {
id string
Alias string
Scheme string
Host string
@ -28,10 +30,15 @@ func NewProxyConfig() ProxyConfig {
return ProxyConfig{}
}
func (cfg *ProxyConfig) UpdateId() {
cfg.id = fmt.Sprintf("%s-%s-%s-%s-%s", cfg.Alias, cfg.Scheme, cfg.Host, cfg.Port, cfg.Path)
}
var dockerClient *client.Client
func buildContainerRoute(container types.Container) {
var aliases []string
var wg sync.WaitGroup
container_name := strings.TrimPrefix(container.Names[0], "/")
aliases_label, ok := container.Labels["proxy.aliases"]
@ -101,12 +108,19 @@ func buildContainerRoute(container types.Container) {
}
}
config.Alias = alias
createProxy(config)
config.UpdateId()
wg.Add(1)
go func() {
createRoute(&config)
wg.Done()
}()
}
wg.Wait()
}
func buildRoutes() {
initProxyMaps()
initRoutes()
containerSlice, err := dockerClient.ContainerList(context.Background(), container.ListOptions{})
if err != nil {
log.Fatal(err)

View file

@ -1,32 +0,0 @@
package main
import (
"net"
"net/http"
"time"
)
func healthCheckHttp(targetUrl string) error {
// try HEAD first
// if HEAD is not allowed, try GET
resp, err := healthCheckHttpClient.Head(targetUrl)
if resp != nil {
defer resp.Body.Close()
}
if err != nil && resp != nil && resp.StatusCode == http.StatusMethodNotAllowed {
_, err = healthCheckHttpClient.Get(targetUrl)
}
if resp != nil {
defer resp.Body.Close()
}
return err
}
func healthCheckStream(scheme string, host string) error {
conn, err := net.DialTimeout(scheme, host, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
return nil
}

View file

@ -20,7 +20,8 @@ func main() {
log.Fatal(err)
}
buildRoutes()
log.Printf("[Build] built %v reverse proxies", countProxies())
log.Printf("[Build] built %v reverse proxies", countRoutes())
beginListenStreams()
go func() {
filter := filters.NewArgs(
@ -34,8 +35,10 @@ func main() {
for msg := range msgs {
// TODO: handle actor only
log.Printf("[Event] %s %s caused rebuild", msg.Action, msg.Actor.Attributes["name"])
endListenStreams()
buildRoutes()
log.Printf("[Build] rebuilt %v reverse proxies", countProxies())
log.Printf("[Build] rebuilt %v reverse proxies", countRoutes())
beginListenStreams()
}
}()

View file

@ -51,7 +51,7 @@ func panelIndex(w http.ResponseWriter, r *http.Request) {
return
}
err = tmpl.Execute(w, routes)
err = tmpl.Execute(w, &routes)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
@ -79,9 +79,9 @@ func panelCheckTargetHealth(w http.ResponseWriter, r *http.Request) {
scheme := url.Scheme
if isStreamScheme(scheme) {
err = healthCheckStream(scheme, url.Host)
err = utils.healthCheckStream(scheme, url.Host)
} else {
err = healthCheckHttp(targetUrl)
err = utils.healthCheckHttp(targetUrl)
}
if err != nil {

View file

@ -3,30 +3,27 @@ package main
import (
"fmt"
"log"
"net"
"net/url"
"sync"
)
type Routes struct {
HTTPRoutes map[string][]HTTPRoute // subdomain/alias -> path
StreamRoutes map[string]*StreamRoute // port -> target
HTTPRoutes map[string][]HTTPRoute // id -> path
StreamRoutes map[string]*StreamRoute // id -> target
Mutex sync.Mutex
}
var routes = Routes{
HTTPRoutes: make(map[string][]HTTPRoute),
StreamRoutes: make(map[string]*StreamRoute),
Mutex: sync.Mutex{},
}
var routesMutex = sync.Mutex{}
var streamSchemes = []string{"tcp", "udp"} // TODO: support "tcp:udp", "udp:tcp"
var httpSchemes = []string{"http", "https"}
var validSchemes = append(streamSchemes, httpSchemes...)
var lastFreePort int
func isValidScheme(scheme string) bool {
for _, v := range validSchemes {
if v == scheme {
@ -45,36 +42,24 @@ func isStreamScheme(scheme string) bool {
return false
}
func initProxyMaps() {
routesMutex.Lock()
defer routesMutex.Unlock()
func initRoutes() {
routes.Mutex.Lock()
defer routes.Mutex.Unlock()
lastFreePort = 20000
oldStreamRoutes := routes.StreamRoutes
utils.resetPortsInUse()
routes.StreamRoutes = make(map[string]*StreamRoute)
routes.HTTPRoutes = make(map[string][]HTTPRoute)
var wg sync.WaitGroup
wg.Add(len(oldStreamRoutes))
defer wg.Wait()
for _, route := range oldStreamRoutes {
go func(r *StreamRoute) {
r.Cancel()
wg.Done()
}(route)
}
}
func countProxies() int {
func countRoutes() int {
return len(routes.HTTPRoutes) + len(routes.StreamRoutes)
}
func createProxy(config ProxyConfig) {
func createRoute(config *ProxyConfig) {
if isStreamScheme(config.Scheme) {
_, inMap := routes.StreamRoutes[config.Port]
_, inMap := routes.StreamRoutes[config.id]
if inMap {
log.Printf("[Build] Duplicated stream :%s, ignoring", config.Port)
log.Printf("[Build] Duplicated stream %s, ignoring", config.id)
return
}
route, err := NewStreamRoute(config)
@ -82,9 +67,11 @@ func createProxy(config ProxyConfig) {
log.Println(err)
return
}
routes.StreamRoutes[config.Port] = route
go route.listenStream()
routes.Mutex.Lock()
routes.StreamRoutes[config.id] = route
routes.Mutex.Unlock()
} else {
routes.Mutex.Lock()
_, inMap := routes.HTTPRoutes[config.Alias]
if !inMap {
routes.HTTPRoutes[config.Alias] = make([]HTTPRoute, 0)
@ -94,27 +81,6 @@ func createProxy(config ProxyConfig) {
log.Fatal(err)
}
routes.HTTPRoutes[config.Alias] = append(routes.HTTPRoutes[config.Alias], NewHTTPRoute(url, config.Path))
routes.Mutex.Unlock()
}
}
func findFreePort() (int, error) {
var portStr string
var l net.Listener
var err error = nil
for lastFreePort <= 21000 {
portStr = fmt.Sprintf(":%d", lastFreePort)
l, err = net.Listen("tcp", portStr)
lastFreePort++
if err != nil {
l.Close()
return lastFreePort, nil
}
}
l, err = net.Listen("tcp", ":0")
if err != nil {
return -1, fmt.Errorf("unable to find free port: %v", err)
}
// NOTE: may not be after 20000
return l.Addr().(*net.TCPAddr).Port, nil
}

View file

@ -33,6 +33,7 @@ var imageNamePortMap = map[string]string{
"mssql": "1433",
"memcached": "11211",
"rabbitmq": "5672",
"mongo": "27017",
}
var extraNamePortMap = map[string]string{
"dns": "53",
@ -56,9 +57,7 @@ var namePortMap = func() map[string]string {
const UDPStreamType = "udp"
const TCPStreamType = "tcp"
func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) {
port_split := strings.Split(config.Port, ":")
func NewStreamRoute(config *ProxyConfig) (*StreamRoute, error) {
var streamType string = TCPStreamType
var srcPort string
var dstPort string
@ -67,21 +66,13 @@ func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) {
var srcUDPAddr *net.UDPAddr = nil
var dstUDPAddr *net.UDPAddr = nil
port_split := strings.Split(config.Port, ":")
if len(port_split) != 2 {
warnMsg := fmt.Sprintf(`[Build] Invalid stream port %s, `+
`should be <listeningPort>:<targetPort>`, config.Port)
freePort, err := findFreePort()
if err != nil {
return nil, fmt.Errorf("%s and %s", warnMsg, err)
}
srcPort = fmt.Sprintf("%d", freePort)
log.Printf(`[Build] Invalid stream port %s, `+
`should be <listeningPort>:<targetPort>, `+
`assuming it is targetPort`, config.Port)
srcPort = "0"
dstPort = config.Port
fmt.Printf(`%s, assuming %s is targetPort and `+
`using free port %s as listeningPort`,
warnMsg,
srcPort,
dstPort,
)
} else {
srcPort = port_split[0]
dstPort = port_split[1]
@ -94,8 +85,8 @@ func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) {
_, err := strconv.Atoi(dstPort)
if err != nil {
return nil, fmt.Errorf(
"[Build] Unrecognized stream target port %s, ignoring",
dstPort,
"[Build] %s: Unrecognized stream target port %s, ignoring",
config.Alias, dstPort,
)
}
@ -125,6 +116,12 @@ func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) {
}
}
lsPort, err := strconv.Atoi(srcPort)
if err != nil {
return nil, err
}
utils.markPortInUse(lsPort)
ctx, cancel := context.WithCancel(context.Background())
route := StreamRoute{
@ -140,28 +137,29 @@ func NewStreamRoute(config ProxyConfig) (*StreamRoute, error) {
Cancel: cancel,
}
if streamType == UDPStreamType {
return (*StreamRoute)(unsafe.Pointer(&UDPRoute{
StreamRoute: route,
ConnMap: make(map[net.Addr]*net.UDPConn),
ConnMapMutex: sync.Mutex{},
QueueSize: atomic.Int32{},
SourceUDPAddr: srcUDPAddr,
TargetUDPAddr: dstUDPAddr,
})), nil
if streamType == TCPStreamType {
return &route, nil
}
return &route, nil
return (*StreamRoute)(unsafe.Pointer(&UDPRoute{
StreamRoute: route,
ConnMap: make(map[net.Addr]*net.UDPConn),
ConnMapMutex: sync.Mutex{},
QueueSize: atomic.Int32{},
SourceUDPAddr: srcUDPAddr,
TargetUDPAddr: dstUDPAddr,
})), nil
}
func (route *StreamRoute) PrintError(err error) {
if err == nil {
return
}
log.Printf("[Stream] %s => %s error: %v", route.ListeningUrl(), route.TargetUrl(), err)
log.Printf("[Stream] %s (%s => %s) error: %v", route.Alias, route.ListeningUrl(), route.TargetUrl(), err)
}
func (route *StreamRoute) ListeningUrl() string {
return fmt.Sprintf("%s://:%s", route.ListeningScheme, route.ListeningPort)
return fmt.Sprintf("%s:%s", route.ListeningScheme, route.ListeningPort)
}
func (route *StreamRoute) TargetUrl() string {
@ -169,9 +167,40 @@ func (route *StreamRoute) TargetUrl() string {
}
func (route *StreamRoute) listenStream() {
if route.ListeningPort == "0" {
freePort, err := utils.findFreePort(20000)
if err != nil {
route.PrintError(err)
return
}
route.ListeningPort = fmt.Sprintf("%d", freePort)
utils.markPortInUse(freePort)
}
if route.Type == UDPStreamType {
listenUDP((*UDPRoute)(unsafe.Pointer(route)))
} else {
listenTCP(route)
}
}
}
func beginListenStreams() {
for _, route := range routes.StreamRoutes {
go route.listenStream()
}
}
func endListenStreams() {
var wg sync.WaitGroup
wg.Add(len(routes.StreamRoutes))
defer wg.Wait()
routes.Mutex.Lock()
defer routes.Mutex.Unlock()
for _, route := range routes.StreamRoutes {
go func(r *StreamRoute) {
r.Cancel()
wg.Done()
}(route)
}
}

View file

@ -69,10 +69,10 @@ func udpLoop(route *UDPRoute, in *net.UDPConn, out *net.UDPConn, buffer []byte,
defer route.QueueSize.Add(-1)
defer wg.Done()
in.SetReadDeadline(time.Now().Add(udpListenTimeout))
var nRead int
var nWritten int
in.SetReadDeadline(time.Now().Add(udpListenTimeout))
nRead, srcAddr, err := in.ReadFromUDP(buffer)
if err != nil {

78
src/go-proxy/utils.go Normal file
View file

@ -0,0 +1,78 @@
package main
import (
"fmt"
"net"
"net/http"
"sync"
"time"
)
type Utils struct {
PortsInUse map[int]bool
portsInUseMutex sync.Mutex
}
var utils = &Utils{
PortsInUse: make(map[int]bool),
portsInUseMutex: sync.Mutex{},
}
func (u *Utils) findFreePort(startingPort int) (int, error) {
for port := startingPort; port <= startingPort+100 && port <= 65535; port++ {
if u.PortsInUse[port] {
continue
}
addr := fmt.Sprintf(":%d", port)
l, err := net.Listen("tcp", addr)
if err == nil {
l.Close()
return port, nil
}
}
l, err := net.Listen("tcp", ":0")
if err == nil {
l.Close()
// NOTE: may not be after 20000
return l.Addr().(*net.TCPAddr).Port, nil
}
return -1, fmt.Errorf("unable to find free port: %v", err)
}
func (u *Utils) resetPortsInUse() {
u.portsInUseMutex.Lock()
defer u.portsInUseMutex.Unlock()
for port := range u.PortsInUse {
u.PortsInUse[port] = false
}
}
func (u* Utils) markPortInUse(port int) {
u.portsInUseMutex.Lock()
defer u.portsInUseMutex.Unlock()
u.PortsInUse[port] = true
}
func (*Utils) healthCheckHttp(targetUrl string) error {
// try HEAD first
// if HEAD is not allowed, try GET
resp, err := healthCheckHttpClient.Head(targetUrl)
if resp != nil {
defer resp.Body.Close()
}
if err != nil && resp != nil && resp.StatusCode == http.StatusMethodNotAllowed {
_, err = healthCheckHttpClient.Get(targetUrl)
}
if resp != nil {
defer resp.Body.Close()
}
return err
}
func (*Utils) healthCheckStream(scheme string, host string) error {
conn, err := net.DialTimeout(scheme, host, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
return nil
}