This commit is contained in:
yusing 2025-01-03 18:55:44 +08:00
parent 65afc73f25
commit c506db1ef4
5 changed files with 19 additions and 16 deletions

View file

@ -16,7 +16,6 @@ import (
) )
type ( type (
Client = *SharedClient
SharedClient struct { SharedClient struct {
*client.Client *client.Client
@ -28,7 +27,7 @@ type (
) )
var ( var (
clientMap F.Map[string, Client] = F.NewMapOf[string, Client]() clientMap F.Map[string, *SharedClient] = F.NewMapOf[string, *SharedClient]()
clientMapMu sync.Mutex clientMapMu sync.Mutex
clientOptEnvHost = []client.Opt{ clientOptEnvHost = []client.Opt{
@ -39,7 +38,7 @@ var (
func init() { func init() {
task.OnProgramExit("docker_clients_cleanup", func() { task.OnProgramExit("docker_clients_cleanup", func() {
clientMap.RangeAllParallel(func(_ string, c Client) { clientMap.RangeAllParallel(func(_ string, c *SharedClient) {
if c.Connected() { if c.Connected() {
c.Client.Close() c.Client.Close()
} }
@ -68,7 +67,7 @@ func (c *SharedClient) Close() {
// Returns: // Returns:
// - Client: the Docker client connection. // - Client: the Docker client connection.
// - error: an error if the connection failed. // - error: an error if the connection failed.
func ConnectClient(host string) (Client, error) { func ConnectClient(host string) (*SharedClient, error) {
clientMapMu.Lock() clientMapMu.Lock()
defer clientMapMu.Unlock() defer clientMapMu.Unlock()

View file

@ -29,7 +29,7 @@ type (
*idlewatcher.Config *idlewatcher.Config
*waker *waker
client D.Client client *D.SharedClient
stopByMethod StopCallback // send a docker command w.r.t. `stop_method` stopByMethod StopCallback // send a docker command w.r.t. `stop_method`
ticker *time.Ticker ticker *time.Ticker
task *task.Task task *task.Task

View file

@ -16,7 +16,7 @@ func Inspect(dockerHost string, containerID string) (*Container, error) {
return client.Inspect(containerID) return client.Inspect(containerID)
} }
func (c Client) Inspect(containerID string) (*Container, error) { func (c *SharedClient) Inspect(containerID string) (*Container, error) {
ctx, cancel := context.WithTimeoutCause(context.Background(), 3*time.Second, errors.New("docker container inspect timeout")) ctx, cancel := context.WithTimeoutCause(context.Background(), 3*time.Second, errors.New("docker container inspect timeout"))
defer cancel() defer cancel()

View file

@ -10,7 +10,6 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
E "github.com/yusing/go-proxy/internal/error" E "github.com/yusing/go-proxy/internal/error"
"github.com/yusing/go-proxy/internal/task" "github.com/yusing/go-proxy/internal/task"
F "github.com/yusing/go-proxy/internal/utils/functional"
"github.com/yusing/go-proxy/internal/watcher/events" "github.com/yusing/go-proxy/internal/watcher/events"
) )
@ -20,7 +19,7 @@ type DirWatcher struct {
dir string dir string
w *fsnotify.Watcher w *fsnotify.Watcher
fwMap F.Map[string, *fileWatcher] fwMap map[string]*fileWatcher
mu sync.Mutex mu sync.Mutex
eventCh chan Event eventCh chan Event
@ -53,7 +52,7 @@ func NewDirectoryWatcher(parent task.Parent, dirPath string) *DirWatcher {
Logger(), Logger(),
dir: dirPath, dir: dirPath,
w: w, w: w,
fwMap: F.NewMapOf[string, *fileWatcher](), fwMap: make(map[string]*fileWatcher),
eventCh: make(chan Event), eventCh: make(chan Event),
errCh: make(chan E.Error), errCh: make(chan E.Error),
task: parent.Subtask("dir_watcher(" + dirPath + ")"), task: parent.Subtask("dir_watcher(" + dirPath + ")"),
@ -71,7 +70,7 @@ func (h *DirWatcher) Add(relPath string) Watcher {
defer h.mu.Unlock() defer h.mu.Unlock()
// check if the watcher already exists // check if the watcher already exists
s, ok := h.fwMap.Load(relPath) s, ok := h.fwMap[relPath]
if ok { if ok {
return s return s
} }
@ -80,18 +79,21 @@ func (h *DirWatcher) Add(relPath string) Watcher {
eventCh: make(chan Event), eventCh: make(chan Event),
errCh: make(chan E.Error), errCh: make(chan E.Error),
} }
h.fwMap.Store(relPath, s) h.fwMap[relPath] = s
return s return s
} }
func (h *DirWatcher) cleanup() { func (h *DirWatcher) cleanup() {
h.mu.Lock()
defer h.mu.Unlock()
h.w.Close() h.w.Close()
close(h.eventCh) close(h.eventCh)
close(h.errCh) close(h.errCh)
h.fwMap.RangeAll(func(key string, fw *fileWatcher) { for _, fw := range h.fwMap {
close(fw.eventCh) close(fw.eventCh)
close(fw.errCh) close(fw.errCh)
}) }
h.task.Finish(nil) h.task.Finish(nil)
} }
@ -136,7 +138,9 @@ func (h *DirWatcher) start() {
} }
// send event to file watcher too // send event to file watcher too
w, ok := h.fwMap.Load(relPath) h.mu.Lock()
w, ok := h.fwMap[relPath]
h.mu.Unlock()
if ok { if ok {
select { select {
case w.eventCh <- msg: case w.eventCh <- msg:

View file

@ -17,7 +17,7 @@ type (
zerolog.Logger zerolog.Logger
host string host string
client D.Client client *D.SharedClient
clientOwned bool clientOwned bool
} }
DockerListOptions = docker_events.ListOptions DockerListOptions = docker_events.ListOptions
@ -62,7 +62,7 @@ func NewDockerWatcher(host string) DockerWatcher {
} }
} }
func NewDockerWatcherWithClient(client D.Client) DockerWatcher { func NewDockerWatcherWithClient(client *D.SharedClient) DockerWatcher {
return DockerWatcher{ return DockerWatcher{
client: client, client: client,
Logger: logger.With(). Logger: logger.With().