large refactoring, bug fixes, performance improvement

This commit is contained in:
yusing 2024-03-18 04:51:59 +00:00
parent eee6ff4f15
commit a52b1bcadd
27 changed files with 1530 additions and 419 deletions

View file

@ -4,7 +4,7 @@ all: build quick-restart logs
build:
mkdir -p bin
CGO_ENABLED=0 GOOS=linux go build -o bin/go-proxy src/go-proxy/*.go
CGO_ENABLED=0 GOOS=linux go build -pgo=auto -o bin/go-proxy src/go-proxy/*.go
up:
docker compose up -d --build go-proxy

164
README.md
View file

@ -1,14 +1,15 @@
# go-proxy
A simple auto docker reverse proxy for home use. *Written in **Go***
A simple auto docker reverse proxy for home use. \*Written in **Go\***
In the examples domain `x.y.z` is used, replace them with your domain
## Table of content
- [Features](#features)
- [Why am I making this](#why-am-i-making-this)
- [Key Points](#key-points)
- [How to use](#how-to-use)
- [Binary] (#binary)
- [Docker] (#docker)
- [Configuration](#configuration)
- [Single Port Configuration](#single-port-configuration-example)
- [Multiple Ports Configuration](#multiple-ports-configuration-example)
@ -20,8 +21,9 @@ In the examples domain `x.y.z` is used, replace them with your domain
- [Build it yourself](#build-it-yourself)
- [Getting SSL certs](#getting-ssl-certs)
## Features
## Key Points
- fast, nearly no performance penalty for end users when comparing to direct IP connections (See [benchmarks](#benchmarks))
- auto detect reverse proxies from docker
- additional reverse proxies from provider yaml file
- allow multiple docker / file providers by custom `config.yml` file
@ -29,30 +31,38 @@ In the examples domain `x.y.z` is used, replace them with your domain
- path matching
- HTTP proxy
- TCP/UDP Proxy
- HTTP round robin load balance support (same subdomain and path across containers replicas)
- Auto hot-reload when container start / die / stop.
- HTTP round robin load balance support (same subdomain and path across different hosts)
- Auto hot-reload on container start / die / stop or config changes.
- Simple panel to see all reverse proxies and health (visit port [panel port] of go-proxy `https://*.y.z:[panel port]`)
![panel screenshot](screenshots/panel.png)
![panel screenshot](screenshots/panel.png)
## Why am I making this
## How to use (docker)
1. It's fun.
2. I have tried different reverse proxy services, i.e. [nginx proxy manager](https://nginxproxymanager.com/), [traefik](https://github.com/traefik/traefik), [nginx-proxy](https://github.com/nginx-proxy/nginx-proxy). I have found that `traefik` is not easy to use, and I don't want to click buttons every time I spin up a new container (`nginx proxy manager`). For `nginx-proxy` I found it buggy and quite unusable.
1. Download and extract the latest release (or clone the repository if you want to try out experimental features)
2. Copy `config.example.yml` to `config.yml` and modify the content to fit your needs
3. Do the same for `providers.example.yml`
4. See [Binary](#binary) or [docker](#docker)
## How to use
### Binary
1. (Optional) Prepare your certificates in `certs/` to enable https. See [Getting SSL Certs](#getting-ssl-certs)
- cert / chain / fullchain: ./certs/cert.crt
- private key: ./certs/priv.key
2. run the binary `bin/go-proxy`
3. enjoy
1. Download and extract the latest release
### Docker
1. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml`
2. Copy content from [compose.example.yml](compose.example.yml) and create your own `compose.yml`
2. Add networks to make sure it is in the same network with other containers, or make sure `proxy.<alias>.host` is reachable
3. Add networks to make sure it is in the same network with other containers, or make sure `proxy.<alias>.host` is reachable
3. (Optional) Mount your SSL certs to enable https. See [Getting SSL Certs](#getting-ssl-certs)
- cert / chain / fullchain -> /app/certs/cert.crt
- private key -> /app/certs/priv.key
4. (Optional) Mount your SSL certs. See [Getting SSL Certs](#getting-ssl-certs)
4. Start `go-proxy` with `docker compose up -d` or `make up`.
5. Start `go-proxy` with `docker compose up -d` or `make up`.
6. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy`
5. (Optional) If you are using ufw with vpn that drop all inbound traffic except vpn, run below to allow docker containers to connect to `go-proxy`
In case the network of your container is in subnet `172.16.0.0/16` (bridge),
and vpn network is under `100.64.0.0/10` (i.e. tailscale)
@ -63,9 +73,9 @@ In the examples domain `x.y.z` is used, replace them with your domain
`docker network inspect $(docker network ls | awk '$3 == "bridge" { print $1}') | jq -r '.[] | .Name + " " + .IPAM.Config[0].Subnet' -`
7. start your docker app, and visit <container_name>.y.z
6. start your docker app, and visit <container_name>.y.z
8. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies
7. check the logs with `docker compose logs` or `make logs` to see if there is any error, check panel at [panel port] for active proxies
## Known issues
@ -88,9 +98,12 @@ However, there are some labels you can manipulate with:
- tcp/udp: is in format of `[<listeningPort>:]<targetPort>`
- when `listeningPort` is omitted (not suggested), a free port will be used automatically.
- `targetPort` must be a number, or the predefined names (see [stream.go](src/go-proxy/stream.go#L28))
- `no_tls_verify`: whether skip tls verify when scheme is https
- defaults to false
- `proxy.<alias>.path`: path matching (for http proxy only)
- defaults to empty
- `proxy.<alias>.path_mode`: mode for path handling
- defaults to empty
- allowed: \<empty>, forward, sub
- empty: remove path prefix from URL when proxying
@ -153,7 +166,7 @@ app-db:
- proxy.app-db.scheme=tcp
# Optional (first free port will be used for listening port)
- proxy.app-db.port=20000:postgres
- proxy.app-db.port=20000:postgres
# In go-proxy
go-proxy:
@ -189,43 +202,84 @@ A: Make sure the container is running, and \<subdomain> matches any container na
Benchmarked with `wrk` connecting `traefik/whoami`'s `/bench` endpoint
Direct connection
Remote benchmark (client running wrk and `go-proxy` server are different devices)
```shell
% wrk -t20 -c100 -d10s --latency http://homelab:4999/bench
Running 10s test @ http://homelab:4999/bench
20 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.74ms 1.19ms 19.94ms 81.53%
Req/Sec 1.35k 103.96 1.60k 73.60%
Latency Distribution
50% 3.46ms
75% 4.16ms
90% 4.98ms
99% 8.04ms
269696 requests in 10.01s, 32.41MB read
Requests/sec: 26950.35
Transfer/sec: 3.24MB
```
- Direct connection
With **go-proxy** reverse proxy
```shell
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://10.0.100.1/bench
Running 30s test @ http://10.0.100.1/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.34ms 1.16ms 22.76ms 85.77%
Req/Sec 4.63k 435.14 5.47k 90.07%
Latency Distribution
50% 3.95ms
75% 4.71ms
90% 5.68ms
99% 8.61ms
1383812 requests in 30.02s, 166.28MB read
Requests/sec: 46100.87
Transfer/sec: 5.54MB
```
```shell
% wrk -t20 -c100 -d10s --latency https://whoami.mydomain.com/bench
Running 10s test @ https://whoami.6uo.me/bench
20 threads and 100 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.02ms 2.13ms 47.49ms 95.14%
Req/Sec 1.28k 139.15 1.47k 91.67%
Latency Distribution
50% 3.60ms
75% 4.36ms
90% 5.29ms
99% 8.83ms
253874 requests in 10.02s, 24.70MB read
Requests/sec: 25342.46
Transfer/sec: 2.47MB
```
- With reverse proxy
```shell
root@yusing-pc:~# wrk -t 10 -c 200 -d 30s --latency http://bench.6uo.me/bench
Running 30s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.50ms 1.44ms 27.53ms 86.48%
Req/Sec 4.48k 375.00 5.12k 84.73%
Latency Distribution
50% 4.09ms
75% 5.06ms
90% 6.03ms
99% 9.41ms
1338996 requests in 30.01s, 160.90MB read
Requests/sec: 44616.36
Transfer/sec: 5.36MB
```
Local benchmark (client running wrk and `go-proxy` server are under same proxmox host but different LXCs)
- Direct connection
```
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://10.0.100.1/bench
Running 10s test @ http://10.0.100.1/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 434.08us 539.35us 8.76ms 85.28%
Req/Sec 67.71k 6.31k 87.21k 71.20%
Latency Distribution
50% 153.00us
75% 646.00us
90% 1.18ms
99% 2.38ms
6739591 requests in 10.01s, 809.85MB read
Requests/sec: 673608.15
Transfer/sec: 80.94MB
```
- With reverse proxy
```
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.78ms 5.49ms 117.53ms 99.00%
Req/Sec 16.31k 2.30k 21.01k 86.69%
Latency Distribution
50% 1.12ms
75% 1.88ms
90% 2.80ms
99% 7.27ms
1634774 requests in 10.10s, 196.44MB read
Requests/sec: 161858.70
Transfer/sec: 19.45MB
```
## Memory usage

Binary file not shown.

View file

@ -7,8 +7,8 @@ services:
networks: # ^also add here
- default
# environment:
# - VERBOSITY=1 # LOG LEVEL (optional, defaults to 1)
# - DEBUG=1 # (optional, enable only for debug)
# - GOPROXY_DEBUG=1 # (optional, enable only for debug)
# - GOPROXY_REDIRECT_HTTP=0 # (optional, uncomment to disable http redirect (http -> https))
ports:
- 80:80 # http
# - 443:443 # optional, https

View file

@ -3,13 +3,9 @@ if [ "$1" == "restart" ]; then
echo "restarting"
killall go-proxy
fi
if [ -z "$VERBOSITY" ]; then
VERBOSITY=1
fi
echo "starting with verbosity $VERBOSITY" > log/go-proxy.log
if [ "$DEBUG" == "1" ]; then
/app/go-proxy -v=$VERBOSITY --log_dir=log --stderrthreshold=0 2>> log/go-proxy.log &
/app/go-proxy 2> log/go-proxy.log &
tail -f /dev/null
else
/app/go-proxy -v=$VERBOSITY --logtostderr=1
/app/go-proxy
fi

7
go.mod
View file

@ -3,20 +3,18 @@ module github.com/yusing/go-proxy
go 1.21.7
require (
github.com/docker/cli v25.0.4+incompatible
github.com/docker/docker v25.0.4+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/golang/glog v1.2.0
github.com/sirupsen/logrus v1.9.3
golang.org/x/net v0.22.0
gopkg.in/yaml.v3 v3.0.1
)
require github.com/sirupsen/logrus v1.9.3 // indirect
require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/cli v25.0.4+incompatible
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
@ -36,6 +34,7 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/mod v0.16.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.19.0 // indirect
gotest.tools/v3 v3.5.1 // indirect

2
go.sum
View file

@ -30,8 +30,6 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=

View file

@ -9,3 +9,5 @@ app: # alias
path:
# optional
path_mode:
# optional
notlsverify: false

View file

@ -3,73 +3,95 @@ package main
import (
"fmt"
"os"
"sync"
"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
"gopkg.in/yaml.v3"
)
type Config struct {
Providers map[string]*Provider `yaml:",flow"`
// commented out if unused
type Config interface {
// Load() error
MustLoad()
// MustReload()
// Reload() error
StartProviders()
StopProviders()
WatchChanges()
StopWatching()
}
var config *Config
func NewConfig() Config {
cfg := &config{}
cfg.watcher = NewFileWatcher(
configPath,
cfg.MustReload, // OnChange
func() { os.Exit(1) }, // OnDelete
)
return cfg
}
func (cfg *config) Load() error {
cfg.mutex.Lock()
defer cfg.mutex.Unlock()
// unload if any
if cfg.Providers != nil {
for _, p := range cfg.Providers {
p.StopAllRoutes()
}
}
cfg.Providers = make(map[string]*Provider)
func ReadConfig() (*Config, error) {
config := Config{}
data, err := os.ReadFile(configPath)
if err != nil {
return nil, fmt.Errorf("unable to read config file: %v", err)
return fmt.Errorf("unable to read config file: %v", err)
}
err = yaml.Unmarshal(data, &config)
if err != nil {
return nil, fmt.Errorf("unable to parse config file: %v", err)
if err = yaml.Unmarshal(data, &cfg); err != nil {
return fmt.Errorf("unable to parse config file: %v", err)
}
for name, p := range config.Providers {
for name, p := range cfg.Providers {
p.name = name
}
return &config, nil
return nil
}
func ListenConfigChanges() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
glog.Errorf("[Config] unable to create file watcher: %v", err)
func (cfg *config) MustLoad() {
if err := cfg.Load(); err != nil {
cfgl.Fatal(err)
}
defer watcher.Close()
}
if err = watcher.Add(configPath); err != nil {
glog.Errorf("[Config] unable to watch file: %v", err)
return
}
func (cfg *config) Reload() error {
return cfg.Load()
}
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
switch {
case event.Has(fsnotify.Write):
glog.Infof("[Config] file change detected")
for _, p := range config.Providers {
p.StopAllRoutes()
}
config, err = ReadConfig()
if err != nil {
glog.Fatalf("[Config] unable to read config: %v", err)
}
StartAllRoutes()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
glog.Fatalf("[Config] file renamed / deleted")
}
case err := <-watcher.Errors:
glog.Errorf("[Config] File watcher error: %s", err)
}
}
}
func (cfg *config) MustReload() {
cfg.MustLoad()
}
func (cfg *config) StartProviders() {
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StartAllRoutes)
}
func (cfg *config) StopProviders() {
// Providers have their own mutex, no lock needed
ParallelForEachValue(cfg.Providers, (*Provider).StopAllRoutes)
}
func (cfg *config) WatchChanges() {
cfg.watcher.Start()
}
func (cfg *config) StopWatching() {
cfg.watcher.Stop()
}
type config struct {
Providers map[string]*Provider `yaml:",flow"`
watcher Watcher
mutex sync.Mutex
}

View file

@ -1,9 +1,13 @@
package main
import (
"crypto/tls"
"net"
"net/http"
"os"
"time"
"github.com/sirupsen/logrus"
)
var (
@ -75,6 +79,12 @@ var transport = &http.Transport{
MaxIdleConnsPerHost: 1000,
}
var transportNoTLS = func() *http.Transport {
var clone = transport.Clone()
clone.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
return clone
}()
const clientUrlFromEnv = "FROM_ENV"
const configPath = "config.yml"
@ -84,3 +94,13 @@ const StreamStopListenTimeout = 1 * time.Second
const templateFile = "templates/panel.html"
const udpBufferSize = 1500
var logLevel = func() logrus.Level {
switch os.Getenv("GOPROXY_DEBUG") {
case "1", "true":
logrus.SetLevel(logrus.DebugLevel)
}
return logrus.GetLevel()
}()
var redirectHTTP = os.Getenv("GOPROXY_REDIRECT_HTTP") != "0" && os.Getenv("GOPROXY_REDIRECT_HTTP") != "false"

View file

@ -5,12 +5,10 @@ import (
"net/http"
"reflect"
"strings"
"time"
"github.com/docker/cli/cli/connhelper"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"golang.org/x/net/context"
)
@ -45,16 +43,17 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP
isRemote := clientIP != ""
for _, alias := range aliases {
l := p.l.WithField("container", container_name).WithField("alias", alias)
config := NewProxyConfig(p)
prefix := fmt.Sprintf("proxy.%s.", alias)
for label, value := range container.Labels {
err := p.setConfigField(&config, label, value, prefix)
if err != nil {
p.Errorf("Build", "%v", err)
l.Error(err)
}
err = p.setConfigField(&config, label, value, wildcardPrefix)
if err != nil {
p.Errorf("Build", "%v", err)
l.Error(err)
}
}
if config.Port == "" {
@ -62,7 +61,7 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP
}
if config.Port == "0" {
// no ports exposed or specified
p.Logf("Build", "no ports exposed for %s, ignored", container_name)
l.Info("no ports exposed, ignored")
continue
}
if config.Scheme == "" {
@ -84,7 +83,7 @@ func (p *Provider) getContainerProxyConfigs(container types.Container, clientIP
}
}
if !isValidScheme(config.Scheme) {
p.Warningf("Build", "unsupported scheme: %s, using http", container_name, config.Scheme)
l.Warnf("unsupported scheme: %s, using http", config.Scheme)
config.Scheme = "http"
}
if config.Host == "" {
@ -177,33 +176,6 @@ func (p *Provider) getDockerProxyConfigs() ([]*ProxyConfig, error) {
return cfgs, nil
}
func (p *Provider) grWatchDockerChanges() {
p.stopWatching = make(chan struct{})
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
filters.Arg("event", "die"), // 'stop' already triggering 'die'
)
msgChan, errChan := p.dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
for {
select {
case <-p.stopWatching:
return
case msg := <-msgChan:
// TODO: handle actor only
p.Logf("Event", "container %s %s caused rebuild", msg.Actor.Attributes["name"], msg.Action)
p.StopAllRoutes()
p.StartAllRoutes()
case err := <-errChan:
p.Logf("Event", "error %s", err)
time.Sleep(100 * time.Millisecond)
msgChan, errChan = p.dockerClient.Events(context.Background(), types.EventsOptions{Filters: filter})
}
}
}
// var dockerUrlRegex = regexp.MustCompile(`^(?P<scheme>\w+)://(?P<host>[^:]+)(?P<port>:\d+)?(?P<path>/.*)?$`)
func getPublicPort(p types.Port) uint16 { return p.PublicPort }

View file

@ -3,9 +3,7 @@ package main
import (
"fmt"
"os"
"time"
"github.com/fsnotify/fsnotify"
"gopkg.in/yaml.v3"
)
@ -39,39 +37,3 @@ func (p *Provider) getFileProxyConfigs() ([]*ProxyConfig, error) {
return nil, err
}
}
func (p *Provider) grWatchFileChanges() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
p.Errorf("Watcher", "unable to create file watcher: %v", err)
}
defer watcher.Close()
if err = watcher.Add(p.Value); err != nil {
p.Errorf("Watcher", "unable to watch file %q: %v", p.Value, err)
return
}
for {
select {
case <-p.stopWatching:
return
case event, ok := <-watcher.Events:
if !ok {
return
}
switch {
case event.Has(fsnotify.Write):
p.Logf("Watcher", "file change detected")
p.StopAllRoutes()
p.StartAllRoutes()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
p.Logf("Watcher", "file renamed / deleted")
p.StopAllRoutes()
}
case err := <-watcher.Errors:
time.Sleep(100 * time.Millisecond)
p.Errorf("Watcher", "File watcher error: %s", err)
}
}
}

View file

@ -0,0 +1,39 @@
package main
import "sync"
func ParallelForEach[T interface{}](obj []T, do func(T)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for _, v := range obj {
go func(v T) {
do(v)
wg.Done()
}(v)
}
wg.Wait()
}
func ParallelForEachValue[K comparable, V interface{}](obj map[K]V, do func(V)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for _, v := range obj {
go func(v V) {
do(v)
wg.Done()
}(v)
}
wg.Wait()
}
func ParallelForEachKeyValue[K comparable, V interface{}](obj map[K]V, do func(K, V)) {
var wg sync.WaitGroup
wg.Add(len(obj))
for k, v := range obj {
go func(k K, v V) {
do(k, v)
wg.Done()
}(k, v)
}
wg.Wait()
}

View file

@ -2,20 +2,57 @@ package main
import (
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"github.com/golang/glog"
"github.com/sirupsen/logrus"
)
/**
A small mod on net/http/httputil.ReverseProxy
Before mod:
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.02ms 4.34ms 102.70ms 94.90%
Req/Sec 8.06k 1.17k 9.99k 79.86%
Latency Distribution
50% 2.38ms
75% 4.00ms
90% 5.93ms
99% 11.90ms
808813 requests in 10.10s, 78.68MB read
Requests/sec: 80079.47
Transfer/sec: 7.79MB
After mod:
root@http-benchmark-client:~# wrk -t 10 -c 200 -d 10s --latency http://bench.6uo.me/bench
Running 10s test @ http://bench.6uo.me/bench
10 threads and 200 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.77ms 5.64ms 118.14ms 99.07%
Req/Sec 16.59k 2.22k 19.65k 87.30%
Latency Distribution
50% 1.11ms
75% 1.85ms
90% 2.74ms
99% 6.68ms
1665286 requests in 10.10s, 200.11MB read
Requests/sec: 164880.11
Transfer/sec: 19.81MB
**/
type HTTPRoute struct {
Alias string
Url *url.URL
Path string
PathMode string
Proxy *httputil.ReverseProxy
Proxy *ReverseProxy
l logrus.FieldLogger
}
func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
@ -24,8 +61,14 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
return nil, err
}
proxy := httputil.NewSingleHostReverseProxy(url)
proxy.Transport = transport
var tr *http.Transport
if config.NoTLSVerify {
tr = transportNoTLS
} else {
tr = transport
}
proxy := NewSingleHostReverseProxy(url, tr)
if !isValidProxyPathMode(config.PathMode) {
return nil, fmt.Errorf("invalid path mode: %s", config.PathMode)
@ -37,22 +80,21 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
Path: config.Path,
Proxy: proxy,
PathMode: config.PathMode,
l: hrlog.WithFields(logrus.Fields{
"alias": config.Alias,
"path": config.Path,
"path_mode": config.PathMode,
}),
}
director := proxy.Director
proxy.Director = nil
initRewrite := func(pr *httputil.ProxyRequest) {
director(pr.Out)
}
rewrite := initRewrite
var rewrite func(*ProxyRequest)
switch {
case config.Path == "", config.PathMode == ProxyPathMode_Forward:
break
rewrite = proxy.Rewrite
case config.PathMode == ProxyPathMode_Sub:
rewrite = func(pr *httputil.ProxyRequest) {
initRewrite(pr)
rewrite = func(pr *ProxyRequest) {
proxy.Rewrite(pr)
// disable compression
pr.Out.Header.Set("Accept-Encoding", "identity")
// remove path prefix
@ -61,9 +103,7 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
route.Proxy.ModifyResponse = func(r *http.Response) error {
contentType, ok := r.Header["Content-Type"]
if !ok || len(contentType) == 0 {
if glog.V(3) {
glog.Infof("[Path sub] unknown content type for %s", r.Request.URL.String())
}
route.l.Debug("unknown content type for", r.Request.URL.String())
return nil
}
// disable cache
@ -76,28 +116,27 @@ func NewHTTPRoute(config *ProxyConfig) (*HTTPRoute, error) {
case strings.HasPrefix(contentType[0], "application/javascript"):
err = utils.respJSSubPath(r, config.Path)
default:
glog.V(4).Infof("[Path sub] unknown content type(s): %s", contentType)
route.l.Debug("unknown content type(s): ", contentType)
}
if err != nil {
err = fmt.Errorf("[Path sub] failed to remove path prefix %s: %v", config.Path, err)
err = fmt.Errorf("failed to remove path prefix %s: %v", config.Path, err)
route.l.WithField("action", "path_sub").Error(err)
r.Status = err.Error()
r.StatusCode = http.StatusInternalServerError
}
return err
}
default:
rewrite = func(pr *httputil.ProxyRequest) {
initRewrite(pr)
rewrite = func(pr *ProxyRequest) {
proxy.Rewrite(pr)
pr.Out.URL.Path = strings.TrimPrefix(pr.Out.URL.Path, config.Path)
}
}
if glog.V(3) {
route.Proxy.Rewrite = func(pr *httputil.ProxyRequest) {
if logLevel == logrus.DebugLevel {
route.Proxy.Rewrite = func(pr *ProxyRequest) {
rewrite(pr)
r := pr.In
glog.Infof("[Request] %s %s%s", r.Method, r.Host, r.URL.Path)
glog.V(5).InfoDepthf(1, "Headers: %v", r.Header)
route.l.Debug("Request headers: ", pr.In.Header)
}
} else {
route.Proxy.Rewrite = rewrite
@ -114,7 +153,7 @@ func (p *httpLoadBalancePool) Pick() *HTTPRoute {
}
func (r *HTTPRoute) RemoveFromRoutes() {
routes.HTTPRoutes.Delete(r.Alias)
httpRoutes.Delete(r.Alias)
}
// dummy implementation for Route interface
@ -144,24 +183,30 @@ func redirectToTLS(w http.ResponseWriter, r *http.Request) {
func findHTTPRoute(host string, path string) (*HTTPRoute, error) {
subdomain := strings.Split(host, ".")[0]
routeMap, ok := routes.HTTPRoutes.UnsafeGet(subdomain)
if !ok {
return nil, fmt.Errorf("no matching route for subdomain %s", subdomain)
routeMap, ok := httpRoutes.UnsafeGet(subdomain)
if ok {
return routeMap.FindMatch(path)
}
return routeMap.FindMatch(path)
return nil, fmt.Errorf("no matching route for subdomain %s", subdomain)
}
func httpProxyHandler(w http.ResponseWriter, r *http.Request) {
route, err := findHTTPRoute(r.Host, r.URL.Path)
if err != nil {
err = fmt.Errorf("[Request] failed %s %s%s, error: %v",
err = fmt.Errorf("request failed %s %s%s, error: %v",
r.Method,
r.Host,
r.URL.Path,
err,
)
logrus.Error(err)
http.Error(w, err.Error(), http.StatusNotFound)
return
}
route.Proxy.ServeHTTP(w, r)
}
// alias -> (path -> routes)
type HTTPRoutes = SafeMap[string, *pathPoolMap]
var httpRoutes HTTPRoutes = NewSafeMap[string](newPathPoolMap)

View file

@ -0,0 +1,833 @@
package main
// A small mod on net/http/httputils
// that doubled the performance
import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httptrace"
"net/textproto"
"net/url"
"strings"
"time"
"golang.org/x/net/http/httpguts"
)
// A ProxyRequest contains a request to be rewritten by a [ReverseProxy].
type ProxyRequest struct {
// In is the request received by the proxy.
// The Rewrite function must not modify In.
In *http.Request
// Out is the request which will be sent by the proxy.
// The Rewrite function may modify or replace this request.
// Hop-by-hop headers are removed from this request
// before Rewrite is called.
Out *http.Request
}
// SetURL routes the outbound request to the scheme, host, and base path
// provided in target. If the target's path is "/base" and the incoming
// request was for "/dir", the target request will be for "/base/dir".
//
// SetURL rewrites the outbound Host header to match the target's host.
// To preserve the inbound request's Host header (the default behavior
// of [NewSingleHostReverseProxy]):
//
// rewriteFunc := func(r *httputil.ProxyRequest) {
// r.SetURL(url)
// r.Out.Host = r.In.Host
// }
func (r *ProxyRequest) SetURL(target *url.URL) {
rewriteRequestURL(r.Out, target)
r.Out.Host = ""
}
// SetXForwarded sets the X-Forwarded-For, X-Forwarded-Host, and
// X-Forwarded-Proto headers of the outbound request.
//
// - The X-Forwarded-For header is set to the client IP address.
// - The X-Forwarded-Host header is set to the host name requested
// by the client.
// - The X-Forwarded-Proto header is set to "http" or "https", depending
// on whether the inbound request was made on a TLS-enabled connection.
//
// If the outbound request contains an existing X-Forwarded-For header,
// SetXForwarded appends the client IP address to it. To append to the
// inbound request's X-Forwarded-For header (the default behavior of
// [ReverseProxy] when using a Director function), copy the header
// from the inbound request before calling SetXForwarded:
//
// rewriteFunc := func(r *httputil.ProxyRequest) {
// r.Out.Header["X-Forwarded-For"] = r.In.Header["X-Forwarded-For"]
// r.SetXForwarded()
// }
func (r *ProxyRequest) SetXForwarded() {
clientIP, _, err := net.SplitHostPort(r.In.RemoteAddr)
if err == nil {
prior := r.Out.Header["X-Forwarded-For"]
if len(prior) > 0 {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
r.Out.Header.Set("X-Forwarded-For", clientIP)
} else {
r.Out.Header.Del("X-Forwarded-For")
}
r.Out.Header.Set("X-Forwarded-Host", r.In.Host)
if r.In.TLS == nil {
r.Out.Header.Set("X-Forwarded-Proto", "http")
} else {
r.Out.Header.Set("X-Forwarded-Proto", "https")
}
}
// ReverseProxy is an HTTP Handler that takes an incoming request and
// sends it to another server, proxying the response back to the
// client.
//
// 1xx responses are forwarded to the client if the underlying
// transport supports ClientTrace.Got1xxResponse.
type ReverseProxy struct {
// Rewrite must be a function which modifies
// the request into a new request to be sent
// using Transport. Its response is then copied
// back to the original client unmodified.
// Rewrite must not access the provided ProxyRequest
// or its contents after returning.
//
// The Forwarded, X-Forwarded, X-Forwarded-Host,
// and X-Forwarded-Proto headers are removed from the
// outbound request before Rewrite is called. See also
// the ProxyRequest.SetXForwarded method.
//
// Unparsable query parameters are removed from the
// outbound request before Rewrite is called.
// The Rewrite function may copy the inbound URL's
// RawQuery to the outbound URL to preserve the original
// parameter string. Note that this can lead to security
// issues if the proxy's interpretation of query parameters
// does not match that of the downstream server.
//
// At most one of Rewrite or Director may be set.
Rewrite func(*ProxyRequest)
// The transport used to perform proxy requests.
// If nil, http.DefaultTransport is used.
Transport http.RoundTripper
// FlushInterval specifies the flush interval
// to flush to the client while copying the
// response body.
// If zero, no periodic flushing is done.
// A negative value means to flush immediately
// after each write to the client.
// The FlushInterval is ignored when ReverseProxy
// recognizes a response as a streaming response, or
// if its ContentLength is -1; for such responses, writes
// are flushed to the client immediately.
FlushInterval time.Duration
// ErrorLog specifies an optional logger for errors
// that occur when attempting to proxy the request.
// If nil, logging is done via the log package's standard logger.
ErrorLog *log.Logger
// BufferPool optionally specifies a buffer pool to
// get byte slices for use by io.CopyBuffer when
// copying HTTP response bodies.
BufferPool BufferPool
// ModifyResponse is an optional function that modifies the
// Response from the backend. It is called if the backend
// returns a response at all, with any HTTP status code.
// If the backend is unreachable, the optional ErrorHandler is
// called without any call to ModifyResponse.
//
// If ModifyResponse returns an error, ErrorHandler is called
// with its error value. If ErrorHandler is nil, its default
// implementation is used.
ModifyResponse func(*http.Response) error
// ErrorHandler is an optional function that handles errors
// reaching the backend or errors from ModifyResponse.
//
// If nil, the default is to log the provided error and return
// a 502 Status Bad Gateway response.
ErrorHandler func(http.ResponseWriter, *http.Request, error)
}
// A BufferPool is an interface for getting and returning temporary
// byte slices for use by [io.CopyBuffer].
type BufferPool interface {
Get() []byte
Put([]byte)
}
func singleJoiningSlash(a, b string) string {
aslash := strings.HasSuffix(a, "/")
bslash := strings.HasPrefix(b, "/")
switch {
case aslash && bslash:
return a + b[1:]
case !aslash && !bslash:
return a + "/" + b
}
return a + b
}
func joinURLPath(a, b *url.URL) (path, rawpath string) {
if a.RawPath == "" && b.RawPath == "" {
return singleJoiningSlash(a.Path, b.Path), ""
}
// Same as singleJoiningSlash, but uses EscapedPath to determine
// whether a slash should be added
apath := a.EscapedPath()
bpath := b.EscapedPath()
aslash := strings.HasSuffix(apath, "/")
bslash := strings.HasPrefix(bpath, "/")
switch {
case aslash && bslash:
return a.Path + b.Path[1:], apath + bpath[1:]
case !aslash && !bslash:
return a.Path + "/" + b.Path, apath + "/" + bpath
}
return a.Path + b.Path, apath + bpath
}
// NewSingleHostReverseProxy returns a new [ReverseProxy] that routes
// URLs to the scheme, host, and base path provided in target. If the
// target's path is "/base" and the incoming request was for "/dir",
// the target request will be for /base/dir.
//
// NewSingleHostReverseProxy does not rewrite the Host header.
//
// To customize the ReverseProxy behavior beyond what
// NewSingleHostReverseProxy provides, use ReverseProxy directly
// with a Rewrite function. The ProxyRequest SetURL method
// may be used to route the outbound request. (Note that SetURL,
// unlike NewSingleHostReverseProxy, rewrites the Host header
// of the outbound request by default.)
//
// proxy := &ReverseProxy{
// Rewrite: func(r *ProxyRequest) {
// r.SetURL(target)
// r.Out.Host = r.In.Host // if desired
// },
// }
func NewSingleHostReverseProxy(target *url.URL, transport *http.Transport) *ReverseProxy {
return &ReverseProxy{Rewrite: func(pr *ProxyRequest) {
rewriteRequestURL(pr.Out, target)
}, Transport: transport}
}
func rewriteRequestURL(req *http.Request, target *url.URL) {
targetQuery := target.RawQuery
req.URL.Scheme = target.Scheme
req.URL.Host = target.Host
req.URL.Path, req.URL.RawPath = joinURLPath(target, req.URL)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}
// Hop-by-hop headers. These are removed when sent to the backend.
// As of RFC 7230, hop-by-hop headers are required to appear in the
// Connection header field. These are the headers defined by the
// obsoleted RFC 2616 (section 13.5.1) and are used for backward
// compatibility.
// var hopHeaders = []string{
// "Connection",
// "Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
// "Keep-Alive",
// "Proxy-Authenticate",
// "Proxy-Authorization",
// "Te", // canonicalized version of "TE"
// "Trailer", // not Trailers per URL above; https://www.rfc-editor.org/errata_search.php?eid=4522
// "Transfer-Encoding",
// "Upgrade",
// }
// NOTE: getErrorHandler and DefaultErrorHandler removed
func (p *ReverseProxy) errorHandler(rw http.ResponseWriter, _ *http.Request, err error) {
p.logf("http: proxy error: %v", err)
rw.WriteHeader(http.StatusBadGateway)
}
// modifyResponse conditionally runs the optional ModifyResponse hook
// and reports whether the request should proceed.
func (p *ReverseProxy) modifyResponse(rw http.ResponseWriter, res *http.Response, req *http.Request) bool {
if p.ModifyResponse == nil {
return true
}
if err := p.ModifyResponse(res); err != nil {
res.Body.Close()
p.errorHandler(rw, req, err)
return false
}
return true
}
func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
transport := p.Transport
// Note: removed
// if transport == nil {
// transport = http.DefaultTransport
// }
ctx := req.Context()
if ctx.Done() != nil {
// CloseNotifier predates context.Context, and has been
// entirely superseded by it. If the request contains
// a Context that carries a cancellation signal, don't
// bother spinning up a goroutine to watch the CloseNotify
// channel (if any).
//
// If the request Context has a nil Done channel (which
// means it is either context.Background, or a custom
// Context implementation with no cancellation signal),
// then consult the CloseNotifier if available.
} else if cn, ok := rw.(http.CloseNotifier); ok {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
notifyChan := cn.CloseNotify()
go func() {
select {
case <-notifyChan:
cancel()
case <-ctx.Done():
}
}()
}
outreq := req.Clone(ctx)
if req.ContentLength == 0 {
outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
}
if outreq.Body != nil {
// Reading from the request body after returning from a handler is not
// allowed, and the RoundTrip goroutine that reads the Body can outlive
// this handler. This can lead to a crash if the handler panics (see
// Issue 46866). Although calling Close doesn't guarantee there isn't
// any Read in flight after the handle returns, in practice it's safe to
// read after closing it.
defer outreq.Body.Close()
}
if outreq.Header == nil {
outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
// NOTE: removed
// if (p.Director != nil) == (p.Rewrite != nil) {
// p.errorHandler(rw, req, errors.New("ReverseProxy must have exactly one of Director or Rewrite set"))
// return
// }
// if p.Director != nil {
// p.Director(outreq)
// if outreq.Form != nil {
// outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
// }
// }
outreq.Close = false
reqUpType := upgradeType(outreq.Header)
if !IsPrint(reqUpType) {
p.errorHandler(rw, req, fmt.Errorf("client tried to switch to invalid protocol %q", reqUpType))
return
}
// NOTE: removed
// removeHopByHopHeaders(outreq.Header)
// Issue 21096: tell backend applications that care about trailer support
// that we support trailers. (We do, but we don't go out of our way to
// advertise that unless the incoming client request thought it was worth
// mentioning.) Note that we look at req.Header, not outreq.Header, since
// the latter has passed through removeHopByHopHeaders.
if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") {
outreq.Header.Set("Te", "trailers")
}
// After stripping all the hop-by-hop connection headers above, add back any
// necessary for protocol upgrades, such as for websockets.
if reqUpType != "" {
outreq.Header.Set("Connection", "Upgrade")
outreq.Header.Set("Upgrade", reqUpType)
}
// NOTE: removed
// if p.Rewrite != nil {
// Strip client-provided forwarding headers.
// The Rewrite func may use SetXForwarded to set new values
// for these or copy the previous values from the inbound request.
// outreq.Header.Del("Forwarded")
// outreq.Header.Del("X-Forwarded-For")
// outreq.Header.Del("X-Forwarded-Host")
// outreq.Header.Del("X-Forwarded-Proto")
// NOTE: removed
// Remove unparsable query parameters from the outbound request.
// outreq.URL.RawQuery = cleanQueryParams(outreq.URL.RawQuery)
pr := &ProxyRequest{
In: req,
Out: outreq,
}
pr.SetXForwarded() // NOTE: added
p.Rewrite(pr)
outreq = pr.Out
// NOTE: removed
// } else {
// if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
// // If we aren't the first proxy retain prior
// // X-Forwarded-For information as a comma+space
// // separated list and fold multiple headers into one.
// prior, ok := outreq.Header["X-Forwarded-For"]
// omit := ok && prior == nil // Issue 38079: nil now means don't populate the header
// if len(prior) > 0 {
// clientIP = strings.Join(prior, ", ") + ", " + clientIP
// }
// if !omit {
// outreq.Header.Set("X-Forwarded-For", clientIP)
// }
// }
// }
if _, ok := outreq.Header["User-Agent"]; !ok {
// If the outbound request doesn't have a User-Agent header set,
// don't send the default Go HTTP client User-Agent.
outreq.Header.Set("User-Agent", "")
}
trace := &httptrace.ClientTrace{
Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
h := rw.Header()
// copyHeader(h, http.Header(header))
for k, vv := range header {
for _, v := range vv {
h.Add(k, v)
}
}
rw.WriteHeader(code)
// Clear headers, it's not automatically done by ResponseWriter.WriteHeader() for 1xx responses
clear(h)
return nil
},
}
outreq = outreq.WithContext(httptrace.WithClientTrace(outreq.Context(), trace))
res, err := transport.RoundTrip(outreq)
if err != nil {
p.errorHandler(rw, outreq, err)
return
}
// Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
if res.StatusCode == http.StatusSwitchingProtocols {
if !p.modifyResponse(rw, res, outreq) {
return
}
p.handleUpgradeResponse(rw, outreq, res)
return
}
// NOTE: removed
// removeHopByHopHeaders(res.Header)
if !p.modifyResponse(rw, res, outreq) {
return
}
copyHeader(rw.Header(), res.Header)
// The "Trailer" header isn't included in the Transport's response,
// at least for *http.Transport. Build it up from Trailer.
announcedTrailers := len(res.Trailer)
if announcedTrailers > 0 {
trailerKeys := make([]string, 0, len(res.Trailer))
for k := range res.Trailer {
trailerKeys = append(trailerKeys, k)
}
rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
}
rw.WriteHeader(res.StatusCode)
// NOTE: changing this line extremely improve throughput
// err = p.copyResponse(rw, res.Body, p.flushInterval(res))
_, err = io.Copy(rw, res.Body)
if err != nil {
defer res.Body.Close()
// note: removed
// Since we're streaming the response, if we run into an error all we can do
// is abort the request. Issue 23643: ReverseProxy should use ErrAbortHandler
// on read error while copying body.
// if !shouldPanicOnCopyError(req) {
// p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
// return
// }
panic(http.ErrAbortHandler)
}
res.Body.Close() // close now, instead of defer, to populate res.Trailer
if len(res.Trailer) > 0 {
// Force chunking if we saw a response trailer.
// This prevents net/http from calculating the length for short
// bodies and adding a Content-Length.
http.NewResponseController(rw).Flush()
}
if len(res.Trailer) == announcedTrailers {
copyHeader(rw.Header(), res.Trailer)
return
}
for k, vv := range res.Trailer {
k = http.TrailerPrefix + k
for _, v := range vv {
rw.Header().Add(k, v)
}
}
}
// var inOurTests bool // whether we're in our own tests
// NOTE: removed
// shouldPanicOnCopyError reports whether the reverse proxy should
// panic with http.ErrAbortHandler. This is the right thing to do by
// default, but Go 1.10 and earlier did not, so existing unit tests
// weren't expecting panics. Only panic in our own tests, or when
// running under the HTTP server.
// func shouldPanicOnCopyError(req *http.Request) bool {
// if inOurTests {
// // Our tests know to handle this panic.
// return true
// }
// if req.Context().Value(http.ServerContextKey) != nil {
// // We seem to be running under an HTTP server, so
// // it'll recover the panic.
// return true
// }
// // Otherwise act like Go 1.10 and earlier to not break
// // existing tests.
// return false
// }
// removeHopByHopHeaders removes hop-by-hop headers.
//
// func removeHopByHopHeaders(h http.Header) {
// // RFC 7230, section 6.1: Remove headers listed in the "Connection" header.
// for _, f := range h["Connection"] {
// for _, sf := range strings.Split(f, ",") {
// if sf = textproto.TrimString(sf); sf != "" {
// h.Del(sf)
// }
// }
// }
// // RFC 2616, section 13.5.1: Remove a set of known hop-by-hop headers.
// // This behavior is superseded by the RFC 7230 Connection header, but
// // preserve it for backwards compatibility.
// for _, f := range hopHeaders {
// h.Del(f)
// }
// }
// NOTE: removed
// flushInterval returns the p.FlushInterval value, conditionally
// overriding its value for a specific request/response.
// func (p *ReverseProxy) flushInterval(res *http.Response) time.Duration {
// resCT := res.Header.Get("Content-Type")
// // For Server-Sent Events responses, flush immediately.
// // The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
// if baseCT, _, _ := mime.ParseMediaType(resCT); baseCT == "text/event-stream" {
// return -1 // negative means immediately
// }
// // We might have the case of streaming for which Content-Length might be unset.
// if res.ContentLength == -1 {
// return -1
// }
// return p.FlushInterval
// }
// NOTE: removed
// func (p *ReverseProxy) copyResponse(dst http.ResponseWriter, src io.Reader, flushInterval time.Duration) error {
// var w io.Writer = dst
// if flushInterval != 0 {
// mlw := &maxLatencyWriter{
// dst: dst,
// flush: http.NewResponseController(dst).Flush,
// latency: flushInterval,
// }
// defer mlw.stop()
// // set up initial timer so headers get flushed even if body writes are delayed
// mlw.flushPending = true
// mlw.t = time.AfterFunc(flushInterval, mlw.delayedFlush)
// w = mlw
// }
// var buf []byte
// if p.BufferPool != nil {
// buf = p.BufferPool.Get()
// defer p.BufferPool.Put(buf)
// }
// _, err := p.copyBuffer(w, src, buf)
// return err
// }
// copyBuffer returns any write errors or non-EOF read errors, and the amount
// of bytes written.
// NOTE: removed
// func (p *ReverseProxy) copyBuffer(dst io.Writer, src io.Reader, buf []byte) (int64, error) {
// if len(buf) == 0 {
// buf = make([]byte, 32*1024)
// }
// var written int64
// for {
// nr, rerr := src.Read(buf)
// if rerr != nil && rerr != io.EOF && rerr != context.Canceled {
// p.logf("httputil: ReverseProxy read error during body copy: %v", rerr)
// }
// if nr > 0 {
// nw, werr := dst.Write(buf[:nr])
// if nw > 0 {
// written += int64(nw)
// }
// if werr != nil {
// return written, werr
// }
// if nr != nw {
// return written, io.ErrShortWrite
// }
// }
// if rerr != nil {
// if rerr == io.EOF {
// rerr = nil
// }
// return written, rerr
// }
// }
// }
func (p *ReverseProxy) logf(format string, args ...any) {
if p.ErrorLog != nil {
p.ErrorLog.Printf(format, args...)
} else {
hrlog.Printf(format, args...)
}
}
// NOTE: removed
// type maxLatencyWriter struct {
// dst io.Writer
// flush func() error
// latency time.Duration // non-zero; negative means to flush immediately
// mu sync.Mutex // protects t, flushPending, and dst.Flush
// t *time.Timer
// flushPending bool
// }
// NOTE: removed
// func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
// m.mu.Lock()
// defer m.mu.Unlock()
// n, err = m.dst.Write(p)
// if m.latency < 0 {
// m.flush()
// return
// }
// if m.flushPending {
// return
// }
// if m.t == nil {
// m.t = time.AfterFunc(m.latency, m.delayedFlush)
// } else {
// m.t.Reset(m.latency)
// }
// m.flushPending = true
// return
// }
// func (m *maxLatencyWriter) delayedFlush() {
// m.mu.Lock()
// defer m.mu.Unlock()
// if !m.flushPending { // if stop was called but AfterFunc already started this goroutine
// return
// }
// m.flush()
// m.flushPending = false
// }
// func (m *maxLatencyWriter) stop() {
// m.mu.Lock()
// defer m.mu.Unlock()
// m.flushPending = false
// if m.t != nil {
// m.t.Stop()
// }
// }
func upgradeType(h http.Header) string {
if !httpguts.HeaderValuesContainsToken(h["Connection"], "Upgrade") {
return ""
}
return h.Get("Upgrade")
}
func (p *ReverseProxy) handleUpgradeResponse(rw http.ResponseWriter, req *http.Request, res *http.Response) {
reqUpType := upgradeType(req.Header)
resUpType := upgradeType(res.Header)
if !IsPrint(resUpType) { // We know reqUpType is ASCII, it's checked by the caller.
p.errorHandler(rw, req, fmt.Errorf("backend tried to switch to invalid protocol %q", resUpType))
}
if !strings.EqualFold(reqUpType, resUpType) {
p.errorHandler(rw, req, fmt.Errorf("backend tried to switch protocol %q when %q was requested", resUpType, reqUpType))
return
}
backConn, ok := res.Body.(io.ReadWriteCloser)
if !ok {
p.errorHandler(rw, req, fmt.Errorf("internal error: 101 switching protocols response with non-writable body"))
return
}
rc := http.NewResponseController(rw)
conn, brw, hijackErr := rc.Hijack()
if errors.Is(hijackErr, http.ErrNotSupported) {
p.errorHandler(rw, req, fmt.Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw))
return
}
backConnCloseCh := make(chan bool)
go func() {
// Ensure that the cancellation of a request closes the backend.
// See issue https://golang.org/issue/35559.
select {
case <-req.Context().Done():
case <-backConnCloseCh:
}
backConn.Close()
}()
defer close(backConnCloseCh)
if hijackErr != nil {
p.errorHandler(rw, req, fmt.Errorf("hijack failed on protocol switch: %v", hijackErr))
return
}
defer conn.Close()
copyHeader(rw.Header(), res.Header)
res.Header = rw.Header()
res.Body = nil // so res.Write only writes the headers; we have res.Body in backConn above
if err := res.Write(brw); err != nil {
p.errorHandler(rw, req, fmt.Errorf("response write: %v", err))
return
}
if err := brw.Flush(); err != nil {
p.errorHandler(rw, req, fmt.Errorf("response flush: %v", err))
return
}
errc := make(chan error, 1)
// NOTE: removed
// spc := switchProtocolCopier{user: conn, backend: backConn}
// go spc.copyToBackend(errc)
// go spc.copyFromBackend(errc)
go func() {
_, err := io.Copy(conn, backConn)
errc <- err
}()
go func() {
_, err := io.Copy(backConn, conn)
errc <- err
}()
<-errc
}
// NOTE: removed
// switchProtocolCopier exists so goroutines proxying data back and
// forth have nice names in stacks.
// type switchProtocolCopier struct {
// user, backend io.ReadWriter
// }
// func (c switchProtocolCopier) copyFromBackend(errc chan<- error) {
// _, err := io.Copy(c.user, c.backend)
// errc <- err
// }
// func (c switchProtocolCopier) copyToBackend(errc chan<- error) {
// _, err := io.Copy(c.backend, c.user)
// errc <- err
// }
// NOTE: removed
// func cleanQueryParams(s string) string {
// reencode := func(s string) string {
// v, _ := url.ParseQuery(s)
// return v.Encode()
// }
// for i := 0; i < len(s); {
// switch s[i] {
// case ';':
// return reencode(s)
// case '%':
// if i+2 >= len(s) || !ishex(s[i+1]) || !ishex(s[i+2]) {
// return reencode(s)
// }
// i += 3
// default:
// i++
// }
// }
// return s
// }
// func ishex(c byte) bool {
// switch {
// case '0' <= c && c <= '9':
// return true
// case 'a' <= c && c <= 'f':
// return true
// case 'A' <= c && c <= 'F':
// return true
// }
// return false
// }
func IsPrint(s string) bool {
for i := 0; i < len(s); i++ {
if s[i] < ' ' || s[i] > '~' {
return false
}
}
return true
}

10
src/go-proxy/loggers.go Normal file
View file

@ -0,0 +1,10 @@
package main
import "github.com/sirupsen/logrus"
var palog = logrus.WithField("component", "panel")
var prlog = logrus.WithField("component", "provider")
var cfgl = logrus.WithField("component", "config")
var hrlog = logrus.WithField("component", "http_proxy")
var srlog = logrus.WithField("component", "stream")
var wlog = logrus.WithField("component", "watcher")

View file

@ -3,10 +3,12 @@ package main
import (
"flag"
"net/http"
"os"
"os/signal"
"runtime"
"time"
"syscall"
"github.com/golang/glog"
log "github.com/sirupsen/logrus"
)
func main() {
@ -15,59 +17,65 @@ func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
go func() {
for range time.Tick(100 * time.Millisecond) {
glog.Flush()
}
}()
log.SetFormatter(&log.TextFormatter{
ForceColors: true,
DisableColors: false,
FullTimestamp: true,
})
if config, err = ReadConfig(); err != nil {
glog.Fatal("unable to read config: ", err)
}
InitFSWatcher()
InitDockerWatcher()
StartAllRoutes()
go ListenConfigChanges()
mux := http.NewServeMux()
mux.HandleFunc("/", httpProxyHandler)
cfg := NewConfig()
cfg.MustLoad()
cfg.StartProviders()
cfg.WatchChanges()
var certAvailable = utils.fileOK(certPath) && utils.fileOK(keyPath)
go func() {
glog.Infoln("starting http server on port 80")
if certAvailable {
log.Info("starting http server on port 80")
if certAvailable && redirectHTTP {
err = http.ListenAndServe(":80", http.HandlerFunc(redirectToTLS))
} else {
err = http.ListenAndServe(":80", mux)
err = http.ListenAndServe(":80", http.HandlerFunc(httpProxyHandler))
}
if err != nil {
glog.Fatal("HTTP server error", err)
log.Fatal("HTTP server error: ", err)
}
}()
go func() {
glog.Infoln("starting http panel on port 8080")
log.Infof("starting http panel on port 8080")
err := http.ListenAndServe(":8080", http.HandlerFunc(panelHandler))
if err != nil {
glog.Fatal("HTTP server error", err)
log.Warning("HTTP panel error: ", err)
}
}()
if certAvailable {
go func() {
glog.Infoln("starting https panel on port 8443")
err := http.ListenAndServeTLS(":8443", certPath, keyPath, http.HandlerFunc(panelHandler))
log.Info("starting https server on port 443")
err = http.ListenAndServeTLS(":443", certPath, keyPath, http.HandlerFunc(httpProxyHandler))
if err != nil {
glog.Fatal("http server error", err)
log.Fatal("https server error: ", err)
}
}()
go func() {
glog.Infoln("starting https server on port 443")
err = http.ListenAndServeTLS(":443", certPath, keyPath, mux)
log.Info("starting https panel on port 8443")
err := http.ListenAndServeTLS(":8443", certPath, keyPath, http.HandlerFunc(panelHandler))
if err != nil {
glog.Fatal("https server error: ", err)
log.Warning("http panel error: ", err)
}
}()
}
<-make(chan struct{})
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT)
signal.Notify(sig, syscall.SIGTERM)
signal.Notify(sig, syscall.SIGHUP)
<-sig
cfg.StopProviders()
close(fsWatcherStop)
close(dockerWatcherStop)
}

View file

@ -6,8 +6,6 @@ import (
"net/http"
"net/url"
"time"
"github.com/golang/glog"
)
var healthCheckHttpClient = &http.Client{
@ -32,6 +30,7 @@ func panelHandler(w http.ResponseWriter, r *http.Request) {
panelCheckTargetHealth(w, r)
return
default:
palog.Errorf("%s not found", r.URL.Path)
http.NotFound(w, r)
return
}
@ -46,12 +45,22 @@ func panelIndex(w http.ResponseWriter, r *http.Request) {
tmpl, err := template.ParseFiles(templateFile)
if err != nil {
palog.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = tmpl.Execute(w, &routes)
type allRoutes struct {
HTTPRoutes HTTPRoutes
StreamRoutes StreamRoutes
}
err = tmpl.Execute(w, allRoutes{
HTTPRoutes: httpRoutes,
StreamRoutes: streamRoutes,
})
if err != nil {
palog.Error(err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
@ -71,7 +80,7 @@ func panelCheckTargetHealth(w http.ResponseWriter, r *http.Request) {
url, err := url.Parse(targetUrl)
if err != nil {
glog.Infof("[Panel] failed to parse %s, error: %v", targetUrl, err)
palog.Infof("failed to parse url %q, error: %v", targetUrl, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

View file

@ -9,8 +9,8 @@ type pathPoolMap struct {
SafeMap[string, *httpLoadBalancePool]
}
func newPathPoolMap() pathPoolMap {
return pathPoolMap{
func newPathPoolMap() *pathPoolMap {
return &pathPoolMap{
NewSafeMap[string](NewHTTPLoadBalancePool),
}
}
@ -21,8 +21,7 @@ func (m pathPoolMap) Add(path string, route *HTTPRoute) {
}
func (m pathPoolMap) FindMatch(pathGot string) (*HTTPRoute, error) {
pool := m.Iterator()
for pathWant, v := range pool {
for pathWant, v := range m.Iterator() {
if strings.HasPrefix(pathGot, pathWant) {
return v.Pick(), nil
}

View file

@ -5,7 +5,7 @@ import (
"sync"
"github.com/docker/docker/client"
"github.com/golang/glog"
"github.com/sirupsen/logrus"
)
type Provider struct {
@ -13,111 +13,75 @@ type Provider struct {
Value string
name string
stopWatching chan struct{}
watcher Watcher
routes map[string]Route // id -> Route
dockerClient *client.Client
mutex sync.Mutex
l logrus.FieldLogger
}
func (p *Provider) GetProxyConfigs() ([]*ProxyConfig, error) {
func (p *Provider) Setup() error {
var cfgs []*ProxyConfig
var err error
p.l = prlog.WithFields(logrus.Fields{"kind": p.Kind, "name": p.name})
switch p.Kind {
case ProviderKind_Docker:
return p.getDockerProxyConfigs()
cfgs, err = p.getDockerProxyConfigs()
p.watcher = NewDockerWatcher(p.dockerClient, p.ReloadRoutes)
case ProviderKind_File:
return p.getFileProxyConfigs()
cfgs, err = p.getFileProxyConfigs()
p.watcher = NewFileWatcher(p.Value, p.ReloadRoutes, p.StopAllRoutes)
default:
// this line should never be reached
return nil, fmt.Errorf("unknown provider kind %q", p.Kind)
}
}
func StartAllRoutes() {
var wg sync.WaitGroup
wg.Add(len(config.Providers))
for _, p := range config.Providers {
go func(p *Provider) {
p.StartAllRoutes()
wg.Done()
}(p)
}
wg.Wait()
}
func (p *Provider) StopAllRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.stopWatching == nil {
return
return fmt.Errorf("unknown provider kind")
}
close(p.stopWatching)
p.stopWatching = nil
if p.dockerClient != nil {
p.dockerClient.Close()
}
var wg sync.WaitGroup
wg.Add(len(p.routes))
for _, route := range p.routes {
go func(r Route) {
r.StopListening()
r.RemoveFromRoutes()
wg.Done()
}(route)
}
wg.Wait()
p.routes = make(map[string]Route)
}
func (p *Provider) StartAllRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.routes = make(map[string]Route)
cfgs, err := p.GetProxyConfigs()
if err != nil {
p.Logf("Build", "unable to get proxy configs: %v", err)
return
return err
}
p.l.Infof("loaded %d proxy configurations", len(cfgs))
for _, cfg := range cfgs {
r, err := NewRoute(cfg)
if err != nil {
p.Logf("Build", "error creating route %q: %v", cfg.Alias, err)
p.l.Errorf("error creating route %s: %v", cfg.Alias, err)
continue
}
r.SetupListen()
r.Listen()
p.routes[cfg.GetID()] = r
}
p.WatchChanges()
p.Logf("Build", "built %d routes", len(p.routes))
p.stopWatching = make(chan struct{})
return nil
}
func (p *Provider) WatchChanges() {
switch p.Kind {
case ProviderKind_Docker:
go p.grWatchDockerChanges()
case ProviderKind_File:
go p.grWatchFileChanges()
default:
// this line should never be reached
p.Errorf("unknown provider kind %q", p.Kind)
func (p *Provider) StartAllRoutes() {
p.routes = make(map[string]Route)
err := p.Setup()
if err != nil {
p.l.Error(err)
return
}
p.watcher.Start()
}
func (p *Provider) Logf(t string, s string, args ...interface{}) {
glog.Infof("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...)
func (p *Provider) StopAllRoutes() {
p.watcher.Stop()
p.dockerClient = nil
ParallelForEachValue(p.routes, func(r Route) {
r.StopListening()
r.RemoveFromRoutes()
})
p.routes = make(map[string]Route)
}
func (p *Provider) Errorf(t string, s string, args ...interface{}) {
glog.Errorf("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...)
}
func (p *Provider) ReloadRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
func (p *Provider) Warningf(t string, s string, args ...interface{}) {
glog.Warningf("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...)
}
p.StopAllRoutes()
p.StartAllRoutes()
}

View file

@ -8,6 +8,7 @@ type ProxyConfig struct {
Host string
Port string
LoadBalance string // docker provider only
NoTLSVerify bool // http proxy only
Path string // http proxy only
PathMode string `yaml:"path_mode"` // http proxy only

View file

@ -2,15 +2,8 @@ package main
import (
"fmt"
"sync"
)
type Routes struct {
HTTPRoutes SafeMap[string, pathPoolMap] // alias -> (path -> routes)
StreamRoutes SafeMap[string, StreamRoute] // id -> target
Mutex sync.Mutex
}
type Route interface {
SetupListen()
Listen()
@ -21,22 +14,22 @@ type Route interface {
func NewRoute(cfg *ProxyConfig) (Route, error) {
if isStreamScheme(cfg.Scheme) {
id := cfg.GetID()
if routes.StreamRoutes.Contains(id) {
if streamRoutes.Contains(id) {
return nil, fmt.Errorf("duplicated %s stream %s, ignoring", cfg.Scheme, id)
}
route, err := NewStreamRoute(cfg)
if err != nil {
return nil, err
}
routes.StreamRoutes.Set(id, route)
streamRoutes.Set(id, route)
return route, nil
} else {
routes.HTTPRoutes.Ensure(cfg.Alias)
httpRoutes.Ensure(cfg.Alias)
route, err := NewHTTPRoute(cfg)
if err != nil {
return nil, err
}
routes.HTTPRoutes.Get(cfg.Alias).Add(cfg.Path, route)
httpRoutes.Get(cfg.Alias).Add(cfg.Path, route)
return route, nil
}
}
@ -59,11 +52,7 @@ func isStreamScheme(s string) bool {
return false
}
func initRoutes() *Routes {
r := Routes{}
r.HTTPRoutes = NewSafeMap[string](newPathPoolMap)
r.StreamRoutes = NewSafeMap[string, StreamRoute]()
return &r
}
// id -> target
type StreamRoutes = SafeMap[string, StreamRoute]
var routes = initRoutes()
var streamRoutes = NewSafeMap[string, StreamRoute]()

View file

@ -8,15 +8,14 @@ import (
"sync"
"time"
"github.com/golang/glog"
"github.com/sirupsen/logrus"
)
type StreamRoute interface {
Route
Logf(string, ...interface{})
PrintError(error)
ListeningUrl() string
TargetUrl() string
Logger() logrus.FieldLogger
closeListeners()
closeChannel()
@ -36,6 +35,7 @@ type StreamRouteBase struct {
id string
wg sync.WaitGroup
stopChann chan struct{}
l logrus.FieldLogger
}
func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
@ -47,8 +47,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
port_split := strings.Split(config.Port, ":")
if len(port_split) != 2 {
glog.Infof(`[Build] %s: Invalid stream port %s, `+
`assuming it's targetPort`, config.Alias, config.Port)
cfgl.Warnf("Invalid port %s, assuming it is target port", config.Port)
srcPort = "0"
dstPort = config.Port
} else {
@ -63,8 +62,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
srcPortInt, err := strconv.Atoi(srcPort)
if err != nil {
return nil, fmt.Errorf(
"[Build] %s: Unrecognized stream source port %s, ignoring",
config.Alias, srcPort,
"invalid stream source port %s, ignoring", srcPort,
)
}
@ -73,8 +71,7 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
dstPortInt, err := strconv.Atoi(dstPort)
if err != nil {
return nil, fmt.Errorf(
"[Build] %s: Unrecognized stream target port %s, ignoring",
config.Alias, dstPort,
"invalid stream target port %s, ignoring", dstPort,
)
}
@ -100,6 +97,11 @@ func newStreamRouteBase(config *ProxyConfig) (*StreamRouteBase, error) {
id: config.GetID(),
wg: sync.WaitGroup{},
stopChann: make(chan struct{}),
l: srlog.WithFields(logrus.Fields{
"alias": config.Alias,
"src": fmt.Sprintf("%s://:%d", srcScheme, srcPortInt),
"dst": fmt.Sprintf("%s://%s:%d", dstScheme, config.Host, dstPortInt),
}),
}, nil
}
@ -114,29 +116,6 @@ func NewStreamRoute(config *ProxyConfig) (StreamRoute, error) {
}
}
func (route *StreamRouteBase) PrintError(err error) {
if err == nil {
return
}
glog.Errorf("[%s -> %s] %s: %v",
route.ListeningScheme,
route.TargetScheme,
route.Alias,
err,
)
}
func (route *StreamRouteBase) Logf(format string, v ...interface{}) {
glog.Infof("[%s -> %s] %s: "+format,
append([]interface{}{
route.ListeningScheme,
route.TargetScheme,
route.Alias},
v...,
)...,
)
}
func (route *StreamRouteBase) ListeningUrl() string {
return fmt.Sprintf("%s:%v", route.ListeningScheme, route.ListeningPort)
}
@ -145,21 +124,25 @@ func (route *StreamRouteBase) TargetUrl() string {
return fmt.Sprintf("%s://%s:%v", route.TargetScheme, route.TargetHost, route.TargetPort)
}
func (route *StreamRouteBase) Logger() logrus.FieldLogger {
return route.l
}
func (route *StreamRouteBase) SetupListen() {
if route.ListeningPort == 0 {
freePort, err := utils.findUseFreePort(20000)
if err != nil {
route.PrintError(err)
route.l.Error(err)
return
}
route.ListeningPort = freePort
route.Logf("Assigned free port %v", route.ListeningPort)
route.l.Info("Assigned free port", route.ListeningPort)
}
route.Logf("Listening on %s", route.ListeningUrl())
route.l.Info("Listening on", route.ListeningUrl())
}
func (route *StreamRouteBase) RemoveFromRoutes() {
routes.StreamRoutes.Delete(route.id)
streamRoutes.Delete(route.id)
}
func (route *StreamRouteBase) wait() {
@ -175,7 +158,8 @@ func (route *StreamRouteBase) unmarkPort() {
}
func stopListening(route StreamRoute) {
route.Logf("Stopping listening")
l := route.Logger()
l.Debug("Stopping listening")
route.closeChannel()
route.closeListeners()
@ -189,10 +173,10 @@ func stopListening(route StreamRoute) {
select {
case <-done:
route.Logf("Stopped listening")
l.Info("Stopped listening")
return
case <-time.After(StreamStopListenTimeout):
route.Logf("timed out waiting for connections")
l.Error("timed out waiting for connections")
return
}
}
}

View file

@ -7,8 +7,6 @@ import (
"net"
"sync"
"time"
"github.com/golang/glog"
)
const tcpDialTimeout = 5 * time.Second
@ -37,7 +35,7 @@ func NewTCPRoute(config *ProxyConfig) (StreamRoute, error) {
func (route *TCPRoute) Listen() {
in, err := net.Listen("tcp", fmt.Sprintf(":%v", route.ListeningPort))
if err != nil {
route.PrintError(err)
route.l.Error(err)
return
}
route.listener = in
@ -68,7 +66,7 @@ func (route *TCPRoute) grAcceptConnections() {
default:
conn, err := route.listener.Accept()
if err != nil {
route.PrintError(err)
route.l.Error(err)
continue
}
route.connChan <- conn
@ -101,7 +99,7 @@ func (route *TCPRoute) grHandleConnection(clientConn net.Conn) {
dialer := &net.Dialer{}
serverConn, err := dialer.DialContext(ctx, route.TargetScheme, serverAddr)
if err != nil {
glog.Infof("[Stream Dial] %v", err)
route.l.WithField("stage", "dial").Infof("%v", err)
return
}
route.tcpPipe(clientConn, serverConn)
@ -118,13 +116,13 @@ func (route *TCPRoute) tcpPipe(src net.Conn, dest net.Conn) {
go func() {
_, err := io.Copy(src, dest)
route.PrintError(err)
route.l.Error(err)
close()
wg.Done()
}()
go func() {
_, err := io.Copy(dest, src)
route.PrintError(err)
route.l.Error(err)
close()
wg.Done()
}()

View file

@ -5,6 +5,8 @@ import (
"io"
"net"
"sync"
"github.com/sirupsen/logrus"
)
type UDPRoute struct {
@ -46,13 +48,13 @@ func NewUDPRoute(config *ProxyConfig) (StreamRoute, error) {
func (route *UDPRoute) Listen() {
source, err := net.ListenPacket(route.ListeningScheme, fmt.Sprintf(":%v", route.ListeningPort))
if err != nil {
route.PrintError(err)
route.l.Error(err)
return
}
target, err := net.Dial(route.TargetScheme, fmt.Sprintf("%s:%v", route.TargetHost, route.TargetPort))
if err != nil {
route.PrintError(err)
route.l.Error(err)
source.Close()
return
}
@ -93,7 +95,7 @@ func (route *UDPRoute) grAcceptConnections() {
default:
conn, err := route.accept()
if err != nil {
route.PrintError(err)
route.l.Error(err)
continue
}
route.connChan <- conn
@ -112,7 +114,7 @@ func (route *UDPRoute) grHandleConnections() {
go func() {
err := route.handleConnection(conn)
if err != nil {
route.PrintError(err)
route.l.Error(err)
}
}()
}
@ -133,8 +135,16 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
route.connMapMutex.Unlock()
}
var forwarder func(*UDPConn, net.Conn) error
if logLevel == logrus.DebugLevel {
forwarder = route.forwardReceivedDebug
} else {
forwarder = route.forwardReceivedReal
}
// initiate connection to target
err = route.forwardReceived(conn, route.targetConn)
err = forwarder(conn, route.targetConn)
if err != nil {
return err
}
@ -150,7 +160,7 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
return err
}
// forward to source
err = route.forwardReceived(conn, srcConn)
err = forwarder(conn, srcConn)
if err != nil {
return err
}
@ -160,7 +170,7 @@ func (route *UDPRoute) handleConnection(conn *UDPConn) error {
continue
}
// forward to target
err = route.forwardReceived(conn, route.targetConn)
err = forwarder(conn, route.targetConn)
if err != nil {
return err
}
@ -209,13 +219,7 @@ func (route *UDPRoute) readFrom(src net.Conn, buffer []byte) (*UDPConn, error) {
}, nil
}
func (route *UDPRoute) forwardReceived(receivedConn *UDPConn, dest net.Conn) error {
route.Logf(
"forwarding %d bytes %s -> %s",
receivedConn.nReceived,
receivedConn.remoteAddr.String(),
dest.RemoteAddr().String(),
)
func (route *UDPRoute) forwardReceivedReal(receivedConn *UDPConn, dest net.Conn) error {
nWritten, err := dest.Write(receivedConn.bytesReceived)
if nWritten != receivedConn.nReceived {
@ -224,3 +228,12 @@ func (route *UDPRoute) forwardReceived(receivedConn *UDPConn, dest net.Conn) err
return err
}
func (route *UDPRoute) forwardReceivedDebug(receivedConn *UDPConn, dest net.Conn) error {
route.l.WithField("size", receivedConn.nReceived).Debugf(
"forwarding from %s to %s",
receivedConn.remoteAddr.String(),
dest.RemoteAddr().String(),
)
return route.forwardReceivedReal(receivedConn, dest)
}

View file

@ -14,7 +14,7 @@ import (
"sync"
"time"
"github.com/golang/glog"
"github.com/sirupsen/logrus"
xhtml "golang.org/x/net/html"
)
@ -111,10 +111,9 @@ func tryAppendPathPrefixImpl(pOrig, pAppend string) string {
var tryAppendPathPrefix func(string, string) string
var _ = func() int {
if glog.V(4) {
if logLevel == logrus.DebugLevel {
tryAppendPathPrefix = func(s1, s2 string) string {
replaced := tryAppendPathPrefixImpl(s1, s2)
glog.Infof("[Path sub] %s -> %s", s1, replaced)
return replaced
}
} else {
@ -192,4 +191,4 @@ func (*Utils) respJSSubPath(r *http.Response, p string) error {
func (*Utils) fileOK(path string) bool {
_, err := os.Stat(path)
return err == nil
}
}

195
src/go-proxy/watcher.go Normal file
View file

@ -0,0 +1,195 @@
package main
import (
"path"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
type Watcher interface {
Start()
Stop()
}
type watcherBase struct {
name string // for log / error output
kind string // for log / error output
onChange func()
l logrus.FieldLogger
}
type fileWatcher struct {
*watcherBase
path string
onDelete func()
}
type dockerWatcher struct {
*watcherBase
client *client.Client
stop chan struct{}
wg sync.WaitGroup
}
func newWatcher(kind string, name string, onChange func()) *watcherBase {
return &watcherBase{
kind: kind,
name: name,
onChange: onChange,
l: wlog.WithFields(logrus.Fields{"kind": kind, "name": name}),
}
}
func NewFileWatcher(p string, onChange func(), onDelete func()) Watcher {
return &fileWatcher{
watcherBase: newWatcher("File", path.Base(p), onChange),
path: p,
onDelete: onDelete,
}
}
func NewDockerWatcher(c *client.Client, onChange func()) Watcher {
return &dockerWatcher{
watcherBase: newWatcher("Docker", c.DaemonHost(), onChange),
client: c,
stop: make(chan struct{}, 1),
}
}
func (w *fileWatcher) Start() {
if fsWatcher == nil {
return
}
err := fsWatcher.Add(w.path)
if err != nil {
w.l.Error("failed to start: ", err)
}
fileWatchMap.Set(w.path, w)
}
func (w *fileWatcher) Stop() {
fileWatchMap.Delete(w.path)
err := fsWatcher.Remove(w.path)
if err != nil {
w.l.WithField("action", "stop").Error(err)
}
}
func (w *dockerWatcher) Start() {
dockerWatchMap.Set(w.name, w)
w.wg.Add(1)
go func() {
w.watch()
w.wg.Done()
}()
}
func (w *dockerWatcher) Stop() {
close(w.stop)
w.stop = nil
dockerWatchMap.Delete(w.name)
w.wg.Wait()
}
func InitFSWatcher() {
w, err := fsnotify.NewWatcher()
if err != nil {
wlog.Errorf("unable to create file watcher: %v", err)
return
}
fsWatcher = w
go watchFiles()
}
func InitDockerWatcher() {
// stop all docker client on watcher stop
go func() {
<-dockerWatcherStop
stopAllDockerClients()
}()
}
func stopAllDockerClients() {
ParallelForEachValue(
dockerWatchMap.Iterator(),
func(w *dockerWatcher) {
w.Stop()
err := w.client.Close()
if err != nil {
w.l.WithField("action", "stop").Error(err)
}
w.client = nil
},
)
}
func watchFiles() {
defer fsWatcher.Close()
for {
select {
case event, ok := <-fsWatcher.Events:
if !ok {
wlog.Error("file watcher channel closed")
return
}
w, ok := fileWatchMap.UnsafeGet(event.Name)
if !ok {
wlog.Errorf("watcher for %s not found", event.Name)
}
switch {
case event.Has(fsnotify.Write):
w.l.Info("File change detected")
w.onChange()
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
w.l.Info("File renamed / deleted")
w.onDelete()
}
case err := <-fsWatcher.Errors:
wlog.Error(err)
}
}
}
func (w *dockerWatcher) watch() {
filter := filters.NewArgs(
filters.Arg("type", "container"),
filters.Arg("event", "start"),
filters.Arg("event", "die"), // 'stop' already triggering 'die'
)
listen := func() (<-chan events.Message, <-chan error) {
return w.client.Events(context.Background(), types.EventsOptions{Filters: filter})
}
msgChan, errChan := listen()
for {
select {
case <-w.stop:
return
case msg := <-msgChan:
w.l.Info("container", msg.Actor.Attributes["name"], msg.Action)
w.onChange()
case err := <-errChan:
w.l.Errorf("%s, retrying in 1s", err)
time.Sleep(1 * time.Second)
msgChan, errChan = listen()
}
}
}
var fsWatcher *fsnotify.Watcher
var (
fileWatchMap = NewSafeMap[string, *fileWatcher]()
dockerWatchMap = NewSafeMap[string, *dockerWatcher]()
)
var (
fsWatcherStop = make(chan struct{}, 1)
dockerWatcherStop = make(chan struct{}, 1)
)