feat: debug api

This commit is contained in:
yusing 2025-04-13 06:17:41 +08:00
parent fdbf1ad787
commit 1eac48e899
8 changed files with 192 additions and 40 deletions

View file

@ -70,6 +70,11 @@ func GetAgentAddrFromDockerHost(dockerHost string) string {
return dockerHost[FakeDockerHostPrefixLen:]
}
// Key implements pool.Object
func (cfg *AgentConfig) Key() string {
return cfg.Addr
}
func (cfg *AgentConfig) FakeDockerHost() string {
return FakeDockerHostPrefix + cfg.Addr
}
@ -192,9 +197,10 @@ func (cfg *AgentConfig) String() string {
return cfg.name + "@" + cfg.Addr
}
func (cfg *AgentConfig) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{
// MarshalMap implements pool.Object
func (cfg *AgentConfig) MarshalMap() map[string]any {
return map[string]any{
"name": cfg.Name(),
"addr": cfg.Addr,
})
}
}

View file

@ -7,6 +7,7 @@ import (
"sync"
"github.com/yusing/go-proxy/internal/api/v1/auth"
debugapi "github.com/yusing/go-proxy/internal/api/v1/debug"
"github.com/yusing/go-proxy/internal/api/v1/query"
"github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/config"
@ -146,6 +147,8 @@ func main() {
uptime.Poller.Start()
config.WatchChanges()
debugapi.StartServer(cfg)
task.WaitExit(cfg.Value().TimeoutShutdown)
}

View file

@ -0,0 +1,75 @@
//go:build debug
package debugapi
import (
"iter"
"net/http"
"sort"
"time"
"github.com/yusing/go-proxy/agent/pkg/agent"
config "github.com/yusing/go-proxy/internal/config/types"
"github.com/yusing/go-proxy/internal/docker"
"github.com/yusing/go-proxy/internal/idlewatcher"
"github.com/yusing/go-proxy/internal/net/gphttp/gpwebsocket"
"github.com/yusing/go-proxy/internal/net/gphttp/servemux"
"github.com/yusing/go-proxy/internal/net/gphttp/server"
"github.com/yusing/go-proxy/internal/proxmox"
"github.com/yusing/go-proxy/internal/task"
)
func StartServer(cfg config.ConfigInstance) {
srv := server.NewServer(server.Options{
Name: "debug",
HTTPAddr: ":8899",
Handler: newHandler(cfg),
})
srv.Start(task.RootTask("debug_server", false))
}
type debuggable interface {
MarshalMap() map[string]any
Key() string
}
func toSortedSlice[T debuggable](data iter.Seq2[string, T]) []map[string]any {
s := make([]map[string]any, 0)
for _, v := range data {
m := v.MarshalMap()
m["key"] = v.Key()
s = append(s, m)
}
sort.Slice(s, func(i, j int) bool {
return s[i]["key"].(string) < s[j]["key"].(string)
})
return s
}
func jsonHandler[T debuggable](getData iter.Seq2[string, T]) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
gpwebsocket.DynamicJSONHandler(w, r, func() []map[string]any {
return toSortedSlice(getData)
}, 1*time.Second)
}
}
func iterMap[K comparable, V debuggable](m func() map[K]V) iter.Seq2[K, V] {
return func(yield func(K, V) bool) {
for k, v := range m() {
if !yield(k, v) {
break
}
}
}
}
func newHandler(cfg config.ConfigInstance) http.Handler {
mux := servemux.NewServeMux(cfg)
mux.HandleFunc("GET", "/tasks", jsonHandler(task.AllTasks()))
mux.HandleFunc("GET", "/idlewatcher", jsonHandler(iterMap(idlewatcher.Watchers)))
mux.HandleFunc("GET", "/agents", jsonHandler(agent.Agents.Iter))
mux.HandleFunc("GET", "/proxmox", jsonHandler(proxmox.Clients.Iter))
mux.HandleFunc("GET", "/docker", jsonHandler(iterMap(docker.Clients)))
return mux
}

View file

@ -0,0 +1,11 @@
//go:build !debug
package debugapi
import (
config "github.com/yusing/go-proxy/internal/config/types"
)
func StartServer(cfg config.ConfigInstance) {
// do nothing
}

