18 Commits

Author SHA1 Message Date
94c737a02b Merge pull request 'Change address syntax to be a URL' (#3) from addr-url into main
Reviewed-on: #3
2024-12-19 10:06:02 -05:00
74efe7c808 update README 2024-12-19 09:46:36 -05:00
06e51af628 do dupe keys check only once 2024-12-18 15:45:36 -05:00
b14072106a Fix golint issues and add test 2024-12-18 15:38:52 -05:00
7cab173c7d fix ParseAddress 2024-12-17 19:36:31 -05:00
47ee5fb4f4 remove channel close 2024-12-17 18:16:38 -05:00
79256b0440 update to new syntax 2024-12-17 13:21:37 -05:00
e789d40258 Add helper function to parse address 2024-12-17 12:07:42 -05:00
917f340870 remove trailing space 2024-05-09 12:10:33 -04:00
0693a56697 add doc for idle server auto shutdown 2023-09-11 21:01:20 -04:00
e95d2d0a96 Merge pull request 'Add global idler and other helpers' (#2) from globalIdler into main
Reviewed-on: #2

Fixes #1
2023-09-11 20:45:51 -04:00
d4304636a2 add basic test 2023-09-11 20:44:52 -04:00
5c43a1a4fc un http Server async and return error via channel 2023-09-11 20:28:04 -04:00
ee28a78eda Add WrapHandler to automatically call Tick on every request 2023-09-11 20:24:10 -04:00
8ca82581fe WIP: Enter/Exit won't work :( 2023-09-08 19:35:58 -04:00
b5588989de Add function to return address type 2023-09-08 19:10:36 -04:00
ea04fe735b Add example for idle 2023-05-09 22:45:52 -04:00
710e8b66e6 Add idle package 2023-05-08 22:37:44 -04:00
6 changed files with 520 additions and 56 deletions

View File

@ -1,4 +1,4 @@
Create http server listening on unix sockets or systemd socket activated fds Create http server listening on unix sockets and systemd socket activated fds
## Quick Usage ## Quick Usage
@ -17,36 +17,52 @@ Just replace `http.ListenAndServe` with `anyhttp.ListenAndServe`.
Syntax Syntax
unix/<path to socket> unix?path=<socket_path>&mode=<socket file mode>&remove_existing=<yes|no>
Examples Examples
unix/relative/path.sock unix?path=relative/path.sock
unix//var/run/app/absolutepath.sock unix?path=/var/run/app/absolutepath.sock
unix?path=/run/app.sock&mode=600&remove_existing=no
| option | description | default |
|-----------------|------------------------------------------------|----------|
| path | path to unix socket | Required |
| mode | socket file mode | 666 |
| remove_existing | Whether to remove existing socket file or fail | true |
### Systemd Socket activated fd: ### Systemd Socket activated fd:
Syntax Syntax
sysd/fdidx/<fd index starting at 0> sysd?idx=<fd index>&name=<fd name>&check_pid=<yes|no>&unset_env=<yes|no>&idle_timeout=<duration>
sysd/fdname/<fd name set using FileDescriptorName socket setting >
Only one of `idx` or `name` has to be set
Examples: Examples:
# First (or only) socket fd passed to app # First (or only) socket fd passed to app
sysd/fdidx/0 sysd?idx=0
# Socket with FileDescriptorName # Socket with FileDescriptorName
sysd/fdname/myapp sysd?name=myapp
# Using default name # Using default name and auto shutdown if no requests received in last 30 minutes
sysd/fdname/myapp.socket sysd?name=myapp.socket&idle_timeout=30m
### TCP port | option | description | default |
|--------------|--------------------------------------------------------------------------------------------|------------------|
| name | Name configured via FileDescriptorName or socket file name | Required |
| idx | FD Index. Actual fd num will be 3 + idx | Required |
| idle_timeout | time to wait before shutdown. [syntax][0] | no auto shutdown |
| check_pid | Check process PID matches LISTEN_PID | true |
| unset_env | Unsets the LISTEN\* environment variables, so they don't get passed to any child processes | true |
If the address is a number less than 65536, it is assumed as a port and passed as `http.ListenAndServe(":<port>",...)` ### TCP
Anything else is directly passed to `http.ListenAndServe` as well. Below examples should work If the address is not one of above, it is assumed to be tcp and passed to `http.ListenAndServe`.
Examples:
:http :http
:8888 :8888
@ -60,3 +76,5 @@ https://pkg.go.dev/go.balki.me/anyhttp
* https://gist.github.com/teknoraver/5ffacb8757330715bcbcc90e6d46ac74#file-unixhttpd-go * https://gist.github.com/teknoraver/5ffacb8757330715bcbcc90e6d46ac74#file-unixhttpd-go
* https://github.com/coreos/go-systemd/tree/main/activation * https://github.com/coreos/go-systemd/tree/main/activation
[0]: https://pkg.go.dev/time#ParseDuration

View File

@ -2,16 +2,35 @@
package anyhttp package anyhttp
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io/fs" "io/fs"
"net" "net"
"net/http" "net/http"
"net/url"
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"
"time"
"go.balki.me/anyhttp/idle"
)
// AddressType of the address passed
type AddressType string
var (
// UnixSocket - address is a unix socket, e.g. unix//run/foo.sock
UnixSocket AddressType = "UnixSocket"
// SystemdFD - address is a systemd fd, e.g. sysd/fdname/myapp.socket
SystemdFD AddressType = "SystemdFD"
// TCP - address is a TCP address, e.g. :1234
TCP AddressType = "TCP"
// Unknown - address is not recognized
Unknown AddressType = "Unknown"
) )
// UnixSocketConfig has the configuration for Unix socket // UnixSocketConfig has the configuration for Unix socket
@ -83,6 +102,8 @@ type SysdConfig struct {
CheckPID bool CheckPID bool
// Unsets the LISTEN* environment variables, so they don't get passed to any child processes // Unsets the LISTEN* environment variables, so they don't get passed to any child processes
UnsetEnv bool UnsetEnv bool
// Shutdown http server if no requests received for below timeout
IdleTimeout *time.Duration
} }
// DefaultSysdConfig has the default values for SysdConfig // DefaultSysdConfig has the default values for SysdConfig
@ -182,59 +203,61 @@ func (s *SysdConfig) GetListener() (net.Listener, error) {
return nil, errors.New("neither FDIndex nor FDName set") return nil, errors.New("neither FDIndex nor FDName set")
} }
// UnknownAddress Error is returned when address does not match any known syntax // Serve creates and serve a http server.
type UnknownAddress struct{} func Serve(addr string, h http.Handler) (addrType AddressType, srv *http.Server, idler idle.Idler, done <-chan error, err error) {
addrType, usc, sysc, err := parseAddress(addr)
func (u UnknownAddress) Error() string { if err != nil {
return "unknown address" return
}
// GetListener gets a unix or systemd socket listener
func GetListener(addr string) (net.Listener, error) {
if strings.HasPrefix(addr, "unix/") {
usc := NewUnixSocketConfig(strings.TrimPrefix(addr, "unix/"))
return usc.GetListener()
} }
if strings.HasPrefix(addr, "sysd/fdidx/") { listener, err := func() (net.Listener, error) {
idx, err := strconv.Atoi(strings.TrimPrefix(addr, "sysd/fdidx/")) if usc != nil {
if err != nil { return usc.GetListener()
return nil, fmt.Errorf("invalid fdidx, addr:%q err: %w", addr, err) } else if sysc != nil {
return sysc.GetListener()
} }
sysdc := NewSysDConfigWithFDIdx(idx) if addr == "" {
return sysdc.GetListener() addr = ":http"
}
return net.Listen("tcp", addr)
}()
if err != nil {
return
} }
errChan := make(chan error)
if strings.HasPrefix(addr, "sysd/fdname/") { done = errChan
sysdc := NewSysDConfigWithFDName(strings.TrimPrefix(addr, "sysd/fdname/")) if addrType == SystemdFD && sysc.IdleTimeout != nil {
return sysdc.GetListener() idler = idle.CreateIdler(*sysc.IdleTimeout)
srv = &http.Server{Handler: idle.WrapIdlerHandler(idler, h)}
waitErrChan := make(chan error)
go func() {
waitErrChan <- srv.Serve(listener)
}()
go func() {
select {
case err := <-waitErrChan:
errChan <- err
case <-idler.Chan():
errChan <- srv.Shutdown(context.TODO())
}
}()
} else {
srv = &http.Server{Handler: h}
go func() {
errChan <- srv.Serve(listener)
}()
} }
return
return nil, UnknownAddress{}
} }
// ListenAndServe is the drop-in replacement for `http.ListenAndServe`. // ListenAndServe is the drop-in replacement for `http.ListenAndServe`.
// Supports unix and systemd sockets in addition // Supports unix and systemd sockets in addition
func ListenAndServe(addr string, h http.Handler) error { func ListenAndServe(addr string, h http.Handler) error {
_, _, _, done, err := Serve(addr, h)
listener, err := GetListener(addr) if err != nil {
if _, isUnknown := err.(UnknownAddress); err != nil && !isUnknown {
return err return err
} }
return <-done
if listener != nil {
return http.Serve(listener, h)
}
if port, err := strconv.Atoi(addr); err == nil {
if port > 0 && port < 65536 {
return http.ListenAndServe(fmt.Sprintf(":%v", port), h)
}
return fmt.Errorf("invalid port: %v", port)
}
return http.ListenAndServe(addr, h)
} }
// UnsetSystemdListenVars unsets the LISTEN* environment variables so they are not passed to any child processes // UnsetSystemdListenVars unsets the LISTEN* environment variables so they are not passed to any child processes
@ -243,3 +266,98 @@ func UnsetSystemdListenVars() {
_ = os.Unsetenv("LISTEN_FDS") _ = os.Unsetenv("LISTEN_FDS")
_ = os.Unsetenv("LISTEN_FDNAMES") _ = os.Unsetenv("LISTEN_FDNAMES")
} }
func parseAddress(addr string) (addrType AddressType, usc *UnixSocketConfig, sysc *SysdConfig, err error) {
usc = nil
sysc = nil
err = nil
u, err := url.Parse(addr)
if err != nil {
return TCP, nil, nil, nil
}
if u.Path == "unix" {
duc := DefaultUnixSocketConfig
usc = &duc
addrType = UnixSocket
for key, val := range u.Query() {
if len(val) != 1 {
err = fmt.Errorf("unix socket address error. Multiple %v found: %v", key, val)
return
}
if key == "path" {
usc.SocketPath = val[0]
} else if key == "mode" {
if _, serr := fmt.Sscanf(val[0], "%o", &usc.SocketMode); serr != nil {
err = fmt.Errorf("unix socket address error. Bad mode: %v, err: %w", val, serr)
return
}
} else if key == "remove_existing" {
if removeExisting, berr := strconv.ParseBool(val[0]); berr == nil {
usc.RemoveExisting = removeExisting
} else {
err = fmt.Errorf("unix socket address error. Bad remove_existing: %v, err: %w", val, berr)
return
}
} else {
err = fmt.Errorf("unix socket address error. Bad option; key: %v, val: %v", key, val)
return
}
}
if usc.SocketPath == "" {
err = fmt.Errorf("unix socket address error. Missing path; addr: %v", addr)
return
}
} else if u.Path == "sysd" {
dsc := DefaultSysdConfig
sysc = &dsc
addrType = SystemdFD
for key, val := range u.Query() {
if len(val) != 1 {
err = fmt.Errorf("systemd socket fd address error. Multiple %v found: %v", key, val)
return
}
if key == "name" {
sysc.FDName = &val[0]
} else if key == "idx" {
if idx, ierr := strconv.Atoi(val[0]); ierr == nil {
sysc.FDIndex = &idx
} else {
err = fmt.Errorf("systemd socket fd address error. Bad idx: %v, err: %w", val, ierr)
return
}
} else if key == "check_pid" {
if checkPID, berr := strconv.ParseBool(val[0]); berr == nil {
sysc.CheckPID = checkPID
} else {
err = fmt.Errorf("systemd socket fd address error. Bad check_pid: %v, err: %w", val, berr)
return
}
} else if key == "unset_env" {
if unsetEnv, berr := strconv.ParseBool(val[0]); berr == nil {
sysc.UnsetEnv = unsetEnv
} else {
err = fmt.Errorf("systemd socket fd address error. Bad unset_env: %v, err: %w", val, berr)
return
}
} else if key == "idle_timeout" {
if timeout, terr := time.ParseDuration(val[0]); terr == nil {
sysc.IdleTimeout = &timeout
} else {
err = fmt.Errorf("systemd socket fd address error. Bad idle_timeout: %v, err: %w", val, terr)
return
}
} else {
err = fmt.Errorf("systemd socket fd address error. Bad option; key: %v, val: %v", key, val)
return
}
}
if (sysc.FDIndex == nil) == (sysc.FDName == nil) {
err = fmt.Errorf("systemd socket fd address error. Exactly only one of name and idx has to be set. name: %v, idx: %v", sysc.FDName, sysc.FDIndex)
return
}
} else {
// Just assume as TCP address
return TCP, nil, nil, nil
}
return
}

