fixing idlewatcher

This commit is contained in:
yusing 2024-09-21 09:42:40 +08:00
parent e48b9bbb0a
commit d7eab2ebcd
16 changed files with 352 additions and 242 deletions

View file

@ -42,3 +42,6 @@ rapid-crash:
sudo docker run --restart=always --name test_crash debian:bookworm-slim /bin/cat &&\
sleep 3 &&\
sudo docker rm -f test_crash
debug-list-containers:
bash -c 'echo -e "GET /containers/json HTTP/1.0\r\n" | sudo netcat -U /var/run/docker.sock | tail -n +9 | jq'

View file

@ -16,6 +16,8 @@ type Client struct {
key string
refCount *atomic.Int32
*client.Client
l logrus.FieldLogger
}
func (c Client) DaemonHostname() string {
@ -23,10 +25,13 @@ func (c Client) DaemonHostname() string {
return url.Hostname()
}
func (c Client) Connected() bool {
return c.Client != nil
}
// if the client is still referenced, this is no-op
func (c Client) Close() error {
if c.refCount.Load() > 0 {
c.refCount.Add(-1)
func (c *Client) Close() error {
if c.refCount.Add(-1) > 0 {
return nil
}
@ -34,7 +39,15 @@ func (c Client) Close() error {
defer clientMapMu.Unlock()
delete(clientMap, c.key)
return c.Client.Close()
client := c.Client
c.Client = nil
c.l.Debugf("client closed")
if client != nil {
return client.Close()
}
return nil
}
// ConnectClient creates a new Docker client connection to the specified host.
@ -94,12 +107,16 @@ func ConnectClient(host string) (Client, E.NestedError) {
return Client{}, err
}
clientMap[host] = Client{
c := Client{
Client: client,
key: host,
refCount: &atomic.Int32{},
l: logger.WithField("docker_client", client.DaemonHost()),
}
clientMap[host].refCount.Add(1)
c.refCount.Add(1)
c.l.Debugf("client connected")
clientMap[host] = c
return clientMap[host], nil
}

View file

@ -10,17 +10,18 @@ import (
)
type ProxyProperties struct {
DockerHost string `yaml:"docker_host" json:"docker_host"`
ContainerName string `yaml:"container_name" json:"container_name"`
ImageName string `yaml:"image_name" json:"image_name"`
Aliases []string `yaml:"aliases" json:"aliases"`
IsExcluded bool `yaml:"is_excluded" json:"is_excluded"`
FirstPort string `yaml:"first_port" json:"first_port"`
IdleTimeout string `yaml:"idle_timeout" json:"idle_timeout"`
WakeTimeout string `yaml:"wake_timeout" json:"wake_timeout"`
StopMethod string `yaml:"stop_method" json:"stop_method"`
StopTimeout string `yaml:"stop_timeout" json:"stop_timeout"` // stop_method = "stop" only
StopSignal string `yaml:"stop_signal" json:"stop_signal"` // stop_method = "stop" | "kill" only
DockerHost string `yaml:"-" json:"docker_host"`
ContainerName string `yaml:"-" json:"container_name"`
ImageName string `yaml:"-" json:"image_name"`
Aliases []string `yaml:"-" json:"aliases"`
IsExcluded bool `yaml:"-" json:"is_excluded"`
FirstPort string `yaml:"-" json:"first_port"`
IdleTimeout string `yaml:"-" json:"idle_timeout"`
WakeTimeout string `yaml:"-" json:"wake_timeout"`
StopMethod string `yaml:"-" json:"stop_method"`
StopTimeout string `yaml:"-" json:"stop_timeout"` // stop_method = "stop" only
StopSignal string `yaml:"-" json:"stop_signal"` // stop_method = "stop" | "kill" only
Running bool `yaml:"-" json:"running"`
}
type Container struct {
@ -42,6 +43,7 @@ func FromDocker(c *types.Container, dockerHost string) (res Container) {
StopMethod: res.getDeleteLabel(LabelStopMethod),
StopTimeout: res.getDeleteLabel(LabelStopTimeout),
StopSignal: res.getDeleteLabel(LabelStopSignal),
Running: c.Status == "running",
}
return
}

View file

@ -15,10 +15,13 @@ import (
E "github.com/yusing/go-proxy/error"
P "github.com/yusing/go-proxy/proxy"
PT "github.com/yusing/go-proxy/proxy/fields"
W "github.com/yusing/go-proxy/watcher"
event "github.com/yusing/go-proxy/watcher/events"
)
type watcher struct {
*P.ReverseProxyEntry
client D.Client
refCount atomic.Int32
@ -26,6 +29,7 @@ type watcher struct {
stopByMethod StopCallback
wakeCh chan struct{}
wakeDone chan E.NestedError
running atomic.Bool
ctx context.Context
cancel context.CancelFunc
@ -36,7 +40,7 @@ type watcher struct {
type (
WakeDone <-chan error
WakeFunc func() WakeDone
StopCallback func() (bool, E.NestedError)
StopCallback func() E.NestedError
)
func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
@ -51,6 +55,7 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
if w, ok := watcherMap[entry.ContainerName]; ok {
w.refCount.Add(1)
w.ReverseProxyEntry = entry
return w, nil
}
@ -67,8 +72,9 @@ func Register(entry *P.ReverseProxyEntry) (*watcher, E.NestedError) {
l: logger.WithField("container", entry.ContainerName),
}
w.refCount.Add(1)
w.running.Store(entry.ContainerRunning)
w.stopByMethod = w.getStopCallback()
watcherMap[w.ContainerName] = w
go func() {
@ -84,13 +90,14 @@ func Unregister(containerName string) {
defer watcherMapMu.Unlock()
if w, ok := watcherMap[containerName]; ok {
if w.refCount.Load() == 0 {
w.cancel()
close(w.wakeCh)
delete(watcherMap, containerName)
} else {
w.refCount.Add(-1)
if w.refCount.Add(-1) > 0 {
return
}
if w.cancel != nil {
w.cancel()
}
w.client.Close()
delete(watcherMap, containerName)
}
}
@ -131,19 +138,26 @@ func (w *watcher) PatchRoundTripper(rtp http.RoundTripper) roundTripper {
}
func (w *watcher) roundTrip(origRoundTrip roundTripFunc, req *http.Request) (*http.Response, error) {
timeout := time.After(w.WakeTimeout)
w.wakeCh <- struct{}{}
if w.running.Load() {
return origRoundTrip(req)
}
timeout := time.After(w.WakeTimeout)
for {
if w.running.Load() {
return origRoundTrip(req)
}
select {
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-w.wakeDone:
if err != nil {
return nil, err.Error()
}
return origRoundTrip(req)
case <-timeout:
resp := loadingResponse
resp.TLS = req.TLS
return &resp, nil
return getLoadingResponse(), nil
}
}
}
@ -178,36 +192,23 @@ func (w *watcher) containerStatus() (string, E.NestedError) {
return json.State.Status, nil
}
func (w *watcher) wakeIfStopped() (bool, E.NestedError) {
failure := E.Failure("wake")
func (w *watcher) wakeIfStopped() E.NestedError {
status, err := w.containerStatus()
if err.HasError() {
return false, failure.With(err)
return err
}
// "created", "running", "paused", "restarting", "removing", "exited", or "dead"
switch status {
case "exited", "dead":
err = E.From(w.containerStart())
return E.From(w.containerStart())
case "paused":
err = E.From(w.containerUnpause())
return E.From(w.containerUnpause())
case "running":
return false, nil
w.running.Store(true)
return nil
default:
return false, failure.With(E.Unexpected("container state", status))
}
if err.HasError() {
return false, failure.With(err)
}
status, err = w.containerStatus()
if err.HasError() {
return false, failure.With(err)
} else if status != "running" {
return false, failure.With(E.Unexpected("container state", status))
} else {
return true, nil
return E.Unexpected("container state", status)
}
}
@ -223,19 +224,15 @@ func (w *watcher) getStopCallback() StopCallback {
default:
panic("should not reach here")
}
return func() (bool, E.NestedError) {
return func() E.NestedError {
status, err := w.containerStatus()
if err.HasError() {
return false, E.FailWith("stop", err)
return err
}
if status != "running" {
return false, nil
return nil
}
err = E.From(cb())
if err.HasError() {
return false, E.FailWith("stop", err)
}
return true, nil
return E.From(cb())
}
}
@ -244,42 +241,83 @@ func (w *watcher) watch() {
w.ctx = watcherCtx
w.cancel = watcherCancel
dockerWatcher := W.NewDockerWatcherWithClient(w.client)
defer close(w.wakeCh)
dockerEventCh, dockerEventErrCh := dockerWatcher.EventsWithOptions(w.ctx, W.DockerListOptions{
Filters: W.NewDockerFilter(
W.DockerFilterContainer,
W.DockerrFilterContainerName(w.ContainerName),
W.DockerFilterStart,
W.DockerFilterStop,
W.DockerFilterDie,
W.DockerFilterKill,
W.DockerFilterPause,
W.DockerFilterUnpause,
),
})
ticker := time.NewTicker(w.IdleTimeout)
defer ticker.Stop()
for {
select {
case <-mainLoopCtx.Done():
watcherCancel()
w.cancel()
case <-watcherCtx.Done():
w.l.Debug("stopped")
return
case err := <-dockerEventErrCh:
if err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("docker watcher", err))
}
case e := <-dockerEventCh:
switch e.Action {
case event.ActionDockerStartUnpause:
w.running.Store(true)
w.l.Infof("%s %s", e.ActorName, e.Action)
case event.ActionDockerStopPause:
w.running.Store(false)
w.l.Infof("%s %s", e.ActorName, e.Action)
}
case <-ticker.C:
w.l.Debug("timeout")
stopped, err := w.stopByMethod()
if err.HasError() {
w.l.Error(err.Extraf("stop method: %s", w.StopMethod))
} else if stopped {
w.l.Infof("%s: ok", w.StopMethod)
} else {
ticker.Stop()
ticker.Stop()
if err := w.stopByMethod(); err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("stop", err).Extraf("stop method: %s", w.StopMethod))
}
case <-w.wakeCh:
w.l.Debug("wake received")
go func() {
started, err := w.wakeIfStopped()
if err != nil {
w.l.Error(err)
} else if started {
w.l.Infof("awaken")
ticker.Reset(w.IdleTimeout)
}
w.wakeDone <- err // this is passed to roundtrip
}()
w.l.Debug("wake signal received")
ticker.Reset(w.IdleTimeout)
err := w.wakeIfStopped()
if err != nil && err.IsNot(context.Canceled) {
w.l.Error(E.FailWith("wake", err))
}
select {
case w.wakeDone <- err: // this is passed to roundtrip
default:
}
}
}
}
func getLoadingResponse() *http.Response {
return &http.Response{
StatusCode: http.StatusAccepted,
Header: http.Header{
"Content-Type": {"text/html"},
"Cache-Control": {
"no-cache",
"no-store",
"must-revalidate",
},
},
Body: io.NopCloser(bytes.NewReader((loadingPage))),
ContentLength: int64(len(loadingPage)),
}
}
var (
mainLoopCtx context.Context
mainLoopCancel context.CancelFunc
@ -292,20 +330,6 @@ var (
logger = logrus.WithField("module", "idle_watcher")
loadingResponse = http.Response{
StatusCode: http.StatusAccepted,
Header: http.Header{
"Content-Type": {"text/html"},
"Cache-Control": {
"no-cache",
"no-store",
"must-revalidate",
},
},
Body: io.NopCloser(bytes.NewReader((loadingPage))),
ContentLength: int64(len(loadingPage)),
}
loadingPage = []byte(`
<!DOCTYPE html>
<html>
@ -317,12 +341,16 @@ var (
<body>
<script>
window.onload = function() {
setTimeout(function() {
location.reload();
}, 1000); // 1000 milliseconds = 1 second
setTimeout(function() {
window.location.reload()
}, 1000)
// fetch(window.location.href)
// .then(resp => resp.text())
// .then(data => { document.body.innerHTML = data; })
// .catch(err => { document.body.innerHTML = 'Error: ' + err; });
};
</script>
<p>Container is starting... Please wait</p>
<h1>Container is starting... Please wait</h1>
</body>
</html>
`[1:])

View file

@ -3,8 +3,8 @@ module github.com/yusing/go-proxy
go 1.22.0
require (
github.com/docker/cli v27.2.1+incompatible
github.com/docker/docker v27.2.1+incompatible
github.com/docker/cli v27.3.1+incompatible
github.com/docker/docker v27.3.1+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/go-acme/lego/v4 v4.18.0
github.com/puzpuzpuz/xsync/v3 v3.4.0

View file

@ -13,10 +13,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/cli v27.2.1+incompatible h1:U5BPtiD0viUzjGAjV1p0MGB8eVA3L3cbIrnyWmSJI70=
github.com/docker/cli v27.2.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v27.2.1+incompatible h1:fQdiLfW7VLscyoeYEBz7/J8soYFDZV1u6VW6gJEjNMI=
github.com/docker/docker v27.2.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/cli v27.3.1+incompatible h1:qEGdFBF3Xu6SCvCYhc7CzaQTlBmqDuzxPDpigSyeKQQ=
github.com/docker/cli v27.3.1+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
github.com/docker/docker v27.3.1+incompatible h1:KttF0XoteNTicmUtBO0L2tP+J7FGRFTjaEF4k6WdhfI=
github.com/docker/docker v27.3.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=

View file

@ -22,13 +22,14 @@ type (
HideHeaders []string
/* Docker only */
IdleTimeout time.Duration
WakeTimeout time.Duration
StopMethod T.StopMethod
StopTimeout int
StopSignal T.Signal
DockerHost string
ContainerName string
IdleTimeout time.Duration
WakeTimeout time.Duration
StopMethod T.StopMethod
StopTimeout int
StopSignal T.Signal
DockerHost string
ContainerName string
ContainerRunning bool
}
StreamEntry struct {
Alias T.Alias `json:"alias"`
@ -102,20 +103,21 @@ func validateRPEntry(m *M.ProxyEntry, s T.Scheme, b E.Builder) *ReverseProxyEntr
}
return &ReverseProxyEntry{
Alias: T.NewAlias(m.Alias),
Scheme: s,
URL: url,
NoTLSVerify: m.NoTLSVerify,
PathPatterns: pathPatterns,
SetHeaders: setHeaders,
HideHeaders: m.HideHeaders,
IdleTimeout: idleTimeout,
WakeTimeout: wakeTimeout,
StopMethod: stopMethod,
StopTimeout: int(stopTimeOut.Seconds()), // docker api takes integer seconds for timeout argument
StopSignal: stopSignal,
DockerHost: m.DockerHost,
ContainerName: m.ContainerName,
Alias: T.NewAlias(m.Alias),
Scheme: s,
URL: url,
NoTLSVerify: m.NoTLSVerify,
PathPatterns: pathPatterns,
SetHeaders: setHeaders,
HideHeaders: m.HideHeaders,
IdleTimeout: idleTimeout,
WakeTimeout: wakeTimeout,
StopMethod: stopMethod,
StopTimeout: int(stopTimeOut.Seconds()), // docker api takes integer seconds for timeout argument
StopSignal: stopSignal,
DockerHost: m.DockerHost,
ContainerName: m.ContainerName,
ContainerRunning: m.Running,
}
}

View file

@ -1,12 +1,13 @@
package provider
import (
"strconv"
D "github.com/yusing/go-proxy/docker"
E "github.com/yusing/go-proxy/error"
M "github.com/yusing/go-proxy/models"
R "github.com/yusing/go-proxy/route"
W "github.com/yusing/go-proxy/watcher"
. "github.com/yusing/go-proxy/watcher/event"
)
type DockerProvider struct {
@ -60,7 +61,7 @@ func (p *DockerProvider) LoadRoutesImpl() (routes R.Routes, err E.NestedError) {
return routes, errors.Build()
}
func (p *DockerProvider) OnEvent(event Event, routes R.Routes) (res EventResult) {
func (p *DockerProvider) OnEvent(event W.Event, routes R.Routes) (res EventResult) {
b := E.NewBuilder("event %s error", event)
defer b.To(&res.err)
@ -72,36 +73,33 @@ func (p *DockerProvider) OnEvent(event Event, routes R.Routes) (res EventResult)
}
})
switch event.Action {
case ActionStarted, ActionCreated, ActionModified:
client, err := D.ConnectClient(p.dockerHost)
if err.HasError() {
b.Add(E.FailWith("connect to docker", err))
return
}
defer client.Close()
cont, err := client.Inspect(event.ActorID)
if err.HasError() {
b.Add(E.FailWith("inspect container", err))
return
}
entries, err := p.entriesFromContainerLabels(cont)
b.Add(err)
entries.RangeAll(func(alias string, entry *M.ProxyEntry) {
if routes.Has(alias) {
b.Add(E.AlreadyExist("alias", alias))
} else {
if route, err := R.NewRoute(entry); err.HasError() {
b.Add(err)
} else {
routes.Store(alias, route)
b.Add(route.Start())
res.nAdded++
}
}
})
client, err := D.ConnectClient(p.dockerHost)
if err.HasError() {
b.Add(E.FailWith("connect to docker", err))
return
}
defer client.Close()
cont, err := client.Inspect(event.ActorID)
if err.HasError() {
b.Add(E.FailWith("inspect container", err))
return
}
entries, err := p.entriesFromContainerLabels(cont)
b.Add(err)
entries.RangeAll(func(alias string, entry *M.ProxyEntry) {
if routes.Has(alias) {
b.Add(E.AlreadyExist("alias", alias))
} else {
if route, err := R.NewRoute(entry); err.HasError() {
b.Add(err)
} else {
routes.Store(alias, route)
b.Add(route.Start())
res.nAdded++
}
}
})
return
}
@ -125,6 +123,22 @@ func (p *DockerProvider) entriesFromContainerLabels(container D.Container) (M.Pr
errors.Add(p.applyLabel(entries, key, val))
}
// selecting correct host port
if container.HostConfig.NetworkMode != "host" {
for _, a := range container.Aliases {
entry, ok := entries.Load(a)
if !ok {
continue
}
for _, p := range container.Ports {
containerPort := strconv.Itoa(int(p.PrivatePort))
if containerPort == entry.Port {
entry.Port = strconv.Itoa(int(p.PublicPort))
}
}
}
}
return entries, errors.Build().Subject(container.ContainerName)
}

View file

@ -10,7 +10,6 @@ import (
R "github.com/yusing/go-proxy/route"
U "github.com/yusing/go-proxy/utils"
W "github.com/yusing/go-proxy/watcher"
. "github.com/yusing/go-proxy/watcher/event"
)
type FileProvider struct {
@ -29,7 +28,7 @@ func Validate(data []byte) E.NestedError {
return U.ValidateYaml(U.GetSchema(common.ProvidersSchemaPath), data)
}
func (p FileProvider) OnEvent(event Event, routes R.Routes) (res EventResult) {
func (p FileProvider) OnEvent(event W.Event, routes R.Routes) (res EventResult) {
b := E.NewBuilder("event %s error", event)
defer b.To(&res.err)

View file

@ -9,7 +9,6 @@ import (
E "github.com/yusing/go-proxy/error"
R "github.com/yusing/go-proxy/route"
W "github.com/yusing/go-proxy/watcher"
. "github.com/yusing/go-proxy/watcher/event"
)
type (
@ -29,7 +28,7 @@ type (
ProviderImpl interface {
NewWatcher() W.Watcher
LoadRoutesImpl() (R.Routes, E.NestedError)
OnEvent(event Event, routes R.Routes) EventResult
OnEvent(event W.Event, routes R.Routes) EventResult
}
ProviderType string
EventResult struct {

View file

@ -4,72 +4,101 @@ import (
"context"
"time"
"github.com/docker/docker/api/types/events"
docker_events "github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
"github.com/sirupsen/logrus"
D "github.com/yusing/go-proxy/docker"
E "github.com/yusing/go-proxy/error"
. "github.com/yusing/go-proxy/watcher/event"
"github.com/yusing/go-proxy/watcher/events"
)
type DockerWatcher struct {
host string
type (
DockerWatcher struct {
host string
client D.Client
logrus.FieldLogger
}
DockerListOptions = docker_events.ListOptions
)
// https://docs.docker.com/reference/api/engine/version/v1.47/#tag/System/operation/SystemPingHead
var (
DockerFilterContainer = filters.Arg("type", string(docker_events.ContainerEventType))
DockerFilterStart = filters.Arg("event", string(docker_events.ActionStart))
DockerFilterStop = filters.Arg("event", string(docker_events.ActionStop))
DockerFilterDie = filters.Arg("event", string(docker_events.ActionDie))
DockerFilterKill = filters.Arg("event", string(docker_events.ActionKill))
DockerFilterPause = filters.Arg("event", string(docker_events.ActionPause))
DockerFilterUnpause = filters.Arg("event", string(docker_events.ActionUnPause))
NewDockerFilter = filters.NewArgs
)
func DockerrFilterContainerName(name string) filters.KeyValuePair {
return filters.Arg("container", name)
}
func NewDockerWatcher(host string) *DockerWatcher {
return &DockerWatcher{host: host}
func NewDockerWatcher(host string) DockerWatcher {
return DockerWatcher{host: host, FieldLogger: logrus.WithField("module", "docker_watcher")}
}
func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) {
func NewDockerWatcherWithClient(client D.Client) DockerWatcher {
return DockerWatcher{client: client, FieldLogger: logrus.WithField("module", "docker_watcher")}
}
func (w DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.NestedError) {
return w.EventsWithOptions(ctx, optionsWatchAll)
}
func (w DockerWatcher) EventsWithOptions(ctx context.Context, options DockerListOptions) (<-chan Event, <-chan E.NestedError) {
eventCh := make(chan Event)
errCh := make(chan E.NestedError)
started := make(chan struct{})
go func() {
defer close(eventCh)
defer close(errCh)
var cl D.Client
var err E.NestedError
for range 3 {
cl, err = D.ConnectClient(w.host)
if err.NoError() {
break
if !w.client.Connected() {
var err E.NestedError
for range 3 {
w.client, err = D.ConnectClient(w.host)
if err != nil {
defer w.client.Close()
break
}
time.Sleep(1 * time.Second)
}
if err.HasError() {
errCh <- E.FailWith("docker connection", err)
return
}
errCh <- err
time.Sleep(1 * time.Second)
}
if err.HasError() {
errCh <- E.Failure("connecting to docker")
return
}
defer cl.Close()
cEventCh, cErrCh := cl.Events(ctx, dwOptions)
cEventCh, cErrCh := w.client.Events(ctx, options)
started <- struct{}{}
for {
select {
case <-ctx.Done():
if err := <-cErrCh; err != nil {
errCh <- E.From(err)
if err := E.From(ctx.Err()); err != nil && err.IsNot(context.Canceled) {
errCh <- err
}
return
case msg := <-cEventCh:
var Action Action
switch msg.Action {
case events.ActionStart:
Action = ActionCreated
case events.ActionDie:
Action = ActionStopped
default: // NOTE: should not happen
Action = ActionModified
action, ok := events.DockerEventMap[msg.Action]
if !ok {
w.Debugf("ignored unknown docker event: %s for container %s", msg.Action, msg.Actor.Attributes["name"])
continue
}
eventCh <- Event{
Type: EventTypeDocker,
event := Event{
Type: events.EventTypeDocker,
ActorID: msg.Actor.ID,
ActorAttributes: msg.Actor.Attributes, // labels
ActorName: msg.Actor.Attributes["name"],
Action: Action,
Action: action,
}
eventCh <- event
case err := <-cErrCh:
if err == nil {
continue
@ -81,7 +110,7 @@ func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Nest
default:
if D.IsErrConnectionFailed(err) {
time.Sleep(100 * time.Millisecond)
cEventCh, cErrCh = cl.Events(ctx, dwOptions)
cEventCh, cErrCh = w.client.Events(ctx, options)
}
}
}
@ -92,8 +121,9 @@ func (w *DockerWatcher) Events(ctx context.Context) (<-chan Event, <-chan E.Nest
return eventCh, errCh
}
var dwOptions = events.ListOptions{Filters: filters.NewArgs(
filters.Arg("type", string(events.ContainerEventType)),
filters.Arg("event", string(events.ActionStart)),
filters.Arg("event", string(events.ActionDie)), // 'stop' already triggering 'die'
var optionsWatchAll = DockerListOptions{Filters: NewDockerFilter(
DockerFilterContainer,
DockerFilterStart,
DockerFilterStop,
DockerFilterDie,
)}

View file

@ -1,34 +0,0 @@
package event
import "fmt"
type (
Event struct {
Type EventType
ActorName string
ActorID string
ActorAttributes map[string]string
Action Action
}
Action string
EventType string
)
const (
ActionModified Action = "modified"
ActionCreated Action = "created"
ActionStarted Action = "started"
ActionDeleted Action = "deleted"
ActionStopped Action = "stopped"
EventTypeDocker EventType = "docker"
EventTypeFile EventType = "file"
)
func (e Event) String() string {
return fmt.Sprintf("%s %s", e.ActorName, e.Action)
}
func (a Action) IsDelete() bool {
return a == ActionDeleted
}

View file

@ -0,0 +1,49 @@
package events
import (
"fmt"
dockerEvents "github.com/docker/docker/api/types/events"
)
type (
Event struct {
Type EventType
ActorName string
ActorID string
ActorAttributes map[string]string
Action Action
}
Action string
EventType string
)
const (
ActionFileModified Action = "modified"
ActionFileCreated Action = "created"
ActionFileDeleted Action = "deleted"
ActionDockerStartUnpause Action = "start"
ActionDockerStopPause Action = "stop"
EventTypeDocker EventType = "docker"
EventTypeFile EventType = "file"
)
var DockerEventMap = map[dockerEvents.Action]Action{
dockerEvents.ActionCreate: ActionDockerStartUnpause,
dockerEvents.ActionStart: ActionDockerStartUnpause,
dockerEvents.ActionPause: ActionDockerStartUnpause,
dockerEvents.ActionDie: ActionDockerStopPause,
dockerEvents.ActionStop: ActionDockerStopPause,
dockerEvents.ActionUnPause: ActionDockerStopPause,
dockerEvents.ActionKill: ActionDockerStopPause,
}
func (e Event) String() string {
return fmt.Sprintf("%s %s", e.ActorName, e.Action)
}
func (a Action) IsDelete() bool {
return a == ActionFileDeleted
}

View file

@ -6,7 +6,6 @@ import (
"github.com/yusing/go-proxy/common"
E "github.com/yusing/go-proxy/error"
. "github.com/yusing/go-proxy/watcher/event"
)
type fileWatcher struct {

View file

@ -9,7 +9,7 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
E "github.com/yusing/go-proxy/error"
. "github.com/yusing/go-proxy/watcher/event"
"github.com/yusing/go-proxy/watcher/events"
)
type fileWatcherHelper struct {
@ -81,30 +81,30 @@ func (h *fileWatcherHelper) start() {
for {
select {
case event, ok := <-h.w.Events:
case fsEvent, ok := <-h.w.Events:
if !ok {
// closed manually?
fsLogger.Error("channel closed")
return
}
// retrieve the watcher
w, ok := h.m[path.Base(event.Name)]
w, ok := h.m[path.Base(fsEvent.Name)]
if !ok {
// watcher for this file does not exist
continue
}
msg := Event{
Type: EventTypeFile,
Type: events.EventTypeFile,
ActorName: w.filename,
}
switch {
case event.Has(fsnotify.Create):
msg.Action = ActionCreated
case event.Has(fsnotify.Write):
msg.Action = ActionModified
case event.Has(fsnotify.Remove), event.Has(fsnotify.Rename):
msg.Action = ActionDeleted
case fsEvent.Has(fsnotify.Create):
msg.Action = events.ActionFileCreated
case fsEvent.Has(fsnotify.Write):
msg.Action = events.ActionFileModified
case fsEvent.Has(fsnotify.Remove), fsEvent.Has(fsnotify.Rename):
msg.Action = events.ActionFileDeleted
default: // ignore other events
continue
}

View file

@ -4,9 +4,11 @@ import (
"context"
E "github.com/yusing/go-proxy/error"
. "github.com/yusing/go-proxy/watcher/event"
"github.com/yusing/go-proxy/watcher/events"
)
type Event = events.Event
type Watcher interface {
Events(ctx context.Context) (<-chan Event, <-chan E.NestedError)
}