From decd2c2dede5532be4c853e974936eed284a445d Mon Sep 17 00:00:00 2001 From: yusing Date: Thu, 13 Feb 2025 15:05:16 +0800 Subject: [PATCH] fix various endpoints --- agent/pkg/agent/config.go | 1 + agent/pkg/agent/requests.go | 1 + internal/api/v1/system_info.go | 45 ++++++++-------------------- internal/api/v1/utils/error.go | 3 +- internal/config/agent_pool.go | 10 +++---- internal/config/types/config.go | 2 +- internal/metrics/period/handler.go | 18 +++++------ internal/net/http/httpheaders/sse.go | 7 +++++ internal/route/reverse_proxy.go | 3 +- 9 files changed, 39 insertions(+), 51 deletions(-) create mode 100644 internal/net/http/httpheaders/sse.go diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index bc6a6ba..ad2824b 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -50,6 +50,7 @@ const ( ) var ( + AgentURL = types.MustParseURL(APIBaseURL) HTTPProxyURL = types.MustParseURL(APIBaseURL + EndpointProxyHTTP) HTTPProxyURLPrefixLen = len(APIEndpointBase + EndpointProxyHTTP) ) diff --git a/agent/pkg/agent/requests.go b/agent/pkg/agent/requests.go index 5da76d1..a50e9f0 100644 --- a/agent/pkg/agent/requests.go +++ b/agent/pkg/agent/requests.go @@ -21,6 +21,7 @@ func (cfg *AgentConfig) Forward(req *http.Request, endpoint string) ([]byte, int req.URL.Host = AgentHost req.URL.Scheme = "https" req.URL.Path = APIEndpointBase + endpoint + req.RequestURI = "" resp, err := cfg.httpClient.Do(req) if err != nil { return nil, 0, err diff --git a/internal/api/v1/system_info.go b/internal/api/v1/system_info.go index 22d9bd4..1c06dad 100644 --- a/internal/api/v1/system_info.go +++ b/internal/api/v1/system_info.go @@ -3,27 +3,27 @@ package v1 import ( "net/http" - "github.com/coder/websocket/wsjson" agentPkg "github.com/yusing/go-proxy/agent/pkg/agent" U "github.com/yusing/go-proxy/internal/api/v1/utils" config "github.com/yusing/go-proxy/internal/config/types" E "github.com/yusing/go-proxy/internal/error" "github.com/yusing/go-proxy/internal/metrics/systeminfo" "github.com/yusing/go-proxy/internal/net/http/httpheaders" + "github.com/yusing/go-proxy/internal/net/http/reverseproxy" ) func SystemInfo(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Request) { query := r.URL.Query() - agentName := query.Get("agent_name") - query.Del("agent_name") - if agentName == "" { + agentAddr := query.Get("agent_addr") + query.Del("agent_addr") + if agentAddr == "" { systeminfo.Poller.ServeHTTP(w, r) return } - agent, ok := cfg.GetAgent(agentName) + agent, ok := cfg.GetAgent(agentAddr) if !ok { - U.HandleErr(w, r, U.ErrInvalidKey("agent_name"), http.StatusNotFound) + U.HandleErr(w, r, U.ErrInvalidKey("agent_addr"), http.StatusNotFound) return } @@ -40,35 +40,14 @@ func SystemInfo(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Reques } U.WriteBody(w, respData) } else { - r = r.WithContext(r.Context()) - clientConn, err := U.InitiateWS(w, r) + rp := reverseproxy.NewReverseProxy("agent", agentPkg.AgentURL, agent.Transport()) + header := r.Header.Clone() + r, err := http.NewRequestWithContext(r.Context(), r.Method, agentPkg.EndpointSystemInfo+"?"+query.Encode(), nil) if err != nil { - U.HandleErr(w, r, E.Wrap(err, "failed to initiate websocket")) + U.HandleErr(w, r, E.Wrap(err, "failed to create request")) return } - defer clientConn.CloseNow() - agentConn, _, err := agent.Websocket(r.Context(), agentPkg.EndpointSystemInfo+"?"+query.Encode()) - if err != nil { - U.HandleErr(w, r, E.Wrap(err, "failed to connect to agent with websocket")) - return - } - //nolint:errcheck - defer agentConn.CloseNow() - var data []byte - for { - select { - case <-r.Context().Done(): - return - default: - err := wsjson.Read(r.Context(), agentConn, &data) - if err == nil { - err = wsjson.Write(r.Context(), clientConn, data) - } - if err != nil { - U.HandleErr(w, r, E.Wrap(err, "failed to write data to client")) - return - } - } - } + r.Header = header + rp.ServeHTTP(w, r) } } diff --git a/internal/api/v1/utils/error.go b/internal/api/v1/utils/error.go index 613c4d3..31c0669 100644 --- a/internal/api/v1/utils/error.go +++ b/internal/api/v1/utils/error.go @@ -8,6 +8,7 @@ import ( "syscall" E "github.com/yusing/go-proxy/internal/error" + "github.com/yusing/go-proxy/internal/net/http/httpheaders" "github.com/yusing/go-proxy/internal/utils/strutils/ansi" ) @@ -25,7 +26,7 @@ func HandleErr(w http.ResponseWriter, r *http.Request, err error, code ...int) { return } LogError(r).Msg(err.Error()) - if r.Header.Get("Upgrade") == "websocket" { + if httpheaders.IsWebsocket(r.Header) { return } if len(code) == 0 { diff --git a/internal/config/agent_pool.go b/internal/config/agent_pool.go index 7fcd407..9325803 100644 --- a/internal/config/agent_pool.go +++ b/internal/config/agent_pool.go @@ -1,8 +1,6 @@ package config import ( - "errors" - "github.com/yusing/go-proxy/agent/pkg/agent" "github.com/yusing/go-proxy/internal/utils/functional" ) @@ -22,11 +20,11 @@ func GetAgent(addr string) (agent *agent.AgentConfig, ok bool) { return } -func (cfg *Config) GetAgent(agentDockerHost string) (*agent.AgentConfig, bool) { - if !agent.IsDockerHostAgent(agentDockerHost) { - panic(errors.New("invalid use of GetAgent with docker host: " + agentDockerHost)) +func (cfg *Config) GetAgent(agentAddrOrDockerHost string) (*agent.AgentConfig, bool) { + if !agent.IsDockerHostAgent(agentAddrOrDockerHost) { + return GetAgent(agentAddrOrDockerHost) } - return GetAgent(agent.GetAgentAddrFromDockerHost(agentDockerHost)) + return GetAgent(agent.GetAgentAddrFromDockerHost(agentAddrOrDockerHost)) } func (cfg *Config) ListAgents() []*agent.AgentConfig { diff --git a/internal/config/types/config.go b/internal/config/types/config.go index 2e5e4fb..e41dd4b 100644 --- a/internal/config/types/config.go +++ b/internal/config/types/config.go @@ -41,7 +41,7 @@ type ( Statistics() map[string]any RouteProviderList() []string Context() context.Context - GetAgent(agentDockerHost string) (*agent.AgentConfig, bool) + GetAgent(agentAddrOrDockerHost string) (*agent.AgentConfig, bool) ListAgents() []*agent.AgentConfig } ) diff --git a/internal/metrics/period/handler.go b/internal/metrics/period/handler.go index d86f5c4..a5e2892 100644 --- a/internal/metrics/period/handler.go +++ b/internal/metrics/period/handler.go @@ -25,17 +25,17 @@ import ( // If the request is a websocket request, it serves the data for the given period for every interval. func (p *Poller[T, AggregateT]) ServeHTTP(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() - interval := metricsutils.QueryDuration(query, "interval", 0) - - minInterval := 1 * time.Second - if interval == 0 { - interval = p.interval() - } - if interval < minInterval { - interval = minInterval - } if httpheaders.IsWebsocket(r.Header) { + interval := metricsutils.QueryDuration(query, "interval", 0) + + minInterval := 1 * time.Second + if interval == 0 { + interval = p.interval() + } + if interval < minInterval { + interval = minInterval + } utils.PeriodicWS(w, r, interval, func(conn *websocket.Conn) error { data, err := p.getRespData(r) if err != nil { diff --git a/internal/net/http/httpheaders/sse.go b/internal/net/http/httpheaders/sse.go new file mode 100644 index 0000000..e2c7006 --- /dev/null +++ b/internal/net/http/httpheaders/sse.go @@ -0,0 +1,7 @@ +package httpheaders + +import "net/http" + +func IsSSE(h http.Header) bool { + return h.Get("Content-Type") == "text/event-stream" +} diff --git a/internal/route/reverse_proxy.go b/internal/route/reverse_proxy.go index acff99c..8281642 100755 --- a/internal/route/reverse_proxy.go +++ b/internal/route/reverse_proxy.go @@ -45,12 +45,13 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, E.Error) { httpConfig := base.HTTPConfig proxyURL := base.ProxyURL - trans := gphttp.NewTransport() + var trans *http.Transport a := base.Agent() if a != nil { trans = a.Transport() proxyURL = agent.HTTPProxyURL } else { + trans = gphttp.NewTransport() if httpConfig.NoTLSVerify { trans.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} }