From d1c9e18c97e416c32c30a170065a0b5b31192c4d Mon Sep 17 00:00:00 2001 From: yusing Date: Mon, 7 Oct 2024 18:50:51 +0800 Subject: [PATCH] improved idlewatcher support for API-like services, fixed idlewaker proxying to zero port --- internal/docker/idlewatcher/waker.go | 34 ++++++++++++++------- internal/docker/idlewatcher/watcher.go | 15 +++++++--- internal/net/http/reverse_proxy_mod.go | 41 +++----------------------- 3 files changed, 38 insertions(+), 52 deletions(-) diff --git a/internal/docker/idlewatcher/waker.go b/internal/docker/idlewatcher/waker.go index 0acc395..3fde40f 100644 --- a/internal/docker/idlewatcher/waker.go +++ b/internal/docker/idlewatcher/waker.go @@ -3,6 +3,7 @@ package idlewatcher import ( "context" "crypto/tls" + "fmt" "net/http" "strconv" "time" @@ -22,6 +23,20 @@ func NewWaker(w *watcher, rp *gphttp.ReverseProxy) *Waker { if w.NoTLSVerify { tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } + orig := rp.ServeHTTP + // workaround for stopped containers port become zero + rp.ServeHTTP = func(rw http.ResponseWriter, r *http.Request) { + if rp.TargetURL.Port() == "0" { + port, ok := portHistoryMap.Load(w.Alias) + if !ok { + w.l.Errorf("port history not found for %s", w.Alias) + http.Error(rw, "internal server error", http.StatusInternalServerError) + return + } + rp.TargetURL.Host = fmt.Sprintf("%s:%v", rp.TargetURL.Hostname(), port) + } + orig(rw, r) + } return &Waker{ watcher: w, client: &http.Client{ @@ -37,9 +52,10 @@ func (w *Waker) ServeHTTP(rw http.ResponseWriter, r *http.Request) { } func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Request) { + w.resetIdleTimer() + // pass through if container is ready if w.ready.Load() { - w.resetIdleTimer() next(rw, r) return } @@ -48,14 +64,10 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ defer cancel() accept := gphttp.GetAccept(r.Header) - acceptHTML := accept.AcceptHTML() || accept.IsEmpty() + acceptHTML := r.Method == http.MethodGet && accept.AcceptHTML() - if !acceptHTML { - w.l.Debugf("Accept %v", accept) - } - - isCheckRedirect := r.Header.Get(headerCheckRedirect) != "" && acceptHTML - if !isCheckRedirect { + isCheckRedirect := r.Header.Get(headerCheckRedirect) != "" + if !isCheckRedirect && acceptHTML { // Send a loading response to the client body := w.makeRespBody("%s waking up...", w.ContainerName) rw.Header().Set("Content-Type", "text/html; charset=utf-8") @@ -106,10 +118,10 @@ func (w *Waker) wake(next http.HandlerFunc, rw http.ResponseWriter, r *http.Requ return } - // we don't care about the response - _, err = w.client.Do(wakeReq) - if err == nil { + wakeResp, err := w.client.Do(wakeReq) + if err == nil && wakeResp.StatusCode != http.StatusServiceUnavailable { w.ready.Store(true) + w.l.Debug("awaken") if isCheckRedirect { rw.WriteHeader(http.StatusOK) } else { diff --git a/internal/docker/idlewatcher/watcher.go b/internal/docker/idlewatcher/watcher.go index 34c7d0d..c27cb0e 100644 --- a/internal/docker/idlewatcher/watcher.go +++ b/internal/docker/idlewatcher/watcher.go @@ -12,6 +12,7 @@ import ( E "github.com/yusing/go-proxy/internal/error" P "github.com/yusing/go-proxy/internal/proxy" PT "github.com/yusing/go-proxy/internal/proxy/fields" + F "github.com/yusing/go-proxy/internal/utils/functional" W "github.com/yusing/go-proxy/internal/watcher" ) @@ -45,9 +46,11 @@ var ( mainLoopCancel context.CancelFunc mainLoopWg sync.WaitGroup - watcherMap = make(map[string]*watcher) + watcherMap = F.NewMapOf[string, *watcher]() watcherMapMu sync.Mutex + portHistoryMap = F.NewMapOf[PT.Alias, string]() + newWatcherCh = make(chan *watcher) logger = logrus.WithField("module", "idle_watcher") @@ -65,7 +68,11 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) { key := entry.ContainerID - if w, ok := watcherMap[key]; ok { + if entry.URL.Port() != "0" { + portHistoryMap.Store(entry.Alias, entry.URL.Port()) + } + + if w, ok := watcherMap.Load(key); ok { w.refCount.Add(1) w.ReverseProxyEntry = entry return w, nil @@ -88,7 +95,7 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) { w.refCount.Add(1) w.stopByMethod = w.getStopCallback() - watcherMap[key] = w + watcherMap.Store(key, w) go func() { newWatcherCh <- w @@ -118,7 +125,7 @@ func Start() { w.watchUntilCancel() w.refCount.Wait() // wait for 0 ref count - delete(watcherMap, w.ContainerID) + watcherMap.Delete(w.ContainerID) w.l.Debug("unregistered") mainLoopWg.Done() }() diff --git a/internal/net/http/reverse_proxy_mod.go b/internal/net/http/reverse_proxy_mod.go index 9099da4..cca5837 100644 --- a/internal/net/http/reverse_proxy_mod.go +++ b/internal/net/http/reverse_proxy_mod.go @@ -69,36 +69,6 @@ type ProxyRequest struct { // 1xx responses are forwarded to the client if the underlying // transport supports ClientTrace.Got1xxResponse. type ReverseProxy struct { - // Director is a function which modifies - // the request into a new request to be sent - // using Transport. Its response is then copied - // back to the original client unmodified. - // Director must not access the provided Request - // after returning. - // - // By default, the X-Forwarded-For header is set to the - // value of the client IP address. If an X-Forwarded-For - // header already exists, the client IP is appended to the - // existing values. As a special case, if the header - // exists in the Request.Header map but has a nil value - // (such as when set by the Director func), the X-Forwarded-For - // header is not modified. - // - // To prevent IP spoofing, be sure to delete any pre-existing - // X-Forwarded-For header coming from the client or - // an untrusted proxy. - // - // Hop-by-hop headers are removed from the request after - // Director returns, which can remove headers added by - // Director. Use a Rewrite function instead to ensure - // modifications to the request are preserved. - // - // Unparsable query parameters are removed from the outbound - // request if Request.Form is set after Director returns. - // - // At most one of Rewrite or Director may be set. - Director func(*http.Request) - // The transport used to perform proxy requests. // If nil, http.DefaultTransport is used. Transport http.RoundTripper @@ -115,6 +85,8 @@ type ReverseProxy struct { ModifyResponse func(*http.Response) error ServeHTTP http.HandlerFunc + + TargetURL *url.URL } func singleJoiningSlash(a, b string) string { @@ -176,12 +148,7 @@ func NewReverseProxy(target *url.URL, transport http.RoundTripper) *ReverseProxy if transport == nil { panic("nil transport") } - rp := &ReverseProxy{ - Director: func(req *http.Request) { - rewriteRequestURL(req, target) - }, - Transport: transport, - } + rp := &ReverseProxy{Transport: transport, TargetURL: target} rp.ServeHTTP = rp.serveHTTP return rp } @@ -296,7 +263,7 @@ func (p *ReverseProxy) serveHTTP(rw http.ResponseWriter, req *http.Request) { outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate } - p.Director(outreq) + rewriteRequestURL(outreq, p.TargetURL) outreq.Close = false reqUpType := UpgradeType(outreq.Header)