refactor(route): improve route handling

This commit is contained in:
yusing 2025-06-04 23:17:41 +08:00
parent b670cdbd49
commit 22ab043e06
2 changed files with 59 additions and 11 deletions

View file

@ -2,9 +2,10 @@ package route
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"runtime"
"strings" "strings"
"sync"
"time" "time"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
@ -58,7 +59,8 @@ type (
Metadata struct { Metadata struct {
/* Docker only */ /* Docker only */
Container *docker.Container `json:"container,omitempty"` Container *docker.Container `json:"container,omitempty"`
Provider string `json:"provider,omitempty"`
Provider string `json:"provider,omitempty"` // for backward compatibility
// private fields // private fields
LisURL *net.URL `json:"lurl,omitempty"` LisURL *net.URL `json:"lurl,omitempty"`
@ -69,8 +71,10 @@ type (
impl routes.Route impl routes.Route
isValidated bool isValidated bool
lastError gperr.Error lastError gperr.Error
provider routes.Provider
started chan struct{} started chan struct{}
once sync.Once
} }
Routes map[string]*Route Routes map[string]*Route
) )
@ -89,6 +93,16 @@ func (r *Route) Validate() gperr.Error {
r.isValidated = true r.isValidated = true
r.Finalize() r.Finalize()
r.started = make(chan struct{})
// close the channel when the route is destroyed (if not closed yet).
runtime.AddCleanup(r, func(ch chan struct{}) {
select {
case <-ch:
default:
close(ch)
}
}, r.started)
if r.Idlewatcher != nil && r.Idlewatcher.Proxmox != nil { if r.Idlewatcher != nil && r.Idlewatcher.Proxmox != nil {
node := r.Idlewatcher.Proxmox.Node node := r.Idlewatcher.Proxmox.Node
vmid := r.Idlewatcher.Proxmox.VMID vmid := r.Idlewatcher.Proxmox.VMID
@ -220,26 +234,53 @@ func (r *Route) Validate() gperr.Error {
r.impl = impl r.impl = impl
r.Excluded = r.ShouldExclude() r.Excluded = r.ShouldExclude()
r.started = make(chan struct{})
return nil return nil
} }
func (r *Route) Impl() routes.Route {
return r.impl
}
func (r *Route) Task() *task.Task { func (r *Route) Task() *task.Task {
if r.impl == nil { // should not happen
panic(errors.New("route not initialized"))
}
return r.impl.Task() return r.impl.Task()
} }
func (r *Route) Start(parent task.Parent) (err gperr.Error) { func (r *Route) Start(parent task.Parent) (err gperr.Error) {
r.once.Do(func() {
err = r.start(parent)
})
return
}
func (r *Route) start(parent task.Parent) gperr.Error {
if r.impl == nil { // should not happen if r.impl == nil { // should not happen
return gperr.New("route not initialized") return gperr.New("route not initialized")
} }
defer close(r.started)
return r.impl.Start(parent) if err := r.impl.Start(parent); err != nil {
return err
}
if conflict, added := routes.All.AddIfNotExists(r.impl); !added {
err := gperr.Errorf("route %s already exists: from %s and %s", r.Alias, r.ProviderName(), conflict.ProviderName())
r.impl.Task().FinishAndWait(err)
return err
} else {
// reference here because r.impl will be nil after Finish() is called.
impl := r.impl
impl.Task().OnCancel("remove_routes_from_all", func() {
routes.All.Del(impl)
})
}
return nil
} }
func (r *Route) Finish(reason any) { func (r *Route) Finish(reason any) {
r.FinishAndWait(reason)
}
func (r *Route) FinishAndWait(reason any) {
if r.impl == nil { if r.impl == nil {
return return
} }
@ -251,10 +292,17 @@ func (r *Route) Started() <-chan struct{} {
return r.started return r.started
} }
func (r *Route) GetProvider() routes.Provider {
return r.provider
}
func (r *Route) SetProvider(p routes.Provider) {
r.provider = p
r.Provider = p.ShortName()
} }
func (r *Route) ProviderName() string { func (r *Route) ProviderName() string {
return r.Provider return r.provider.ShortName()
} }
func (r *Route) TargetURL() *net.URL { func (r *Route) TargetURL() *net.URL {
@ -372,7 +420,7 @@ func (r *Route) UseLoadBalance() bool {
} }
func (r *Route) UseIdleWatcher() bool { func (r *Route) UseIdleWatcher() bool {
return r.Idlewatcher != nil && r.Idlewatcher.IdleTimeout != 0 return r.Idlewatcher != nil && r.Idlewatcher.IdleTimeout > 0
} }
func (r *Route) UseHealthCheck() bool { func (r *Route) UseHealthCheck() bool {
@ -452,7 +500,7 @@ func (r *Route) Finalize() {
if isDocker { if isDocker {
if r.Scheme == "" { if r.Scheme == "" {
for _, p := range cont.PublicPortMapping { for _, p := range cont.PublicPortMapping {
if p.PrivatePort == uint16(pp) && p.Type == "udp" { if int(p.PrivatePort) == pp && p.Type == "udp" {
r.Scheme = "udp" r.Scheme = "udp"
break break
} }

View file

@ -41,7 +41,7 @@ func NewStreamRoute(base *Route) (routes.Route, gperr.Error) {
// Start implements task.TaskStarter. // Start implements task.TaskStarter.
func (r *StreamRoute) Start(parent task.Parent) gperr.Error { func (r *StreamRoute) Start(parent task.Parent) gperr.Error {
r.task = parent.Subtask("stream."+r.Name(), true) r.task = parent.Subtask("stream."+r.Name(), !r.ShouldExclude())
r.Stream = NewStream(r) r.Stream = NewStream(r)
switch { switch {