From 9074d4bfb8aaa8e5c84649b900a23659fcc472f6 Mon Sep 17 00:00:00 2001 From: Dan Sosedoff Date: Fri, 2 Dec 2022 13:36:31 -0600 Subject: [PATCH 1/7] Add internal sessions manager --- pkg/api/api.go | 10 ++-- pkg/api/middleware.go | 2 +- pkg/api/session_cleanup.go | 41 ------------- pkg/api/session_manager.go | 117 +++++++++++++++++++++++++++++++++++++ pkg/cli/cli.go | 22 ++++--- pkg/client/client.go | 10 +++- 6 files changed, 147 insertions(+), 55 deletions(-) delete mode 100644 pkg/api/session_cleanup.go create mode 100644 pkg/api/session_manager.go diff --git a/pkg/api/api.go b/pkg/api/api.go index d1afaed..fafec07 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -26,13 +26,13 @@ var ( DbClient *client.Client // DbSessions represents the mapping for client connections - DbSessions = map[string]*client.Client{} + DbSessions *SessionManager ) // DB returns a database connection from the client context func DB(c *gin.Context) *client.Client { if command.Opts.Sessions { - return DbSessions[getSessionId(c.Request)] + return DbSessions.Get(getSessionId(c.Request)) } return DbClient } @@ -54,7 +54,7 @@ func setClient(c *gin.Context, newClient *client.Client) error { return errSessionRequired } - DbSessions[sid] = newClient + DbSessions.Add(sid, newClient) return nil } @@ -80,10 +80,10 @@ func GetSessions(c *gin.Context) { // In debug mode endpoint will return a lot of sensitive information // like full database connection string and all query history. if command.Opts.Debug { - successResponse(c, DbSessions) + successResponse(c, DbSessions.Sessions()) return } - successResponse(c, gin.H{"sessions": len(DbSessions)}) + successResponse(c, gin.H{"sessions": DbSessions.Len()}) } // ConnectWithBackend creates a new connection based on backend resource diff --git a/pkg/api/middleware.go b/pkg/api/middleware.go index f20dffb..be79ec1 100644 --- a/pkg/api/middleware.go +++ b/pkg/api/middleware.go @@ -39,7 +39,7 @@ func dbCheckMiddleware() gin.HandlerFunc { } // Determine the database connection handle for the session - conn := DbSessions[sid] + conn := DbSessions.Get(sid) if conn == nil { badRequest(c, errNotConnected) return diff --git a/pkg/api/session_cleanup.go b/pkg/api/session_cleanup.go deleted file mode 100644 index 862d121..0000000 --- a/pkg/api/session_cleanup.go +++ /dev/null @@ -1,41 +0,0 @@ -package api - -import ( - "log" - "time" - - "github.com/sosedoff/pgweb/pkg/command" -) - -// StartSessionCleanup starts a goroutine to cleanup idle database sessions -func StartSessionCleanup() { - for range time.Tick(time.Minute) { - if command.Opts.Debug { - log.Println("Triggering idle session deletion") - } - cleanupIdleSessions() - } -} - -func cleanupIdleSessions() { - ids := []string{} - - // Figure out which sessions are idle - for id, client := range DbSessions { - if client.IsIdle() { - ids = append(ids, id) - } - } - if len(ids) == 0 { - return - } - - // Close and delete idle sessions - log.Println("Closing", len(ids), "idle sessions") - for _, id := range ids { - // TODO: concurrent map edit will trigger panic - if err := DbSessions[id].Close(); err == nil { - delete(DbSessions, id) - } - } -} diff --git a/pkg/api/session_manager.go b/pkg/api/session_manager.go new file mode 100644 index 0000000..00478a8 --- /dev/null +++ b/pkg/api/session_manager.go @@ -0,0 +1,117 @@ +package api + +import ( + "sync" + "time" + + "github.com/sirupsen/logrus" + + "github.com/sosedoff/pgweb/pkg/client" +) + +type SessionManager struct { + logger *logrus.Logger + sessions map[string]*client.Client + mu sync.Mutex +} + +func NewSessionManager(logger *logrus.Logger) *SessionManager { + return &SessionManager{ + logger: logger, + sessions: map[string]*client.Client{}, + mu: sync.Mutex{}, + } +} + +func (m *SessionManager) IDs() []string { + m.mu.Lock() + defer m.mu.Unlock() + + ids := []string{} + for k := range m.sessions { + ids = append(ids, k) + } + + return ids +} + +func (m *SessionManager) Sessions() map[string]*client.Client { + m.mu.Lock() + sessions := m.sessions + defer m.mu.Unlock() + + return sessions +} + +func (m *SessionManager) Get(id string) *client.Client { + m.mu.Lock() + c := m.sessions[id] + m.mu.Unlock() + + return c +} + +func (m *SessionManager) Add(id string, conn *client.Client) { + m.mu.Lock() + m.sessions[id] = conn + m.mu.Unlock() +} + +func (m *SessionManager) Remove(id string) bool { + m.mu.Lock() + defer m.mu.Unlock() + + conn, ok := m.sessions[id] + if ok { + conn.Close() + delete(m.sessions, id) + } + + return ok +} + +func (m *SessionManager) Len() int { + m.mu.Lock() + sz := len(m.sessions) + m.mu.Unlock() + + return sz +} + +func (m *SessionManager) Cleanup() int { + removed := 0 + + m.logger.Debug("starting idle sessions cleanup") + defer func() { + m.logger.Debug("removed idle sessions:", removed) + }() + + for _, id := range m.staleSessions() { + m.logger.WithField("id", id).Debug("closing stale session") + if m.Remove(id) { + removed++ + } + } + + return removed +} + +func (m *SessionManager) RunPeriodicCleanup() { + for range time.Tick(time.Minute) { + m.Cleanup() + } +} + +func (m *SessionManager) staleSessions() []string { + m.mu.TryLock() + defer m.mu.Unlock() + + ids := []string{} + for id, conn := range m.sessions { + if conn.IsIdle() { + ids = append(ids, id) + } + } + + return ids +} diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 8338295..7bd07bf 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -23,6 +23,7 @@ import ( ) var ( + logger *logrus.Logger options command.Options readonlyWarning = ` @@ -36,6 +37,10 @@ For proper read-only access please follow postgresql role management documentati regexErrAuthFailed = regexp.MustCompile(`authentication failed`) ) +func init() { + logger = logrus.New() +} + func exitWithMessage(message string) { fmt.Println("Error:", message) os.Exit(1) @@ -152,6 +157,10 @@ func initOptions() { os.Exit(0) } + if options.Debug { + logger.SetLevel(logrus.DebugLevel) + } + if options.ReadOnly { fmt.Println(readonlyWarning) } @@ -184,11 +193,6 @@ func printVersion() { } func startServer() { - logger := logrus.New() - if options.Debug { - logger.SetLevel(logrus.DebugLevel) - } - router := gin.New() router.Use(api.RequestLogger(logger)) router.Use(gin.Recovery()) @@ -258,8 +262,12 @@ func Run() { } // Start session cleanup worker - if options.Sessions && !command.Opts.DisableConnectionIdleTimeout { - go api.StartSessionCleanup() + if options.Sessions { + api.DbSessions = api.NewSessionManager(logger) + + if !command.Opts.DisableConnectionIdleTimeout { + go api.DbSessions.RunPeriodicCleanup() + } } startServer() diff --git a/pkg/client/client.go b/pkg/client/client.go index 865a2e3..d58365d 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -34,7 +34,8 @@ type Client struct { serverVersion string serverType string lastQueryTime time.Time - External bool + closed bool + External bool `json:"external"` History []history.Record `json:"history"` ConnectionString string `json:"connection_string"` } @@ -423,6 +424,13 @@ func (client *Client) query(query string, args ...interface{}) (*Result, error) // Close database connection func (client *Client) Close() error { + if client.closed { + return nil + } + defer func() { + client.closed = true + }() + if client.tunnel != nil { client.tunnel.Close() } From 29a6a0df8c079bbf46ac8221011aa099c9be7dc3 Mon Sep 17 00:00:00 2001 From: Dan Sosedoff Date: Fri, 2 Dec 2022 13:56:49 -0600 Subject: [PATCH 2/7] Add session manager tests --- pkg/api/session_manager_test.go | 79 +++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 pkg/api/session_manager_test.go diff --git a/pkg/api/session_manager_test.go b/pkg/api/session_manager_test.go new file mode 100644 index 0000000..9b5dd1e --- /dev/null +++ b/pkg/api/session_manager_test.go @@ -0,0 +1,79 @@ +package api + +import ( + "sort" + "testing" + + "github.com/sirupsen/logrus" + "github.com/sosedoff/pgweb/pkg/client" + "github.com/sosedoff/pgweb/pkg/command" + "github.com/stretchr/testify/assert" +) + +func TestSessionManager(t *testing.T) { + t.Run("returns ids", func(t *testing.T) { + manager := NewSessionManager(nil) + assert.Equal(t, []string{}, manager.IDs()) + + manager.sessions["foo"] = &client.Client{} + manager.sessions["bar"] = &client.Client{} + + ids := manager.IDs() + sort.Strings(ids) + assert.Equal(t, []string{"bar", "foo"}, ids) + }) + + t.Run("get session", func(t *testing.T) { + manager := NewSessionManager(nil) + assert.Nil(t, manager.Get("foo")) + + manager.sessions["foo"] = &client.Client{} + assert.NotNil(t, manager.Get("foo")) + }) + + t.Run("set session", func(t *testing.T) { + manager := NewSessionManager(nil) + assert.Nil(t, manager.Get("foo")) + + manager.Add("foo", &client.Client{}) + assert.NotNil(t, manager.Get("foo")) + }) + + t.Run("remove session", func(t *testing.T) { + manager := NewSessionManager(nil) + assert.Nil(t, manager.Get("foo")) + + manager.Add("foo", &client.Client{}) + assert.NotNil(t, manager.Get("foo")) + assert.True(t, manager.Remove("foo")) + assert.False(t, manager.Remove("foo")) + assert.Nil(t, manager.Get("foo")) + }) + + t.Run("returns len", func(t *testing.T) { + manager := NewSessionManager(nil) + manager.sessions["foo"] = &client.Client{} + manager.sessions["bar"] = &client.Client{} + + assert.Equal(t, 2, manager.Len()) + }) + + t.Run("cleans up stale sessions", func(t *testing.T) { + defer func() { + command.Opts.ConnectionIdleTimeout = 0 + }() + + manager := NewSessionManager(logrus.New()) + conn := &client.Client{} + manager.Add("foo", conn) + + command.Opts.ConnectionIdleTimeout = 0 + assert.Equal(t, 1, manager.Len()) + assert.Equal(t, 0, manager.Cleanup()) + assert.Equal(t, 1, manager.Len()) + + command.Opts.ConnectionIdleTimeout = 1 + assert.Equal(t, 1, manager.Cleanup()) + assert.Equal(t, 0, manager.Len()) + }) +} From bc54c95140a20e1e27f5c0ada996699be0052340 Mon Sep 17 00:00:00 2001 From: Dan Sosedoff Date: Fri, 2 Dec 2022 13:57:56 -0600 Subject: [PATCH 3/7] Tweak test run names --- pkg/api/session_manager_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/api/session_manager_test.go b/pkg/api/session_manager_test.go index 9b5dd1e..9387592 100644 --- a/pkg/api/session_manager_test.go +++ b/pkg/api/session_manager_test.go @@ -11,7 +11,7 @@ import ( ) func TestSessionManager(t *testing.T) { - t.Run("returns ids", func(t *testing.T) { + t.Run("return ids", func(t *testing.T) { manager := NewSessionManager(nil) assert.Equal(t, []string{}, manager.IDs()) @@ -50,7 +50,7 @@ func TestSessionManager(t *testing.T) { assert.Nil(t, manager.Get("foo")) }) - t.Run("returns len", func(t *testing.T) { + t.Run("return len", func(t *testing.T) { manager := NewSessionManager(nil) manager.sessions["foo"] = &client.Client{} manager.sessions["bar"] = &client.Client{} @@ -58,7 +58,7 @@ func TestSessionManager(t *testing.T) { assert.Equal(t, 2, manager.Len()) }) - t.Run("cleans up stale sessions", func(t *testing.T) { + t.Run("clean up stale sessions", func(t *testing.T) { defer func() { command.Opts.ConnectionIdleTimeout = 0 }() From 73dfcc46c3250fa4f87518d744882cb178ea510c Mon Sep 17 00:00:00 2001 From: Dan Sosedoff Date: Fri, 2 Dec 2022 13:59:22 -0600 Subject: [PATCH 4/7] Verify client is closed --- pkg/api/session_manager_test.go | 1 + pkg/client/client.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/pkg/api/session_manager_test.go b/pkg/api/session_manager_test.go index 9387592..cd7ef55 100644 --- a/pkg/api/session_manager_test.go +++ b/pkg/api/session_manager_test.go @@ -75,5 +75,6 @@ func TestSessionManager(t *testing.T) { command.Opts.ConnectionIdleTimeout = 1 assert.Equal(t, 1, manager.Cleanup()) assert.Equal(t, 0, manager.Len()) + assert.True(t, conn.IsClosed()) }) } diff --git a/pkg/client/client.go b/pkg/client/client.go index d58365d..f24272c 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -442,6 +442,10 @@ func (client *Client) Close() error { return nil } +func (c *Client) IsClosed() bool { + return c.closed +} + func (client *Client) IsIdle() bool { mins := int(time.Since(client.lastQueryTime).Minutes()) From 0a133dc3958c51960993091a0f9fa4b10b7a6329 Mon Sep 17 00:00:00 2001 From: Dan Sosedoff Date: Fri, 2 Dec 2022 14:00:01 -0600 Subject: [PATCH 5/7] Reorder imports --- pkg/api/session_manager_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/api/session_manager_test.go b/pkg/api/session_manager_test.go index cd7ef55..60c0bd5 100644 --- a/pkg/api/session_manager_test.go +++ b/pkg/api/session_manager_test.go @@ -5,9 +5,10 @@ import ( "testing" "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/sosedoff/pgweb/pkg/client" "github.com/sosedoff/pgweb/pkg/command" - "github.com/stretchr/testify/assert" ) func TestSessionManager(t *testing.T) { From 16726e24610bd1c8926c8338824b2a019e435609 Mon Sep 17 00:00:00 2001 From: Dan Sosedoff Date: Fri, 2 Dec 2022 14:20:20 -0600 Subject: [PATCH 6/7] Add idle timeout into session manager --- pkg/api/session_manager.go | 21 +++++++++++++++++---- pkg/api/session_manager_test.go | 10 +++------- pkg/cli/cli.go | 2 ++ pkg/client/client.go | 10 +++++++++- 4 files changed, 31 insertions(+), 12 deletions(-) diff --git a/pkg/api/session_manager.go b/pkg/api/session_manager.go index 00478a8..9166f9f 100644 --- a/pkg/api/session_manager.go +++ b/pkg/api/session_manager.go @@ -10,9 +10,10 @@ import ( ) type SessionManager struct { - logger *logrus.Logger - sessions map[string]*client.Client - mu sync.Mutex + logger *logrus.Logger + sessions map[string]*client.Client + mu sync.Mutex + idleTimeout time.Duration } func NewSessionManager(logger *logrus.Logger) *SessionManager { @@ -23,6 +24,10 @@ func NewSessionManager(logger *logrus.Logger) *SessionManager { } } +func (m *SessionManager) SetIdleTimeout(timeout time.Duration) { + m.idleTimeout = timeout +} + func (m *SessionManager) IDs() []string { m.mu.Lock() defer m.mu.Unlock() @@ -79,6 +84,10 @@ func (m *SessionManager) Len() int { } func (m *SessionManager) Cleanup() int { + if m.idleTimeout == 0 { + return 0 + } + removed := 0 m.logger.Debug("starting idle sessions cleanup") @@ -97,6 +106,8 @@ func (m *SessionManager) Cleanup() int { } func (m *SessionManager) RunPeriodicCleanup() { + m.logger.WithField("timeout", m.idleTimeout).Info("session manager cleanup enabled") + for range time.Tick(time.Minute) { m.Cleanup() } @@ -106,9 +117,11 @@ func (m *SessionManager) staleSessions() []string { m.mu.TryLock() defer m.mu.Unlock() + now := time.Now() ids := []string{} + for id, conn := range m.sessions { - if conn.IsIdle() { + if now.Sub(conn.LastQueryTime()) > m.idleTimeout { ids = append(ids, id) } } diff --git a/pkg/api/session_manager_test.go b/pkg/api/session_manager_test.go index 60c0bd5..6ac8c12 100644 --- a/pkg/api/session_manager_test.go +++ b/pkg/api/session_manager_test.go @@ -3,12 +3,12 @@ package api import ( "sort" "testing" + "time" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/sosedoff/pgweb/pkg/client" - "github.com/sosedoff/pgweb/pkg/command" ) func TestSessionManager(t *testing.T) { @@ -60,20 +60,16 @@ func TestSessionManager(t *testing.T) { }) t.Run("clean up stale sessions", func(t *testing.T) { - defer func() { - command.Opts.ConnectionIdleTimeout = 0 - }() - manager := NewSessionManager(logrus.New()) conn := &client.Client{} manager.Add("foo", conn) - command.Opts.ConnectionIdleTimeout = 0 assert.Equal(t, 1, manager.Len()) assert.Equal(t, 0, manager.Cleanup()) assert.Equal(t, 1, manager.Len()) - command.Opts.ConnectionIdleTimeout = 1 + conn.Query("select 1") + manager.SetIdleTimeout(time.Minute) assert.Equal(t, 1, manager.Cleanup()) assert.Equal(t, 0, manager.Len()) assert.True(t, conn.IsClosed()) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 7bd07bf..bb3e306 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -8,6 +8,7 @@ import ( "regexp" "strings" "syscall" + "time" "github.com/gin-gonic/gin" "github.com/jessevdk/go-flags" @@ -266,6 +267,7 @@ func Run() { api.DbSessions = api.NewSessionManager(logger) if !command.Opts.DisableConnectionIdleTimeout { + api.DbSessions.SetIdleTimeout(time.Minute * time.Duration(command.Opts.ConnectionIdleTimeout)) go api.DbSessions.RunPeriodicCleanup() } } diff --git a/pkg/client/client.go b/pkg/client/client.go index f24272c..3d8bd50 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -335,6 +335,10 @@ func (client *Client) ServerVersion() string { } func (client *Client) query(query string, args ...interface{}) (*Result, error) { + if client.db == nil { + return nil, nil + } + // Update the last usage time defer func() { client.lastQueryTime = time.Now().UTC() @@ -366,7 +370,7 @@ func (client *Client) query(query string, args ...interface{}) (*Result, error) result := Result{ Columns: []string{"Rows Affected"}, Rows: []Row{ - Row{affected}, + {affected}, }, } @@ -446,6 +450,10 @@ func (c *Client) IsClosed() bool { return c.closed } +func (c *Client) LastQueryTime() time.Time { + return c.lastQueryTime +} + func (client *Client) IsIdle() bool { mins := int(time.Since(client.lastQueryTime).Minutes()) From 07d0010750ca23e9168c219990a7ca45bce1a080 Mon Sep 17 00:00:00 2001 From: Dan Sosedoff Date: Fri, 2 Dec 2022 14:22:50 -0600 Subject: [PATCH 7/7] Make linter happy --- pkg/api/session_manager_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/api/session_manager_test.go b/pkg/api/session_manager_test.go index 6ac8c12..594af36 100644 --- a/pkg/api/session_manager_test.go +++ b/pkg/api/session_manager_test.go @@ -68,7 +68,10 @@ func TestSessionManager(t *testing.T) { assert.Equal(t, 0, manager.Cleanup()) assert.Equal(t, 1, manager.Len()) - conn.Query("select 1") + res, err := conn.Query("select 1") + assert.Nil(t, res) + assert.Nil(t, err) + manager.SetIdleTimeout(time.Minute) assert.Equal(t, 1, manager.Cleanup()) assert.Equal(t, 0, manager.Len())