provider update race fix attempt

This commit is contained in:
yusing 2024-03-12 07:18:47 +00:00
parent 76d2cc2871
commit a2ada5c7ea
2 changed files with 36 additions and 14 deletions

Binary file not shown.

View file

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/golang/glog" "github.com/golang/glog"
@ -14,8 +15,10 @@ type Provider struct {
name string name string
stopWatching chan struct{} stopWatching chan struct{}
routes SafeMap[string, Route] // id -> Route routes map[string]Route // id -> Route
dockerClient *client.Client dockerClient *client.Client
mutex sync.Mutex
lastUpdate time.Time
} }
func (p *Provider) GetProxyConfigs() ([]*ProxyConfig, error) { func (p *Provider) GetProxyConfigs() ([]*ProxyConfig, error) {
@ -30,16 +33,27 @@ func (p *Provider) GetProxyConfigs() ([]*ProxyConfig, error) {
} }
} }
func (p *Provider) needUpdate() bool {
return p.lastUpdate.Add(1 * time.Second).Before(time.Now())
}
func (p *Provider) StopAllRoutes() { func (p *Provider) StopAllRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.needUpdate() {
return
}
close(p.stopWatching) close(p.stopWatching)
if p.dockerClient != nil { if p.dockerClient != nil {
p.dockerClient.Close() p.dockerClient.Close()
} }
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(p.routes.Size()) wg.Add(len(p.routes))
for _, route := range p.routes.Iterator() { for _, route := range p.routes {
go func(r Route) { go func(r Route) {
r.StopListening() r.StopListening()
r.RemoveFromRoutes() r.RemoveFromRoutes()
@ -47,12 +61,20 @@ func (p *Provider) StopAllRoutes() {
}(route) }(route)
} }
wg.Wait() wg.Wait()
p.routes = NewSafeMap[string, Route]() p.routes = make(map[string]Route)
} }
func (p *Provider) BuildStartRoutes() { func (p *Provider) BuildStartRoutes() {
p.mutex.Lock()
defer p.mutex.Unlock()
if !p.needUpdate() {
return
}
p.lastUpdate = time.Now()
p.stopWatching = make(chan struct{}) p.stopWatching = make(chan struct{})
p.routes = NewSafeMap[string, Route]() p.routes = make(map[string]Route)
cfgs, err := p.GetProxyConfigs() cfgs, err := p.GetProxyConfigs()
if err != nil { if err != nil {
@ -68,10 +90,10 @@ func (p *Provider) BuildStartRoutes() {
} }
r.SetupListen() r.SetupListen()
r.Listen() r.Listen()
p.routes.Set(cfg.GetID(), r) p.routes[cfg.GetID()] = r
} }
p.WatchChanges() p.WatchChanges()
p.Logf("Build", "built %d routes", p.routes.Size()) p.Logf("Build", "built %d routes", len(p.routes))
} }
func (p *Provider) WatchChanges() { func (p *Provider) WatchChanges() {
@ -86,14 +108,14 @@ func (p *Provider) WatchChanges() {
} }
} }
func (p* Provider) Logf(t string, s string, args ...interface{}) { func (p *Provider) Logf(t string, s string, args ...interface{}) {
glog.Infof("[%s] %s provider %q: " + s, append([]interface{}{t, p.Kind, p.name}, args...)...) glog.Infof("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...)
} }
func (p* Provider) Errorf(t string, s string, args ...interface{}) { func (p *Provider) Errorf(t string, s string, args ...interface{}) {
glog.Errorf("[%s] %s provider %q: " + s, append([]interface{}{t, p.Kind, p.name}, args...)...) glog.Errorf("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...)
} }
func (p* Provider) Warningf(t string, s string, args ...interface{}) { func (p *Provider) Warningf(t string, s string, args ...interface{}) {
glog.Warningf("[%s] %s provider %q: " + s, append([]interface{}{t, p.Kind, p.name}, args...)...) glog.Warningf("[%s] %s provider %q: "+s, append([]interface{}{t, p.Kind, p.name}, args...)...)
} }