126 lines
2.3 KiB
Go
126 lines
2.3 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
//go:generate stringer -type=Step
|
|
type Step int
|
|
|
|
const (
|
|
NotStarted Step = iota
|
|
Publishing
|
|
Done
|
|
)
|
|
|
|
type InvalidState struct{ Step Step }
|
|
|
|
func (i InvalidState) Error() string {
|
|
return fmt.Sprintf("Invalid state: %v", i.Step)
|
|
}
|
|
|
|
// One thread publishes progress, one or more threads subscribes to watch the progress
|
|
// Subscribers may not get all updates. They will get the latest status when waiting on the channel
|
|
type ProgressTracker interface {
|
|
// Only one publisher sends update. Should close when done
|
|
// Error if there is/was existing publisher
|
|
Publish() (chan<- string, error)
|
|
|
|
// Can subscribe even if there no publisher yet
|
|
// If already done, nil channel is returned
|
|
// channel will be closed when done
|
|
Subscribe() <-chan string
|
|
}
|
|
|
|
type subChan struct {
|
|
c chan<- string
|
|
lastUpdateIndex int
|
|
}
|
|
|
|
type progressTracker struct {
|
|
step Step
|
|
m sync.Mutex
|
|
sc []subChan
|
|
}
|
|
|
|
func NewProgressTracker() ProgressTracker {
|
|
return &progressTracker{
|
|
step: NotStarted,
|
|
}
|
|
}
|
|
|
|
func (pt *progressTracker) Publish() (chan<- string, error) {
|
|
err := func() error {
|
|
pt.m.Lock()
|
|
defer pt.m.Unlock()
|
|
if pt.step != NotStarted {
|
|
return InvalidState{pt.step}
|
|
}
|
|
pt.step = Publishing
|
|
return nil
|
|
}()
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
prodChan := make(chan string, 100)
|
|
go func() {
|
|
var update string
|
|
prodChanOpen := true
|
|
ticker := time.NewTicker(100 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
lastUpdateIndex := 0
|
|
for range ticker.C {
|
|
LoopReader:
|
|
for prodChanOpen {
|
|
select {
|
|
case update, prodChanOpen = <-prodChan:
|
|
lastUpdateIndex++
|
|
default:
|
|
break LoopReader
|
|
}
|
|
}
|
|
var scs []subChan
|
|
func() {
|
|
pt.m.Lock()
|
|
defer pt.m.Unlock()
|
|
scs = pt.sc
|
|
if !prodChanOpen {
|
|
pt.step = Done
|
|
}
|
|
}()
|
|
if !prodChanOpen {
|
|
for _, subChan := range scs {
|
|
close(subChan.c)
|
|
}
|
|
return
|
|
}
|
|
for _, subChan := range scs {
|
|
if subChan.lastUpdateIndex != lastUpdateIndex {
|
|
select {
|
|
case subChan.c <- update:
|
|
subChan.lastUpdateIndex = lastUpdateIndex
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return prodChan, nil
|
|
}
|
|
|
|
func (pt *progressTracker) Subscribe() <-chan string {
|
|
c := make(chan string, 1)
|
|
sc := subChan{c: c}
|
|
pt.m.Lock()
|
|
defer pt.m.Unlock()
|
|
if pt.step == Done {
|
|
return nil
|
|
}
|
|
pt.sc = append(pt.sc, sc)
|
|
return c
|
|
}
|