132
anyhttp_test.go Normal file
View File

@ -0,0 +1,132 @@
package anyhttp
import (
"encoding/json"
"testing"
"time"
)
func Test_parseAddress(t *testing.T) {
tests := []struct {
name string // description of this test case
// Named input parameters for target function.
addr string
wantAddrType AddressType
wantUsc *UnixSocketConfig
wantSysc *SysdConfig
wantErr bool
}{
{
name: "tcp port",
addr: ":8080",
wantAddrType: TCP,
wantUsc: nil,
wantSysc: nil,
wantErr: false,
},
{
name: "unix address",
addr: "unix?path=/run/foo.sock&mode=660",
wantAddrType: UnixSocket,
wantUsc: &UnixSocketConfig{
SocketPath: "/run/foo.sock",
SocketMode: 0660,
RemoveExisting: true,
},
wantSysc: nil,
wantErr: false,
},
{
name: "systemd address",
addr: "sysd?name=foo.socket",
wantAddrType: SystemdFD,
wantUsc: nil,
wantSysc: &SysdConfig{
FDIndex: nil,
FDName: ptr("foo.socket"),
CheckPID: true,
UnsetEnv: true,
IdleTimeout: nil,
},
wantErr: false,
},
{
name: "systemd address with index",
addr: "sysd?idx=0&idle_timeout=30m",
wantAddrType: SystemdFD,
wantUsc: nil,
wantSysc: &SysdConfig{
FDIndex: ptr(0),
FDName: nil,
CheckPID: true,
UnsetEnv: true,
IdleTimeout: ptr(30 * time.Minute),
},
wantErr: false,
},
{
name: "systemd address. Bad example",
addr: "sysd?idx=0&idle_timeout=30m&name=foo",
wantAddrType: SystemdFD,
wantUsc: nil,
wantSysc: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotAddrType, gotUsc, gotSysc, gotErr := parseAddress(tt.addr)
if gotErr != nil {
if !tt.wantErr {
t.Errorf("parseAddress() failed: %v", gotErr)
}
return
}
if tt.wantErr {
t.Fatal("parseAddress() succeeded unexpectedly")
}
if gotAddrType != tt.wantAddrType {
t.Errorf("parseAddress() addrType = %v, want %v", gotAddrType, tt.wantAddrType)
}
if !check(gotUsc, tt.wantUsc) {
t.Errorf("parseAddress() Usc = %v, want %v", gotUsc, tt.wantUsc)
}
if !check(gotSysc, tt.wantSysc) {
if (gotSysc == nil || tt.wantSysc == nil) ||
!(check(gotSysc.FDIndex, tt.wantSysc.FDIndex) &&
check(gotSysc.FDName, tt.wantSysc.FDName) &&
check(gotSysc.IdleTimeout, tt.wantSysc.IdleTimeout)) {
t.Errorf("parseAddress() Sysc = %v, want %v", asJSON(gotSysc), asJSON(tt.wantSysc))
}
}
})
}
}
// Helpers
// print value instead of pointer
func asJSON[T any](val T) string {
op, err := json.Marshal(val)
if err != nil {
return err.Error()
}
return string(op)
}
func ptr[T any](val T) *T {
return &val
}
// nil safe equal check
func check[T comparable](got, want *T) bool {
if (got == nil) != (want == nil) {
return false
}
if got == nil {
return true
}
return *got == *want
}

