diff --git a/Makefile b/Makefile index d760dd1..0be5a83 100755 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ all: build quick-restart logs build: mkdir -p bin - CGO_ENABLED=0 GOOS=linux go build -o bin/go-proxy src/go-proxy/*.go + CGO_ENABLED=0 GOOS=linux go build -pgo=auto -o bin/go-proxy src/go-proxy/*.go up: docker compose up -d --build go-proxy diff --git a/README.md b/README.md index d404fc1..f3f8b29 100755 --- a/README.md +++ b/README.md @@ -1,14 +1,15 @@ # go-proxy -A simple auto docker reverse proxy for home use. *Written in **Go*** +A simple auto docker reverse proxy for home use. \*Written in **Go\*** In the examples domain `x.y.z` is used, replace them with your domain ## Table of content -- [Features](#features) -- [Why am I making this](#why-am-i-making-this) +- [Key Points](#key-points) - [How to use](#how-to-use) + - [Binary] (#binary) + - [Docker] (#docker) - [Configuration](#configuration) - [Single Port Configuration](#single-port-configuration-example) - [Multiple Ports Configuration](#multiple-ports-configuration-example) @@ -20,8 +21,9 @@ In the examples domain `x.y.z` is used, replace them with your domain - [Build it yourself](#build-it-yourself) - [Getting SSL certs](#getting-ssl-certs) -## Features +## Key Points +- fast, nearly no performance penalty for end users when comparing to direct IP connections (See [benchmarks](#benchmarks)) - auto detect reverse proxies from docker - additional reverse proxies from provider yaml file - allow multiple docker / file providers by custom `config.yml` file @@ -29,30 +31,38 @@ In the examples domain `x.y.z` is used, replace them with your domain - path matching - HTTP proxy - TCP/UDP Proxy -- HTTP round robin load balance support (same subdomain and path across containers replicas) -- Auto hot-reload when container start / die / stop. +- HTTP round robin load balance support (same subdomain and path across different hosts) +- Auto hot-reload on container start / die / stop or config changes. - Simple panel to see all reverse proxies and health (visit port [panel port] of go-proxy `https://*.y.z:[panel port]`) - ![panel screenshot](screenshots/panel.png) + ![panel screenshot](screenshots/panel.png) -## Why am I making this +## How to use (docker) -1. It's fun. -2. I have tried different reverse proxy services, i.e. [nginx proxy manager](https://nginxproxymanager.com/), [traefik](https://github.com/traefik/traefik), [nginx-proxy](https://github.com/nginx-proxy/nginx-proxy). I have found that `traefik` is not easy to use, and I don't want to click buttons every time I spin up a new container (`nginx proxy manager`). For `nginx-proxy` I found it buggy and quite unusable. +1. Download and extract the latest release (or clone the repository if you want to try out experimental features) +2. Copy `config.example.yml` to `config.yml` and modify the content to fit your needs +3. Do the same for `providers.example.yml` +4. See [Binary](#binary) or [docker](#docker) -## How to use +### Binary + 1. (Optional) Prepare your certificates in `certs/` to enable https. See [Getting SSL Certs](#getting-ssl-certs) + - cert / chain / fullchain: ./certs/cert.crt + - private key: ./certs/priv.key + 2. run the binary `bin/go-proxy` + 3. enjoy -1. Download and extract the latest release +### Docker + 1. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml` -2. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml` + 2. Add networks to make sure it is in the same network with other containers, or make sure `proxy..host` is reachable -3. Add networks to make sure it is in the same network with other containers, or make sure `proxy..host` is reachable + 3. (Optional) Mount your SSL certs to enable https. See [Getting SSL Certs](#getting-ssl-certs) + - cert / chain / fullchain -> /app/certs/cert.crt + - private key -> /app/certs/priv.key -4. (Optional) Mount your SSL certs. See [Getting SSL Certs](#getting-ssl-certs) + 4. Start `go-proxy` with `docker compose up -d` or `make up`. -5. Start `go-proxy` with `docker compose up -d` or `make up`. - -6. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy` + 5. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy` In case the network of your container is in subnet `172.16.0.0/16` (bridge), and vpn network is under `100.64.0.0/10` (i.e. tailscale) @@ -63,9 +73,9 @@ In the examples domain `x.y.z` is used, replace them with your domain `docker network inspect $(docker network ls | awk '$3 == "bridge" { print $1}') | jq -r '.[] | .Name + " " + .IPAM.Config[0].Subnet' -` -7. start your docker app, and visit .y.z + 6. start your docker app, and visit .y.z -8. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies + 7. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies ## Known issues @@ -88,9 +98,12 @@ However, there are some labels you can manipulate with: - tcp/udp: is in format of `[:]` - when `listeningPort` is omitted (not suggested), a free port will be used automatically. - `targetPort` must be a number, or the predefined names (see [stream.go](src/go-proxy/stream.go#L28)) +- `no_tls_verify`: whether skip tls verify when scheme is https + - defaults to false - `proxy..path`: path matching (for http proxy only) - defaults to empty - `proxy..path_mode`: mode for path handling + - defaults to empty - allowed: \, forward, sub - empty: remove path prefix from URL when proxying @@ -153,7 +166,7 @@ app-db: - proxy.app-db.scheme=tcp # Optional (first free port will be used for listening port) - - proxy.app-db.port=20000:postgres + - proxy.app-db.port=20000:postgres # In go-proxy go-proxy: @@ -189,43 +202,84 @@ A: Make sure the container is running, and \ matches any container na Benchmarked with `wrk` connecting `traefik/whoami`'s `/bench` endpoint -Direct connection +Remote benchmark (client running wrk and `go-proxy` server are different devices) -```shell -% wrk -t20 -c100 -d10s --latency http://homelab:4999/bench -Running 10s test @ http://homelab:4999/bench - 20 threads and 100 connections - Thread Stats Avg Stdev Max +/- Stdev - Latency 3.74ms 1.19ms 19.94ms 81.53% - Req/Sec 1.35k 103.96 1.60k 73.60% - Latency Distribution - 50% 3.46ms - 75% 4.16ms - 90% 4.98ms - 99% 8.04ms - 269696 requests in 10.01s, 32.41MB read -Requests/sec: 26950.35 -Transfer/sec: 3.24MB -``` +- Direct connection -With **go-proxy** reverse proxy + ```shell + root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://10.0.100.1/bench + Running 30s test @ http://10.0.100.1/bench + 10 threads and 200 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 4.34ms 1.16ms 22.76ms 85.77% + Req/Sec 4.63k 435.14 5.47k 90.07% + Latency Distribution + 50% 3.95ms + 75% 4.71ms + 90% 5.68ms + 99% 8.61ms + 1383812 requests in 30.02s, 166.28MB read + Requests/sec: 46100.87 + Transfer/sec: 5.54MB + ``` -```shell -% wrk -t20 -c100 -d10s --latency https://whoami.mydomain.com/bench -Running 10s test @ https://whoami.6uo.me/bench - 20 threads and 100 connections - Thread Stats Avg Stdev Max +/- Stdev - Latency 4.02ms 2.13ms 47.49ms 95.14% - Req/Sec 1.28k 139.15 1.47k 91.67% - Latency Distribution - 50% 3.60ms - 75% 4.36ms - 90% 5.29ms - 99% 8.83ms - 253874 requests in 10.02s, 24.70MB read -Requests/sec: 25342.46 -Transfer/sec: 2.47MB -``` +- With reverse proxy + + ```shell + root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://bench.6uo.me/bench + Running 30s test @ http://bench.6uo.me/bench + 10 threads and 200 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 4.50ms 1.44ms 27.53ms 86.48% + Req/Sec 4.48k 375.00 5.12k 84.73% + Latency Distribution + 50% 4.09ms + 75% 5.06ms + 90% 6.03ms + 99% 9.41ms + 1338996 requests in 30.01s, 160.90MB read + Requests/sec: 44616.36 + Transfer/sec: 5.36MB + ``` + +Local benchmark (client running wrk and `go-proxy` server are under same proxmox host but different LXCs) + +- Direct connection + + ``` + root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://10.0.100.1/bench + Running 10s test @ http://10.0.100.1/bench + 10 threads and 200 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 434.08us 539.35us 8.76ms 85.28% + Req/Sec 67.71k 6.31k 87.21k 71.20% + Latency Distribution + 50% 153.00us + 75% 646.00us + 90% 1.18ms + 99% 2.38ms + 6739591 requests in 10.01s, 809.85MB read + Requests/sec: 673608.15 + Transfer/sec: 80.94MB + ``` + +- With reverse proxy + ``` + root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench + Running 10s test @ http://bench.6uo.me/bench + 10 threads and 200 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 1.78ms 5.49ms 117.53ms 99.00% + Req/Sec 16.31k 2.30k 21.01k 86.69% + Latency Distribution + 50% 1.12ms + 75% 1.88ms + 90% 2.80ms + 99% 7.27ms + 1634774 requests in 10.10s, 196.44MB read + Requests/sec: 161858.70 + Transfer/sec: 19.45MB + ``` ## Memory usage diff --git a/bin/go-proxy b/bin/go-proxy index d52be93..bb48d19 100755 Binary files a/bin/go-proxy and b/bin/go-proxy differ diff --git a/compose.example.yml b/compose.example.yml index dd0eb26..be3284a 100755 --- a/compose.example.yml +++ b/compose.example.yml @@ -7,8 +7,8 @@ services: networks: # ^also add here - default # environment: - # - VERBOSITY=1 # LOG LEVEL (optional, defaults to 1) - # - DEBUG=1 # (optional, enable only for debug) + # - GOPROXY_DEBUG=1 # (optional, enable only for debug) + # - GOPROXY_REDIRECT_HTTP=0 # (optional, uncomment to disable http redirect (http -> https)) ports: - 80:80 # http # - 443:443 # optional, https diff --git a/entrypoint.sh b/entrypoint.sh index f82d652..eee77eb 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -3,13 +3,9 @@ if [ "$1" == "restart" ]; then echo "restarting" killall go-proxy fi -if [ -z "$VERBOSITY" ]; then - VERBOSITY=1 -fi -echo "starting with verbosity $VERBOSITY" > log/go-proxy.log if [ "$DEBUG" == "1" ]; then - /app/go-proxy -v=$VERBOSITY --log_dir=log --stderrthreshold=0 2>> log/go-proxy.log & + /app/go-proxy 2> log/go-proxy.log & tail -f /dev/null else - /app/go-proxy -v=$VERBOSITY --logtostderr=1 + /app/go-proxy fi \ No newline at end of file diff --git a/go.mod b/go.mod index 6f06e9a..344ac56 100755 --- a/go.mod +++ b/go.mod @@ -3,20 +3,18 @@ module github.com/yusing/go-proxy go 1.21.7 require ( + github.com/docker/cli v25.0.4+incompatible github.com/docker/docker v25.0.4+incompatible github.com/fsnotify/fsnotify v1.7.0 - github.com/golang/glog v1.2.0 + github.com/sirupsen/logrus v1.9.3 golang.org/x/net v0.22.0 gopkg.in/yaml.v3 v3.0.1 ) -require github.com/sirupsen/logrus v1.9.3 // indirect - require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/containerd/log v0.1.0 // indirect github.com/distribution/reference v0.5.0 // indirect - github.com/docker/cli v25.0.4+incompatible github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -36,6 +34,7 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/mod v0.16.0 // indirect golang.org/x/sys v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.19.0 // indirect gotest.tools/v3 v3.5.1 // indirect diff --git a/go.sum b/go.sum index 54b5ba1..9a9fd82 100755 --- a/go.sum +++ b/go.sum @@ -30,8 +30,6 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= -github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= diff --git a/providers.example.yml b/providers.example.yml index 13a125e..12d410e 100644 --- a/providers.example.yml +++ b/providers.example.yml @@ -9,3 +9,5 @@ app: # alias path: # optional path_mode: + # optional + notlsverify: false \ No newline at end of file diff --git a/src/go-proxy/config.go b/src/go-proxy/config.go index a2cf52f..33f5ddc 100644 --- a/src/go-proxy/config.go +++ b/src/go-proxy/config.go @@ -3,73 +3,95 @@ package main import ( "fmt" "os" + "sync" - "github.com/fsnotify/fsnotify" - "github.com/golang/glog" "gopkg.in/yaml.v3" ) -type Config struct { - Providers map[string]*Provider `yaml:",flow"` +// commented out if unused +type Config interface { + // Load() error + MustLoad() + // MustReload() + // Reload() error + StartProviders() + StopProviders() + WatchChanges() + StopWatching() } -var config *Config +func NewConfig() Config { + cfg := &config{} + cfg.watcher = NewFileWatcher( + configPath, + cfg.MustReload, // OnChange + func() { os.Exit(1) }, // OnDelete + ) + return cfg +} + +func (cfg *config) Load() error { + cfg.mutex.Lock() + defer cfg.mutex.Unlock() + + // unload if any + if cfg.Providers != nil { + for _, p := range cfg.Providers { + p.StopAllRoutes() + } + } + cfg.Providers = make(map[string]*Provider) -func ReadConfig() (*Config, error) { - config := Config{} data, err := os.ReadFile(configPath) - if err != nil { - return nil, fmt.Errorf("unable to read config file: %v", err) + return fmt.Errorf("unable to read config file: %v", err) } - err = yaml.Unmarshal(data, &config) - - if err != nil { - return nil, fmt.Errorf("unable to parse config file: %v", err) + if err = yaml.Unmarshal(data, &cfg); err != nil { + return fmt.Errorf("unable to parse config file: %v", err) } - for name, p := range config.Providers { + for name, p := range cfg.Providers { p.name = name } - return &config, nil + return nil } -func ListenConfigChanges() { - watcher, err := fsnotify.NewWatcher() - if err != nil { - glog.Errorf("[Config] unable to create file watcher: %v", err) +func (cfg *config) MustLoad() { + if err := cfg.Load(); err != nil { + cfgl.Fatal(err) } - defer watcher.Close() +} - if err = watcher.Add(configPath); err != nil { - glog.Errorf("[Config] unable to watch file: %v", err) - return - } +func (cfg *config) Reload() error { + return cfg.Load() +} - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - switch { - case event.Has(fsnotify.Write): - glog.Infof("[Config] file change detected") - for _, p := range config.Providers { - p.StopAllRoutes() - } - config, err = ReadConfig() - if err != nil { - glog.Fatalf("[Config] unable to read config: %v", err) - } - StartAllRoutes() - case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename): - glog.Fatalf("[Config] file renamed / deleted") - } - case err := <-watcher.Errors: - glog.Errorf("[Config] File watcher error: %s", err) - } - } -} \ No newline at end of file +func (cfg *config) MustReload() { + cfg.MustLoad() +} + +func (cfg *config) StartProviders() { + // Providers have their own mutex, no lock needed + ParallelForEachValue(cfg.Providers, (*Provider).StartAllRoutes) +} + +func (cfg *config) StopProviders() { + // Providers have their own mutex, no lock needed + ParallelForEachValue(cfg.Providers, (*Provider).StopAllRoutes) +} + +func (cfg *config) WatchChanges() { + cfg.watcher.Start() +} + +func (cfg *config) StopWatching() { + cfg.watcher.Stop() +} + +type config struct { + Providers map[string]*Provider `yaml:",flow"` + watcher Watcher + mutex sync.Mutex +} diff --git a/src/go-proxy/constants.go b/src/go-proxy/constants.go index 2b8fa9e..9000863 100644 --- a/src/go-proxy/constants.go +++ b/src/go-proxy/constants.go @@ -1,9 +1,13 @@ package main import ( + "crypto/tls" "net" "net/http" + "os" "time" + + "github.com/sirupsen/logrus" ) var ( @@ -75,6 +79,12 @@ var transport = &http.Transport{ MaxIdleConnsPerHost: 1000, } +var transportNoTLS = func() *http.Transport { + var clone = transport.Clone() + clone.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + return clone +}() + const clientUrlFromEnv = "FROM_ENV" const configPath = "config.yml" @@ -84,3 +94,13 @@ const StreamStopListenTimeout = 1 * time.Second const templateFile = "templates/panel.html" const udpBufferSize = 1500 + +var logLevel = func() logrus.Level { + switch os.Getenv("GOPROXY_DEBUG") { + case "1", "true": + logrus.SetLevel(logrus.DebugLevel) + } + return logrus.GetLevel() +}() + +var redirectHTTP = os.Getenv("GOPROXY_REDIRECT_HTTP") != "0" && os.Getenv("GOPROXY_REDIRECT_HTTP") != "false" diff --git a/src/go-proxy/docker_provider.go b/src/go-proxy/docker_provider.go index 1591fef..5f8a80e 100755 --- a/src/go-proxy/docker_provider.go +++ b/src/go-proxy/docker_provider.go @@ -5,12 +5,10 @@ import ( "net/http" "reflect" "strings" - "time" "github.com/docker/cli/cli/connhelper" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "golang.org/x/net/context" ) @@ -45,16 +43,17 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP isRemote := clientIP != "" for _, alias := range aliases { + l := p.l.WithField("container", container_name).WithField("alias", alias) config := NewProxyConfig(p) prefix := fmt.Sprintf("proxy.%s.", alias) for label, value := range container.Labels { err := p.setConfigField(&config, label, value, prefix) if err != nil { - p.Errorf("Build", "%v", err) + l.Error(err) } err = p.setConfigField(&config, label, value, wildcardPrefix) if err != nil { - p.Errorf("Build", "%v", err) + l.Error(err) } } if config.Port == "" { @@ -62,7 +61,7 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP } if config.Port == "0" { // no ports exposed or specified - p.Logf("Build", "no ports exposed for %s, ignored", container_name) + l.Info("no ports exposed, ignored") continue } if config.Scheme == "" { @@ -84,7 +83,7 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP } } if !isValidScheme(config.Scheme) { - p.Warningf("Build", "unsupported scheme: %s, using http", container_name, config.Scheme) + l.Warnf("unsupported scheme: %s, using http", config.Scheme) config.Scheme = "http" } if config.Host == "" { @@ -177,33 +176,6 @@ func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) { return cfgs, nil } -func (p *Provider) grWatchDockerChanges() { - p.stopWatching = make(chan struct{}) - - filter := filters.NewArgs( - filters.Arg("type", "container"), - filters.Arg("event", "start"), - filters.Arg("event", "die"), // 'stop' already triggering 'die' - ) - msgChan, errChan := p.dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter}) - - for { - select { - case <-p.stopWatching: - return - case msg := <-msgChan: - // TODO: handle actor only - p.Logf("Event", "container %s %s caused rebuild", msg.Actor.Attributes["name"], msg.Action) - p.StopAllRoutes() - p.StartAllRoutes() - case err := <-errChan: - p.Logf("Event", "error %s", err) - time.Sleep(100 * time.Millisecond) - msgChan, errChan = p.dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter}) - } - } -} - // var dockerUrlRegex = regexp.MustCompile(`^(?P\w+)://(?P[^:]+)(?P:\d+)?(?P/.*)?$`) func getPublicPort(p types.Port) uint16 { return p.PublicPort } diff --git a/src/go-proxy/file_provider.go b/src/go-proxy/file_provider.go index 381e536..40a8efc 100644 --- a/src/go-proxy/file_provider.go +++ b/src/go-proxy/file_provider.go @@ -3,9 +3,7 @@ package main import ( "fmt" "os" - "time" - "github.com/fsnotify/fsnotify" "gopkg.in/yaml.v3" ) @@ -39,39 +37,3 @@ func (p *Provider) getFileProxyConfigs() ([]*ProxyConfig, error) { return nil, err } } - -func (p *Provider) grWatchFileChanges() { - watcher, err := fsnotify.NewWatcher() - if err != nil { - p.Errorf("Watcher", "unable to create file watcher: %v", err) - } - defer watcher.Close() - - if err = watcher.Add(p.Value); err != nil { - p.Errorf("Watcher", "unable to watch file %q: %v", p.Value, err) - return - } - - for { - select { - case <-p.stopWatching: - return - case event, ok := <-watcher.Events: - if !ok { - return - } - switch { - case event.Has(fsnotify.Write): - p.Logf("Watcher", "file change detected") - p.StopAllRoutes() - p.StartAllRoutes() - case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename): - p.Logf("Watcher", "file renamed / deleted") - p.StopAllRoutes() - } - case err := <-watcher.Errors: - time.Sleep(100 * time.Millisecond) - p.Errorf("Watcher", "File watcher error: %s", err) - } - } -} diff --git a/src/go-proxy/functional.go b/src/go-proxy/functional.go new file mode 100644 index 0000000..771d56f --- /dev/null +++ b/src/go-proxy/functional.go @@ -0,0 +1,39 @@ +package main + +import "sync" + +func ParallelForEach[T interface{}](obj []T, do func(T)) { + var wg sync.WaitGroup + wg.Add(len(obj)) + for _, v := range obj { + go func(v T) { + do(v) + wg.Done() + }(v) + } + wg.Wait() +} + +func ParallelForEachValue[K comparable, V interface{}](obj map[K]V, do func(V)) { + var wg sync.WaitGroup + wg.Add(len(obj)) + for _, v := range obj { + go func(v V) { + do(v) + wg.Done() + }(v) + } + wg.Wait() +} + +func ParallelForEachKeyValue[K comparable, V interface{}](obj map[K]V, do func(K, V)) { + var wg sync.WaitGroup + wg.Add(len(obj)) + for k, v := range obj { + go func(k K, v V) { + do(k, v) + wg.Done() + }(k, v) + } + wg.Wait() +} \ No newline at end of file diff --git a/src/go-proxy/http_route.go b/src/go-proxy/http_route.go index 0a6c345..c0c6ff2 100755 --- a/src/go-proxy/http_route.go +++ b/src/go-proxy/http_route.go @@ -2,20 +2,57 @@ package main import ( "fmt" + "net/http" - "net/http/httputil" "net/url" "strings" - "github.com/golang/glog" + "github.com/sirupsen/logrus" ) +/** +A small mod on net/http/httputil.ReverseProxy + +Before mod: +root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench +Running 10s test @ http://bench.6uo.me/bench + 10 threads and 200 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 3.02ms 4.34ms 102.70ms 94.90% + Req/Sec 8.06k 1.17k 9.99k 79.86% + Latency Distribution + 50% 2.38ms + 75% 4.00ms + 90% 5.93ms + 99% 11.90ms + 808813 requests in 10.10s, 78.68MB read +Requests/sec: 80079.47 +Transfer/sec: 7.79MB + +After mod: +root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench +Running 10s test @ http://bench.6uo.me/bench + 10 threads and 200 connections + Thread Stats Avg Stdev Max +/- Stdev + Latency 1.77ms 5.64ms 118.14ms 99.07% + Req/Sec 16.59k 2.22k 19.65k 87.30% + Latency Distribution + 50% 1.11ms + 75% 1.85ms + 90% 2.74ms + 99% 6.68ms + 1665286 requests in 10.10s, 200.11MB read +Requests/sec: 164880.11 +Transfer/sec: 19.81MB +**/ type HTTPRoute struct { Alias string Url *url.URL Path string PathMode string - Proxy *httputil.ReverseProxy + Proxy *ReverseProxy + + l logrus.FieldLogger } func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) { @@ -24,8 +61,14 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) { return nil, err } - proxy := httputil.NewSingleHostReverseProxy(url) - proxy.Transport = transport + var tr *http.Transport + if config.NoTLSVerify { + tr = transportNoTLS + } else { + tr = transport + } + + proxy := NewSingleHostReverseProxy(url, tr) if !isValidProxyPathMode(config.PathMode) { return nil, fmt.Errorf("invalid path mode: %s", config.PathMode) @@ -37,22 +80,21 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) { Path: config.Path, Proxy: proxy, PathMode: config.PathMode, + l: hrlog.WithFields(logrus.Fields{ + "alias": config.Alias, + "path": config.Path, + "path_mode": config.PathMode, + }), } - director := proxy.Director - proxy.Director = nil - - initRewrite := func(pr *httputil.ProxyRequest) { - director(pr.Out) - } - rewrite := initRewrite + var rewrite func(*ProxyRequest) switch { case config.Path == "", config.PathMode == ProxyPathMode_Forward: - break + rewrite = proxy.Rewrite case config.PathMode == ProxyPathMode_Sub: - rewrite = func(pr *httputil.ProxyRequest) { - initRewrite(pr) + rewrite = func(pr *ProxyRequest) { + proxy.Rewrite(pr) // disable compression pr.Out.Header.Set("Accept-Encoding", "identity") // remove path prefix @@ -61,9 +103,7 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) { route.Proxy.ModifyResponse = func(r *http.Response) error { contentType, ok := r.Header["Content-Type"] if !ok || len(contentType) == 0 { - if glog.V(3) { - glog.Infof("[Path sub] unknown content type for %s", r.Request.URL.String()) - } + route.l.Debug("unknown content type for", r.Request.URL.String()) return nil } // disable cache @@ -76,28 +116,27 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) { case strings.HasPrefix(contentType[0], "application/javascript"): err = utils.respJSSubPath(r, config.Path) default: - glog.V(4).Infof("[Path sub] unknown content type(s): %s", contentType) + route.l.Debug("unknown content type(s): ", contentType) } if err != nil { - err = fmt.Errorf("[Path sub] failed to remove path prefix %s: %v", config.Path, err) + err = fmt.Errorf("failed to remove path prefix %s: %v", config.Path, err) + route.l.WithField("action", "path_sub").Error(err) r.Status = err.Error() r.StatusCode = http.StatusInternalServerError } return err } default: - rewrite = func(pr *httputil.ProxyRequest) { - initRewrite(pr) + rewrite = func(pr *ProxyRequest) { + proxy.Rewrite(pr) pr.Out.URL.Path = strings.TrimPrefix(pr.Out.URL.Path, config.Path) } } - if glog.V(3) { - route.Proxy.Rewrite = func(pr *httputil.ProxyRequest) { + if logLevel == logrus.DebugLevel { + route.Proxy.Rewrite = func(pr *ProxyRequest) { rewrite(pr) - r := pr.In - glog.Infof("[Request] %s %s%s", r.Method, r.Host, r.URL.Path) - glog.V(5).InfoDepthf(1, "Headers: %v", r.Header) + route.l.Debug("Request headers: ", pr.In.Header) } } else { route.Proxy.Rewrite = rewrite @@ -114,7 +153,7 @@ func (p *httpLoadBalancePool) Pick() *HTTPRoute { } func (r *HTTPRoute) RemoveFromRoutes() { - routes.HTTPRoutes.Delete(r.Alias) + httpRoutes.Delete(r.Alias) } // dummy implementation for Route interface @@ -144,24 +183,30 @@ func redirectToTLS(w http.ResponseWriter, r *http.Request) { func findHTTPRoute(host string, path string) (*HTTPRoute, error) { subdomain := strings.Split(host, ".")[0] - routeMap, ok := routes.HTTPRoutes.UnsafeGet(subdomain) - if !ok { - return nil, fmt.Errorf("no matching route for subdomain %s", subdomain) + routeMap, ok := httpRoutes.UnsafeGet(subdomain) + if ok { + return routeMap.FindMatch(path) } - return routeMap.FindMatch(path) + return nil, fmt.Errorf("no matching route for subdomain %s", subdomain) } func httpProxyHandler(w http.ResponseWriter, r *http.Request) { route, err := findHTTPRoute(r.Host, r.URL.Path) if err != nil { - err = fmt.Errorf("[Request] failed %s %s%s, error: %v", + err = fmt.Errorf("request failed %s %s%s, error: %v", r.Method, r.Host, r.URL.Path, err, ) + logrus.Error(err) http.Error(w, err.Error(), http.StatusNotFound) return } route.Proxy.ServeHTTP(w, r) } + +// alias -> (path -> routes) +type HTTPRoutes = SafeMap[string, *pathPoolMap] + +var httpRoutes HTTPRoutes = NewSafeMap[string](newPathPoolMap) diff --git a/src/go-proxy/httputil_mod.go b/src/go-proxy/httputil_mod.go new file mode 100644 index 0000000..cb588b6 --- /dev/null +++ b/src/go-proxy/httputil_mod.go @@ -0,0 +1,833 @@ +package main + +// A small mod on net/http/httputils +// that doubled the performance + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "net" + "net/http" + "net/http/httptrace" + "net/textproto" + "net/url" + "strings" + "time" + + "golang.org/x/net/http/httpguts" +) + +// A ProxyRequest contains a request to be rewritten by a [ReverseProxy]. +type ProxyRequest struct { + // In is the request received by the proxy. + // The Rewrite function must not modify In. + In *http.Request + + // Out is the request which will be sent by the proxy. + // The Rewrite function may modify or replace this request. + // Hop-by-hop headers are removed from this request + // before Rewrite is called. + Out *http.Request +} + +// SetURL routes the outbound request to the scheme, host, and base path +// provided in target. If the target's path is "/base" and the incoming +// request was for "/dir", the target request will be for "/base/dir". +// +// SetURL rewrites the outbound Host header to match the target's host. +// To preserve the inbound request's Host header (the default behavior +// of [NewSingleHostReverseProxy]): +// +// rewriteFunc := func(r *httputil.ProxyRequest) { +// r.SetURL(url) +// r.Out.Host = r.In.Host +// } +func (r *ProxyRequest) SetURL(target *url.URL) { + rewriteRequestURL(r.Out, target) + r.Out.Host = "" +} + +// SetXForwarded sets the X-Forwarded-For, X-Forwarded-Host, and +// X-Forwarded-Proto headers of the outbound request. +// +// - The X-Forwarded-For header is set to the client IP address. +// - The X-Forwarded-Host header is set to the host name requested +// by the client. +// - The X-Forwarded-Proto header is set to "http" or "https", depending +// on whether the inbound request was made on a TLS-enabled connection. +// +// If the outbound request contains an existing X-Forwarded-For header, +// SetXForwarded appends the client IP address to it. To append to the +// inbound request's X-Forwarded-For header (the default behavior of +// [ReverseProxy] when using a Director function), copy the header +// from the inbound request before calling SetXForwarded: +// +// rewriteFunc := func(r *httputil.ProxyRequest) { +// r.Out.Header["X-Forwarded-For"] = r.In.Header["X-Forwarded-For"] +// r.SetXForwarded() +// } +func (r *ProxyRequest) SetXForwarded() { + clientIP, _, err := net.SplitHostPort(r.In.RemoteAddr) + if err == nil { + prior := r.Out.Header["X-Forwarded-For"] + if len(prior) > 0 { + clientIP = strings.Join(prior, ", ") + ", " + clientIP + } + r.Out.Header.Set("X-Forwarded-For", clientIP) + } else { + r.Out.Header.Del("X-Forwarded-For") + } + r.Out.Header.Set("X-Forwarded-Host", r.In.Host) + if r.In.TLS == nil { + r.Out.Header.Set("X-Forwarded-Proto", "http") + } else { + r.Out.Header.Set("X-Forwarded-Proto", "https") + } +} + +// ReverseProxy is an HTTP Handler that takes an incoming request and +// sends it to another server, proxying the response back to the +// client. +// +// 1xx responses are forwarded to the client if the underlying +// transport supports ClientTrace.Got1xxResponse. +type ReverseProxy struct { + // Rewrite must be a function which modifies + // the request into a new request to be sent + // using Transport. Its response is then copied + // back to the original client unmodified. + // Rewrite must not access the provided ProxyRequest + // or its contents after returning. + // + // The Forwarded, X-Forwarded, X-Forwarded-Host, + // and X-Forwarded-Proto headers are removed from the + // outbound request before Rewrite is called. See also + // the ProxyRequest.SetXForwarded method. + // + // Unparsable query parameters are removed from the + // outbound request before Rewrite is called. + // The Rewrite function may copy the inbound URL's + // RawQuery to the outbound URL to preserve the original + // parameter string. Note that this can lead to security + // issues if the proxy's interpretation of query parameters + // does not match that of the downstream server. + // + // At most one of Rewrite or Director may be set. + Rewrite func(*ProxyRequest) + + // The transport used to perform proxy requests. + // If nil, http.DefaultTransport is used. + Transport http.RoundTripper + + // FlushInterval specifies the flush interval + // to flush to the client while copying the + // response body. + // If zero, no periodic flushing is done. + // A negative value means to flush immediately + // after each write to the client. + // The FlushInterval is ignored when ReverseProxy + // recognizes a response as a streaming response, or + // if its ContentLength is -1; for such responses, writes + // are flushed to the client immediately. + FlushInterval time.Duration + + // ErrorLog specifies an optional logger for errors + // that occur when attempting to proxy the request. + // If nil, logging is done via the log package's standard logger. + ErrorLog *log.Logger + + // BufferPool optionally specifies a buffer pool to + // get byte slices for use by io.CopyBuffer when + // copying HTTP response bodies. + BufferPool BufferPool + + // ModifyResponse is an optional function that modifies the + // Response from the backend. It is called if the backend + // returns a response at all, with any HTTP status code. + // If the backend is unreachable, the optional ErrorHandler is + // called without any call to ModifyResponse. + // + // If ModifyResponse returns an error, ErrorHandler is called + // with its error value. If ErrorHandler is nil, its default + // implementation is used. + ModifyResponse func(*http.Response) error + + // ErrorHandler is an optional function that handles errors + // reaching the backend or errors from ModifyResponse. + // + // If nil, the default is to log the provided error and return + // a 502 Status Bad Gateway response. + ErrorHandler func(http.ResponseWriter, *http.Request, error) +} + +// A BufferPool is an interface for getting and returning temporary +// byte slices for use by [io.CopyBuffer]. +type BufferPool interface { + Get() []byte + Put([]byte) +} + +func singleJoiningSlash(a, b string) string { + aslash := strings.HasSuffix(a, "/") + bslash := strings.HasPrefix(b, "/") + switch { + case aslash && bslash: + return a + b[1:] + case !aslash && !bslash: + return a + "/" + b + } + return a + b +} + +func joinURLPath(a, b *url.URL) (path, rawpath string) { + if a.RawPath == "" && b.RawPath == "" { + return singleJoiningSlash(a.Path, b.Path), "" + } + // Same as singleJoiningSlash, but uses EscapedPath to determine + // whether a slash should be added + apath := a.EscapedPath() + bpath := b.EscapedPath() + + aslash := strings.HasSuffix(apath, "/") + bslash := strings.HasPrefix(bpath, "/") + + switch { + case aslash && bslash: + return a.Path + b.Path[1:], apath + bpath[1:] + case !aslash && !bslash: + return a.Path + "/" + b.Path, apath + "/" + bpath + } + return a.Path + b.Path, apath + bpath +} + +// NewSingleHostReverseProxy returns a new [ReverseProxy] that routes +// URLs to the scheme, host, and base path provided in target. If the +// target's path is "/base" and the incoming request was for "/dir", +// the target request will be for /base/dir. +// +// NewSingleHostReverseProxy does not rewrite the Host header. +// +// To customize the ReverseProxy behavior beyond what +// NewSingleHostReverseProxy provides, use ReverseProxy directly +// with a Rewrite function. The ProxyRequest SetURL method +// may be used to route the outbound request. (Note that SetURL, +// unlike NewSingleHostReverseProxy, rewrites the Host header +// of the outbound request by default.) +// +// proxy := &ReverseProxy{ +// Rewrite: func(r *ProxyRequest) { +// r.SetURL(target) +// r.Out.Host = r.In.Host // if desired +// }, +// } +func NewSingleHostReverseProxy(target *url.URL, transport *http.Transport) *ReverseProxy { + return &ReverseProxy{Rewrite: func(pr *ProxyRequest) { + rewriteRequestURL(pr.Out, target) + }, Transport: transport} +} + +func rewriteRequestURL(req *http.Request, target *url.URL) { + targetQuery := target.RawQuery + req.URL.Scheme = target.Scheme + req.URL.Host = target.Host + req.URL.Path, req.URL.RawPath = joinURLPath(target, req.URL) + if targetQuery == "" || req.URL.RawQuery == "" { + req.URL.RawQuery = targetQuery + req.URL.RawQuery + } else { + req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery + } +} + +func copyHeader(dst, src http.Header) { + for k, vv := range src { + for _, v := range vv { + dst.Add(k, v) + } + } +} + +// Hop-by-hop headers. These are removed when sent to the backend. +// As of RFC 7230, hop-by-hop headers are required to appear in the +// Connection header field. These are the headers defined by the +// obsoleted RFC 2616 (section 13.5.1) and are used for backward +// compatibility. +// var hopHeaders = []string{ +// "Connection", +// "Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google +// "Keep-Alive", +// "Proxy-Authenticate", +// "Proxy-Authorization", +// "Te", // canonicalized version of "TE" +// "Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522 +// "Transfer-Encoding", +// "Upgrade", +// } + +// NOTE: getErrorHandler and DefaultErrorHandler removed + +func (p *ReverseProxy) errorHandler(rw http.ResponseWriter, _ *http.Request, err error) { + p.logf("http: proxy error: %v", err) + rw.WriteHeader(http.StatusBadGateway) +} + +// modifyResponse conditionally runs the optional ModifyResponse hook +// and reports whether the request should proceed. +func (p *ReverseProxy) modifyResponse(rw http.ResponseWriter, res *http.Response, req *http.Request) bool { + if p.ModifyResponse == nil { + return true + } + if err := p.ModifyResponse(res); err != nil { + res.Body.Close() + p.errorHandler(rw, req, err) + return false + } + return true +} + +func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + transport := p.Transport + // Note: removed + // if transport == nil { + // transport = http.DefaultTransport + // } + + ctx := req.Context() + if ctx.Done() != nil { + // CloseNotifier predates context.Context, and has been + // entirely superseded by it. If the request contains + // a Context that carries a cancellation signal, don't + // bother spinning up a goroutine to watch the CloseNotify + // channel (if any). + // + // If the request Context has a nil Done channel (which + // means it is either context.Background, or a custom + // Context implementation with no cancellation signal), + // then consult the CloseNotifier if available. + } else if cn, ok := rw.(http.CloseNotifier); ok { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + defer cancel() + notifyChan := cn.CloseNotify() + go func() { + select { + case <-notifyChan: + cancel() + case <-ctx.Done(): + } + }() + } + + outreq := req.Clone(ctx) + if req.ContentLength == 0 { + outreq.Body = nil // Issue 16036: nil Body for http.Transport retries + } + if outreq.Body != nil { + // Reading from the request body after returning from a handler is not + // allowed, and the RoundTrip goroutine that reads the Body can outlive + // this handler. This can lead to a crash if the handler panics (see + // Issue 46866). Although calling Close doesn't guarantee there isn't + // any Read in flight after the handle returns, in practice it's safe to + // read after closing it. + defer outreq.Body.Close() + } + if outreq.Header == nil { + outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate + } + + // NOTE: removed + // if (p.Director != nil) == (p.Rewrite != nil) { + // p.errorHandler(rw, req, errors.New("ReverseProxy must have exactly one of Director or Rewrite set")) + // return + // } + + // if p.Director != nil { + // p.Director(outreq) + // if outreq.Form != nil { + // outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery) + // } + // } + outreq.Close = false + + reqUpType := upgradeType(outreq.Header) + if !IsPrint(reqUpType) { + p.errorHandler(rw, req, fmt.Errorf("client tried to switch to invalid protocol %q", reqUpType)) + return + } + // NOTE: removed + // removeHopByHopHeaders(outreq.Header) + + // Issue 21096: tell backend applications that care about trailer support + // that we support trailers. (We do, but we don't go out of our way to + // advertise that unless the incoming client request thought it was worth + // mentioning.) Note that we look at req.Header, not outreq.Header, since + // the latter has passed through removeHopByHopHeaders. + if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") { + outreq.Header.Set("Te", "trailers") + } + + // After stripping all the hop-by-hop connection headers above, add back any + // necessary for protocol upgrades, such as for websockets. + if reqUpType != "" { + outreq.Header.Set("Connection", "Upgrade") + outreq.Header.Set("Upgrade", reqUpType) + } + + // NOTE: removed + // if p.Rewrite != nil { + // Strip client-provided forwarding headers. + // The Rewrite func may use SetXForwarded to set new values + // for these or copy the previous values from the inbound request. + // outreq.Header.Del("Forwarded") + // outreq.Header.Del("X-Forwarded-For") + // outreq.Header.Del("X-Forwarded-Host") + // outreq.Header.Del("X-Forwarded-Proto") + + // NOTE: removed + // Remove unparsable query parameters from the outbound request. + // outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery) + + pr := &ProxyRequest{ + In: req, + Out: outreq, + } + pr.SetXForwarded() // NOTE: added + p.Rewrite(pr) + outreq = pr.Out + // NOTE: removed + // } else { + // if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil { + // // If we aren't the first proxy retain prior + // // X-Forwarded-For information as a comma+space + // // separated list and fold multiple headers into one. + // prior, ok := outreq.Header["X-Forwarded-For"] + // omit := ok && prior == nil // Issue 38079: nil now means don't populate the header + // if len(prior) > 0 { + // clientIP = strings.Join(prior, ", ") + ", " + clientIP + // } + // if !omit { + // outreq.Header.Set("X-Forwarded-For", clientIP) + // } + // } + // } + + if _, ok := outreq.Header["User-Agent"]; !ok { + // If the outbound request doesn't have a User-Agent header set, + // don't send the default Go HTTP client User-Agent. + outreq.Header.Set("User-Agent", "") + } + + trace := &httptrace.ClientTrace{ + Got1xxResponse: func(code int, header textproto.MIMEHeader) error { + h := rw.Header() + // copyHeader(h, http.Header(header)) + for k, vv := range header { + for _, v := range vv { + h.Add(k, v) + } + } + rw.WriteHeader(code) + + // Clear headers, it's not automatically done by ResponseWriter.WriteHeader() for 1xx responses + clear(h) + return nil + }, + } + outreq = outreq.WithContext(httptrace.WithClientTrace(outreq.Context(), trace)) + + res, err := transport.RoundTrip(outreq) + if err != nil { + p.errorHandler(rw, outreq, err) + return + } + + // Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc) + if res.StatusCode == http.StatusSwitchingProtocols { + if !p.modifyResponse(rw, res, outreq) { + return + } + p.handleUpgradeResponse(rw, outreq, res) + return + } + + // NOTE: removed + // removeHopByHopHeaders(res.Header) + + if !p.modifyResponse(rw, res, outreq) { + return + } + + copyHeader(rw.Header(), res.Header) + + // The "Trailer" header isn't included in the Transport's response, + // at least for *http.Transport. Build it up from Trailer. + announcedTrailers := len(res.Trailer) + if announcedTrailers > 0 { + trailerKeys := make([]string, 0, len(res.Trailer)) + for k := range res.Trailer { + trailerKeys = append(trailerKeys, k) + } + rw.Header().Add("Trailer", strings.Join(trailerKeys, ", ")) + } + + rw.WriteHeader(res.StatusCode) + + // NOTE: changing this line extremely improve throughput + // err = p.copyResponse(rw, res.Body, p.flushInterval(res)) + _, err = io.Copy(rw, res.Body) + if err != nil { + defer res.Body.Close() + // note: removed + // Since we're streaming the response, if we run into an error all we can do + // is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler + // on read error while copying body. + // if !shouldPanicOnCopyError(req) { + // p.logf("suppressing panic for copyResponse error in test; copy error: %v", err) + // return + // } + panic(http.ErrAbortHandler) + } + res.Body.Close() // close now, instead of defer, to populate res.Trailer + + if len(res.Trailer) > 0 { + // Force chunking if we saw a response trailer. + // This prevents net/http from calculating the length for short + // bodies and adding a Content-Length. + http.NewResponseController(rw).Flush() + } + + if len(res.Trailer) == announcedTrailers { + copyHeader(rw.Header(), res.Trailer) + return + } + + for k, vv := range res.Trailer { + k = http.TrailerPrefix + k + for _, v := range vv { + rw.Header().Add(k, v) + } + } +} + +// var inOurTests bool // whether we're in our own tests + +// NOTE: removed +// shouldPanicOnCopyError reports whether the reverse proxy should +// panic with http.ErrAbortHandler. This is the right thing to do by +// default, but Go 1.10 and earlier did not, so existing unit tests +// weren't expecting panics. Only panic in our own tests, or when +// running under the HTTP server. +// func shouldPanicOnCopyError(req *http.Request) bool { +// if inOurTests { +// // Our tests know to handle this panic. +// return true +// } +// if req.Context().Value(http.ServerContextKey) != nil { +// // We seem to be running under an HTTP server, so +// // it'll recover the panic. +// return true +// } +// // Otherwise act like Go 1.10 and earlier to not break +// // existing tests. +// return false +// } + +// removeHopByHopHeaders removes hop-by-hop headers. +// +// func removeHopByHopHeaders(h http.Header) { +// // RFC 7230, section 6.1: Remove headers listed in the "Connection" header. +// for _, f := range h["Connection"] { +// for _, sf := range strings.Split(f, ",") { +// if sf = textproto.TrimString(sf); sf != "" { +// h.Del(sf) +// } +// } +// } +// // RFC 2616, section 13.5.1: Remove a set of known hop-by-hop headers. +// // This behavior is superseded by the RFC 7230 Connection header, but +// // preserve it for backwards compatibility. +// for _, f := range hopHeaders { +// h.Del(f) +// } +// } + +// NOTE: removed +// flushInterval returns the p.FlushInterval value, conditionally +// overriding its value for a specific request/response. +// func (p *ReverseProxy) flushInterval(res *http.Response) time.Duration { +// resCT := res.Header.Get("Content-Type") + +// // For Server-Sent Events responses, flush immediately. +// // The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream +// if baseCT, _, _ := mime.ParseMediaType(resCT); baseCT == "text/event-stream" { +// return -1 // negative means immediately +// } + +// // We might have the case of streaming for which Content-Length might be unset. +// if res.ContentLength == -1 { +// return -1 +// } + +// return p.FlushInterval +// } + +// NOTE: removed +// func (p *ReverseProxy) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error { +// var w io.Writer = dst + +// if flushInterval != 0 { +// mlw := &maxLatencyWriter{ +// dst: dst, +// flush: http.NewResponseController(dst).Flush, +// latency: flushInterval, +// } +// defer mlw.stop() + +// // set up initial timer so headers get flushed even if body writes are delayed +// mlw.flushPending = true +// mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush) + +// w = mlw +// } + +// var buf []byte +// if p.BufferPool != nil { +// buf = p.BufferPool.Get() +// defer p.BufferPool.Put(buf) +// } +// _, err := p.copyBuffer(w, src, buf) +// return err +// } + +// copyBuffer returns any write errors or non-EOF read errors, and the amount +// of bytes written. + +// NOTE: removed +// func (p *ReverseProxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) { +// if len(buf) == 0 { +// buf = make([]byte, 32*1024) +// } +// var written int64 +// for { +// nr, rerr := src.Read(buf) +// if rerr != nil && rerr != io.EOF && rerr != context.Canceled { +// p.logf("httputil: ReverseProxy read error during body copy: %v", rerr) +// } +// if nr > 0 { +// nw, werr := dst.Write(buf[:nr]) +// if nw > 0 { +// written += int64(nw) +// } +// if werr != nil { +// return written, werr +// } +// if nr != nw { +// return written, io.ErrShortWrite +// } +// } +// if rerr != nil { +// if rerr == io.EOF { +// rerr = nil +// } +// return written, rerr +// } +// } +// } + +func (p *ReverseProxy) logf(format string, args ...any) { + if p.ErrorLog != nil { + p.ErrorLog.Printf(format, args...) + } else { + hrlog.Printf(format, args...) + } +} + +// NOTE: removed +// type maxLatencyWriter struct { +// dst io.Writer +// flush func() error +// latency time.Duration // non-zero; negative means to flush immediately + +// mu sync.Mutex // protects t, flushPending, and dst.Flush +// t *time.Timer +// flushPending bool +// } + +// NOTE: removed +// func (m *maxLatencyWriter) Write(p []byte) (n int, err error) { +// m.mu.Lock() +// defer m.mu.Unlock() +// n, err = m.dst.Write(p) +// if m.latency < 0 { +// m.flush() +// return +// } +// if m.flushPending { +// return +// } +// if m.t == nil { +// m.t = time.AfterFunc(m.latency, m.delayedFlush) +// } else { +// m.t.Reset(m.latency) +// } +// m.flushPending = true +// return +// } + +// func (m *maxLatencyWriter) delayedFlush() { +// m.mu.Lock() +// defer m.mu.Unlock() +// if !m.flushPending { // if stop was called but AfterFunc already started this goroutine +// return +// } +// m.flush() +// m.flushPending = false +// } + +// func (m *maxLatencyWriter) stop() { +// m.mu.Lock() +// defer m.mu.Unlock() +// m.flushPending = false +// if m.t != nil { +// m.t.Stop() +// } +// } + +func upgradeType(h http.Header) string { + if !httpguts.HeaderValuesContainsToken(h["Connection"], "Upgrade") { + return "" + } + return h.Get("Upgrade") +} + +func (p *ReverseProxy) handleUpgradeResponse(rw http.ResponseWriter, req *http.Request, res *http.Response) { + reqUpType := upgradeType(req.Header) + resUpType := upgradeType(res.Header) + if !IsPrint(resUpType) { // We know reqUpType is ASCII, it's checked by the caller. + p.errorHandler(rw, req, fmt.Errorf("backend tried to switch to invalid protocol %q", resUpType)) + } + if !strings.EqualFold(reqUpType, resUpType) { + p.errorHandler(rw, req, fmt.Errorf("backend tried to switch protocol %q when %q was requested", resUpType, reqUpType)) + return + } + + backConn, ok := res.Body.(io.ReadWriteCloser) + if !ok { + p.errorHandler(rw, req, fmt.Errorf("internal error: 101 switching protocols response with non-writable body")) + return + } + + rc := http.NewResponseController(rw) + conn, brw, hijackErr := rc.Hijack() + if errors.Is(hijackErr, http.ErrNotSupported) { + p.errorHandler(rw, req, fmt.Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw)) + return + } + + backConnCloseCh := make(chan bool) + go func() { + // Ensure that the cancellation of a request closes the backend. + // See issue https://golang.org/issue/35559. + select { + case <-req.Context().Done(): + case <-backConnCloseCh: + } + backConn.Close() + }() + defer close(backConnCloseCh) + + if hijackErr != nil { + p.errorHandler(rw, req, fmt.Errorf("hijack failed on protocol switch: %v", hijackErr)) + return + } + defer conn.Close() + + copyHeader(rw.Header(), res.Header) + + res.Header = rw.Header() + res.Body = nil // so res.Write only writes the headers; we have res.Body in backConn above + if err := res.Write(brw); err != nil { + p.errorHandler(rw, req, fmt.Errorf("response write: %v", err)) + return + } + if err := brw.Flush(); err != nil { + p.errorHandler(rw, req, fmt.Errorf("response flush: %v", err)) + return + } + errc := make(chan error, 1) + // NOTE: removed + // spc := switchProtocolCopier{user: conn, backend: backConn} + // go spc.copyToBackend(errc) + // go spc.copyFromBackend(errc) + go func() { + _, err := io.Copy(conn, backConn) + errc <- err + }() + go func() { + _, err := io.Copy(backConn, conn) + errc <- err + }() + <-errc +} + +// NOTE: removed +// switchProtocolCopier exists so goroutines proxying data back and +// forth have nice names in stacks. +// type switchProtocolCopier struct { +// user, backend io.ReadWriter +// } + +// func (c switchProtocolCopier) copyFromBackend(errc chan<- error) { +// _, err := io.Copy(c.user, c.backend) +// errc <- err +// } + +// func (c switchProtocolCopier) copyToBackend(errc chan<- error) { +// _, err := io.Copy(c.backend, c.user) +// errc <- err +// } + +// NOTE: removed +// func cleanQueryParams(s string) string { +// reencode := func(s string) string { +// v, _ := url.ParseQuery(s) +// return v.Encode() +// } +// for i := 0; i < len(s); { +// switch s[i] { +// case ';': +// return reencode(s) +// case '%': +// if i+2 >= len(s) || !ishex(s[i+1]) || !ishex(s[i+2]) { +// return reencode(s) +// } +// i += 3 +// default: +// i++ +// } +// } +// return s +// } + +// func ishex(c byte) bool { +// switch { +// case '0' <= c && c <= '9': +// return true +// case 'a' <= c && c <= 'f': +// return true +// case 'A' <= c && c <= 'F': +// return true +// } +// return false +// } + +func IsPrint(s string) bool { + for i := 0; i < len(s); i++ { + if s[i] < ' ' || s[i] > '~' { + return false + } + } + return true +} diff --git a/src/go-proxy/loggers.go b/src/go-proxy/loggers.go new file mode 100644 index 0000000..2b24d3d --- /dev/null +++ b/src/go-proxy/loggers.go @@ -0,0 +1,10 @@ +package main + +import "github.com/sirupsen/logrus" + +var palog = logrus.WithField("component", "panel") +var prlog = logrus.WithField("component", "provider") +var cfgl = logrus.WithField("component", "config") +var hrlog = logrus.WithField("component", "http_proxy") +var srlog = logrus.WithField("component", "stream") +var wlog = logrus.WithField("component", "watcher") diff --git a/src/go-proxy/main.go b/src/go-proxy/main.go index 0ad0565..4b99a10 100755 --- a/src/go-proxy/main.go +++ b/src/go-proxy/main.go @@ -3,10 +3,12 @@ package main import ( "flag" "net/http" + "os" + "os/signal" "runtime" - "time" + "syscall" - "github.com/golang/glog" + log "github.com/sirupsen/logrus" ) func main() { @@ -15,59 +17,65 @@ func main() { flag.Parse() runtime.GOMAXPROCS(runtime.NumCPU()) - go func() { - for range time.Tick(100 * time.Millisecond) { - glog.Flush() - } - }() + log.SetFormatter(&log.TextFormatter{ + ForceColors: true, + DisableColors: false, + FullTimestamp: true, + }) - if config, err = ReadConfig(); err != nil { - glog.Fatal("unable to read config: ", err) - } + InitFSWatcher() + InitDockerWatcher() - StartAllRoutes() - go ListenConfigChanges() - - mux := http.NewServeMux() - mux.HandleFunc("/", httpProxyHandler) + cfg := NewConfig() + cfg.MustLoad() + cfg.StartProviders() + cfg.WatchChanges() var certAvailable = utils.fileOK(certPath) && utils.fileOK(keyPath) go func() { - glog.Infoln("starting http server on port 80") - if certAvailable { + log.Info("starting http server on port 80") + if certAvailable && redirectHTTP { err = http.ListenAndServe(":80", http.HandlerFunc(redirectToTLS)) } else { - err = http.ListenAndServe(":80", mux) + err = http.ListenAndServe(":80", http.HandlerFunc(httpProxyHandler)) } if err != nil { - glog.Fatal("HTTP server error", err) + log.Fatal("HTTP server error: ", err) } }() go func() { - glog.Infoln("starting http panel on port 8080") + log.Infof("starting http panel on port 8080") err := http.ListenAndServe(":8080", http.HandlerFunc(panelHandler)) if err != nil { - glog.Fatal("HTTP server error", err) + log.Warning("HTTP panel error: ", err) } }() if certAvailable { go func() { - glog.Infoln("starting https panel on port 8443") - err := http.ListenAndServeTLS(":8443", certPath, keyPath, http.HandlerFunc(panelHandler)) + log.Info("starting https server on port 443") + err = http.ListenAndServeTLS(":443", certPath, keyPath, http.HandlerFunc(httpProxyHandler)) if err != nil { - glog.Fatal("http server error", err) + log.Fatal("https server error: ", err) } }() go func() { - glog.Infoln("starting https server on port 443") - err = http.ListenAndServeTLS(":443", certPath, keyPath, mux) + log.Info("starting https panel on port 8443") + err := http.ListenAndServeTLS(":8443", certPath, keyPath, http.HandlerFunc(panelHandler)) if err != nil { - glog.Fatal("https server error: ", err) + log.Warning("http panel error: ", err) } }() } - <-make(chan struct{}) + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT) + signal.Notify(sig, syscall.SIGTERM) + signal.Notify(sig, syscall.SIGHUP) + + <-sig + cfg.StopProviders() + close(fsWatcherStop) + close(dockerWatcherStop) } diff --git a/src/go-proxy/panel.go b/src/go-proxy/panel.go index a3b7d60..f4205ab 100755 --- a/src/go-proxy/panel.go +++ b/src/go-proxy/panel.go @@ -6,8 +6,6 @@ import ( "net/http" "net/url" "time" - - "github.com/golang/glog" ) var healthCheckHttpClient = &http.Client{ @@ -32,6 +30,7 @@ func panelHandler(w http.ResponseWriter, r *http.Request) { panelCheckTargetHealth(w, r) return default: + palog.Errorf("%s not found", r.URL.Path) http.NotFound(w, r) return } @@ -46,12 +45,22 @@ func panelIndex(w http.ResponseWriter, r *http.Request) { tmpl, err := template.ParseFiles(templateFile) if err != nil { + palog.Error(err) http.Error(w, err.Error(), http.StatusInternalServerError) return } - err = tmpl.Execute(w, &routes) + type allRoutes struct { + HTTPRoutes HTTPRoutes + StreamRoutes StreamRoutes + } + + err = tmpl.Execute(w, allRoutes{ + HTTPRoutes: httpRoutes, + StreamRoutes: streamRoutes, + }) if err != nil { + palog.Error(err) http.Error(w, err.Error(), http.StatusInternalServerError) } } @@ -71,7 +80,7 @@ func panelCheckTargetHealth(w http.ResponseWriter, r *http.Request) { url, err := url.Parse(targetUrl) if err != nil { - glog.Infof("[Panel] failed to parse %s, error: %v", targetUrl, err) + palog.Infof("failed to parse url %q, error: %v", targetUrl, err) http.Error(w, err.Error(), http.StatusBadRequest) return } diff --git a/src/go-proxy/path_pool_map.go b/src/go-proxy/path_pool_map.go index 92b2263..9e2761b 100644 --- a/src/go-proxy/path_pool_map.go +++ b/src/go-proxy/path_pool_map.go @@ -9,8 +9,8 @@ type pathPoolMap struct { SafeMap[string, *httpLoadBalancePool] } -func newPathPoolMap() pathPoolMap { - return pathPoolMap{ +func newPathPoolMap() *pathPoolMap { + return &pathPoolMap{ NewSafeMap[string](NewHTTPLoadBalancePool), } } @@ -21,8 +21,7 @@ func (m pathPoolMap) Add(path string, route *HTTPRoute) { } func (m pathPoolMap) FindMatch(pathGot string) (*HTTPRoute, error) { - pool := m.Iterator() - for pathWant, v := range pool { + for pathWant, v := range m.Iterator() { if strings.HasPrefix(pathGot, pathWant) { return v.Pick(), nil } diff --git a/src/go-proxy/provider.go b/src/go-proxy/provider.go index 28d00b6..a657c6b 100644 --- a/src/go-proxy/provider.go +++ b/src/go-proxy/provider.go @@ -5,7 +5,7 @@ import ( "sync" "github.com/docker/docker/client" - "github.com/golang/glog" + "github.com/sirupsen/logrus" ) type Provider struct { @@ -13,111 +13,75 @@ type Provider struct { Value string name string - stopWatching chan struct{} + watcher Watcher routes map[string]Route // id -> Route dockerClient *client.Client mutex sync.Mutex + l logrus.FieldLogger } -func (p *Provider) GetProxyConfigs() ([]*ProxyConfig, error) { +func (p *Provider) Setup() error { + var cfgs []*ProxyConfig + var err error + + p.l = prlog.WithFields(logrus.Fields{"kind": p.Kind, "name": p.name}) + switch p.Kind { case ProviderKind_Docker: - return p.getDockerProxyConfigs() + cfgs, err = p.getDockerProxyConfigs() + p.watcher = NewDockerWatcher(p.dockerClient, p.ReloadRoutes) case ProviderKind_File: - return p.getFileProxyConfigs() + cfgs, err = p.getFileProxyConfigs() + p.watcher = NewFileWatcher(p.Value, p.ReloadRoutes, p.StopAllRoutes) default: // this line should never be reached - return nil, fmt.Errorf("unknown provider kind %q", p.Kind) - } -} - -func StartAllRoutes() { - var wg sync.WaitGroup - wg.Add(len(config.Providers)) - for _, p := range config.Providers { - go func(p *Provider) { - p.StartAllRoutes() - wg.Done() - }(p) - } - wg.Wait() -} - -func (p *Provider) StopAllRoutes() { - p.mutex.Lock() - defer p.mutex.Unlock() - - if p.stopWatching == nil { - return + return fmt.Errorf("unknown provider kind") } - close(p.stopWatching) - p.stopWatching = nil - if p.dockerClient != nil { - p.dockerClient.Close() - } - - var wg sync.WaitGroup - wg.Add(len(p.routes)) - - for _, route := range p.routes { - go func(r Route) { - r.StopListening() - r.RemoveFromRoutes() - wg.Done() - }(route) - } - wg.Wait() - p.routes = make(map[string]Route) -} - -func (p *Provider) StartAllRoutes() { - p.mutex.Lock() - defer p.mutex.Unlock() - - p.routes = make(map[string]Route) - - cfgs, err := p.GetProxyConfigs() if err != nil { - p.Logf("Build", "unable to get proxy configs: %v", err) - return + return err } + p.l.Infof("loaded %d proxy configurations", len(cfgs)) for _, cfg := range cfgs { r, err := NewRoute(cfg) if err != nil { - p.Logf("Build", "error creating route %q: %v", cfg.Alias, err) + p.l.Errorf("error creating route %s: %v", cfg.Alias, err) continue } r.SetupListen() r.Listen() p.routes[cfg.GetID()] = r } - p.WatchChanges() - p.Logf("Build", "built %d routes", len(p.routes)) - p.stopWatching = make(chan struct{}) + return nil } -func (p *Provider) WatchChanges() { - switch p.Kind { - case ProviderKind_Docker: - go p.grWatchDockerChanges() - case ProviderKind_File: - go p.grWatchFileChanges() - default: - // this line should never be reached - p.Errorf("unknown provider kind %q", p.Kind) +func (p *Provider) StartAllRoutes() { + p.routes = make(map[string]Route) + err := p.Setup() + if err != nil { + p.l.Error(err) + return } + p.watcher.Start() } -func (p *Provider) Logf(t string, s string, args ...interface{}) { - glog.Infof("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...) +func (p *Provider) StopAllRoutes() { + p.watcher.Stop() + p.dockerClient = nil + + ParallelForEachValue(p.routes, func(r Route) { + r.StopListening() + r.RemoveFromRoutes() + }) + + p.routes = make(map[string]Route) } -func (p *Provider) Errorf(t string, s string, args ...interface{}) { - glog.Errorf("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...) -} +func (p *Provider) ReloadRoutes() { + p.mutex.Lock() + defer p.mutex.Unlock() -func (p *Provider) Warningf(t string, s string, args ...interface{}) { - glog.Warningf("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...) -} + p.StopAllRoutes() + p.StartAllRoutes() +} \ No newline at end of file diff --git a/src/go-proxy/proxy_config.go b/src/go-proxy/proxy_config.go index 52a04d0..68533ac 100644 --- a/src/go-proxy/proxy_config.go +++ b/src/go-proxy/proxy_config.go @@ -8,6 +8,7 @@ type ProxyConfig struct { Host string Port string LoadBalance string // docker provider only + NoTLSVerify bool // http proxy only Path string // http proxy only PathMode string `yaml:"path_mode"` // http proxy only diff --git a/src/go-proxy/route.go b/src/go-proxy/route.go index a6a8124..b676e2f 100755 --- a/src/go-proxy/route.go +++ b/src/go-proxy/route.go @@ -2,15 +2,8 @@ package main import ( "fmt" - "sync" ) -type Routes struct { - HTTPRoutes SafeMap[string, pathPoolMap] // alias -> (path -> routes) - StreamRoutes SafeMap[string, StreamRoute] // id -> target - Mutex sync.Mutex -} - type Route interface { SetupListen() Listen() @@ -21,22 +14,22 @@ type Route interface { func NewRoute(cfg *ProxyConfig) (Route, error) { if isStreamScheme(cfg.Scheme) { id := cfg.GetID() - if routes.StreamRoutes.Contains(id) { + if streamRoutes.Contains(id) { return nil, fmt.Errorf("duplicated %s stream %s, ignoring", cfg.Scheme, id) } route, err := NewStreamRoute(cfg) if err != nil { return nil, err } - routes.StreamRoutes.Set(id, route) + streamRoutes.Set(id, route) return route, nil } else { - routes.HTTPRoutes.Ensure(cfg.Alias) + httpRoutes.Ensure(cfg.Alias) route, err := NewHTTPRoute(cfg) if err != nil { return nil, err } - routes.HTTPRoutes.Get(cfg.Alias).Add(cfg.Path, route) + httpRoutes.Get(cfg.Alias).Add(cfg.Path, route) return route, nil } } @@ -59,11 +52,7 @@ func isStreamScheme(s string) bool { return false } -func initRoutes() *Routes { - r := Routes{} - r.HTTPRoutes = NewSafeMap[string](newPathPoolMap) - r.StreamRoutes = NewSafeMap[string, StreamRoute]() - return &r -} +// id -> target +type StreamRoutes = SafeMap[string, StreamRoute] -var routes = initRoutes() \ No newline at end of file +var streamRoutes = NewSafeMap[string, StreamRoute]() diff --git a/src/go-proxy/stream_route.go b/src/go-proxy/stream_route.go index 769dbbd..618f562 100755 --- a/src/go-proxy/stream_route.go +++ b/src/go-proxy/stream_route.go @@ -8,15 +8,14 @@ import ( "sync" "time" - "github.com/golang/glog" + "github.com/sirupsen/logrus" ) type StreamRoute interface { Route - Logf(string, ...interface{}) - PrintError(error) ListeningUrl() string TargetUrl() string + Logger() logrus.FieldLogger closeListeners() closeChannel() @@ -36,6 +35,7 @@ type StreamRouteBase struct { id string wg sync.WaitGroup stopChann chan struct{} + l logrus.FieldLogger } func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) { @@ -47,8 +47,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) { port_split := strings.Split(config.Port, ":") if len(port_split) != 2 { - glog.Infof(`[Build] %s: Invalid stream port %s, `+ - `assuming it's targetPort`, config.Alias, config.Port) + cfgl.Warnf("Invalid port %s, assuming it is target port", config.Port) srcPort = "0" dstPort = config.Port } else { @@ -63,8 +62,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) { srcPortInt, err := strconv.Atoi(srcPort) if err != nil { return nil, fmt.Errorf( - "[Build] %s: Unrecognized stream source port %s, ignoring", - config.Alias, srcPort, + "invalid stream source port %s, ignoring", srcPort, ) } @@ -73,8 +71,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) { dstPortInt, err := strconv.Atoi(dstPort) if err != nil { return nil, fmt.Errorf( - "[Build] %s: Unrecognized stream target port %s, ignoring", - config.Alias, dstPort, + "invalid stream target port %s, ignoring", dstPort, ) } @@ -100,6 +97,11 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) { id: config.GetID(), wg: sync.WaitGroup{}, stopChann: make(chan struct{}), + l: srlog.WithFields(logrus.Fields{ + "alias": config.Alias, + "src": fmt.Sprintf("%s://:%d", srcScheme, srcPortInt), + "dst": fmt.Sprintf("%s://%s:%d", dstScheme, config.Host, dstPortInt), + }), }, nil } @@ -114,29 +116,6 @@ func NewStreamRoute(config *ProxyConfig) (StreamRoute, error) { } } -func (route *StreamRouteBase) PrintError(err error) { - if err == nil { - return - } - glog.Errorf("[%s -> %s] %s: %v", - route.ListeningScheme, - route.TargetScheme, - route.Alias, - err, - ) -} - -func (route *StreamRouteBase) Logf(format string, v ...interface{}) { - glog.Infof("[%s -> %s] %s: "+format, - append([]interface{}{ - route.ListeningScheme, - route.TargetScheme, - route.Alias}, - v..., - )..., - ) -} - func (route *StreamRouteBase) ListeningUrl() string { return fmt.Sprintf("%s:%v", route.ListeningScheme, route.ListeningPort) } @@ -145,21 +124,25 @@ func (route *StreamRouteBase) TargetUrl() string { return fmt.Sprintf("%s://%s:%v", route.TargetScheme, route.TargetHost, route.TargetPort) } +func (route *StreamRouteBase) Logger() logrus.FieldLogger { + return route.l +} + func (route *StreamRouteBase) SetupListen() { if route.ListeningPort == 0 { freePort, err := utils.findUseFreePort(20000) if err != nil { - route.PrintError(err) + route.l.Error(err) return } route.ListeningPort = freePort - route.Logf("Assigned free port %v", route.ListeningPort) + route.l.Info("Assigned free port", route.ListeningPort) } - route.Logf("Listening on %s", route.ListeningUrl()) + route.l.Info("Listening on", route.ListeningUrl()) } func (route *StreamRouteBase) RemoveFromRoutes() { - routes.StreamRoutes.Delete(route.id) + streamRoutes.Delete(route.id) } func (route *StreamRouteBase) wait() { @@ -175,7 +158,8 @@ func (route *StreamRouteBase) unmarkPort() { } func stopListening(route StreamRoute) { - route.Logf("Stopping listening") + l := route.Logger() + l.Debug("Stopping listening") route.closeChannel() route.closeListeners() @@ -189,10 +173,10 @@ func stopListening(route StreamRoute) { select { case <-done: - route.Logf("Stopped listening") + l.Info("Stopped listening") return case <-time.After(StreamStopListenTimeout): - route.Logf("timed out waiting for connections") + l.Error("timed out waiting for connections") return } -} \ No newline at end of file +} diff --git a/src/go-proxy/tcp_route.go b/src/go-proxy/tcp_route.go index d9d7209..2ebcbad 100755 --- a/src/go-proxy/tcp_route.go +++ b/src/go-proxy/tcp_route.go @@ -7,8 +7,6 @@ import ( "net" "sync" "time" - - "github.com/golang/glog" ) const tcpDialTimeout = 5 * time.Second @@ -37,7 +35,7 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) { func (route *TCPRoute) Listen() { in, err := net.Listen("tcp", fmt.Sprintf(":%v", route.ListeningPort)) if err != nil { - route.PrintError(err) + route.l.Error(err) return } route.listener = in @@ -68,7 +66,7 @@ func (route *TCPRoute) grAcceptConnections() { default: conn, err := route.listener.Accept() if err != nil { - route.PrintError(err) + route.l.Error(err) continue } route.connChan <- conn @@ -101,7 +99,7 @@ func (route *TCPRoute) grHandleConnection(clientConn net.Conn) { dialer := &net.Dialer{} serverConn, err := dialer.DialContext(ctx, route.TargetScheme, serverAddr) if err != nil { - glog.Infof("[Stream Dial] %v", err) + route.l.WithField("stage", "dial").Infof("%v", err) return } route.tcpPipe(clientConn, serverConn) @@ -118,13 +116,13 @@ func (route *TCPRoute) tcpPipe(src net.Conn, dest net.Conn) { go func() { _, err := io.Copy(src, dest) - route.PrintError(err) + route.l.Error(err) close() wg.Done() }() go func() { _, err := io.Copy(dest, src) - route.PrintError(err) + route.l.Error(err) close() wg.Done() }() diff --git a/src/go-proxy/udp_route.go b/src/go-proxy/udp_route.go index 6fb7e5d..45834ce 100755 --- a/src/go-proxy/udp_route.go +++ b/src/go-proxy/udp_route.go @@ -5,6 +5,8 @@ import ( "io" "net" "sync" + + "github.com/sirupsen/logrus" ) type UDPRoute struct { @@ -46,13 +48,13 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) { func (route *UDPRoute) Listen() { source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%v", route.ListeningPort)) if err != nil { - route.PrintError(err) + route.l.Error(err) return } target, err := net.Dial(route.TargetScheme, fmt.Sprintf("%s:%v", route.TargetHost, route.TargetPort)) if err != nil { - route.PrintError(err) + route.l.Error(err) source.Close() return } @@ -93,7 +95,7 @@ func (route *UDPRoute) grAcceptConnections() { default: conn, err := route.accept() if err != nil { - route.PrintError(err) + route.l.Error(err) continue } route.connChan <- conn @@ -112,7 +114,7 @@ func (route *UDPRoute) grHandleConnections() { go func() { err := route.handleConnection(conn) if err != nil { - route.PrintError(err) + route.l.Error(err) } }() } @@ -133,8 +135,16 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error { route.connMapMutex.Unlock() } + var forwarder func(*UDPConn, net.Conn) error + + if logLevel == logrus.DebugLevel { + forwarder = route.forwardReceivedDebug + } else { + forwarder = route.forwardReceivedReal + } + // initiate connection to target - err = route.forwardReceived(conn, route.targetConn) + err = forwarder(conn, route.targetConn) if err != nil { return err } @@ -150,7 +160,7 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error { return err } // forward to source - err = route.forwardReceived(conn, srcConn) + err = forwarder(conn, srcConn) if err != nil { return err } @@ -160,7 +170,7 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error { continue } // forward to target - err = route.forwardReceived(conn, route.targetConn) + err = forwarder(conn, route.targetConn) if err != nil { return err } @@ -209,13 +219,7 @@ func (route *UDPRoute) readFrom(src net.Conn, buffer []byte) (*UDPConn, error) { }, nil } -func (route *UDPRoute) forwardReceived(receivedConn *UDPConn, dest net.Conn) error { - route.Logf( - "forwarding %d bytes %s -> %s", - receivedConn.nReceived, - receivedConn.remoteAddr.String(), - dest.RemoteAddr().String(), - ) +func (route *UDPRoute) forwardReceivedReal(receivedConn *UDPConn, dest net.Conn) error { nWritten, err := dest.Write(receivedConn.bytesReceived) if nWritten != receivedConn.nReceived { @@ -224,3 +228,12 @@ func (route *UDPRoute) forwardReceived(receivedConn *UDPConn, dest net.Conn) err return err } + +func (route *UDPRoute) forwardReceivedDebug(receivedConn *UDPConn, dest net.Conn) error { + route.l.WithField("size", receivedConn.nReceived).Debugf( + "forwarding from %s to %s", + receivedConn.remoteAddr.String(), + dest.RemoteAddr().String(), + ) + return route.forwardReceivedReal(receivedConn, dest) +} \ No newline at end of file diff --git a/src/go-proxy/utils.go b/src/go-proxy/utils.go index e590c5d..5e394b8 100755 --- a/src/go-proxy/utils.go +++ b/src/go-proxy/utils.go @@ -14,7 +14,7 @@ import ( "sync" "time" - "github.com/golang/glog" + "github.com/sirupsen/logrus" xhtml "golang.org/x/net/html" ) @@ -111,10 +111,9 @@ func tryAppendPathPrefixImpl(pOrig, pAppend string) string { var tryAppendPathPrefix func(string, string) string var _ = func() int { - if glog.V(4) { + if logLevel == logrus.DebugLevel { tryAppendPathPrefix = func(s1, s2 string) string { replaced := tryAppendPathPrefixImpl(s1, s2) - glog.Infof("[Path sub] %s -> %s", s1, replaced) return replaced } } else { @@ -192,4 +191,4 @@ func (*Utils) respJSSubPath(r *http.Response, p string) error { func (*Utils) fileOK(path string) bool { _, err := os.Stat(path) return err == nil -} \ No newline at end of file +} diff --git a/src/go-proxy/watcher.go b/src/go-proxy/watcher.go new file mode 100644 index 0000000..65e5386 --- /dev/null +++ b/src/go-proxy/watcher.go @@ -0,0 +1,195 @@ +package main + +import ( + "path" + "sync" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/client" + "github.com/fsnotify/fsnotify" + "github.com/sirupsen/logrus" + + "golang.org/x/net/context" +) + +type Watcher interface { + Start() + Stop() +} + +type watcherBase struct { + name string // for log / error output + kind string // for log / error output + onChange func() + l logrus.FieldLogger +} + +type fileWatcher struct { + *watcherBase + path string + onDelete func() +} + +type dockerWatcher struct { + *watcherBase + client *client.Client + stop chan struct{} + wg sync.WaitGroup +} + +func newWatcher(kind string, name string, onChange func()) *watcherBase { + return &watcherBase{ + kind: kind, + name: name, + onChange: onChange, + l: wlog.WithFields(logrus.Fields{"kind": kind, "name": name}), + } +} +func NewFileWatcher(p string, onChange func(), onDelete func()) Watcher { + return &fileWatcher{ + watcherBase: newWatcher("File", path.Base(p), onChange), + path: p, + onDelete: onDelete, + } +} + +func NewDockerWatcher(c *client.Client, onChange func()) Watcher { + return &dockerWatcher{ + watcherBase: newWatcher("Docker", c.DaemonHost(), onChange), + client: c, + stop: make(chan struct{}, 1), + } +} + +func (w *fileWatcher) Start() { + if fsWatcher == nil { + return + } + err := fsWatcher.Add(w.path) + if err != nil { + w.l.Error("failed to start: ", err) + } + fileWatchMap.Set(w.path, w) +} + +func (w *fileWatcher) Stop() { + fileWatchMap.Delete(w.path) + err := fsWatcher.Remove(w.path) + if err != nil { + w.l.WithField("action", "stop").Error(err) + } +} + +func (w *dockerWatcher) Start() { + dockerWatchMap.Set(w.name, w) + w.wg.Add(1) + go func() { + w.watch() + w.wg.Done() + }() +} + +func (w *dockerWatcher) Stop() { + close(w.stop) + w.stop = nil + dockerWatchMap.Delete(w.name) + w.wg.Wait() +} + +func InitFSWatcher() { + w, err := fsnotify.NewWatcher() + if err != nil { + wlog.Errorf("unable to create file watcher: %v", err) + return + } + fsWatcher = w + go watchFiles() +} + +func InitDockerWatcher() { + // stop all docker client on watcher stop + go func() { + <-dockerWatcherStop + stopAllDockerClients() + }() +} + +func stopAllDockerClients() { + ParallelForEachValue( + dockerWatchMap.Iterator(), + func(w *dockerWatcher) { + w.Stop() + err := w.client.Close() + if err != nil { + w.l.WithField("action", "stop").Error(err) + } + w.client = nil + }, + ) +} + +func watchFiles() { + defer fsWatcher.Close() + for { + select { + case event, ok := <-fsWatcher.Events: + if !ok { + wlog.Error("file watcher channel closed") + return + } + w, ok := fileWatchMap.UnsafeGet(event.Name) + if !ok { + wlog.Errorf("watcher for %s not found", event.Name) + } + switch { + case event.Has(fsnotify.Write): + w.l.Info("File change detected") + w.onChange() + case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename): + w.l.Info("File renamed / deleted") + w.onDelete() + } + case err := <-fsWatcher.Errors: + wlog.Error(err) + } + } +} + +func (w *dockerWatcher) watch() { + filter := filters.NewArgs( + filters.Arg("type", "container"), + filters.Arg("event", "start"), + filters.Arg("event", "die"), // 'stop' already triggering 'die' + ) + listen := func() (<-chan events.Message, <-chan error) { + return w.client.Events(context.Background(), types.EventsOptions{Filters: filter}) + } + msgChan, errChan := listen() + + for { + select { + case <-w.stop: + return + case msg := <-msgChan: + w.l.Info("container", msg.Actor.Attributes["name"], msg.Action) + w.onChange() + case err := <-errChan: + w.l.Errorf("%s, retrying in 1s", err) + time.Sleep(1 * time.Second) + msgChan, errChan = listen() + } + } +} + +var fsWatcher *fsnotify.Watcher +var ( + fileWatchMap = NewSafeMap[string, *fileWatcher]() + dockerWatchMap = NewSafeMap[string, *dockerWatcher]() +) +var ( + fsWatcherStop = make(chan struct{}, 1) + dockerWatcherStop = make(chan struct{}, 1) +)