From 1eac48e899167315e439bec22e0c72946986508a Mon Sep 17 00:00:00 2001 From: yusing Date: Sun, 13 Apr 2025 06:17:41 +0800 Subject: [PATCH] feat: debug api --- agent/pkg/agent/config.go | 12 +++- cmd/main.go | 3 + internal/api/v1/debug/handler.go | 75 +++++++++++++++++++++ internal/api/v1/debug/handler_production.go | 11 +++ internal/docker/client.go | 69 +++++++++++++------ internal/docker/inspect.go | 2 +- internal/task/debug.go | 53 +++++++++++---- internal/utils/serialization.go | 7 +- 8 files changed, 192 insertions(+), 40 deletions(-) create mode 100644 internal/api/v1/debug/handler.go create mode 100644 internal/api/v1/debug/handler_production.go diff --git a/agent/pkg/agent/config.go b/agent/pkg/agent/config.go index 5f50cb1..5a28545 100644 --- a/agent/pkg/agent/config.go +++ b/agent/pkg/agent/config.go @@ -70,6 +70,11 @@ func GetAgentAddrFromDockerHost(dockerHost string) string { return dockerHost[FakeDockerHostPrefixLen:] } +// Key implements pool.Object +func (cfg *AgentConfig) Key() string { + return cfg.Addr +} + func (cfg *AgentConfig) FakeDockerHost() string { return FakeDockerHostPrefix + cfg.Addr } @@ -192,9 +197,10 @@ func (cfg *AgentConfig) String() string { return cfg.name + "@" + cfg.Addr } -func (cfg *AgentConfig) MarshalJSON() ([]byte, error) { - return json.Marshal(map[string]string{ +// MarshalMap implements pool.Object +func (cfg *AgentConfig) MarshalMap() map[string]any { + return map[string]any{ "name": cfg.Name(), "addr": cfg.Addr, - }) + } } diff --git a/cmd/main.go b/cmd/main.go index f1461db..af82af9 100755 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/yusing/go-proxy/internal/api/v1/auth" + debugapi "github.com/yusing/go-proxy/internal/api/v1/debug" "github.com/yusing/go-proxy/internal/api/v1/query" "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/config" @@ -146,6 +147,8 @@ func main() { uptime.Poller.Start() config.WatchChanges() + debugapi.StartServer(cfg) + task.WaitExit(cfg.Value().TimeoutShutdown) } diff --git a/internal/api/v1/debug/handler.go b/internal/api/v1/debug/handler.go new file mode 100644 index 0000000..51f7563 --- /dev/null +++ b/internal/api/v1/debug/handler.go @@ -0,0 +1,75 @@ +//go:build debug + +package debugapi + +import ( + "iter" + "net/http" + "sort" + "time" + + "github.com/yusing/go-proxy/agent/pkg/agent" + config "github.com/yusing/go-proxy/internal/config/types" + "github.com/yusing/go-proxy/internal/docker" + "github.com/yusing/go-proxy/internal/idlewatcher" + "github.com/yusing/go-proxy/internal/net/gphttp/gpwebsocket" + "github.com/yusing/go-proxy/internal/net/gphttp/servemux" + "github.com/yusing/go-proxy/internal/net/gphttp/server" + "github.com/yusing/go-proxy/internal/proxmox" + "github.com/yusing/go-proxy/internal/task" +) + +func StartServer(cfg config.ConfigInstance) { + srv := server.NewServer(server.Options{ + Name: "debug", + HTTPAddr: ":8899", + Handler: newHandler(cfg), + }) + srv.Start(task.RootTask("debug_server", false)) +} + +type debuggable interface { + MarshalMap() map[string]any + Key() string +} + +func toSortedSlice[T debuggable](data iter.Seq2[string, T]) []map[string]any { + s := make([]map[string]any, 0) + for _, v := range data { + m := v.MarshalMap() + m["key"] = v.Key() + s = append(s, m) + } + sort.Slice(s, func(i, j int) bool { + return s[i]["key"].(string) < s[j]["key"].(string) + }) + return s +} + +func jsonHandler[T debuggable](getData iter.Seq2[string, T]) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + gpwebsocket.DynamicJSONHandler(w, r, func() []map[string]any { + return toSortedSlice(getData) + }, 1*time.Second) + } +} + +func iterMap[K comparable, V debuggable](m func() map[K]V) iter.Seq2[K, V] { + return func(yield func(K, V) bool) { + for k, v := range m() { + if !yield(k, v) { + break + } + } + } +} + +func newHandler(cfg config.ConfigInstance) http.Handler { + mux := servemux.NewServeMux(cfg) + mux.HandleFunc("GET", "/tasks", jsonHandler(task.AllTasks())) + mux.HandleFunc("GET", "/idlewatcher", jsonHandler(iterMap(idlewatcher.Watchers))) + mux.HandleFunc("GET", "/agents", jsonHandler(agent.Agents.Iter)) + mux.HandleFunc("GET", "/proxmox", jsonHandler(proxmox.Clients.Iter)) + mux.HandleFunc("GET", "/docker", jsonHandler(iterMap(docker.Clients))) + return mux +} diff --git a/internal/api/v1/debug/handler_production.go b/internal/api/v1/debug/handler_production.go new file mode 100644 index 0000000..b54c86e --- /dev/null +++ b/internal/api/v1/debug/handler_production.go @@ -0,0 +1,11 @@ +//go:build !debug + +package debugapi + +import ( + config "github.com/yusing/go-proxy/internal/config/types" +) + +func StartServer(cfg config.ConfigInstance) { + // do nothing +} diff --git a/internal/docker/client.go b/internal/docker/client.go index 45029c3..99abccc 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "maps" "net" "net/http" "sync" @@ -16,13 +17,13 @@ import ( "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/logging" "github.com/yusing/go-proxy/internal/task" + "github.com/yusing/go-proxy/internal/utils/strutils" ) type ( SharedClient struct { *client.Client - key string refCount uint32 closedOn int64 @@ -65,7 +66,7 @@ func initClientCleaner() { defer clientMapMu.Unlock() for _, c := range clientMap { - delete(clientMap, c.key) + delete(clientMap, c.Key()) c.Client.Close() } }) @@ -79,30 +80,20 @@ func closeTimedOutClients() { for _, c := range clientMap { if atomic.LoadUint32(&c.refCount) == 0 && now-atomic.LoadInt64(&c.closedOn) > clientTTLSecs { - delete(clientMap, c.key) + delete(clientMap, c.Key()) c.Client.Close() - logging.Debug().Str("host", c.key).Msg("docker client closed") + logging.Debug().Str("host", c.DaemonHost()).Msg("docker client closed") } } } -func (c *SharedClient) Address() string { - return c.addr -} +func Clients() map[string]*SharedClient { + clientMapMu.RLock() + defer clientMapMu.RUnlock() -func (c *SharedClient) CheckConnection(ctx context.Context) error { - conn, err := c.dial(ctx) - if err != nil { - return err - } - conn.Close() - return nil -} - -// if the client is still referenced, this is no-op. -func (c *SharedClient) Close() { - atomic.StoreInt64(&c.closedOn, time.Now().Unix()) - atomic.AddUint32(&c.refCount, ^uint32(0)) + clients := make(map[string]*SharedClient, len(clientMap)) + maps.Copy(clients, clientMap) + return clients } // NewClient creates a new Docker client connection to the specified host. @@ -186,7 +177,6 @@ func NewClient(host string) (*SharedClient, error) { c := &SharedClient{ Client: client, - key: host, refCount: 1, addr: addr, dial: dial, @@ -196,9 +186,44 @@ func NewClient(host string) (*SharedClient, error) { if c.dial == nil { c.dial = client.Dialer() } + if c.addr == "" { + c.addr = c.Client.DaemonHost() + } defer logging.Debug().Str("host", host).Msg("docker client initialized") - clientMap[c.key] = c + clientMap[c.Key()] = c return c, nil } + +func (c *SharedClient) Key() string { + return c.DaemonHost() +} + +func (c *SharedClient) Address() string { + return c.addr +} + +func (c *SharedClient) CheckConnection(ctx context.Context) error { + conn, err := c.dial(ctx) + if err != nil { + return err + } + conn.Close() + return nil +} + +// if the client is still referenced, this is no-op. +func (c *SharedClient) Close() { + atomic.StoreInt64(&c.closedOn, time.Now().Unix()) + atomic.AddUint32(&c.refCount, ^uint32(0)) +} + +func (c *SharedClient) MarshalMap() map[string]any { + return map[string]any{ + "host": c.DaemonHost(), + "addr": c.addr, + "ref_count": c.refCount, + "closed_on": strutils.FormatUnixTime(c.closedOn), + } +} diff --git a/internal/docker/inspect.go b/internal/docker/inspect.go index 8eb1413..6d708c5 100644 --- a/internal/docker/inspect.go +++ b/internal/docker/inspect.go @@ -24,5 +24,5 @@ func (c *SharedClient) Inspect(containerID string) (*Container, error) { if err != nil { return nil, err } - return FromInspectResponse(json, c.key), nil + return FromInspectResponse(json, c.DaemonHost()), nil } diff --git a/internal/task/debug.go b/internal/task/debug.go index bf8a2db..f222582 100644 --- a/internal/task/debug.go +++ b/internal/task/debug.go @@ -1,7 +1,7 @@ package task import ( - "slices" + "iter" "strings" ) @@ -28,16 +28,43 @@ func (t *Task) listCallbacks() []string { return callbacks } -// DebugTaskList returns list of all tasks. -// -// The returned string is suitable for printing to the console. -func DebugTaskList() []string { - l := make([]string, 0, allTasks.Size()) - - allTasks.RangeAll(func(t *Task) { - l = append(l, t.name) - }) - - slices.Sort(l) - return l +func AllTasks() iter.Seq2[string, *Task] { + return func(yield func(k string, v *Task) bool) { + for t := range allTasks.Range { + if !yield(t.name, t) { + return + } + } + } +} + +func (t *Task) Key() string { + return t.name +} + +func toBool(v uint32) bool { + if v > 0 { + return true + } + return false +} + +func (t *Task) callbackList() []map[string]any { + list := make([]map[string]any, 0, len(t.callbacks)) + for cb, _ := range t.callbacks { + list = append(list, map[string]any{ + "about": cb.about, + "wait_children": cb.waitChildren, + }) + } + return list +} + +func (t *Task) MarshalMap() map[string]any { + return map[string]any{ + "name": t.name, + "childrens": t.children, + "callbacks": t.callbackList(), + "finishCalled": toBool(t.finishedCalled), + } } diff --git a/internal/utils/serialization.go b/internal/utils/serialization.go index 9430905..ab5381f 100644 --- a/internal/utils/serialization.go +++ b/internal/utils/serialization.go @@ -18,9 +18,14 @@ import ( type SerializedObject = map[string]any -type MapUnmarshaller interface { +type ( + MapMarshaller interface { + MarshalMap() map[string]any + } + MapUnmarshaller interface { UnmarshalMap(m map[string]any) gperr.Error } +) var ( ErrInvalidType = gperr.New("invalid type")