more refactor

This commit is contained in:
Balakrishnan Balasubramanian 2022-06-26 23:13:57 -04:00
parent 2ff289becf
commit ded32cd3e6
2 changed files with 21 additions and 10 deletions

View File

@ -3,6 +3,7 @@ package pubsub
import ( import (
"fmt" "fmt"
"sync" "sync"
"time"
) )
type Step int type Step int
@ -78,16 +79,15 @@ func (pt *progressTracker) Publish(id string) (chan<- string, error) {
go func() { go func() {
var update string var update string
prodChanOpen := true prodChanOpen := true
for { ticker := time.NewTicker(100 * time.Millisecond)
for prodChanOpen { defer ticker.Stop()
update, prodChanOpen = <-prodChan for range ticker.C {
if !prodChanOpen { done := false
break for !done && prodChanOpen {
}
select { select {
case update, prodChanOpen = <-prodChan: case update, prodChanOpen = <-prodChan:
default: default:
break done = true
} }
} }
var scs []chan<- string var scs []chan<- string

View File

@ -3,7 +3,6 @@ package pubsub
import ( import (
"sync" "sync"
"testing" "testing"
"time"
) )
func TestDupePublisher(t *testing.T) { func TestDupePublisher(t *testing.T) {
@ -55,19 +54,31 @@ func TestPubSub(t *testing.T) {
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
c := 0
testc := make(chan int)
go func() { go func() {
for range sc { for range sc {
if c == 0 {
close(testc)
}
c++
} }
wg.Done() wg.Done()
}() }()
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
pc <- "blah" pc <- "blah"
if i == 4 || i == 5 { if i == 5 {
time.Sleep(100 * time.Millisecond) // time.Sleep(100 * time.Millisecond)
//time.Sleep(1 * time.Second)
<-testc
} }
} }
close(pc) close(pc)
wg.Wait() wg.Wait()
if c == 0 {
t.Fatal("There should be atleast one update")
}
t.Logf("c is :%d", c)
sc2 := pt.Subscribe("foo") sc2 := pt.Subscribe("foo")
if sc2 != nil { if sc2 != nil {
t.Fatal("Subscriber after publisher done should return nil") t.Fatal("Subscriber after publisher done should return nil")