diff --git a/pubsub/pt.go b/pubsub/pt.go index 346ac6e..c226647 100644 --- a/pubsub/pt.go +++ b/pubsub/pt.go @@ -3,6 +3,7 @@ package pubsub import ( "fmt" "sync" + "time" ) type Step int @@ -78,16 +79,15 @@ func (pt *progressTracker) Publish(id string) (chan<- string, error) { go func() { var update string prodChanOpen := true - for { - for prodChanOpen { - update, prodChanOpen = <-prodChan - if !prodChanOpen { - break - } + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for range ticker.C { + done := false + for !done && prodChanOpen { select { case update, prodChanOpen = <-prodChan: default: - break + done = true } } var scs []chan<- string diff --git a/pubsub/pt_test.go b/pubsub/pt_test.go index 26d8623..8a26cc2 100644 --- a/pubsub/pt_test.go +++ b/pubsub/pt_test.go @@ -3,7 +3,6 @@ package pubsub import ( "sync" "testing" - "time" ) func TestDupePublisher(t *testing.T) { @@ -55,19 +54,31 @@ func TestPubSub(t *testing.T) { } wg := sync.WaitGroup{} wg.Add(1) + c := 0 + testc := make(chan int) go func() { for range sc { + if c == 0 { + close(testc) + } + c++ } wg.Done() }() for i := 0; i < 10; i++ { pc <- "blah" - if i == 4 || i == 5 { - time.Sleep(100 * time.Millisecond) + if i == 5 { + // time.Sleep(100 * time.Millisecond) + //time.Sleep(1 * time.Second) + <-testc } } close(pc) wg.Wait() + if c == 0 { + t.Fatal("There should be atleast one update") + } + t.Logf("c is :%d", c) sc2 := pt.Subscribe("foo") if sc2 != nil { t.Fatal("Subscriber after publisher done should return nil")