View file

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"maps"
"net"
"net/http"
"sync"
@ -16,13 +17,13 @@ import (
"github.com/yusing/go-proxy/internal/common"
"github.com/yusing/go-proxy/internal/logging"
"github.com/yusing/go-proxy/internal/task"
"github.com/yusing/go-proxy/internal/utils/strutils"
)
type (
SharedClient struct {
*client.Client
key string
refCount uint32
closedOn int64
@ -65,7 +66,7 @@ func initClientCleaner() {
defer clientMapMu.Unlock()
for _, c := range clientMap {
delete(clientMap, c.key)
delete(clientMap, c.Key())
c.Client.Close()
}
})
@ -79,30 +80,20 @@ func closeTimedOutClients() {
for _, c := range clientMap {
if atomic.LoadUint32(&c.refCount) == 0 && now-atomic.LoadInt64(&c.closedOn) > clientTTLSecs {
delete(clientMap, c.key)
delete(clientMap, c.Key())
c.Client.Close()
logging.Debug().Str("host", c.key).Msg("docker client closed")
logging.Debug().Str("host", c.DaemonHost()).Msg("docker client closed")
}
}
}
func (c *SharedClient) Address() string {
return c.addr
}
func Clients() map[string]*SharedClient {
clientMapMu.RLock()
defer clientMapMu.RUnlock()
func (c *SharedClient) CheckConnection(ctx context.Context) error {
conn, err := c.dial(ctx)
if err != nil {
return err
}
conn.Close()
return nil
}
// if the client is still referenced, this is no-op.
func (c *SharedClient) Close() {
atomic.StoreInt64(&c.closedOn, time.Now().Unix())
atomic.AddUint32(&c.refCount, ^uint32(0))
clients := make(map[string]*SharedClient, len(clientMap))
maps.Copy(clients, clientMap)
return clients
}
// NewClient creates a new Docker client connection to the specified host.
@ -186,7 +177,6 @@ func NewClient(host string) (*SharedClient, error) {
c := &SharedClient{
Client: client,
key: host,
refCount: 1,
addr: addr,
dial: dial,
@ -196,9 +186,44 @@ func NewClient(host string) (*SharedClient, error) {
if c.dial == nil {
c.dial = client.Dialer()
}
if c.addr == "" {
c.addr = c.Client.DaemonHost()
}
defer logging.Debug().Str("host", host).Msg("docker client initialized")
clientMap[c.key] = c
clientMap[c.Key()] = c
return c, nil
}
func (c *SharedClient) Key() string {
return c.DaemonHost()
}
func (c *SharedClient) Address() string {
return c.addr
}
func (c *SharedClient) CheckConnection(ctx context.Context) error {
conn, err := c.dial(ctx)
if err != nil {
return err
}
conn.Close()
return nil
}
// if the client is still referenced, this is no-op.
func (c *SharedClient) Close() {
atomic.StoreInt64(&c.closedOn, time.Now().Unix())
atomic.AddUint32(&c.refCount, ^uint32(0))
}
func (c *SharedClient) MarshalMap() map[string]any {
return map[string]any{
"host": c.DaemonHost(),
"addr": c.addr,
"ref_count": c.refCount,
"closed_on": strutils.FormatUnixTime(c.closedOn),
}
}

View file

@ -24,5 +24,5 @@ func (c *SharedClient) Inspect(containerID string) (*Container, error) {
if err != nil {
return nil, err
}
return FromInspectResponse(json, c.key), nil
return FromInspectResponse(json, c.DaemonHost()), nil
}

View file

@ -1,7 +1,7 @@
package task
import (
"slices"
"iter"
"strings"
)
@ -28,16 +28,43 @@ func (t *Task) listCallbacks() []string {
return callbacks
}
// DebugTaskList returns list of all tasks.
//
// The returned string is suitable for printing to the console.
func DebugTaskList() []string {
l := make([]string, 0, allTasks.Size())
allTasks.RangeAll(func(t *Task) {
l = append(l, t.name)
})
slices.Sort(l)
return l
func AllTasks() iter.Seq2[string, *Task] {
return func(yield func(k string, v *Task) bool) {
for t := range allTasks.Range {
if !yield(t.name, t) {
return
}
}
}
}
func (t *Task) Key() string {
return t.name
}
func toBool(v uint32) bool {
if v > 0 {
return true
}
return false
}
func (t *Task) callbackList() []map[string]any {
list := make([]map[string]any, 0, len(t.callbacks))
for cb, _ := range t.callbacks {
list = append(list, map[string]any{
"about": cb.about,
"wait_children": cb.waitChildren,
})
}
return list
}
func (t *Task) MarshalMap() map[string]any {
return map[string]any{
"name": t.name,
"childrens": t.children,
"callbacks": t.callbackList(),
"finishCalled": toBool(t.finishedCalled),
}
}

View file

@ -18,9 +18,14 @@ import (
type SerializedObject = map[string]any
type MapUnmarshaller interface {
type (
MapMarshaller interface {
MarshalMap() map[string]any
}
MapUnmarshaller interface {
UnmarshalMap(m map[string]any) gperr.Error
}
)
var (
ErrInvalidType = gperr.New("invalid type")