diff --git a/cmd/main.go b/cmd/main.go index 8da9f36..7a0be76 100755 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,6 +18,7 @@ import ( "github.com/yusing/go-proxy/internal/config" E "github.com/yusing/go-proxy/internal/error" "github.com/yusing/go-proxy/internal/net/http/middleware" + "github.com/yusing/go-proxy/internal/notif" R "github.com/yusing/go-proxy/internal/route" "github.com/yusing/go-proxy/internal/server" "github.com/yusing/go-proxy/internal/task" @@ -54,6 +55,7 @@ func main() { TimestampFormat: timeFmt, }) logrus.Infof("go-proxy version %s", pkg.GetVersion()) + logrus.AddHook(notif.GetDispatcher()) } if args.Command == common.CommandReload { diff --git a/go.mod b/go.mod index 1868514..5612f4d 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/docker/docker v27.3.1+incompatible github.com/fsnotify/fsnotify v1.7.0 github.com/go-acme/lego/v4 v4.19.2 + github.com/gotify/server/v2 v2.5.0 github.com/puzpuzpuz/xsync/v3 v3.4.0 github.com/santhosh-tekuri/jsonschema v1.2.4 github.com/sirupsen/logrus v1.9.3 @@ -40,7 +41,7 @@ require ( github.com/opencontainers/image-spec v1.1.0 // indirect github.com/ovh/go-ovh v1.6.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/rogpeppe/go-internal v1.12.0 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.31.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.30.0 // indirect diff --git a/go.sum b/go.sum index 958c6d3..f0d259b 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gotify/server/v2 v2.5.0 h1:tJd+a5bb17X52f0EV2KxqLuyjQFKmVK1+t/iNUkP16Y= +github.com/gotify/server/v2 v2.5.0/go.mod h1:DKPMQI/FZ69iKbZvrOL6VWwRaoB9O+HDvJWVd/kiGbc= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys= github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I= github.com/jarcoal/httpmock v1.3.0 h1:2RJ8GP0IIaWwcC9Fp2BmVi8Kog3v2Hn7VXM3fTd+nuc= @@ -84,8 +86,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/santhosh-tekuri/jsonschema v1.2.4 h1:hNhW8e7t+H1vgY+1QeEQpveR6D4+OwKPXCfD2aieJis= github.com/santhosh-tekuri/jsonschema v1.2.4/go.mod h1:TEAUOeZSmIxTTuHatJzrvARHiuO9LYd+cIxzgEHCQI4= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= diff --git a/internal/config/config.go b/internal/config/config.go index 0820c95..51a42cf 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -10,6 +10,7 @@ import ( "github.com/yusing/go-proxy/internal/common" "github.com/yusing/go-proxy/internal/config/types" E "github.com/yusing/go-proxy/internal/error" + "github.com/yusing/go-proxy/internal/notif" "github.com/yusing/go-proxy/internal/route" proxy "github.com/yusing/go-proxy/internal/route/provider" "github.com/yusing/go-proxy/internal/task" @@ -148,48 +149,59 @@ func (cfg *Config) StartProxyProviders() { } func (cfg *Config) load() (res E.Error) { - b := E.NewBuilder("errors loading config") - defer b.To(&res) + errs := E.NewBuilder("errors loading config") + defer errs.To(&res) logger.Debug("loading config") defer logger.Debug("loaded config") data, err := E.Check(os.ReadFile(common.ConfigPath)) if err != nil { - b.Add(E.FailWith("read config", err)) - logrus.Fatal(b.Build()) + errs.Add(E.FailWith("read config", err)) + logrus.Fatal(errs.Build()) } if !common.NoSchemaValidation { if err = Validate(data); err != nil { - b.Add(E.FailWith("schema validation", err)) - logrus.Fatal(b.Build()) + errs.Add(E.FailWith("schema validation", err)) + logrus.Fatal(errs.Build()) } } model := types.DefaultConfig() if err := E.From(yaml.Unmarshal(data, model)); err != nil { - b.Add(E.FailWith("parse config", err)) - logrus.Fatal(b.Build()) + errs.Add(E.FailWith("parse config", err)) + logrus.Fatal(errs.Build()) } // errors are non fatal below - b.Add(cfg.initAutoCert(&model.AutoCert)) - b.Add(cfg.loadProviders(&model.Providers)) + errs.Add(cfg.initNotification(model.Providers.Notification)) + errs.Add(cfg.initAutoCert(&model.AutoCert)) + errs.Add(cfg.loadRouteProviders(&model.Providers)) cfg.value = model route.SetFindMuxDomains(model.MatchDomains) return } +func (cfg *Config) initNotification(notifCfgMap types.NotificationConfigMap) (err E.Error) { + if len(notifCfgMap) == 0 { + return + } + errs := E.NewBuilder("errors initializing notification providers") + + for name, notifCfg := range notifCfgMap { + _, err := notif.RegisterProvider(cfg.task.Subtask(name), notifCfg) + errs.Add(err) + } + return errs.Build() +} + func (cfg *Config) initAutoCert(autocertCfg *types.AutoCertConfig) (err E.Error) { if cfg.autocertProvider != nil { return } - logger.Debug("initializing autocert") - defer logger.Debug("initialized autocert") - cfg.autocertProvider, err = autocert.NewConfig(autocertCfg).GetProvider() if err != nil { err = E.FailWith("autocert provider", err) @@ -197,11 +209,11 @@ func (cfg *Config) initAutoCert(autocertCfg *types.AutoCertConfig) (err E.Error) return } -func (cfg *Config) loadProviders(providers *types.ProxyProviders) (outErr E.Error) { - subtask := cfg.task.Subtask("load providers") +func (cfg *Config) loadRouteProviders(providers *types.Providers) (outErr E.Error) { + subtask := cfg.task.Subtask("load route providers") defer subtask.Finish("done") - errs := E.NewBuilder("errors loading providers") + errs := E.NewBuilder("errors loading route providers") results := E.NewBuilder("loaded providers") defer errs.To(&outErr) diff --git a/internal/config/types/config.go b/internal/config/types/config.go index ed0e638..5eef941 100644 --- a/internal/config/types/config.go +++ b/internal/config/types/config.go @@ -2,22 +2,23 @@ package types type ( Config struct { - Providers ProxyProviders `json:"providers" yaml:",flow"` + Providers Providers `json:"providers" yaml:",flow"` AutoCert AutoCertConfig `json:"autocert" yaml:",flow"` ExplicitOnly bool `json:"explicit_only" yaml:"explicit_only"` MatchDomains []string `json:"match_domains" yaml:"match_domains"` TimeoutShutdown int `json:"timeout_shutdown" yaml:"timeout_shutdown"` RedirectToHTTPS bool `json:"redirect_to_https" yaml:"redirect_to_https"` } - ProxyProviders struct { - Files []string `json:"include" yaml:"include"` // docker, file - Docker map[string]string `json:"docker" yaml:"docker"` + Providers struct { + Files []string `json:"include" yaml:"include"` + Docker map[string]string `json:"docker" yaml:"docker"` + Notification NotificationConfigMap `json:"notification" yaml:"notification"` } ) func DefaultConfig() *Config { return &Config{ - Providers: ProxyProviders{}, + Providers: Providers{}, TimeoutShutdown: 3, RedirectToHTTPS: false, } diff --git a/internal/config/types/notif_config.go b/internal/config/types/notif_config.go new file mode 100644 index 0000000..e9214c7 --- /dev/null +++ b/internal/config/types/notif_config.go @@ -0,0 +1,5 @@ +package types + +import "github.com/yusing/go-proxy/internal/notif" + +type NotificationConfigMap map[string]notif.ProviderConfig diff --git a/internal/error/error.go b/internal/error/error.go index f4baa27..39e7d60 100644 --- a/internal/error/error.go +++ b/internal/error/error.go @@ -51,6 +51,16 @@ func FromJSON(data []byte) (Error, bool) { }, true } +func TryUnwrap(err error) error { + if err == nil { + return nil + } + if unwrapped := errors.Unwrap(err); unwrapped != nil { + return unwrapped + } + return err +} + // Check is a helper function that // convert (T, error) to (T, NestedError). func Check[T any](obj T, err error) (T, Error) { @@ -140,7 +150,8 @@ func (ne Error) With(s any) Error { } return ne.withError(ss) case error: - return ne.withError(From(ss)) + // unwrap only once + return ne.withError(From(TryUnwrap(ss))) case string: msg = ss case fmt.Stringer: @@ -215,6 +226,13 @@ func (ne Error) HasError() bool { } func errorf(format string, args ...any) Error { + for i, arg := range args { + if err, ok := arg.(error); ok { + if unwrapped := errors.Unwrap(err); unwrapped != nil { + args[i] = unwrapped + } + } + } return From(fmt.Errorf(format, args...)) } diff --git a/internal/notif/dispatcher.go b/internal/notif/dispatcher.go new file mode 100644 index 0000000..7d9d0e4 --- /dev/null +++ b/internal/notif/dispatcher.go @@ -0,0 +1,100 @@ +package notif + +import ( + "github.com/sirupsen/logrus" + E "github.com/yusing/go-proxy/internal/error" + "github.com/yusing/go-proxy/internal/task" + F "github.com/yusing/go-proxy/internal/utils/functional" +) + +type ( + Dispatcher struct { + task task.Task + logCh chan *logrus.Entry + providers F.Set[Provider] + } +) + +var dispatcher *Dispatcher + +func init() { + dispatcher = newNotifDispatcher() + go dispatcher.start() +} + +func newNotifDispatcher() *Dispatcher { + return &Dispatcher{ + task: task.GlobalTask("notif dispatcher"), + logCh: make(chan *logrus.Entry), + providers: F.NewSet[Provider](), + } +} + +func GetDispatcher() *Dispatcher { + return dispatcher +} + +func RegisterProvider(configSubTask task.Task, cfg ProviderConfig) (Provider, E.Error) { + name := configSubTask.Name() + createFunc, ok := Providers[name] + if !ok { + return nil, E.NotExist("provider", name) + } + if provider, err := createFunc(cfg); err != nil { + return nil, err + } else { + dispatcher.providers.Add(provider) + configSubTask.OnCancel("remove provider", func() { + dispatcher.providers.Remove(provider) + }) + return provider, nil + } +} + +func (disp *Dispatcher) start() { + defer dispatcher.task.Finish("dispatcher stopped") + defer close(dispatcher.logCh) + + for { + select { + case <-disp.task.Context().Done(): + return + case entry := <-disp.logCh: + go disp.dispatch(entry) + } + } +} + +func (disp *Dispatcher) dispatch(entry *logrus.Entry) { + task := disp.task.Subtask("dispatch notif") + defer task.Finish("notifs dispatched") + + errs := E.NewBuilder("errors sending notif") + disp.providers.RangeAllParallel(func(p Provider) { + if err := p.Send(task.Context(), entry); err != nil { + errs.Addf("%s: %s", p.Name(), err) + } + }) + if err := errs.Build(); err != nil { + logrus.Error("notif dispatcher failure: ", err) + } +} + +// Levels implements logrus.Hook. +func (disp *Dispatcher) Levels() []logrus.Level { + return []logrus.Level{ + logrus.WarnLevel, + logrus.ErrorLevel, + logrus.FatalLevel, + logrus.PanicLevel, + } +} + +// Fire implements logrus.Hook. +func (disp *Dispatcher) Fire(entry *logrus.Entry) error { + if disp.providers.Size() == 0 { + return nil + } + disp.logCh <- entry + return nil +} diff --git a/internal/notif/gotify.go b/internal/notif/gotify.go new file mode 100644 index 0000000..ff6be73 --- /dev/null +++ b/internal/notif/gotify.go @@ -0,0 +1,111 @@ +package notif + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + + "github.com/gotify/server/v2/model" + "github.com/sirupsen/logrus" + E "github.com/yusing/go-proxy/internal/error" + U "github.com/yusing/go-proxy/internal/utils" +) + +type ( + GotifyClient struct { + GotifyConfig + + url *url.URL + http http.Client + } + GotifyConfig struct { + URL string `json:"url" yaml:"url"` + Token string `json:"token" yaml:"token"` + } + GotifyMessage model.Message +) + +const gotifyMsgEndpoint = "/message" + +func newGotifyClient(cfg map[string]any) (Provider, E.Error) { + client := new(GotifyClient) + err := U.Deserialize(cfg, &client.GotifyConfig) + if err != nil { + return nil, err + } + + url, uErr := url.Parse(client.URL) + if uErr != nil { + return nil, E.FailWith("parse url", uErr) + } + + client.url = url + return client, err +} + +// Name implements NotifProvider. +func (client *GotifyClient) Name() string { + return "gotify" +} + +// Send implements NotifProvider. +func (client *GotifyClient) Send(ctx context.Context, entry *logrus.Entry) error { + var priority int + var title string + + switch entry.Level { + case logrus.WarnLevel: + priority = 2 + title = "Warning" + case logrus.ErrorLevel: + priority = 5 + title = "Error" + case logrus.FatalLevel, logrus.PanicLevel: + priority = 8 + title = "Critical" + default: + return nil + } + if subjects := FieldsAsTitle(entry); subjects != "" { + title = subjects + " " + title + } + + msg := &GotifyMessage{ + Title: title, + Message: entry.Message, + Priority: priority, + } + + data, err := json.Marshal(msg) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, client.url.String()+gotifyMsgEndpoint, bytes.NewReader(data)) + if err != nil { + return fmt.Errorf("error creating request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+client.Token) + + resp, err := client.http.Do(req) + if err != nil { + return fmt.Errorf("failed to send gotify message: %w", err) + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + var errm model.Error + err = json.NewDecoder(resp.Body).Decode(&errm) + if err != nil { + return fmt.Errorf("gotify status %d, but failed to decode err response: %w", resp.StatusCode, err) + } + return fmt.Errorf("gotify status %d %s: %s", resp.StatusCode, errm.Error, errm.ErrorDescription) + } + return nil +} diff --git a/internal/notif/logrus.go b/internal/notif/logrus.go new file mode 100644 index 0000000..6ca0bf1 --- /dev/null +++ b/internal/notif/logrus.go @@ -0,0 +1,21 @@ +package notif + +import ( + "fmt" + "strings" + + "github.com/sirupsen/logrus" + U "github.com/yusing/go-proxy/internal/utils" +) + +func FieldsAsTitle(entry *logrus.Entry) string { + if len(entry.Data) == 0 { + return "" + } + var parts []string + for k, v := range entry.Data { + parts = append(parts, fmt.Sprintf("%s: %s", k, v)) + } + parts[0] = U.Title(parts[0]) + return strings.Join(parts, ", ") +} diff --git a/internal/notif/providers.go b/internal/notif/providers.go new file mode 100644 index 0000000..33b3dc5 --- /dev/null +++ b/internal/notif/providers.go @@ -0,0 +1,21 @@ +package notif + +import ( + "context" + + "github.com/sirupsen/logrus" + E "github.com/yusing/go-proxy/internal/error" +) + +type ( + Provider interface { + Name() string + Send(ctx context.Context, entry *logrus.Entry) error + } + ProviderCreateFunc func(map[string]any) (Provider, E.Error) + ProviderConfig map[string]any +) + +var Providers = map[string]ProviderCreateFunc{ + "gotify": newGotifyClient, +} diff --git a/internal/task/task.go b/internal/task/task.go index c245f84..1ff7400 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -9,10 +9,10 @@ import ( "sync" "time" - "github.com/puzpuzpuz/xsync/v3" "github.com/sirupsen/logrus" "github.com/yusing/go-proxy/internal/common" E "github.com/yusing/go-proxy/internal/error" + F "github.com/yusing/go-proxy/internal/utils/functional" ) var globalTask = createGlobalTask() @@ -21,7 +21,7 @@ func createGlobalTask() (t *task) { t = new(task) t.name = "root" t.ctx, t.cancel = context.WithCancelCause(context.Background()) - t.subtasks = xsync.NewMapOf[*task, struct{}]() + t.subtasks = F.NewSet[*task]() return } @@ -97,7 +97,7 @@ type ( cancel context.CancelCauseFunc parent *task - subtasks *xsync.MapOf[*task, struct{}] + subtasks F.Set[*task] subTasksWg sync.WaitGroup name, line string @@ -209,7 +209,7 @@ func (t *task) OnFinished(about string, fn func()) { defer t.OnFinishedMu.Unlock() if t.OnFinishedFuncs == nil { - onCompTask := GlobalTask(t.name + " > OnFinished") + onCompTask := GlobalTask(t.name + " > OnFinished > " + about) go t.runAllOnFinished(onCompTask) } var file string @@ -252,8 +252,8 @@ func (t *task) Finish(reason any) { } t.finishOnce.Do(func() { t.cancel(fmt.Errorf("%w: %s, reason: "+format, ErrTaskCanceled, t.name, reason)) - t.Wait() }) + t.Wait() } func (t *task) Subtask(name string) Task { @@ -271,10 +271,10 @@ func (t *task) newSubTask(ctx context.Context, cancel context.CancelCauseFunc, n cancel: cancel, name: name, parent: parent, - subtasks: xsync.NewMapOf[*task, struct{}](), + subtasks: F.NewSet[*task](), } parent.subTasksWg.Add(1) - parent.subtasks.Store(subtask, struct{}{}) + parent.subtasks.Add(subtask) if common.IsTrace { _, file, line, ok := runtime.Caller(3) if ok { @@ -288,7 +288,7 @@ func (t *task) newSubTask(ctx context.Context, cancel context.CancelCauseFunc, n } go func() { subtask.Wait() - parent.subtasks.Delete(subtask) + parent.subtasks.Remove(subtask) parent.subTasksWg.Done() }() return subtask @@ -331,9 +331,8 @@ func (t *task) tree(prefix ...string) string { } } sb.WriteString(t.Name() + "\n") - t.subtasks.Range(func(subtask *task, _ struct{}) bool { + t.subtasks.RangeAll(func(subtask *task) { sb.WriteString(subtask.tree(pre + " ")) - return true }) return sb.String() } @@ -362,9 +361,8 @@ func (t *task) serialize() map[string]any { } if t.subtasks.Size() > 0 { m["subtasks"] = make([]map[string]any, 0, t.subtasks.Size()) - t.subtasks.Range(func(subtask *task, _ struct{}) bool { + t.subtasks.RangeAll(func(subtask *task) { m["subtasks"] = append(m["subtasks"].([]map[string]any), subtask.serialize()) - return true }) } return m diff --git a/internal/utils/functional/map.go b/internal/utils/functional/map.go index 657f405..56669d1 100644 --- a/internal/utils/functional/map.go +++ b/internal/utils/functional/map.go @@ -114,9 +114,9 @@ func (m Map[KT, VT]) RangeAll(do func(k KT, v VT)) { // nothing func (m Map[KT, VT]) RangeAllParallel(do func(k KT, v VT)) { var wg sync.WaitGroup - wg.Add(m.Size()) m.Range(func(k KT, v VT) bool { + wg.Add(1) go func() { do(k, v) wg.Done() diff --git a/internal/utils/functional/set.go b/internal/utils/functional/set.go new file mode 100644 index 0000000..6a6d8bf --- /dev/null +++ b/internal/utils/functional/set.go @@ -0,0 +1,59 @@ +package functional + +import ( + "sync" + + "github.com/puzpuzpuz/xsync/v3" +) + +type Set[T comparable] struct { + m *xsync.MapOf[T, struct{}] +} + +func NewSet[T comparable]() Set[T] { + return Set[T]{m: xsync.NewMapOf[T, struct{}]()} +} + +func (set Set[T]) Add(v T) { + set.m.Store(v, struct{}{}) +} + +func (set Set[T]) Remove(v T) { + set.m.Delete(v) +} + +func (set Set[T]) Contains(v T) bool { + _, ok := set.m.Load(v) + return ok +} + +func (set Set[T]) Range(f func(T) bool) { + set.m.Range(func(k T, _ struct{}) bool { + return f(k) + }) +} + +func (set Set[T]) RangeAll(f func(T)) { + set.m.Range(func(k T, _ struct{}) bool { + f(k) + return true + }) +} + +func (set Set[T]) RangeAllParallel(f func(T)) { + var wg sync.WaitGroup + + set.Range(func(k T) bool { + wg.Add(1) + go func() { + f(k) + wg.Done() + }() + return true + }) + wg.Wait() +} + +func (set Set[T]) Size() int { + return set.m.Size() +} diff --git a/schema/config.schema.json b/schema/config.schema.json index 5af8576..dd489dd 100644 --- a/schema/config.schema.json +++ b/schema/config.schema.json @@ -294,6 +294,32 @@ ] } } + }, + "notification": { + "description": "Notification provider configuration", + "type": "object", + "additionalProperties": false, + "properties": { + "gotify": { + "description": "Gotify configuration", + "type": "object", + "additionalProperties": false, + "properties": { + "url": { + "description": "Gotify URL", + "type": "string" + }, + "token": { + "description": "Gotify token", + "type": "string" + } + }, + "required": [ + "url", + "token" + ] + } + } } } },