View File

@ -0,0 +1,35 @@
package main
import (
"context"
"net/http"
"time"
"go.balki.me/anyhttp/idle"
)
func main() {
idler := idle.CreateIdler(10 * time.Second)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
idler.Tick()
w.Write([]byte("Hey there!\n"))
})
http.HandleFunc("/job", func(w http.ResponseWriter, r *http.Request) {
go func() {
idler.Enter()
defer idler.Exit()
time.Sleep(15 * time.Second)
}()
w.Write([]byte("Job scheduled\n"))
})
server := http.Server{
Addr: ":8888",
}
go server.ListenAndServe()
idler.Wait()
server.Shutdown(context.TODO())
}

127
idle/idle.go Normal file
View File

@ -0,0 +1,127 @@
// Package idle helps to gracefully shutdown idle (typically http) servers
package idle
import (
"fmt"
"net/http"
"sync/atomic"
"time"
)
var (
// For simple servers without backgroud jobs, global singleton for simpler API
// Enter/Exit worn't work for global idler as Enter may be called before Wait, use CreateIdler in those cases
gIdler atomic.Pointer[idler]
)
// Wait waits till the server is idle and returns. i.e. no Ticks in last <timeout> duration
func Wait(timeout time.Duration) error {
i := CreateIdler(timeout).(*idler)
ok := gIdler.CompareAndSwap(nil, i)
if !ok {
return fmt.Errorf("idler already waiting")
}
i.Wait()
return nil
}
// Tick records the current time. This will make the server not idle until next Tick or timeout
func Tick() {
i := gIdler.Load()
if i != nil {
i.Tick()
}
}
// WrapHandler calls Tick() before processing passing request to http.Handler
func WrapHandler(h http.Handler) http.Handler {
if h == nil {
h = http.DefaultServeMux
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Tick()
h.ServeHTTP(w, r)
})
}
// WrapIdlerHandler calls idler.Tick() before processing passing request to http.Handler
func WrapIdlerHandler(i Idler, h http.Handler) http.Handler {
if h == nil {
h = http.DefaultServeMux
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
i.Tick()
h.ServeHTTP(w, r)
})
}
// Idler helps manage idle servers
type Idler interface {
// Tick records the current time. This will make the server not idle until next Tick or timeout
Tick()
// Wait waits till the server is idle and returns. i.e. no Ticks in last <timeout> duration
Wait()
// For long running background jobs, use Enter to record start time. Wait will not return while there are active jobs running
Enter()
// Exit records end of a background job
Exit()
// Get the channel to wait yourself
Chan() <-chan struct{}
}
type idler struct {
lastTick atomic.Pointer[time.Time]
c chan struct{}
active atomic.Int64
}
func (i *idler) Enter() {
i.active.Add(1)
}
func (i *idler) Exit() {
i.Tick()
i.active.Add(-1)
}
// CreateIdler creates an Idler with given timeout
func CreateIdler(timeout time.Duration) Idler {
i := &idler{}
i.c = make(chan struct{})
i.Tick()
go func() {
for {
if i.active.Load() != 0 {
time.Sleep(timeout)
continue
}
t := *i.lastTick.Load()
now := time.Now()
dur := t.Add(timeout).Sub(now)
if dur == dur.Abs() {
time.Sleep(dur)
continue
}
break
}
close(i.c)
}()
return i
}
func (i *idler) Tick() {
now := time.Now()
i.lastTick.Store(&now)
}
func (i *idler) Wait() {
<-i.c
}
func (i *idler) Chan() <-chan struct{} {
return i.c
}

34
idle/idle_test.go Normal file
View File

@ -0,0 +1,34 @@
package idle
import (
"testing"
"time"
)
func TestIdlerChan(_ *testing.T) {
i := CreateIdler(10 * time.Millisecond)
<-i.Chan()
}
func TestGlobalIdler(t *testing.T) {
err := Wait(10 * time.Millisecond)
if err != nil {
t.Fatalf("idle.Wait failed, %v", err)
}
err = Wait(10 * time.Millisecond)
if err == nil {
t.Fatal("idle.Wait should fail when called second time")
}
}
func TestIdlerEnterExit(t *testing.T) {
i := CreateIdler(10 * time.Millisecond).(*idler)
i.Enter()
if i.active.Load() != 1 {
t.FailNow()
}
i.Exit()
if i.active.Load() != 0 {
t.FailNow()
}
}