From 90214ff752c987f7f8f3a27d3e5246f5b7f6f034 Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 13 Apr 2025 06:11:06 +0800 Subject: [PATCH] refactor: introduce Pool interface, move agent_pool to agent module --- agent/pkg/agent/agents.go | 16 ++++++++ agent/pkg/agent/config.go | 7 ++-- internal/api/v1/dockerapi/logs.go | 2 +- internal/api/v1/dockerapi/utils.go | 9 ++-- internal/api/v1/new_agent.go | 2 +- internal/api/v1/system_info.go | 6 +-- internal/config/agent_pool.go | 66 ------------------------------ internal/config/config.go | 16 ++++---- internal/config/query.go | 30 ++++++++++++++ internal/config/types/config.go | 2 - internal/docker/client.go | 3 +- internal/docker/container.go | 2 +- internal/utils/pool/pool.go | 62 ++++++++++++++++++++++++++++ 13 files changed, 132 insertions(+), 91 deletions(-) create mode 100644 agent/pkg/agent/agents.go delete mode 100644 internal/config/agent_pool.go create mode 100644 internal/utils/pool/pool.go diff --git a/agent/pkg/agent/agents.go b/agent/pkg/agent/agents.go new file mode 100644 index 0000000..a0469b9 --- /dev/null +++ b/agent/pkg/agent/agents.go @@ -0,0 +1,16 @@ +package agent + +import ( + "github.com/yusing/go-proxy/internal/utils/pool" +) + +type agents struct{ pool.Pool[*AgentConfig] } + +var Agents = agents{pool.New[*AgentConfig]()} + +func (agents agents) Get(agentAddrOrDockerHost string) (*AgentConfig, bool) { + if !IsDockerHostAgent(agentAddrOrDockerHost) { + return agents.Base().Load(agentAddrOrDockerHost) + } + return agents.Base().Load(GetAgentAddrFromDockerHost(agentAddrOrDockerHost)) +} diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index 371c684..5f50cb1 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -4,9 +4,9 @@ import ( "context" "crypto/tls" "crypto/x509" - "encoding/json" "net" "net/http" + "net/url" "os" "strings" "time" @@ -16,7 +16,6 @@ import ( "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/net/gphttp" - "github.com/yusing/go-proxy/internal/net/types" "github.com/yusing/go-proxy/pkg" ) @@ -49,8 +48,8 @@ const ( ) var ( - AgentURL = types.MustParseURL(APIBaseURL) - HTTPProxyURL = types.MustParseURL(APIBaseURL + EndpointProxyHTTP) + AgentURL, _ = url.Parse(APIBaseURL) + HTTPProxyURL, _ = url.Parse(APIBaseURL + EndpointProxyHTTP) HTTPProxyURLPrefixLen = len(APIEndpointBase + EndpointProxyHTTP) ) diff --git a/internal/api/v1/dockerapi/logs.go b/internal/api/v1/dockerapi/logs.go index 0bec4c6..29c700b 100644 --- a/internal/api/v1/dockerapi/logs.go +++ b/internal/api/v1/dockerapi/logs.go @@ -22,7 +22,7 @@ func Logs(w http.ResponseWriter, r *http.Request) { until := query.Get("to") levels := query.Get("levels") // TODO: implement levels - dockerClient, found, err := getDockerClient(w, server) + dockerClient, found, err := getDockerClient(server) if err != nil { gphttp.BadRequest(w, err.Error()) return diff --git a/internal/api/v1/dockerapi/utils.go b/internal/api/v1/dockerapi/utils.go index 2345b00..b25b59a 100644 --- a/internal/api/v1/dockerapi/utils.go +++ b/internal/api/v1/dockerapi/utils.go @@ -8,6 +8,7 @@ import ( "github.com/coder/websocket" "github.com/coder/websocket/wsjson" + "github.com/yusing/go-proxy/agent/pkg/agent" config "github.com/yusing/go-proxy/internal/config/types" "github.com/yusing/go-proxy/internal/docker" "github.com/yusing/go-proxy/internal/gperr" @@ -44,7 +45,7 @@ func getDockerClients() (DockerClients, gperr.Error) { dockerClients[name] = dockerClient } - for _, agent := range cfg.ListAgents() { + for _, agent := range agent.Agents.Iter { dockerClient, err := docker.NewClient(agent.FakeDockerHost()) if err != nil { connErrs.Add(err) @@ -56,7 +57,7 @@ func getDockerClients() (DockerClients, gperr.Error) { return dockerClients, connErrs.Error() } -func getDockerClient(w http.ResponseWriter, server string) (*docker.SharedClient, bool, error) { +func getDockerClient(server string) (*docker.SharedClient, bool, error) { cfg := config.GetInstance() var host string for name, h := range cfg.Value().Providers.Docker { @@ -65,7 +66,7 @@ func getDockerClient(w http.ResponseWriter, server string) (*docker.SharedClient break } } - for _, agent := range cfg.ListAgents() { + for _, agent := range agent.Agents.Iter { if agent.Name() == server { host = agent.FakeDockerHost() break @@ -119,6 +120,6 @@ func serveHTTP[V any, T ResultType[V]](w http.ResponseWriter, r *http.Request, g }) } else { result, err := getResult(r.Context(), dockerClients) - handleResult[V, T](w, err, result) + handleResult[V](w, err, result) } } diff --git a/internal/api/v1/new_agent.go b/internal/api/v1/new_agent.go index e4d67e8..9abc627 100644 --- a/internal/api/v1/new_agent.go +++ b/internal/api/v1/new_agent.go @@ -40,7 +40,7 @@ func NewAgent(w http.ResponseWriter, r *http.Request) { return } hostport := fmt.Sprintf("%s:%d", host, port) - if _, ok := config.GetInstance().GetAgent(hostport); ok { + if _, ok := agent.Agents.Get(hostport); ok { gphttp.ClientError(w, gphttp.ErrAlreadyExists("agent", hostport), http.StatusConflict) return } diff --git a/internal/api/v1/system_info.go b/internal/api/v1/system_info.go index 31f6829..cf1c7ac 100644 --- a/internal/api/v1/system_info.go +++ b/internal/api/v1/system_info.go @@ -3,8 +3,8 @@ package v1 import ( "net/http" + "github.com/yusing/go-proxy/agent/pkg/agent" agentPkg "github.com/yusing/go-proxy/agent/pkg/agent" - config "github.com/yusing/go-proxy/internal/config/types" "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/metrics/systeminfo" "github.com/yusing/go-proxy/internal/net/gphttp" @@ -12,7 +12,7 @@ import ( "github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy" ) -func SystemInfo(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Request) { +func SystemInfo(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() agentAddr := query.Get("agent_addr") query.Del("agent_addr") @@ -21,7 +21,7 @@ func SystemInfo(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Reques return } - agent, ok := cfg.GetAgent(agentAddr) + agent, ok := agent.Agents.Get(agentAddr) if !ok { gphttp.NotFound(w, "agent_addr") return diff --git a/internal/config/agent_pool.go b/internal/config/agent_pool.go deleted file mode 100644 index d3b7610..0000000 --- a/internal/config/agent_pool.go +++ /dev/null @@ -1,66 +0,0 @@ -package config - -import ( - "slices" - - "github.com/yusing/go-proxy/agent/pkg/agent" - "github.com/yusing/go-proxy/internal/gperr" - "github.com/yusing/go-proxy/internal/route/provider" - "github.com/yusing/go-proxy/internal/utils/functional" -) - -var agentPool = functional.NewMapOf[string, *agent.AgentConfig]() - -func addAgent(agent *agent.AgentConfig) { - agentPool.Store(agent.Addr, agent) -} - -func removeAllAgents() { - agentPool.Clear() -} - -func GetAgent(addr string) (agent *agent.AgentConfig, ok bool) { - agent, ok = agentPool.Load(addr) - return -} - -func (cfg *Config) GetAgent(agentAddrOrDockerHost string) (*agent.AgentConfig, bool) { - if !agent.IsDockerHostAgent(agentAddrOrDockerHost) { - return GetAgent(agentAddrOrDockerHost) - } - return GetAgent(agent.GetAgentAddrFromDockerHost(agentAddrOrDockerHost)) -} - -func (cfg *Config) VerifyNewAgent(host string, ca agent.PEMPair, client agent.PEMPair) (int, gperr.Error) { - if slices.ContainsFunc(cfg.value.Providers.Agents, func(a *agent.AgentConfig) bool { - return a.Addr == host - }) { - return 0, gperr.New("agent already exists") - } - - var agentCfg agent.AgentConfig - agentCfg.Addr = host - err := agentCfg.InitWithCerts(cfg.task.Context(), ca.Cert, client.Cert, client.Key) - if err != nil { - return 0, gperr.Wrap(err, "failed to start agent") - } - addAgent(&agentCfg) - - provider := provider.NewAgentProvider(&agentCfg) - if err := cfg.errIfExists(provider); err != nil { - return 0, err - } - err = provider.LoadRoutes() - if err != nil { - return 0, gperr.Wrap(err, "failed to load routes") - } - return provider.NumRoutes(), nil -} - -func (cfg *Config) ListAgents() []*agent.AgentConfig { - agents := make([]*agent.AgentConfig, 0, agentPool.Size()) - agentPool.RangeAll(func(key string, value *agent.AgentConfig) { - agents = append(agents, value) - }) - return agents -} diff --git a/internal/config/config.go b/internal/config/config.go index 2dacc66..84ebe7c 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -277,8 +277,8 @@ func (cfg *Config) errIfExists(p *proxy.Provider) gperr.Error { func (cfg *Config) initAgents(agentCfgs []*agent.AgentConfig) gperr.Error { var wg sync.WaitGroup - var errs gperr.Builder + errs := gperr.NewBuilderWithConcurrency() wg.Add(len(agentCfgs)) for _, agentCfg := range agentCfgs { go func(agentCfg *agent.AgentConfig) { @@ -286,7 +286,7 @@ func (cfg *Config) initAgents(agentCfgs []*agent.AgentConfig) gperr.Error { if err := agentCfg.Init(cfg.task.Context()); err != nil { errs.Add(err.Subject(agentCfg.String())) } else { - addAgent(agentCfg) + agent.Agents.Add(agentCfg) } }(agentCfg) } @@ -298,7 +298,7 @@ func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error { errs := gperr.NewBuilder("route provider errors") results := gperr.NewBuilder("loaded route providers") - removeAllAgents() + agent.Agents.Clear() n := len(providers.Agents) + len(providers.Docker) + len(providers.Files) if n == 0 { @@ -309,12 +309,12 @@ func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error { errs.Add(cfg.initAgents(providers.Agents)) - for _, agent := range providers.Agents { - if !agent.IsInitialized() { // failed to initialize + for _, a := range providers.Agents { + if !a.IsInitialized() { // failed to initialize continue } - addAgent(agent) - routeProviders = append(routeProviders, proxy.NewAgentProvider(agent)) + agent.Agents.Add(a) + routeProviders = append(routeProviders, proxy.NewAgentProvider(a)) } for _, filename := range providers.Files { routeProviders = append(routeProviders, proxy.NewFileProvider(filename)) @@ -338,6 +338,8 @@ func (cfg *Config) loadRouteProviders(providers *config.Providers) gperr.Error { lenLongestName = len(k) } }) + errs.EnableConcurrency() + results.EnableConcurrency() cfg.providers.RangeAllParallel(func(_ string, p *proxy.Provider) { if err := p.LoadRoutes(); err != nil { errs.Add(err.Subject(p.String())) diff --git a/internal/config/query.go b/internal/config/query.go index 70b101b..bdc2668 100644 --- a/internal/config/query.go +++ b/internal/config/query.go @@ -1,6 +1,10 @@ package config import ( + "slices" + + "github.com/yusing/go-proxy/agent/pkg/agent" + "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/route" "github.com/yusing/go-proxy/internal/route/provider" ) @@ -51,3 +55,29 @@ func (cfg *Config) Statistics() map[string]any { "providers": providerStats, } } + +func (cfg *Config) VerifyNewAgent(host string, ca agent.PEMPair, client agent.PEMPair) (int, gperr.Error) { + if slices.ContainsFunc(cfg.value.Providers.Agents, func(a *agent.AgentConfig) bool { + return a.Addr == host + }) { + return 0, gperr.New("agent already exists") + } + + var agentCfg agent.AgentConfig + agentCfg.Addr = host + err := agentCfg.InitWithCerts(cfg.task.Context(), ca.Cert, client.Cert, client.Key) + if err != nil { + return 0, gperr.Wrap(err, "failed to start agent") + } + agent.Agents.Add(&agentCfg) + + provider := provider.NewAgentProvider(&agentCfg) + if err := cfg.errIfExists(provider); err != nil { + return 0, err + } + err = provider.LoadRoutes() + if err != nil { + return 0, gperr.Wrap(err, "failed to load routes") + } + return provider.NumRoutes(), nil +} diff --git a/internal/config/types/config.go b/internal/config/types/config.go index 7c1b9d3..c396c28 100644 --- a/internal/config/types/config.go +++ b/internal/config/types/config.go @@ -45,9 +45,7 @@ type ( Statistics() map[string]any RouteProviderList() []string Context() context.Context - GetAgent(agentAddrOrDockerHost string) (*agent.AgentConfig, bool) VerifyNewAgent(host string, ca agent.PEMPair, client agent.PEMPair) (int, gperr.Error) - ListAgents() []*agent.AgentConfig AutoCertProvider() *autocert.Provider } ) diff --git a/internal/docker/client.go b/internal/docker/client.go index f502ec5..45029c3 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -14,7 +14,6 @@ import ( "github.com/docker/docker/client" "github.com/yusing/go-proxy/agent/pkg/agent" "github.com/yusing/go-proxy/internal/common" - config "github.com/yusing/go-proxy/internal/config/types" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/task" ) @@ -134,7 +133,7 @@ func NewClient(host string) (*SharedClient, error) { var dial func(ctx context.Context) (net.Conn, error) if agent.IsDockerHostAgent(host) { - cfg, ok := config.GetInstance().GetAgent(host) + cfg, ok := agent.Agents.Get(host) if !ok { panic(fmt.Errorf("agent %q not found", host)) } diff --git a/internal/docker/container.go b/internal/docker/container.go index 43e0288..4bcec72 100644 --- a/internal/docker/container.go +++ b/internal/docker/container.go @@ -83,7 +83,7 @@ func FromDocker(c *container.Summary, dockerHost string) (res *Container) { if agent.IsDockerHostAgent(dockerHost) { var ok bool - res.Agent, ok = config.GetInstance().GetAgent(dockerHost) + res.Agent, ok = agent.Agents.Get(dockerHost) if !ok { logging.Error().Msgf("agent %q not found", dockerHost) } diff --git a/internal/utils/pool/pool.go b/internal/utils/pool/pool.go new file mode 100644 index 0000000..9421ac4 --- /dev/null +++ b/internal/utils/pool/pool.go @@ -0,0 +1,62 @@ +package pool + +import ( + "sort" + + "github.com/yusing/go-proxy/internal/utils" + "github.com/yusing/go-proxy/internal/utils/functional" +) + +type ( + Pool[T Object] struct { + m functional.Map[string, T] + } + Object interface { + Key() string + Name() string + utils.MapMarshaller + } +) + +func New[T Object]() Pool[T] { + return Pool[T]{functional.NewMapOf[string, T]()} +} + +func (p Pool[T]) Add(obj T) { + p.m.Store(obj.Key(), obj) +} + +func (p Pool[T]) Get(key string) (T, bool) { + return p.m.Load(key) +} + +func (p Pool[T]) Size() int { + return p.m.Size() +} + +func (p Pool[T]) Clear() { + p.m.Clear() +} + +func (p Pool[T]) Base() functional.Map[string, T] { + return p.m +} + +func (p Pool[T]) Slice() []T { + slice := make([]T, 0, p.m.Size()) + for _, v := range p.m.Range { + slice = append(slice, v) + } + sort.Slice(slice, func(i, j int) bool { + return slice[i].Name() < slice[j].Name() + }) + return slice +} + +func (p Pool[T]) Iter(fn func(k string, v T) bool) { + p.m.Range(fn) +} + +func (p Pool[T]) IterAll(fn func(k string, v T)) { + p.m.RangeAll(fn) +}