From 22ab043e06c7c32dedb90be856b40c4bb3d5f0bd Mon Sep 17 00:00:00 2001 From: yusing Date: Wed, 4 Jun 2025 23:17:41 +0800 Subject: [PATCH] refactor(route): improve route handling --- internal/route/route.go | 68 ++++++++++++++++++++++++++++++++++------ internal/route/stream.go | 2 +- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/internal/route/route.go b/internal/route/route.go index 9d6b4d6..c57ceb6 100644 --- a/internal/route/route.go +++ b/internal/route/route.go @@ -2,9 +2,10 @@ package route import ( "context" - "errors" "fmt" + "runtime" "strings" + "sync" "time" "github.com/docker/docker/api/types/container" @@ -58,7 +59,8 @@ type ( Metadata struct { /* Docker only */ Container *docker.Container `json:"container,omitempty"` - Provider string `json:"provider,omitempty"` + + Provider string `json:"provider,omitempty"` // for backward compatibility // private fields LisURL *net.URL `json:"lurl,omitempty"` @@ -69,8 +71,10 @@ type ( impl routes.Route isValidated bool lastError gperr.Error + provider routes.Provider started chan struct{} + once sync.Once } Routes map[string]*Route ) @@ -89,6 +93,16 @@ func (r *Route) Validate() gperr.Error { r.isValidated = true r.Finalize() + r.started = make(chan struct{}) + // close the channel when the route is destroyed (if not closed yet). + runtime.AddCleanup(r, func(ch chan struct{}) { + select { + case <-ch: + default: + close(ch) + } + }, r.started) + if r.Idlewatcher != nil && r.Idlewatcher.Proxmox != nil { node := r.Idlewatcher.Proxmox.Node vmid := r.Idlewatcher.Proxmox.VMID @@ -220,26 +234,53 @@ func (r *Route) Validate() gperr.Error { r.impl = impl r.Excluded = r.ShouldExclude() - r.started = make(chan struct{}) return nil } +func (r *Route) Impl() routes.Route { + return r.impl +} + func (r *Route) Task() *task.Task { - if r.impl == nil { // should not happen - panic(errors.New("route not initialized")) - } return r.impl.Task() } func (r *Route) Start(parent task.Parent) (err gperr.Error) { + r.once.Do(func() { + err = r.start(parent) + }) + return +} + +func (r *Route) start(parent task.Parent) gperr.Error { if r.impl == nil { // should not happen return gperr.New("route not initialized") } + defer close(r.started) - return r.impl.Start(parent) + if err := r.impl.Start(parent); err != nil { + return err + } + + if conflict, added := routes.All.AddIfNotExists(r.impl); !added { + err := gperr.Errorf("route %s already exists: from %s and %s", r.Alias, r.ProviderName(), conflict.ProviderName()) + r.impl.Task().FinishAndWait(err) + return err + } else { + // reference here because r.impl will be nil after Finish() is called. + impl := r.impl + impl.Task().OnCancel("remove_routes_from_all", func() { + routes.All.Del(impl) + }) + } + return nil } func (r *Route) Finish(reason any) { + r.FinishAndWait(reason) +} + +func (r *Route) FinishAndWait(reason any) { if r.impl == nil { return } @@ -251,10 +292,17 @@ func (r *Route) Started() <-chan struct{} { return r.started } +func (r *Route) GetProvider() routes.Provider { + return r.provider +} + +func (r *Route) SetProvider(p routes.Provider) { + r.provider = p + r.Provider = p.ShortName() } func (r *Route) ProviderName() string { - return r.Provider + return r.provider.ShortName() } func (r *Route) TargetURL() *net.URL { @@ -372,7 +420,7 @@ func (r *Route) UseLoadBalance() bool { } func (r *Route) UseIdleWatcher() bool { - return r.Idlewatcher != nil && r.Idlewatcher.IdleTimeout != 0 + return r.Idlewatcher != nil && r.Idlewatcher.IdleTimeout > 0 } func (r *Route) UseHealthCheck() bool { @@ -452,7 +500,7 @@ func (r *Route) Finalize() { if isDocker { if r.Scheme == "" { for _, p := range cont.PublicPortMapping { - if p.PrivatePort == uint16(pp) && p.Type == "udp" { + if int(p.PrivatePort) == pp && p.Type == "udp" { r.Scheme = "udp" break } diff --git a/internal/route/stream.go b/internal/route/stream.go index 2fbbb95..1060c1a 100755 --- a/internal/route/stream.go +++ b/internal/route/stream.go @@ -41,7 +41,7 @@ func NewStreamRoute(base *Route) (routes.Route, gperr.Error) { // Start implements task.TaskStarter. func (r *StreamRoute) Start(parent task.Parent) gperr.Error { - r.task = parent.Subtask("stream."+r.Name(), true) + r.task = parent.Subtask("stream."+r.Name(), !r.ShouldExclude()) r.Stream = NewStream(r) switch {