From 1fab1e009572a6a6418f00e9276cedb54d2d067c Mon Sep 17 00:00:00 2001 From: balki <3070606-balki@users.noreply.gitlab.com> Date: Mon, 27 Jun 2022 11:47:54 -0400 Subject: [PATCH] dont send duplicate updates --- pubsub/pt.go | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/pubsub/pt.go b/pubsub/pt.go index 3861cab..6c82cfd 100644 --- a/pubsub/pt.go +++ b/pubsub/pt.go @@ -15,10 +15,15 @@ const ( Done ) +type subChan struct { + c chan<- string + lastUpdateIndex int +} + type subscriberState struct { - step Step - m sync.Mutex - subChans []chan<- string + step Step + m sync.Mutex + sc []subChan } type InvalidState struct{ Step Step } @@ -82,23 +87,25 @@ func (pt *progressTracker) Publish(id string) (chan<- string, error) { prodChanOpen := true ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() + lastUpdateIndex := 0 for range ticker.C { - done := false - for !done && prodChanOpen { + LoopReader: + for prodChanOpen { select { case update, prodChanOpen = <-prodChan: + lastUpdateIndex++ default: - done = true + break LoopReader } } - var scs []chan<- string + var scs []subChan func() { state.m.Lock() defer state.m.Unlock() - scs = state.subChans + scs = state.sc if !prodChanOpen { for _, subChan := range scs { - close(subChan) + close(subChan.c) } state.step = Done } @@ -107,9 +114,12 @@ func (pt *progressTracker) Publish(id string) (chan<- string, error) { return } for _, subChan := range scs { - select { - case subChan <- update: - default: + if subChan.lastUpdateIndex != lastUpdateIndex { + select { + case subChan.c <- update: + subChan.lastUpdateIndex = lastUpdateIndex + default: + } } } } @@ -119,6 +129,7 @@ func (pt *progressTracker) Publish(id string) (chan<- string, error) { func (pt *progressTracker) Subscribe(id string) <-chan string { c := make(chan string, 1) + sc := subChan{c: c} var state *subscriberState func() { pt.m.Lock() @@ -126,8 +137,8 @@ func (pt *progressTracker) Subscribe(id string) <-chan string { state = pt.subscribers[id] if state == nil { pt.subscribers[id] = &subscriberState{ - step: NotStarted, - subChans: []chan<- string{c}, + step: NotStarted, + sc: []subChan{sc}, } } }() @@ -139,6 +150,6 @@ func (pt *progressTracker) Subscribe(id string) <-chan string { if state.step == Done { return nil } - state.subChans = append(state.subChans, c) + state.sc = append(state.sc, sc) return c }