mirror of
https://github.com/yusing/godoxy.git
synced 2025-05-19 20:32:35 +02:00
refactor: unify route handling by consolidating route query methods with Pool
- Replaced direct calls to routequery with a new routes package for better organization and maintainability. - Updated various components to utilize the new routes methods for fetching health information, homepage configurations, and route aliases. - Enhanced the overall structure of the routing logic to improve clarity and reduce redundancy.
This commit is contained in:
parent
7f4b04efb7
commit
1fe21b84eb
24 changed files with 211 additions and 238 deletions
|
@ -10,7 +10,7 @@ var Agents = agents{pool.New[*AgentConfig]("agents")}
|
|||
|
||||
func (agents agents) Get(agentAddrOrDockerHost string) (*AgentConfig, bool) {
|
||||
if !IsDockerHostAgent(agentAddrOrDockerHost) {
|
||||
return agents.Base().Load(agentAddrOrDockerHost)
|
||||
return agents.Get(agentAddrOrDockerHost)
|
||||
}
|
||||
return agents.Base().Load(GetAgentAddrFromDockerHost(agentAddrOrDockerHost))
|
||||
return agents.Get(GetAgentAddrFromDockerHost(agentAddrOrDockerHost))
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ import (
|
|||
"github.com/yusing/go-proxy/internal/metrics/systeminfo"
|
||||
"github.com/yusing/go-proxy/internal/metrics/uptime"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/middleware"
|
||||
"github.com/yusing/go-proxy/internal/route/routes/routequery"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
"github.com/yusing/go-proxy/migrations"
|
||||
"github.com/yusing/go-proxy/pkg"
|
||||
|
@ -124,7 +124,7 @@ func main() {
|
|||
switch args.Command {
|
||||
case common.CommandListRoutes:
|
||||
cfg.StartProxyProviders()
|
||||
printJSON(routequery.RoutesByAlias())
|
||||
printJSON(routes.ByAlias())
|
||||
return
|
||||
case common.CommandListConfigs:
|
||||
printJSON(cfg.Value())
|
||||
|
|
|
@ -47,7 +47,7 @@ func GetFavIcon(w http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
|
||||
// try with route.Icon
|
||||
r, ok := routes.GetHTTPRoute(alias)
|
||||
r, ok := routes.HTTP.Get(alias)
|
||||
if !ok {
|
||||
gphttp.ClientError(w, errors.New("no such route"), http.StatusNotFound)
|
||||
return
|
||||
|
|
|
@ -5,9 +5,9 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/gpwebsocket"
|
||||
"github.com/yusing/go-proxy/internal/route/routes/routequery"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
)
|
||||
|
||||
func Health(w http.ResponseWriter, r *http.Request) {
|
||||
gpwebsocket.DynamicJSONHandler(w, r, routequery.HealthMap, 1*time.Second)
|
||||
gpwebsocket.DynamicJSONHandler(w, r, routes.HealthMap, 1*time.Second)
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/yusing/go-proxy/internal/homepage"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/middleware"
|
||||
"github.com/yusing/go-proxy/internal/route/routes/routequery"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
route "github.com/yusing/go-proxy/internal/route/types"
|
||||
"github.com/yusing/go-proxy/internal/utils"
|
||||
)
|
||||
|
@ -45,7 +45,7 @@ func List(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Request) {
|
|||
gphttp.RespondJSON(w, r, route)
|
||||
}
|
||||
case ListRoutes:
|
||||
gphttp.RespondJSON(w, r, routequery.RoutesByAlias(route.RouteType(r.FormValue("type"))))
|
||||
gphttp.RespondJSON(w, r, routes.ByAlias(route.RouteType(r.FormValue("type"))))
|
||||
case ListFiles:
|
||||
listFiles(w, r)
|
||||
case ListMiddlewares:
|
||||
|
@ -55,11 +55,11 @@ func List(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Request) {
|
|||
case ListMatchDomains:
|
||||
gphttp.RespondJSON(w, r, cfg.Value().MatchDomains)
|
||||
case ListHomepageConfig:
|
||||
gphttp.RespondJSON(w, r, routequery.HomepageConfig(r.FormValue("category"), r.FormValue("provider")))
|
||||
gphttp.RespondJSON(w, r, routes.HomepageConfig(r.FormValue("category"), r.FormValue("provider")))
|
||||
case ListRouteProviders:
|
||||
gphttp.RespondJSON(w, r, cfg.RouteProviderList())
|
||||
case ListHomepageCategories:
|
||||
gphttp.RespondJSON(w, r, routequery.HomepageCategories())
|
||||
gphttp.RespondJSON(w, r, routes.HomepageCategories())
|
||||
case ListIcons:
|
||||
limit, err := strconv.Atoi(r.FormValue("limit"))
|
||||
if err != nil {
|
||||
|
@ -83,9 +83,9 @@ func List(cfg config.ConfigInstance, w http.ResponseWriter, r *http.Request) {
|
|||
// otherwise, return a single Route with alias which or nil if not found.
|
||||
func listRoute(which string) any {
|
||||
if which == "" || which == "all" {
|
||||
return routequery.RoutesByAlias()
|
||||
return routes.ByAlias()
|
||||
}
|
||||
routes := routequery.RoutesByAlias()
|
||||
routes := routes.ByAlias()
|
||||
route, ok := routes[which]
|
||||
if !ok {
|
||||
return nil
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/yusing/go-proxy/internal/net/gphttp/middleware"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/middleware/errorpage"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
route "github.com/yusing/go-proxy/internal/route/types"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
"github.com/yusing/go-proxy/internal/utils/strutils"
|
||||
)
|
||||
|
@ -20,7 +19,7 @@ import (
|
|||
type Entrypoint struct {
|
||||
middleware *middleware.Middleware
|
||||
accessLogger *accesslog.AccessLogger
|
||||
findRouteFunc func(host string) (route.HTTPRoute, error)
|
||||
findRouteFunc func(host string) (routes.HTTPRoute, error)
|
||||
}
|
||||
|
||||
var ErrNoSuchRoute = errors.New("no such route")
|
||||
|
@ -108,7 +107,7 @@ func (ep *Entrypoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func findRouteAnyDomain(host string) (route.HTTPRoute, error) {
|
||||
func findRouteAnyDomain(host string) (routes.HTTPRoute, error) {
|
||||
hostSplit := strutils.SplitRune(host, '.')
|
||||
target := hostSplit[0]
|
||||
|
||||
|
@ -118,19 +117,19 @@ func findRouteAnyDomain(host string) (route.HTTPRoute, error) {
|
|||
return nil, fmt.Errorf("%w: %s", ErrNoSuchRoute, target)
|
||||
}
|
||||
|
||||
func findRouteByDomains(domains []string) func(host string) (route.HTTPRoute, error) {
|
||||
return func(host string) (route.HTTPRoute, error) {
|
||||
func findRouteByDomains(domains []string) func(host string) (routes.HTTPRoute, error) {
|
||||
return func(host string) (routes.HTTPRoute, error) {
|
||||
for _, domain := range domains {
|
||||
if strings.HasSuffix(host, domain) {
|
||||
target := strings.TrimSuffix(host, domain)
|
||||
if r, ok := routes.GetHTTPRoute(target); ok {
|
||||
if r, ok := routes.HTTP.Get(target); ok {
|
||||
return r, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fallback to exact match
|
||||
if r, ok := routes.GetHTTPRoute(host); ok {
|
||||
if r, ok := routes.HTTP.Get(host); ok {
|
||||
return r, nil
|
||||
}
|
||||
return nil, fmt.Errorf("%w: %s", ErrNoSuchRoute, host)
|
||||
|
|
|
@ -8,21 +8,29 @@ import (
|
|||
. "github.com/yusing/go-proxy/internal/utils/testing"
|
||||
)
|
||||
|
||||
var (
|
||||
r route.ReveseProxyRoute
|
||||
ep = NewEntrypoint()
|
||||
)
|
||||
var ep = NewEntrypoint()
|
||||
|
||||
func addRoute(alias string) *route.ReveseProxyRoute {
|
||||
r := &route.ReveseProxyRoute{
|
||||
Route: &route.Route{
|
||||
Alias: alias,
|
||||
},
|
||||
}
|
||||
routes.HTTP.Add(r)
|
||||
return r
|
||||
}
|
||||
|
||||
func run(t *testing.T, match []string, noMatch []string) {
|
||||
t.Helper()
|
||||
t.Cleanup(routes.TestClear)
|
||||
t.Cleanup(routes.Clear)
|
||||
t.Cleanup(func() { ep.SetFindRouteDomains(nil) })
|
||||
|
||||
for _, test := range match {
|
||||
t.Run(test, func(t *testing.T) {
|
||||
r := addRoute(test)
|
||||
found, err := ep.findRouteFunc(test)
|
||||
ExpectNoError(t, err)
|
||||
ExpectTrue(t, found == &r)
|
||||
ExpectTrue(t, found == r)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -35,7 +43,7 @@ func run(t *testing.T, match []string, noMatch []string) {
|
|||
}
|
||||
|
||||
func TestFindRouteAnyDomain(t *testing.T) {
|
||||
routes.SetHTTPRoute("app1", &r)
|
||||
addRoute("app1")
|
||||
|
||||
tests := []string{
|
||||
"app1.com",
|
||||
|
@ -66,7 +74,7 @@ func TestFindRouteExactHostMatch(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, test := range tests {
|
||||
routes.SetHTTPRoute(test, &r)
|
||||
addRoute(test)
|
||||
}
|
||||
|
||||
run(t, tests, testsNoMatch)
|
||||
|
@ -78,7 +86,7 @@ func TestFindRouteByDomains(t *testing.T) {
|
|||
".sub.domain.com",
|
||||
})
|
||||
|
||||
routes.SetHTTPRoute("app1", &r)
|
||||
addRoute("app1")
|
||||
|
||||
tests := []string{
|
||||
"app1.domain.com",
|
||||
|
@ -103,7 +111,7 @@ func TestFindRouteByDomainsExactMatch(t *testing.T) {
|
|||
".sub.domain.com",
|
||||
})
|
||||
|
||||
routes.SetHTTPRoute("app1.foo.bar", &r)
|
||||
addRoute("app1")
|
||||
|
||||
tests := []string{
|
||||
"app1.foo.bar", // exact match
|
||||
|
|
|
@ -123,8 +123,7 @@ func fetchIcon(ctx context.Context, filetype, filename string) *FetchResult {
|
|||
}
|
||||
|
||||
func FindIcon(ctx context.Context, r route, uri string) *FetchResult {
|
||||
key := routeKey(r)
|
||||
if result := loadIconCache(key); result != nil {
|
||||
if result := loadIconCache(r.Key()); result != nil {
|
||||
return result
|
||||
}
|
||||
|
||||
|
@ -136,7 +135,7 @@ func FindIcon(ctx context.Context, r route, uri string) *FetchResult {
|
|||
}
|
||||
}
|
||||
if result.OK() {
|
||||
storeIconCache(key, result.Icon)
|
||||
storeIconCache(r.Key(), result)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
|
@ -101,14 +101,10 @@ func pruneExpiredIconCache() {
|
|||
}
|
||||
}
|
||||
|
||||
func routeKey(r route) string {
|
||||
return r.ProviderName() + ":" + r.TargetName()
|
||||
}
|
||||
|
||||
func PruneRouteIconCache(route route) {
|
||||
iconCacheMu.Lock()
|
||||
defer iconCacheMu.Unlock()
|
||||
delete(iconCache, routeKey(route))
|
||||
delete(iconCache, route.Key())
|
||||
}
|
||||
|
||||
func loadIconCache(key string) *FetchResult {
|
||||
|
@ -150,8 +146,8 @@ func (e *cacheEntry) UnmarshalJSON(data []byte) error {
|
|||
err := json.Unmarshal(data, &e)
|
||||
// return only if unmarshal is successful
|
||||
// otherwise fallback to base64
|
||||
if err == nil {
|
||||
return nil
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// fallback to base64
|
||||
|
|
|
@ -3,10 +3,12 @@ package homepage
|
|||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/yusing/go-proxy/internal/utils/pool"
|
||||
)
|
||||
|
||||
type route interface {
|
||||
TargetName() string
|
||||
pool.Object
|
||||
ProviderName() string
|
||||
Reference() string
|
||||
TargetURL() *url.URL
|
||||
|
|
|
@ -13,7 +13,7 @@ import (
|
|||
"github.com/yusing/go-proxy/internal/logging"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/reverseproxy"
|
||||
net "github.com/yusing/go-proxy/internal/net/types"
|
||||
route "github.com/yusing/go-proxy/internal/route/types"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
U "github.com/yusing/go-proxy/internal/utils"
|
||||
"github.com/yusing/go-proxy/internal/utils/atomic"
|
||||
|
@ -80,7 +80,7 @@ var (
|
|||
const reqTimeout = 3 * time.Second
|
||||
|
||||
// TODO: fix stream type
|
||||
func NewWatcher(parent task.Parent, r route.Route) (*Watcher, error) {
|
||||
func NewWatcher(parent task.Parent, r routes.Route) (*Watcher, error) {
|
||||
cfg := r.IdlewatcherConfig()
|
||||
key := cfg.Key()
|
||||
|
||||
|
@ -126,9 +126,9 @@ func NewWatcher(parent task.Parent, r route.Route) (*Watcher, error) {
|
|||
Logger()
|
||||
|
||||
switch r := r.(type) {
|
||||
case route.ReverseProxyRoute:
|
||||
case routes.ReverseProxyRoute:
|
||||
w.rp = r.ReverseProxy()
|
||||
case route.StreamRoute:
|
||||
case routes.StreamRoute:
|
||||
w.stream = r
|
||||
default:
|
||||
return nil, gperr.New("unexpected route type")
|
||||
|
@ -153,7 +153,7 @@ func NewWatcher(parent task.Parent, r route.Route) (*Watcher, error) {
|
|||
|
||||
w.state.Store(&containerState{status: status})
|
||||
|
||||
w.task = parent.Subtask("idlewatcher."+r.TargetName(), true)
|
||||
w.task = parent.Subtask("idlewatcher."+r.Name(), true)
|
||||
|
||||
watcherMapMu.Lock()
|
||||
defer watcherMapMu.Unlock()
|
||||
|
|
|
@ -10,15 +10,14 @@ import (
|
|||
"github.com/yusing/go-proxy/internal/metrics/period"
|
||||
metricsutils "github.com/yusing/go-proxy/internal/metrics/utils"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
"github.com/yusing/go-proxy/internal/route/routes/routequery"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health"
|
||||
"github.com/yusing/go-proxy/pkg/json"
|
||||
)
|
||||
|
||||
type (
|
||||
StatusByAlias struct {
|
||||
Map json.Map[*routequery.HealthInfoRaw] `json:"statuses"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Map json.Map[*routes.HealthInfoRaw] `json:"statuses"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
}
|
||||
Aggregated = json.MapSlice[any]
|
||||
)
|
||||
|
@ -27,7 +26,7 @@ var Poller = period.NewPoller("uptime", getStatuses, aggregateStatuses)
|
|||
|
||||
func getStatuses(ctx context.Context, _ *StatusByAlias) (*StatusByAlias, error) {
|
||||
return &StatusByAlias{
|
||||
Map: routequery.HealthInfo(),
|
||||
Map: routes.HealthInfo(),
|
||||
Timestamp: time.Now().Unix(),
|
||||
}, nil
|
||||
}
|
||||
|
@ -111,7 +110,7 @@ func (rs RouteStatuses) aggregate(limit int, offset int) Aggregated {
|
|||
"avg_latency": latency,
|
||||
"statuses": statuses,
|
||||
}
|
||||
r, ok := routes.GetRoute(alias)
|
||||
r, ok := routes.HTTP.Get(alias)
|
||||
if ok {
|
||||
result[i]["display_name"] = r.HomepageConfig().Name
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package loadbalancer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -10,8 +11,8 @@ import (
|
|||
"github.com/yusing/go-proxy/internal/logging"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/httpheaders"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/loadbalancer/types"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
"github.com/yusing/go-proxy/internal/utils/pool"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health"
|
||||
)
|
||||
|
||||
|
@ -30,7 +31,7 @@ type (
|
|||
|
||||
task *task.Task
|
||||
|
||||
pool Pool
|
||||
pool pool.Pool[Server]
|
||||
poolMu sync.Mutex
|
||||
|
||||
sumWeight Weight
|
||||
|
@ -45,7 +46,7 @@ const maxWeight Weight = 100
|
|||
func New(cfg *Config) *LoadBalancer {
|
||||
lb := &LoadBalancer{
|
||||
Config: new(Config),
|
||||
pool: types.NewServerPool(),
|
||||
pool: pool.New[Server]("loadbalancer." + cfg.Link),
|
||||
l: logging.With().Str("name", cfg.Link).Logger(),
|
||||
}
|
||||
lb.UpdateConfigIfNeeded(cfg)
|
||||
|
@ -55,16 +56,14 @@ func New(cfg *Config) *LoadBalancer {
|
|||
// Start implements task.TaskStarter.
|
||||
func (lb *LoadBalancer) Start(parent task.Parent) gperr.Error {
|
||||
lb.startTime = time.Now()
|
||||
lb.task = parent.Subtask("loadbalancer."+lb.Link, false)
|
||||
parent.OnCancel("lb_remove_route", func() {
|
||||
routes.DeleteHTTPRoute(lb.Link)
|
||||
})
|
||||
lb.task.OnFinished("cleanup", func() {
|
||||
lb.task = parent.Subtask("loadbalancer."+lb.Link, true)
|
||||
lb.task.OnCancel("cleanup", func() {
|
||||
if lb.impl != nil {
|
||||
lb.pool.RangeAll(func(k string, v Server) {
|
||||
lb.impl.OnRemoveServer(v)
|
||||
})
|
||||
for _, srv := range lb.pool.Iter {
|
||||
lb.impl.OnRemoveServer(srv)
|
||||
}
|
||||
}
|
||||
lb.task.Finish(nil)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
@ -90,9 +89,9 @@ func (lb *LoadBalancer) updateImpl() {
|
|||
default: // should happen in test only
|
||||
lb.impl = lb.newRoundRobin()
|
||||
}
|
||||
lb.pool.RangeAll(func(_ string, srv Server) {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
lb.impl.OnAddServer(srv)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) UpdateConfigIfNeeded(cfg *Config) {
|
||||
|
@ -124,12 +123,12 @@ func (lb *LoadBalancer) AddServer(srv Server) {
|
|||
lb.poolMu.Lock()
|
||||
defer lb.poolMu.Unlock()
|
||||
|
||||
if lb.pool.Has(srv.Key()) { // FIXME: this should be a warning
|
||||
old, _ := lb.pool.Load(srv.Key())
|
||||
if old, ok := lb.pool.Get(srv.Key()); ok { // FIXME: this should be a warning
|
||||
lb.sumWeight -= old.Weight()
|
||||
lb.impl.OnRemoveServer(old)
|
||||
lb.pool.Del(old)
|
||||
}
|
||||
lb.pool.Store(srv.Key(), srv)
|
||||
lb.pool.Add(srv)
|
||||
lb.sumWeight += srv.Weight()
|
||||
|
||||
lb.rebalance()
|
||||
|
@ -145,11 +144,11 @@ func (lb *LoadBalancer) RemoveServer(srv Server) {
|
|||
lb.poolMu.Lock()
|
||||
defer lb.poolMu.Unlock()
|
||||
|
||||
if !lb.pool.Has(srv.Key()) {
|
||||
if _, ok := lb.pool.Get(srv.Key()); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
lb.pool.Delete(srv.Key())
|
||||
lb.pool.Del(srv)
|
||||
|
||||
lb.sumWeight -= srv.Weight()
|
||||
lb.rebalance()
|
||||
|
@ -178,15 +177,15 @@ func (lb *LoadBalancer) rebalance() {
|
|||
if lb.sumWeight == 0 { // distribute evenly
|
||||
weightEach := maxWeight / Weight(poolSize)
|
||||
remainder := maxWeight % Weight(poolSize)
|
||||
lb.pool.RangeAll(func(_ string, s Server) {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
w := weightEach
|
||||
lb.sumWeight += weightEach
|
||||
if remainder > 0 {
|
||||
w++
|
||||
remainder--
|
||||
}
|
||||
s.SetWeight(w)
|
||||
})
|
||||
srv.SetWeight(w)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -194,30 +193,29 @@ func (lb *LoadBalancer) rebalance() {
|
|||
scaleFactor := float64(maxWeight) / float64(lb.sumWeight)
|
||||
lb.sumWeight = 0
|
||||
|
||||
lb.pool.RangeAll(func(_ string, s Server) {
|
||||
s.SetWeight(Weight(float64(s.Weight()) * scaleFactor))
|
||||
lb.sumWeight += s.Weight()
|
||||
})
|
||||
for _, srv := range lb.pool.Iter {
|
||||
srv.SetWeight(Weight(float64(srv.Weight()) * scaleFactor))
|
||||
lb.sumWeight += srv.Weight()
|
||||
}
|
||||
|
||||
delta := maxWeight - lb.sumWeight
|
||||
if delta == 0 {
|
||||
return
|
||||
}
|
||||
lb.pool.Range(func(_ string, s Server) bool {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
if delta == 0 {
|
||||
return false
|
||||
break
|
||||
}
|
||||
if delta > 0 {
|
||||
s.SetWeight(s.Weight() + 1)
|
||||
srv.SetWeight(srv.Weight() + 1)
|
||||
lb.sumWeight++
|
||||
delta--
|
||||
} else {
|
||||
s.SetWeight(s.Weight() - 1)
|
||||
srv.SetWeight(srv.Weight() - 1)
|
||||
lb.sumWeight--
|
||||
delta++
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
||||
|
@ -242,13 +240,16 @@ func (lb *LoadBalancer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
|||
// MarshalMap implements health.HealthMonitor.
|
||||
func (lb *LoadBalancer) MarshalMap() map[string]any {
|
||||
extra := make(map[string]any)
|
||||
lb.pool.RangeAll(func(k string, v Server) {
|
||||
extra[v.Key()] = v
|
||||
})
|
||||
for _, srv := range lb.pool.Iter {
|
||||
extra[srv.Key()] = srv
|
||||
}
|
||||
|
||||
status, numHealthy := lb.status()
|
||||
|
||||
return (&health.JSONRepresentation{
|
||||
Name: lb.Name(),
|
||||
Status: lb.Status(),
|
||||
Status: status,
|
||||
Detail: fmt.Sprintf("%d/%d servers are healthy", numHealthy, lb.pool.Size()),
|
||||
Started: lb.startTime,
|
||||
Uptime: lb.Uptime(),
|
||||
Extra: map[string]any{
|
||||
|
@ -265,22 +266,26 @@ func (lb *LoadBalancer) Name() string {
|
|||
|
||||
// Status implements health.HealthMonitor.
|
||||
func (lb *LoadBalancer) Status() health.Status {
|
||||
status, _ := lb.status()
|
||||
return status
|
||||
}
|
||||
|
||||
func (lb *LoadBalancer) status() (status health.Status, numHealthy int) {
|
||||
if lb.pool.Size() == 0 {
|
||||
return health.StatusUnknown
|
||||
return health.StatusUnknown, 0
|
||||
}
|
||||
|
||||
isHealthy := true
|
||||
lb.pool.Range(func(_ string, srv Server) bool {
|
||||
if srv.Status().Bad() {
|
||||
isHealthy = false
|
||||
return false
|
||||
// should be healthy if at least one server is healthy
|
||||
numHealthy = 0
|
||||
for _, srv := range lb.pool.Iter {
|
||||
if srv.Status().Good() {
|
||||
numHealthy++
|
||||
}
|
||||
return true
|
||||
})
|
||||
if !isHealthy {
|
||||
return health.StatusUnhealthy
|
||||
}
|
||||
return health.StatusHealthy
|
||||
if numHealthy == 0 {
|
||||
return health.StatusUnhealthy, numHealthy
|
||||
}
|
||||
return health.StatusHealthy, numHealthy
|
||||
}
|
||||
|
||||
// Uptime implements health.HealthMonitor.
|
||||
|
@ -291,9 +296,9 @@ func (lb *LoadBalancer) Uptime() time.Duration {
|
|||
// Latency implements health.HealthMonitor.
|
||||
func (lb *LoadBalancer) Latency() time.Duration {
|
||||
var sum time.Duration
|
||||
lb.pool.RangeAll(func(_ string, srv Server) {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
sum += srv.Latency()
|
||||
})
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
|
@ -304,10 +309,10 @@ func (lb *LoadBalancer) String() string {
|
|||
|
||||
func (lb *LoadBalancer) availServers() []Server {
|
||||
avail := make([]Server, 0, lb.pool.Size())
|
||||
lb.pool.RangeAll(func(_ string, srv Server) {
|
||||
for _, srv := range lb.pool.Iter {
|
||||
if srv.Status().Good() {
|
||||
avail = append(avail, srv)
|
||||
}
|
||||
})
|
||||
}
|
||||
return avail
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
type (
|
||||
Server = types.Server
|
||||
Servers = []types.Server
|
||||
Pool = types.Pool
|
||||
Weight = types.Weight
|
||||
Config = types.Config
|
||||
Mode = types.Mode
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
|
||||
idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types"
|
||||
U "github.com/yusing/go-proxy/internal/utils"
|
||||
F "github.com/yusing/go-proxy/internal/utils/functional"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health"
|
||||
)
|
||||
|
||||
|
@ -32,12 +31,8 @@ type (
|
|||
SetWeight(weight Weight)
|
||||
TryWake() error
|
||||
}
|
||||
|
||||
Pool = F.Map[string, Server]
|
||||
)
|
||||
|
||||
var NewServerPool = F.NewMap[Pool]
|
||||
|
||||
func NewServer(name string, url *url.URL, weight Weight, handler http.Handler, healthMon health.HealthMonitor) Server {
|
||||
srv := &server{
|
||||
name: name,
|
||||
|
|
|
@ -1,13 +1,11 @@
|
|||
package gpnet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
type (
|
||||
Stream interface {
|
||||
fmt.Stringer
|
||||
StreamListener
|
||||
Setup() error
|
||||
Handle(conn StreamConn) error
|
||||
|
|
|
@ -57,7 +57,7 @@ func NewFileServer(base *Route) (*FileServer, gperr.Error) {
|
|||
|
||||
// Start implements task.TaskStarter.
|
||||
func (s *FileServer) Start(parent task.Parent) gperr.Error {
|
||||
s.task = parent.Subtask("fileserver."+s.TargetName(), false)
|
||||
s.task = parent.Subtask("fileserver."+s.Name(), false)
|
||||
|
||||
pathPatterns := s.PathPatterns
|
||||
switch {
|
||||
|
@ -92,7 +92,7 @@ func (s *FileServer) Start(parent task.Parent) gperr.Error {
|
|||
}
|
||||
|
||||
if common.PrometheusEnabled {
|
||||
metricsLogger := metricslogger.NewMetricsLogger(s.TargetName())
|
||||
metricsLogger := metricslogger.NewMetricsLogger(s.Name())
|
||||
s.handler = metricsLogger.GetHandler(s.handler)
|
||||
s.task.OnCancel("reset_metrics", metricsLogger.ResetMetrics)
|
||||
}
|
||||
|
@ -104,9 +104,9 @@ func (s *FileServer) Start(parent task.Parent) gperr.Error {
|
|||
}
|
||||
}
|
||||
|
||||
routes.SetHTTPRoute(s.TargetName(), s)
|
||||
routes.HTTP.Add(s)
|
||||
s.task.OnCancel("entrypoint_remove_route", func() {
|
||||
routes.DeleteHTTPRoute(s.TargetName())
|
||||
routes.HTTP.Del(s)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, gperr.Error) {
|
|||
}
|
||||
}
|
||||
|
||||
service := base.TargetName()
|
||||
service := base.Name()
|
||||
rp := reverseproxy.NewReverseProxy(service, proxyURL, trans)
|
||||
|
||||
if len(base.Middlewares) > 0 {
|
||||
|
@ -90,16 +90,12 @@ func NewReverseProxyRoute(base *Route) (*ReveseProxyRoute, gperr.Error) {
|
|||
return r, nil
|
||||
}
|
||||
|
||||
func (r *ReveseProxyRoute) String() string {
|
||||
return r.TargetName()
|
||||
}
|
||||
|
||||
// Start implements task.TaskStarter.
|
||||
func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
|
||||
if existing, ok := routes.GetHTTPRoute(r.TargetName()); ok && !r.UseLoadBalance() {
|
||||
if existing, ok := routes.HTTP.Get(r.Key()); ok && !r.UseLoadBalance() {
|
||||
return gperr.Errorf("route already exists: from provider %s and %s", existing.ProviderName(), r.ProviderName())
|
||||
}
|
||||
r.task = parent.Subtask("http."+r.TargetName(), false)
|
||||
r.task = parent.Subtask("http."+r.Name(), false)
|
||||
|
||||
switch {
|
||||
case r.UseIdleWatcher():
|
||||
|
@ -132,7 +128,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
|
|||
r.handler = r.rp
|
||||
default:
|
||||
logging.Warn().
|
||||
Str("route", r.TargetName()).
|
||||
Str("route", r.Name()).
|
||||
Msg("`path_patterns` for reverse proxy is deprecated. Use `rules` instead.")
|
||||
mux := gphttp.NewServeMux()
|
||||
patErrs := gperr.NewBuilder("invalid path pattern(s)")
|
||||
|
@ -148,7 +144,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
|
|||
}
|
||||
|
||||
if len(r.Rules) > 0 {
|
||||
r.handler = r.Rules.BuildHandler(r.TargetName(), r.handler)
|
||||
r.handler = r.Rules.BuildHandler(r.Name(), r.handler)
|
||||
}
|
||||
|
||||
if r.HealthMon != nil {
|
||||
|
@ -158,7 +154,7 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
|
|||
}
|
||||
|
||||
if common.PrometheusEnabled {
|
||||
metricsLogger := metricslogger.NewMetricsLogger(r.TargetName())
|
||||
metricsLogger := metricslogger.NewMetricsLogger(r.Name())
|
||||
r.handler = metricsLogger.GetHandler(r.handler)
|
||||
r.task.OnCancel("reset_metrics", metricsLogger.ResetMetrics)
|
||||
}
|
||||
|
@ -166,9 +162,9 @@ func (r *ReveseProxyRoute) Start(parent task.Parent) gperr.Error {
|
|||
if r.UseLoadBalance() {
|
||||
r.addToLoadBalancer(parent)
|
||||
} else {
|
||||
routes.SetHTTPRoute(r.TargetName(), r)
|
||||
routes.HTTP.Add(r)
|
||||
r.task.OnFinished("entrypoint_remove_route", func() {
|
||||
routes.DeleteHTTPRoute(r.TargetName())
|
||||
routes.HTTP.Del(r)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -201,7 +197,7 @@ func (r *ReveseProxyRoute) HealthMonitor() health.HealthMonitor {
|
|||
func (r *ReveseProxyRoute) addToLoadBalancer(parent task.Parent) {
|
||||
var lb *loadbalancer.LoadBalancer
|
||||
cfg := r.LoadBalance
|
||||
l, ok := routes.GetHTTPRoute(cfg.Link)
|
||||
l, ok := routes.HTTP.Get(cfg.Link)
|
||||
var linked *ReveseProxyRoute
|
||||
if ok {
|
||||
linked = l.(*ReveseProxyRoute)
|
||||
|
@ -222,7 +218,10 @@ func (r *ReveseProxyRoute) addToLoadBalancer(parent task.Parent) {
|
|||
loadBalancer: lb,
|
||||
handler: lb,
|
||||
}
|
||||
routes.SetHTTPRoute(cfg.Link, linked)
|
||||
routes.HTTP.Add(linked)
|
||||
r.task.OnFinished("entrypoint_remove_route", func() {
|
||||
routes.HTTP.Del(linked)
|
||||
})
|
||||
}
|
||||
r.loadBalancer = lb
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
config "github.com/yusing/go-proxy/internal/config/types"
|
||||
"github.com/yusing/go-proxy/internal/net/gphttp/accesslog"
|
||||
loadbalance "github.com/yusing/go-proxy/internal/net/gphttp/loadbalancer/types"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
"github.com/yusing/go-proxy/internal/route/rules"
|
||||
route "github.com/yusing/go-proxy/internal/route/types"
|
||||
"github.com/yusing/go-proxy/internal/utils"
|
||||
|
@ -62,7 +63,7 @@ type (
|
|||
LisURL *url.URL `json:"lurl,omitempty"`
|
||||
ProxyURL *url.URL `json:"purl,omitempty"`
|
||||
|
||||
impl route.Route
|
||||
impl routes.Route
|
||||
}
|
||||
Routes map[string]*Route
|
||||
)
|
||||
|
@ -77,17 +78,6 @@ func (r Routes) Contains(alias string) bool {
|
|||
func (r *Route) Validate() (err gperr.Error) {
|
||||
r.Finalize()
|
||||
|
||||
// return error if route is localhost:<godoxy_port>
|
||||
switch r.Host {
|
||||
case "localhost", "127.0.0.1":
|
||||
switch r.Port.Proxy {
|
||||
case common.ProxyHTTPPort, common.ProxyHTTPSPort, common.APIHTTPPort:
|
||||
if r.Scheme.IsReverseProxy() || r.Scheme == route.SchemeTCP {
|
||||
return gperr.Errorf("localhost:%d is reserved for godoxy", r.Port.Proxy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if r.Idlewatcher != nil && r.Idlewatcher.Proxmox != nil {
|
||||
node := r.Idlewatcher.Proxmox.Node
|
||||
vmid := r.Idlewatcher.Proxmox.VMID
|
||||
|
@ -152,6 +142,17 @@ func (r *Route) Validate() (err gperr.Error) {
|
|||
}
|
||||
}
|
||||
|
||||
// return error if route is localhost:<godoxy_port>
|
||||
switch r.Host {
|
||||
case "localhost", "127.0.0.1":
|
||||
switch r.Port.Proxy {
|
||||
case common.ProxyHTTPPort, common.ProxyHTTPSPort, common.APIHTTPPort:
|
||||
if r.Scheme.IsReverseProxy() || r.Scheme == route.SchemeTCP {
|
||||
return gperr.Errorf("localhost:%d is reserved for godoxy", r.Port.Proxy)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
errs := gperr.NewBuilder("entry validation failed")
|
||||
|
||||
if r.Scheme == route.SchemeFileServer {
|
||||
|
@ -227,7 +228,17 @@ func (r *Route) ProviderName() string {
|
|||
return r.Provider
|
||||
}
|
||||
|
||||
func (r *Route) TargetName() string {
|
||||
// Name implements pool.Object.
|
||||
func (r *Route) Name() string {
|
||||
return r.Alias
|
||||
}
|
||||
|
||||
// Key implements pool.Object.
|
||||
func (r *Route) Key() string {
|
||||
return r.Alias
|
||||
}
|
||||
|
||||
func (r *Route) String() string {
|
||||
return r.Alias
|
||||
}
|
||||
|
||||
|
|
|
@ -1,15 +1,14 @@
|
|||
package routequery
|
||||
package routes
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/yusing/go-proxy/internal/homepage"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
route "github.com/yusing/go-proxy/internal/route/types"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health"
|
||||
)
|
||||
|
||||
func getHealthInfo(r route.Route) map[string]string {
|
||||
func getHealthInfo(r Route) map[string]string {
|
||||
mon := r.HealthMonitor()
|
||||
if mon == nil {
|
||||
return map[string]string{
|
||||
|
@ -30,7 +29,7 @@ type HealthInfoRaw struct {
|
|||
Latency time.Duration `json:"latency"`
|
||||
}
|
||||
|
||||
func getHealthInfoRaw(r route.Route) *HealthInfoRaw {
|
||||
func getHealthInfoRaw(r Route) *HealthInfoRaw {
|
||||
mon := r.HealthMonitor()
|
||||
if mon == nil {
|
||||
return &HealthInfoRaw{
|
||||
|
@ -45,69 +44,69 @@ func getHealthInfoRaw(r route.Route) *HealthInfoRaw {
|
|||
}
|
||||
|
||||
func HealthMap() map[string]map[string]string {
|
||||
healthMap := make(map[string]map[string]string, routes.NumRoutes())
|
||||
routes.RangeRoutes(func(alias string, r route.Route) {
|
||||
healthMap := make(map[string]map[string]string, NumRoutes())
|
||||
for alias, r := range Iter {
|
||||
healthMap[alias] = getHealthInfo(r)
|
||||
})
|
||||
}
|
||||
return healthMap
|
||||
}
|
||||
|
||||
func HealthInfo() map[string]*HealthInfoRaw {
|
||||
healthMap := make(map[string]*HealthInfoRaw, routes.NumRoutes())
|
||||
routes.RangeRoutes(func(alias string, r route.Route) {
|
||||
healthMap := make(map[string]*HealthInfoRaw, NumRoutes())
|
||||
for alias, r := range Iter {
|
||||
healthMap[alias] = getHealthInfoRaw(r)
|
||||
})
|
||||
}
|
||||
return healthMap
|
||||
}
|
||||
|
||||
func HomepageCategories() []string {
|
||||
check := make(map[string]struct{})
|
||||
categories := make([]string, 0)
|
||||
routes.GetHTTPRoutes().RangeAll(func(alias string, r route.HTTPRoute) {
|
||||
for _, r := range HTTP.Iter {
|
||||
item := r.HomepageConfig()
|
||||
if item == nil || item.Category == "" {
|
||||
return
|
||||
continue
|
||||
}
|
||||
if _, ok := check[item.Category]; ok {
|
||||
return
|
||||
continue
|
||||
}
|
||||
check[item.Category] = struct{}{}
|
||||
categories = append(categories, item.Category)
|
||||
})
|
||||
}
|
||||
return categories
|
||||
}
|
||||
|
||||
func HomepageConfig(categoryFilter, providerFilter string) homepage.Homepage {
|
||||
hp := make(homepage.Homepage)
|
||||
|
||||
routes.GetHTTPRoutes().RangeAll(func(alias string, r route.HTTPRoute) {
|
||||
for _, r := range HTTP.Iter {
|
||||
if providerFilter != "" && r.ProviderName() != providerFilter {
|
||||
return
|
||||
continue
|
||||
}
|
||||
item := r.HomepageItem()
|
||||
if categoryFilter != "" && item.Category != categoryFilter {
|
||||
return
|
||||
continue
|
||||
}
|
||||
hp.Add(item)
|
||||
})
|
||||
}
|
||||
return hp
|
||||
}
|
||||
|
||||
func RoutesByAlias(typeFilter ...route.RouteType) map[string]route.Route {
|
||||
rts := make(map[string]route.Route)
|
||||
func ByAlias(typeFilter ...route.RouteType) map[string]Route {
|
||||
rts := make(map[string]Route)
|
||||
if len(typeFilter) == 0 || typeFilter[0] == "" {
|
||||
typeFilter = []route.RouteType{route.RouteTypeHTTP, route.RouteTypeStream}
|
||||
}
|
||||
for _, t := range typeFilter {
|
||||
switch t {
|
||||
case route.RouteTypeHTTP:
|
||||
routes.GetHTTPRoutes().RangeAll(func(alias string, r route.HTTPRoute) {
|
||||
for alias, r := range HTTP.Iter {
|
||||
rts[alias] = r
|
||||
})
|
||||
}
|
||||
case route.RouteTypeStream:
|
||||
routes.GetStreamRoutes().RangeAll(func(alias string, r route.StreamRoute) {
|
||||
for alias, r := range Stream.Iter {
|
||||
rts[alias] = r
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return rts
|
|
@ -1,4 +1,4 @@
|
|||
package route
|
||||
package routes
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
@ -10,6 +10,7 @@ import (
|
|||
idlewatcher "github.com/yusing/go-proxy/internal/idlewatcher/types"
|
||||
net "github.com/yusing/go-proxy/internal/net/types"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
"github.com/yusing/go-proxy/internal/utils/pool"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health"
|
||||
|
||||
loadbalance "github.com/yusing/go-proxy/internal/net/gphttp/loadbalancer/types"
|
||||
|
@ -21,8 +22,8 @@ type (
|
|||
Route interface {
|
||||
task.TaskStarter
|
||||
task.TaskFinisher
|
||||
pool.Object
|
||||
ProviderName() string
|
||||
TargetName() string
|
||||
TargetURL() *url.URL
|
||||
HealthMonitor() health.HealthMonitor
|
||||
Reference() string
|
|
@ -1,78 +1,49 @@
|
|||
package routes
|
||||
|
||||
import (
|
||||
route "github.com/yusing/go-proxy/internal/route/types"
|
||||
F "github.com/yusing/go-proxy/internal/utils/functional"
|
||||
"github.com/yusing/go-proxy/internal/utils/pool"
|
||||
)
|
||||
|
||||
var (
|
||||
httpRoutes = F.NewMapOf[string, route.HTTPRoute]()
|
||||
streamRoutes = F.NewMapOf[string, route.StreamRoute]()
|
||||
HTTP = pool.New[HTTPRoute]("http_routes")
|
||||
Stream = pool.New[StreamRoute]("stream_routes")
|
||||
)
|
||||
|
||||
func RangeRoutes(callback func(alias string, r route.Route)) {
|
||||
httpRoutes.RangeAll(func(alias string, r route.HTTPRoute) {
|
||||
callback(alias, r)
|
||||
})
|
||||
streamRoutes.RangeAll(func(alias string, r route.StreamRoute) {
|
||||
callback(alias, r)
|
||||
})
|
||||
func Iter(yield func(alias string, r Route) bool) {
|
||||
for k, r := range HTTP.Iter {
|
||||
if !yield(k, r) {
|
||||
break
|
||||
}
|
||||
}
|
||||
for k, r := range Stream.Iter {
|
||||
if !yield(k, r) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NumRoutes() int {
|
||||
return httpRoutes.Size() + streamRoutes.Size()
|
||||
return HTTP.Size() + Stream.Size()
|
||||
}
|
||||
|
||||
func GetHTTPRoutes() F.Map[string, route.HTTPRoute] {
|
||||
return httpRoutes
|
||||
func Clear() {
|
||||
HTTP.Clear()
|
||||
Stream.Clear()
|
||||
}
|
||||
|
||||
func GetStreamRoutes() F.Map[string, route.StreamRoute] {
|
||||
return streamRoutes
|
||||
}
|
||||
|
||||
func GetHTTPRouteOrExact(alias, host string) (route.HTTPRoute, bool) {
|
||||
r, ok := httpRoutes.Load(alias)
|
||||
func GetHTTPRouteOrExact(alias, host string) (HTTPRoute, bool) {
|
||||
r, ok := HTTP.Get(alias)
|
||||
if ok {
|
||||
return r, true
|
||||
}
|
||||
// try find with exact match
|
||||
return httpRoutes.Load(host)
|
||||
return HTTP.Get(host)
|
||||
}
|
||||
|
||||
func GetHTTPRoute(alias string) (route.HTTPRoute, bool) {
|
||||
return httpRoutes.Load(alias)
|
||||
}
|
||||
|
||||
func GetStreamRoute(alias string) (route.StreamRoute, bool) {
|
||||
return streamRoutes.Load(alias)
|
||||
}
|
||||
|
||||
func GetRoute(alias string) (route.Route, bool) {
|
||||
r, ok := httpRoutes.Load(alias)
|
||||
func Get(alias string) (Route, bool) {
|
||||
r, ok := HTTP.Get(alias)
|
||||
if ok {
|
||||
return r, true
|
||||
}
|
||||
return streamRoutes.Load(alias)
|
||||
}
|
||||
|
||||
func SetHTTPRoute(alias string, r route.HTTPRoute) {
|
||||
httpRoutes.Store(alias, r)
|
||||
}
|
||||
|
||||
func SetStreamRoute(alias string, r route.StreamRoute) {
|
||||
streamRoutes.Store(alias, r)
|
||||
}
|
||||
|
||||
func DeleteHTTPRoute(alias string) {
|
||||
httpRoutes.Delete(alias)
|
||||
}
|
||||
|
||||
func DeleteStreamRoute(alias string) {
|
||||
streamRoutes.Delete(alias)
|
||||
}
|
||||
|
||||
func TestClear() {
|
||||
httpRoutes = F.NewMapOf[string, route.HTTPRoute]()
|
||||
streamRoutes = F.NewMapOf[string, route.StreamRoute]()
|
||||
return Stream.Get(alias)
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"github.com/yusing/go-proxy/internal/logging"
|
||||
net "github.com/yusing/go-proxy/internal/net/types"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
route "github.com/yusing/go-proxy/internal/route/types"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health"
|
||||
"github.com/yusing/go-proxy/internal/watcher/health/monitor"
|
||||
|
@ -29,31 +28,24 @@ type StreamRoute struct {
|
|||
l zerolog.Logger
|
||||
}
|
||||
|
||||
func NewStreamRoute(base *Route) (route.Route, gperr.Error) {
|
||||
func NewStreamRoute(base *Route) (routes.Route, gperr.Error) {
|
||||
// TODO: support non-coherent scheme
|
||||
return &StreamRoute{
|
||||
Route: base,
|
||||
l: logging.With().
|
||||
Str("type", string(base.Scheme)).
|
||||
Str("name", base.TargetName()).
|
||||
Str("name", base.Name()).
|
||||
Logger(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (r *StreamRoute) String() string {
|
||||
return "stream " + r.TargetName()
|
||||
}
|
||||
|
||||
// Start implements task.TaskStarter.
|
||||
func (r *StreamRoute) Start(parent task.Parent) gperr.Error {
|
||||
if existing, ok := routes.GetStreamRoute(r.TargetName()); ok {
|
||||
if existing, ok := routes.Stream.Get(r.Key()); ok {
|
||||
return gperr.Errorf("route already exists: from provider %s and %s", existing.ProviderName(), r.ProviderName())
|
||||
}
|
||||
r.task = parent.Subtask("stream." + r.TargetName())
|
||||
r.task = parent.Subtask("stream."+r.Name(), true)
|
||||
r.Stream = NewStream(r)
|
||||
parent.OnCancel("finish", func() {
|
||||
r.task.Finish(nil)
|
||||
})
|
||||
|
||||
switch {
|
||||
case r.UseIdleWatcher():
|
||||
|
@ -83,9 +75,9 @@ func (r *StreamRoute) Start(parent task.Parent) gperr.Error {
|
|||
|
||||
go r.acceptConnections()
|
||||
|
||||
routes.SetStreamRoute(r.TargetName(), r)
|
||||
routes.Stream.Add(r)
|
||||
r.task.OnFinished("entrypoint_remove_route", func() {
|
||||
routes.DeleteStreamRoute(r.TargetName())
|
||||
routes.Stream.Del(r)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/yusing/go-proxy/internal/gperr"
|
||||
"github.com/yusing/go-proxy/internal/logging"
|
||||
"github.com/yusing/go-proxy/internal/notif"
|
||||
route "github.com/yusing/go-proxy/internal/route/types"
|
||||
"github.com/yusing/go-proxy/internal/route/routes"
|
||||
"github.com/yusing/go-proxy/internal/task"
|
||||
"github.com/yusing/go-proxy/internal/utils/atomic"
|
||||
"github.com/yusing/go-proxy/internal/utils/strutils"
|
||||
|
@ -37,15 +37,15 @@ type (
|
|||
|
||||
var ErrNegativeInterval = errors.New("negative interval")
|
||||
|
||||
func NewMonitor(r route.Route) health.HealthMonCheck {
|
||||
func NewMonitor(r routes.Route) health.HealthMonCheck {
|
||||
var mon health.HealthMonCheck
|
||||
if r.IsAgent() {
|
||||
mon = NewAgentProxiedMonitor(r.Agent(), r.HealthCheckConfig(), AgentTargetFromURL(r.TargetURL()))
|
||||
} else {
|
||||
switch r := r.(type) {
|
||||
case route.HTTPRoute:
|
||||
case routes.HTTPRoute:
|
||||
mon = NewHTTPHealthMonitor(r.TargetURL(), r.HealthCheckConfig())
|
||||
case route.StreamRoute:
|
||||
case routes.StreamRoute:
|
||||
mon = NewRawHealthMonitor(r.TargetURL(), r.HealthCheckConfig())
|
||||
default:
|
||||
logging.Panic().Msgf("unexpected route type: %T", r)
|
||||
|
@ -58,7 +58,7 @@ func NewMonitor(r route.Route) health.HealthMonCheck {
|
|||
return mon
|
||||
}
|
||||
r.Task().OnCancel("close_docker_client", client.Close)
|
||||
return NewDockerHealthMonitor(client, cont.ContainerID, r.TargetName(), r.HealthCheckConfig(), mon)
|
||||
return NewDockerHealthMonitor(client, cont.ContainerID, r.Name(), r.HealthCheckConfig(), mon)
|
||||
}
|
||||
return mon
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue