diff --git a/internal/route/common.go b/internal/route/common.go new file mode 100644 index 0000000..01c22b0 --- /dev/null +++ b/internal/route/common.go @@ -0,0 +1,23 @@ +package route + +import ( + "github.com/yusing/go-proxy/internal/gperr" + "github.com/yusing/go-proxy/internal/route/routes" +) + +func checkExists(r routes.Route) gperr.Error { + var ( + existing routes.Route + ok bool + ) + switch r := r.(type) { + case routes.HTTPRoute: + existing, ok = routes.HTTP.Get(r.Key()) + case routes.StreamRoute: + existing, ok = routes.Stream.Get(r.Key()) + } + if ok { + return gperr.Errorf("route already exists: from provider %s and %s", existing.ProviderName(), r.ProviderName()) + } + return nil +} diff --git a/internal/route/fileserver.go b/internal/route/fileserver.go index 9c388c1..ce37da3 100644 --- a/internal/route/fileserver.go +++ b/internal/route/fileserver.go @@ -96,8 +96,16 @@ func (s *FileServer) Start(parent task.Parent) gperr.Error { } } + if s.ShouldExclude() { + return nil + } + + if err := checkExists(s); err != nil { + return err + } + routes.HTTP.Add(s) - s.task.OnCancel("entrypoint_remove_route", func() { + s.task.OnFinished("remove_route_from_http", func() { routes.HTTP.Del(s) }) return nil diff --git a/internal/route/provider/docker.go b/internal/route/provider/docker.go index 340b628..4a58cdc 100755 --- a/internal/route/provider/docker.go +++ b/internal/route/provider/docker.go @@ -71,9 +71,6 @@ func (p *DockerProvider) loadRoutesImpl() (route.Routes, gperr.Error) { for _, c := range containers { container := docker.FromDocker(&c, p.dockerHost) - if container.IsExcluded { - continue - } if container.IsHostNetworkMode { err := container.UpdatePorts() @@ -89,10 +86,15 @@ func (p *DockerProvider) loadRoutesImpl() (route.Routes, gperr.Error) { } for k, v := range newEntries { if conflict, ok := routes[k]; ok { - errs.Add(gperr.Multiline(). + err := gperr.Multiline(). Addf("route with alias %s already exists", k). Addf("container %s", container.ContainerName). - Addf("conflicting container %s", conflict.Container.ContainerName)) + Addf("conflicting container %s", conflict.Container.ContainerName) + if conflict.ShouldExclude() || v.ShouldExclude() { + gperr.LogWarn("skipping conflicting route", err) + } else { + errs.Add(err) + } } else { routes[k] = v } diff --git a/internal/route/provider/provider.go b/internal/route/provider/provider.go index be22c63..d0b6c54 100644 --- a/internal/route/provider/provider.go +++ b/internal/route/provider/provider.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "path" + "slices" "time" "github.com/rs/zerolog" @@ -11,6 +12,7 @@ import ( "github.com/yusing/go-proxy/internal/gperr" "github.com/yusing/go-proxy/internal/route" "github.com/yusing/go-proxy/internal/route/provider/types" + "github.com/yusing/go-proxy/internal/route/routes" "github.com/yusing/go-proxy/internal/task" W "github.com/yusing/go-proxy/internal/watcher" "github.com/yusing/go-proxy/internal/watcher/events" @@ -90,9 +92,17 @@ func (p *Provider) startRoute(parent task.Parent, r *route.Route) gperr.Error { err := r.Start(parent) if err != nil { delete(p.routes, r.Alias) + routes.All.Del(r) return err.Subject(r.Alias) } - p.routes[r.Alias] = r + if conflict, added := routes.All.AddIfNotExists(r); !added { + delete(p.routes, r.Alias) + return gperr.Errorf("route %s already exists: from %s and %s", r.Alias, r.ProviderName(), conflict.ProviderName()) + } else { + r.Task().OnCancel("remove_routes_from_all", func() { + routes.All.Del(r) + }) + } return nil } @@ -155,10 +165,6 @@ func (p *Provider) loadRoutes() (routes route.Routes, err gperr.Error) { delete(routes, alias) continue } - if r.ShouldExclude() { - delete(routes, alias) - continue - } r.FinalizeHomepageConfig() } return routes, errs.Error() diff --git a/internal/route/reverse_proxy.go b/internal/route/reverse_proxy.go index 1523e27..025ce77 100755 --- a/internal/route/reverse_proxy.go +++ b/internal/route/reverse_proxy.go @@ -50,7 +50,7 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, gperr.Error) { } else { trans = gphttp.NewTransport() if httpConfig.NoTLSVerify { - trans.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + trans.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} //nolint:gosec } if httpConfig.ResponseHeaderTimeout > 0 { trans.ResponseHeaderTimeout = httpConfig.ResponseHeaderTimeout @@ -98,9 +98,6 @@ func (r *ReveseProxyRoute) ReverseProxy() *reverseproxy.ReverseProxy { // Start implements task.TaskStarter. func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error { - if existing, ok := routes.HTTP.Get(r.Key()); ok && !r.UseLoadBalance() { - return gperr.Errorf("route already exists: from provider %s and %s", existing.ProviderName(), r.ProviderName()) - } r.task = parent.Subtask("http."+r.Name(), false) switch { @@ -139,11 +136,19 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error { } } + if r.ShouldExclude() { + return nil + } + + if err := checkExists(r); err != nil { + return err + } + if r.UseLoadBalance() { r.addToLoadBalancer(parent) } else { routes.HTTP.Add(r) - r.task.OnFinished("entrypoint_remove_route", func() { + r.task.OnCancel("remove_route_from_http", func() { routes.HTTP.Del(r) }) } diff --git a/internal/route/route.go b/internal/route/route.go index f09af57..8840405 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -2,6 +2,7 @@ package route import ( "context" + "errors" "fmt" "strings" "time" @@ -215,6 +216,13 @@ func (r *Route) Validate() gperr.Error { return nil } +func (r *Route) Task() *task.Task { + if r.impl == nil { // should not happen + panic(errors.New("route not initialized")) + } + return r.impl.Task() +} + func (r *Route) Start(parent task.Parent) (err gperr.Error) { if r.impl == nil { // should not happen return gperr.New("route not initialized") @@ -354,10 +362,20 @@ func (r *Route) UseLoadBalance() bool { } func (r *Route) UseIdleWatcher() bool { - return r.Idlewatcher != nil && r.Idlewatcher.IdleTimeout > 0 + return r.Idlewatcher != nil && r.Idlewatcher.IdleTimeout != 0 } func (r *Route) UseHealthCheck() bool { + if r.Container != nil { + switch { + case r.Container.Image.Name == "godoxy-agent": + return false + case !r.Container.Running && !r.UseIdleWatcher(): + return false + case strings.HasPrefix(r.Container.ContainerName, "buildx_"): + return false + } + } return !r.HealthCheck.Disable } @@ -482,6 +500,12 @@ func (r *Route) FinalizeHomepageConfig() { } r.Homepage = r.Homepage.GetOverride(r.Alias) + if r.ShouldExclude() && isDocker { + r.Homepage.Show = false + r.Homepage.Name = r.Container.ContainerName // still show container name in metrics page + return + } + hp := r.Homepage refs := r.References() for _, ref := range refs { diff --git a/internal/route/routes/routes.go b/internal/route/routes/routes.go index 4aa6f59..c3fd007 100644 --- a/internal/route/routes/routes.go +++ b/internal/route/routes/routes.go @@ -7,15 +7,16 @@ import ( var ( HTTP = pool.New[HTTPRoute]("http_routes") Stream = pool.New[StreamRoute]("stream_routes") + // All is a pool of all routes, including HTTP, Stream routes and also excluded routes. + All = pool.New[Route]("all_routes") ) +func init() { + All.DisableLog() +} + func Iter(yield func(r Route) bool) { - for _, r := range HTTP.Iter { - if !yield(r) { - break - } - } - for _, r := range Stream.Iter { + for _, r := range All.Iter { if !yield(r) { break } @@ -23,12 +24,7 @@ func Iter(yield func(r Route) bool) { } func IterKV(yield func(alias string, r Route) bool) { - for k, r := range HTTP.Iter { - if !yield(k, r) { - break - } - } - for k, r := range Stream.Iter { + for k, r := range All.Iter { if !yield(k, r) { break } @@ -36,12 +32,13 @@ func IterKV(yield func(alias string, r Route) bool) { } func NumRoutes() int { - return HTTP.Size() + Stream.Size() + return All.Size() } func Clear() { HTTP.Clear() Stream.Clear() + All.Clear() } func GetHTTPRouteOrExact(alias, host string) (HTTPRoute, bool) { @@ -54,9 +51,5 @@ func GetHTTPRouteOrExact(alias, host string) (HTTPRoute, bool) { } func Get(alias string) (Route, bool) { - r, ok := HTTP.Get(alias) - if ok { - return r, true - } - return Stream.Get(alias) + return All.Get(alias) } diff --git a/internal/route/stream.go b/internal/route/stream.go index b9dbf14..2fbbb95 100755 --- a/internal/route/stream.go +++ b/internal/route/stream.go @@ -41,9 +41,6 @@ func NewStreamRoute(base *Route) (routes.Route, gperr.Error) { // Start implements task.TaskStarter. func (r *StreamRoute) Start(parent task.Parent) gperr.Error { - if existing, ok := routes.Stream.Get(r.Key()); ok { - return gperr.Errorf("route already exists: from provider %s and %s", existing.ProviderName(), r.ProviderName()) - } r.task = parent.Subtask("stream."+r.Name(), true) r.Stream = NewStream(r) @@ -60,23 +57,32 @@ func (r *StreamRoute) Start(parent task.Parent) gperr.Error { r.HealthMon = monitor.NewMonitor(r) } - if err := r.Setup(); err != nil { - r.task.Finish(err) - return gperr.Wrap(err) + if !r.ShouldExclude() { + if err := r.Setup(); err != nil { + r.task.Finish(err) + return gperr.Wrap(err) + } + r.l.Info().Int("port", r.Port.Listening).Msg("listening") } - r.l.Info().Int("port", r.Port.Listening).Msg("listening") - if r.HealthMon != nil { if err := r.HealthMon.Start(r.task); err != nil { gperr.LogWarn("health monitor error", err, &r.l) } } + if r.ShouldExclude() { + return nil + } + + if err := checkExists(r); err != nil { + return err + } + go r.acceptConnections() routes.Stream.Add(r) - r.task.OnFinished("entrypoint_remove_route", func() { + r.task.OnCancel("remove_route_from_stream", func() { routes.Stream.Del(r) }) return nil diff --git a/internal/utils/pool/pool.go b/internal/utils/pool/pool.go index 84056e9..2056b70 100644 --- a/internal/utils/pool/pool.go +++ b/internal/utils/pool/pool.go @@ -9,8 +9,9 @@ import ( type ( Pool[T Object] struct { - m *xsync.Map[string, T] - name string + m *xsync.Map[string, T] + name string + disableLog bool } Object interface { Key() string @@ -19,41 +20,54 @@ type ( ) func New[T Object](name string) Pool[T] { - return Pool[T]{xsync.NewMap[string, T](), name} + return Pool[T]{xsync.NewMap[string, T](), name, false} } -func (p Pool[T]) Name() string { +func (p *Pool[T]) DisableLog() { + p.disableLog = true +} + +func (p *Pool[T]) Name() string { return p.name } -func (p Pool[T]) Add(obj T) { +func (p *Pool[T]) Add(obj T) { p.checkExists(obj.Key()) p.m.Store(obj.Key(), obj) - log.Info().Msgf("%s: added %s", p.name, obj.Name()) + if !p.disableLog { + log.Info().Msgf("%s: added %s", p.name, obj.Name()) + } } -func (p Pool[T]) Del(obj T) { +func (p *Pool[T]) AddIfNotExists(obj T) (actual T, added bool) { + actual, loaded := p.m.LoadOrStore(obj.Key(), obj) + return actual, !loaded +} + +func (p *Pool[T]) Del(obj T) { p.m.Delete(obj.Key()) - log.Info().Msgf("%s: removed %s", p.name, obj.Name()) + if !p.disableLog { + log.Info().Msgf("%s: removed %s", p.name, obj.Name()) + } } -func (p Pool[T]) Get(key string) (T, bool) { +func (p *Pool[T]) Get(key string) (T, bool) { return p.m.Load(key) } -func (p Pool[T]) Size() int { +func (p *Pool[T]) Size() int { return p.m.Size() } -func (p Pool[T]) Clear() { +func (p *Pool[T]) Clear() { p.m.Clear() } -func (p Pool[T]) Iter(fn func(k string, v T) bool) { +func (p *Pool[T]) Iter(fn func(k string, v T) bool) { p.m.Range(fn) } -func (p Pool[T]) Slice() []T { +func (p *Pool[T]) Slice() []T { slice := make([]T, 0, p.m.Size()) for _, v := range p.m.Range { slice = append(slice, v)