refactor, fix reload error when using agents, and other small improvements

This commit is contained in:
yusing 2025-02-11 12:15:51 +08:00
parent b1f72620dc
commit 429a77de8e
10 changed files with 139 additions and 105 deletions

View file

@ -8,7 +8,6 @@ import (
"net/http" "net/http"
"os" "os"
"strings" "strings"
"sync"
"time" "time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -18,21 +17,18 @@ import (
gphttp "github.com/yusing/go-proxy/internal/net/http" gphttp "github.com/yusing/go-proxy/internal/net/http"
"github.com/yusing/go-proxy/internal/net/types" "github.com/yusing/go-proxy/internal/net/types"
"github.com/yusing/go-proxy/internal/task" "github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/internal/utils/functional"
"github.com/yusing/go-proxy/pkg" "github.com/yusing/go-proxy/pkg"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type ( type AgentConfig struct {
AgentConfig struct {
Addr string Addr string
httpClient *http.Client httpClient *http.Client
tlsConfig *tls.Config tlsConfig *tls.Config
name string name string
l zerolog.Logger l zerolog.Logger
} }
)
const ( const (
EndpointVersion = "/version" EndpointVersion = "/version"
@ -53,43 +49,25 @@ const (
FakeDockerHostPrefixLen = len(FakeDockerHostPrefix) FakeDockerHostPrefixLen = len(FakeDockerHostPrefix)
) )
var (
agents = functional.NewMapOf[string, *AgentConfig]()
agentMapMu sync.RWMutex
)
var ( var (
HTTPProxyURL = types.MustParseURL(APIBaseURL + EndpointProxyHTTP) HTTPProxyURL = types.MustParseURL(APIBaseURL + EndpointProxyHTTP)
HTTPProxyURLStripLen = len(APIEndpointBase + EndpointProxyHTTP) HTTPProxyURLPrefixLen = len(APIEndpointBase + EndpointProxyHTTP)
) )
func IsDockerHostAgent(dockerHost string) bool { func IsDockerHostAgent(dockerHost string) bool {
return strings.HasPrefix(dockerHost, FakeDockerHostPrefix) return strings.HasPrefix(dockerHost, FakeDockerHostPrefix)
} }
func GetAgentFromDockerHost(dockerHost string) (*AgentConfig, bool) { func GetAgentAddrFromDockerHost(dockerHost string) string {
if !IsDockerHostAgent(dockerHost) { return dockerHost[FakeDockerHostPrefixLen:]
return nil, false
}
return agents.Load(dockerHost[FakeDockerHostPrefixLen:])
} }
func (cfg *AgentConfig) FakeDockerHost() string { func (cfg *AgentConfig) FakeDockerHost() string {
return FakeDockerHostPrefix + cfg.Name() return FakeDockerHostPrefix + cfg.Addr
} }
func (cfg *AgentConfig) Parse(addr string) error { func (cfg *AgentConfig) Parse(addr string) error {
cfg.Addr = addr cfg.Addr = addr
return cfg.load()
}
func (cfg *AgentConfig) errIfNameExists() E.Error {
agentMapMu.RLock()
defer agentMapMu.RUnlock()
agent, ok := agents.Load(cfg.Name())
if ok {
return E.Errorf("agent with name %s (%s) already exists", cfg.Name(), agent.Addr)
}
return nil return nil
} }
@ -101,13 +79,7 @@ func checkVersion(a, b string) bool {
return withoutBuildTime(a) == withoutBuildTime(b) return withoutBuildTime(a) == withoutBuildTime(b)
} }
func (cfg *AgentConfig) Remove() { func (cfg *AgentConfig) Start(parent task.Parent) E.Error {
agentMapMu.Lock()
defer agentMapMu.Unlock()
agents.Delete(cfg.Name())
}
func (cfg *AgentConfig) load() E.Error {
certData, err := os.ReadFile(certs.AgentCertsFilename(cfg.Addr)) certData, err := os.ReadFile(certs.AgentCertsFilename(cfg.Addr))
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -141,7 +113,7 @@ func (cfg *AgentConfig) load() E.Error {
// create transport and http client // create transport and http client
cfg.httpClient = cfg.NewHTTPClient() cfg.httpClient = cfg.NewHTTPClient()
ctx, cancel := context.WithTimeout(task.RootContext(), 5*time.Second) ctx, cancel := context.WithTimeout(parent.Context(), 5*time.Second)
defer cancel() defer cancel()
// check agent version // check agent version
@ -160,15 +132,10 @@ func (cfg *AgentConfig) load() E.Error {
return E.Wrap(err) return E.Wrap(err)
} }
// check if agent name is already used
cfg.name = string(name) cfg.name = string(name)
if err := cfg.errIfNameExists(); err != nil {
return err
}
cfg.l = logging.With().Str("agent", cfg.name).Logger() cfg.l = logging.With().Str("agent", cfg.name).Logger()
agents.Store(cfg.name, cfg) logging.Info().Msgf("agent %q started", cfg.name)
return nil return nil
} }
@ -195,7 +162,7 @@ func (cfg *AgentConfig) Name() string {
} }
func (cfg *AgentConfig) String() string { func (cfg *AgentConfig) String() string {
return "agent@" + cfg.Name() return "agent@" + cfg.Addr
} }
func (cfg *AgentConfig) MarshalJSON() ([]byte, error) { func (cfg *AgentConfig) MarshalJSON() ([]byte, error) {

View file

@ -5,13 +5,11 @@ import (
"net/http" "net/http"
"github.com/coder/websocket" "github.com/coder/websocket"
"github.com/yusing/go-proxy/internal/logging"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
func (cfg *AgentConfig) Do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) { func (cfg *AgentConfig) Do(ctx context.Context, method, endpoint string, body io.Reader) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, APIBaseURL+endpoint, body) req, err := http.NewRequestWithContext(ctx, method, APIBaseURL+endpoint, body)
logging.Debug().Msgf("request: %s %s", method, req.URL.String())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -49,7 +49,7 @@ func ProxyHTTP(w http.ResponseWriter, r *http.Request) {
r.URL.Scheme = "" r.URL.Scheme = ""
r.URL.Host = "" r.URL.Host = ""
r.URL.Path = r.URL.Path[agent.HTTPProxyURLStripLen:] // strip the {API_BASE}/proxy/http prefix r.URL.Path = r.URL.Path[agent.HTTPProxyURLPrefixLen:] // strip the {API_BASE}/proxy/http prefix
r.RequestURI = r.URL.String() r.RequestURI = r.URL.String()
r.URL.Host = host r.URL.Host = host
r.URL.Scheme = scheme r.URL.Scheme = scheme

View file

@ -2,16 +2,18 @@ package config
import ( import (
"context" "context"
"errors"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/yusing/go-proxy/agent/pkg/agent"
"github.com/yusing/go-proxy/internal/api" "github.com/yusing/go-proxy/internal/api"
"github.com/yusing/go-proxy/internal/autocert" "github.com/yusing/go-proxy/internal/autocert"
"github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/config/types" config "github.com/yusing/go-proxy/internal/config/types"
"github.com/yusing/go-proxy/internal/entrypoint" "github.com/yusing/go-proxy/internal/entrypoint"
E "github.com/yusing/go-proxy/internal/error" E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/logging"
@ -26,7 +28,7 @@ import (
) )
type Config struct { type Config struct {
value *types.Config value *config.Config
providers F.Map[string, *proxy.Provider] providers F.Map[string, *proxy.Provider]
autocertProvider *autocert.Provider autocertProvider *autocert.Provider
entrypoint *entrypoint.Entrypoint entrypoint *entrypoint.Entrypoint
@ -35,7 +37,6 @@ type Config struct {
} }
var ( var (
instance *Config
cfgWatcher watcher.Watcher cfgWatcher watcher.Watcher
reloadMu sync.Mutex reloadMu sync.Mutex
) )
@ -49,15 +50,15 @@ Make sure you rename it back before next time you start.`
You may run "ls-config" to show or dump the current config.` You may run "ls-config" to show or dump the current config.`
) )
var Validate = types.Validate var Validate = config.Validate
func GetInstance() *Config { func GetInstance() *Config {
return instance return config.GetInstance().(*Config)
} }
func newConfig() *Config { func newConfig() *Config {
return &Config{ return &Config{
value: types.DefaultConfig(), value: config.DefaultConfig(),
providers: F.NewMapOf[string, *proxy.Provider](), providers: F.NewMapOf[string, *proxy.Provider](),
entrypoint: entrypoint.NewEntrypoint(), entrypoint: entrypoint.NewEntrypoint(),
task: task.RootTask("config", false), task: task.RootTask("config", false),
@ -65,16 +66,17 @@ func newConfig() *Config {
} }
func Load() (*Config, E.Error) { func Load() (*Config, E.Error) {
if instance != nil { if config.HasInstance() {
return instance, nil panic(errors.New("config already loaded"))
} }
instance = newConfig() cfg := newConfig()
config.SetInstance(cfg)
cfgWatcher = watcher.NewConfigFileWatcher(common.ConfigFileName) cfgWatcher = watcher.NewConfigFileWatcher(common.ConfigFileName)
return instance, instance.load() return cfg, cfg.load()
} }
func MatchDomains() []string { func MatchDomains() []string {
return instance.value.MatchDomains return GetInstance().Value().MatchDomains
} }
func WatchChanges() { func WatchChanges() {
@ -122,14 +124,14 @@ func Reload() E.Error {
// cancel all current subtasks -> wait // cancel all current subtasks -> wait
// -> replace config -> start new subtasks // -> replace config -> start new subtasks
instance.task.Finish("config changed") GetInstance().Task().Finish("config changed")
instance = newCfg newCfg.Start(StartAllServers)
instance.Start(StartAllServers) config.SetInstance(newCfg)
return nil return nil
} }
func (cfg *Config) Value() *types.Config { func (cfg *Config) Value() *config.Config {
return instance.value return cfg.value
} }
func (cfg *Config) Reload() E.Error { func (cfg *Config) Reload() E.Error {
@ -137,7 +139,7 @@ func (cfg *Config) Reload() E.Error {
} }
func (cfg *Config) AutoCertProvider() *autocert.Provider { func (cfg *Config) AutoCertProvider() *autocert.Provider {
return instance.autocertProvider return cfg.autocertProvider
} }
func (cfg *Config) Task() *task.Task { func (cfg *Config) Task() *task.Task {
@ -217,7 +219,7 @@ func (cfg *Config) load() E.Error {
E.LogFatal(errMsg, err) E.LogFatal(errMsg, err)
} }
model := types.DefaultConfig() model := config.DefaultConfig()
if err := utils.DeserializeYAML(data, model); err != nil { if err := utils.DeserializeYAML(data, model); err != nil {
E.LogFatal(errMsg, err) E.LogFatal(errMsg, err)
} }
@ -260,39 +262,74 @@ func (cfg *Config) initAutoCert(autocertCfg *autocert.AutocertConfig) (err E.Err
return return
} }
func (cfg *Config) loadRouteProviders(providers *types.Providers) E.Error { func (cfg *Config) errIfExists(p *proxy.Provider) E.Error {
if _, ok := cfg.providers.Load(p.String()); ok {
return E.Errorf("provider %s already exists", p.String())
}
return nil
}
func (cfg *Config) storeProvider(p *proxy.Provider) {
cfg.providers.Store(p.String(), p)
}
func (cfg *Config) GetAgent(agentDockerHost string) (*agent.AgentConfig, bool) {
if !agent.IsDockerHostAgent(agentDockerHost) {
panic(errors.New("invalid use of GetAgent with docker host: " + agentDockerHost))
}
key := "agent@" + agent.GetAgentAddrFromDockerHost(agentDockerHost)
p, ok := cfg.providers.Load(key)
if !ok {
return nil, false
}
return p.ProviderImpl.(*proxy.AgentProvider).AgentConfig, true
}
func (cfg *Config) loadRouteProviders(providers *config.Providers) E.Error {
errs := E.NewBuilder("route provider errors") errs := E.NewBuilder("route provider errors")
results := E.NewBuilder("loaded route providers") results := E.NewBuilder("loaded route providers")
lenLongestName := 0 for _, agent := range providers.Agents {
if err := agent.Start(cfg.task); err != nil {
errs.Add(err.Subject(agent.String()))
continue
}
p := proxy.NewAgentProvider(agent)
if err := cfg.errIfExists(p); err != nil {
errs.Add(err.Subject(p.String()))
continue
}
cfg.storeProvider(p)
}
for _, filename := range providers.Files { for _, filename := range providers.Files {
p, err := proxy.NewFileProvider(filename) p, err := proxy.NewFileProvider(filename)
if err == nil {
err = cfg.errIfExists(p)
}
if err != nil { if err != nil {
errs.Add(E.PrependSubject(filename, err)) errs.Add(E.PrependSubject(filename, err))
continue continue
} }
cfg.providers.Store(p.String(), p) cfg.storeProvider(p)
if len(p.String()) > lenLongestName {
lenLongestName = len(p.String())
}
} }
for name, dockerHost := range providers.Docker { for name, dockerHost := range providers.Docker {
p, err := proxy.NewDockerProvider(name, dockerHost) p := proxy.NewDockerProvider(name, dockerHost)
if err != nil { if err := cfg.errIfExists(p); err != nil {
errs.Add(E.PrependSubject(name, err)) errs.Add(err.Subject(p.String()))
continue continue
} }
cfg.providers.Store(p.String(), p) cfg.storeProvider(p)
if len(p.String()) > lenLongestName {
lenLongestName = len(p.String())
}
}
for _, agent := range providers.Agents {
cfg.providers.Store(agent.Name(), proxy.NewAgentProvider(&agent))
} }
if cfg.providers.Size() == 0 { if cfg.providers.Size() == 0 {
return nil return nil
} }
lenLongestName := 0
cfg.providers.RangeAll(func(k string, _ *proxy.Provider) {
if len(k) > lenLongestName {
lenLongestName = len(k)
}
})
cfg.providers.RangeAllParallel(func(_ string, p *proxy.Provider) { cfg.providers.RangeAllParallel(func(_ string, p *proxy.Provider) {
if err := p.LoadRoutes(); err != nil { if err := p.LoadRoutes(); err != nil {
errs.Add(err.Subject(p.String())) errs.Add(err.Subject(p.String()))

View file

@ -3,6 +3,7 @@ package types
import ( import (
"context" "context"
"regexp" "regexp"
"sync"
"github.com/go-playground/validator/v10" "github.com/go-playground/validator/v10"
"github.com/yusing/go-proxy/agent/pkg/agent" "github.com/yusing/go-proxy/agent/pkg/agent"
@ -25,8 +26,8 @@ type (
} }
Providers struct { Providers struct {
Files []string `json:"include" yaml:"include,omitempty" validate:"dive,filepath"` Files []string `json:"include" yaml:"include,omitempty" validate:"dive,filepath"`
Docker map[string]string `json:"docker" yaml:"docker,omitempty" validate:"dive,unix_addr|url"` Docker map[string]string `json:"docker" yaml:"docker,omitempty" validate:"non_empty_docker_keys,dive,unix_addr|url"`
Agents []agent.AgentConfig `json:"agents" yaml:"agents,omitempty"` Agents []*agent.AgentConfig `json:"agents" yaml:"agents,omitempty"`
Notification []notif.NotificationConfig `json:"notification" yaml:"notification,omitempty"` Notification []notif.NotificationConfig `json:"notification" yaml:"notification,omitempty"`
} }
Entrypoint struct { Entrypoint struct {
@ -40,9 +41,15 @@ type (
Statistics() map[string]any Statistics() map[string]any
RouteProviderList() []string RouteProviderList() []string
Context() context.Context Context() context.Context
GetAgent(agentDockerHost string) (*agent.AgentConfig, bool)
} }
) )
var (
instance ConfigInstance
instanceMu sync.RWMutex
)
func DefaultConfig() *Config { func DefaultConfig() *Config {
return &Config{ return &Config{
TimeoutShutdown: 3, TimeoutShutdown: 3,
@ -52,6 +59,24 @@ func DefaultConfig() *Config {
} }
} }
func GetInstance() ConfigInstance {
instanceMu.RLock()
defer instanceMu.RUnlock()
return instance
}
func SetInstance(cfg ConfigInstance) {
instanceMu.Lock()
defer instanceMu.Unlock()
instance = cfg
}
func HasInstance() bool {
instanceMu.RLock()
defer instanceMu.RUnlock()
return instance != nil
}
func Validate(data []byte) E.Error { func Validate(data []byte) E.Error {
var model Config var model Config
return utils.DeserializeYAML(data, &model) return utils.DeserializeYAML(data, &model)
@ -70,4 +95,13 @@ func init() {
} }
return true return true
}) })
utils.MustRegisterValidation("non_empty_docker_keys", func(fl validator.FieldLevel) bool {
m := fl.Field().Interface().(map[string]string)
for k := range m {
if k == "" {
return false
}
}
return true
})
} }

View file

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"runtime/debug"
"sync" "sync"
"github.com/docker/cli/cli/connhelper" "github.com/docker/cli/cli/connhelper"
@ -11,6 +12,7 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/yusing/go-proxy/agent/pkg/agent" "github.com/yusing/go-proxy/agent/pkg/agent"
"github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/common"
config "github.com/yusing/go-proxy/internal/config/types"
"github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/task" "github.com/yusing/go-proxy/internal/task"
U "github.com/yusing/go-proxy/internal/utils" U "github.com/yusing/go-proxy/internal/utils"
@ -84,9 +86,9 @@ func ConnectClient(host string) (*SharedClient, error) {
var opt []client.Opt var opt []client.Opt
if agent.IsDockerHostAgent(host) { if agent.IsDockerHostAgent(host) {
cfg, ok := agent.GetAgentFromDockerHost(host) cfg, ok := config.GetInstance().GetAgent(host)
if !ok { if !ok {
return nil, fmt.Errorf("agent not found for host: %s", host) return nil, fmt.Errorf("agent %q not found\n%s", host, debug.Stack())
} }
opt = []client.Opt{ opt = []client.Opt{
client.WithHost(agent.DockerHost), client.WithHost(agent.DockerHost),

View file

@ -7,6 +7,7 @@ import (
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/yusing/go-proxy/agent/pkg/agent" "github.com/yusing/go-proxy/agent/pkg/agent"
config "github.com/yusing/go-proxy/internal/config/types"
"github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/logging"
U "github.com/yusing/go-proxy/internal/utils" U "github.com/yusing/go-proxy/internal/utils"
"github.com/yusing/go-proxy/internal/utils/strutils" "github.com/yusing/go-proxy/internal/utils/strutils"
@ -82,7 +83,11 @@ func FromDocker(c *types.Container, dockerHost string) (res *Container) {
} }
if agent.IsDockerHostAgent(dockerHost) { if agent.IsDockerHostAgent(dockerHost) {
res.Agent, _ = agent.GetAgentFromDockerHost(dockerHost) var ok bool
res.Agent, ok = config.GetInstance().GetAgent(dockerHost)
if !ok {
logging.Error().Msgf("agent %q not found", dockerHost)
}
} }
res.setPrivateHostname(helper) res.setPrivateHostname(helper)

View file

@ -14,7 +14,7 @@ type AgentProvider struct {
} }
func (p *AgentProvider) ShortName() string { func (p *AgentProvider) ShortName() string {
return p.Name() return p.AgentConfig.Name()
} }
func (p *AgentProvider) NewWatcher() watcher.Watcher { func (p *AgentProvider) NewWatcher() watcher.Watcher {

View file

@ -39,8 +39,7 @@ func makeRoutes(cont *types.Container, dockerHostIP ...string) route.Routes {
} }
func TestExplicitOnly(t *testing.T) { func TestExplicitOnly(t *testing.T) {
p, err := NewDockerProvider("a!", "") p := NewDockerProvider("a!", "")
ExpectNoError(t, err)
ExpectTrue(t, p.IsExplicitOnly()) ExpectTrue(t, p.IsExplicitOnly())
} }

View file

@ -59,15 +59,11 @@ func NewFileProvider(filename string) (p *Provider, err error) {
return return
} }
func NewDockerProvider(name string, dockerHost string) (p *Provider, err error) { func NewDockerProvider(name string, dockerHost string) *Provider {
if name == "" { p := newProvider(types.ProviderTypeDocker)
return nil, ErrEmptyProviderName
}
p = newProvider(types.ProviderTypeDocker)
p.ProviderImpl = DockerProviderImpl(name, dockerHost) p.ProviderImpl = DockerProviderImpl(name, dockerHost)
p.watcher = p.NewWatcher() p.watcher = p.NewWatcher()
return return p
} }
func NewAgentProvider(cfg *agent.AgentConfig) *Provider { func NewAgentProvider(cfg *agent.AgentConfig) *Provider {
@ -127,10 +123,6 @@ func (p *Provider) Start(parent task.Parent) E.Error {
if err := errs.Error(); err != nil { if err := errs.Error(); err != nil {
return err.Subject(p.String()) return err.Subject(p.String())
} }
if p.t == types.ProviderTypeAgent {
t.OnCancel("remove agent", p.ProviderImpl.(*AgentProvider).AgentConfig.Remove)
}
return nil return nil
} }