package pubsub import ( "fmt" "sync" "time" ) //go:generate stringer -type=Step type Step int const ( NotStarted Step = iota Publishing Done ) type subscriberState struct { step Step m sync.Mutex subChans []chan<- string } type InvalidState struct{ Step Step } func (i InvalidState) Error() string { return fmt.Sprintf("Invalid state: %v", i.Step) } // One thread publishes progress, one or more threads subscribes to watch the progress // Subscribers may not get all updates. They will get the latest status when waiting on the channel type ProgressTracker interface { // Only one publisher sends update. Should close when done // Error if there is/was existing publisher Publish(id string) (chan<- string, error) // Can subscribe even if there no publisher yet // If already done, nil channel is returned // channel will be closed when done Subscribe(id string) <-chan string } type progressTracker struct { subscribers map[string]*subscriberState m sync.Mutex } func NewProgressTracker() ProgressTracker { return &progressTracker{ subscribers: map[string]*subscriberState{}, } } func (pt *progressTracker) Publish(id string) (chan<- string, error) { var state *subscriberState func() { pt.m.Lock() defer pt.m.Unlock() state = pt.subscribers[id] if state == nil { state = &subscriberState{step: NotStarted} pt.subscribers[id] = state } }() err := func() error { state.m.Lock() defer state.m.Unlock() if state.step != NotStarted { return InvalidState{state.step} } state.step = Publishing return nil }() if err != nil { return nil, err } prodChan := make(chan string, 100) go func() { var update string prodChanOpen := true ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for range ticker.C { done := false for !done && prodChanOpen { select { case update, prodChanOpen = <-prodChan: default: done = true } } var scs []chan<- string func() { state.m.Lock() defer state.m.Unlock() scs = state.subChans if !prodChanOpen { for _, subChan := range scs { close(subChan) } state.step = Done } }() if !prodChanOpen { return } for _, subChan := range scs { select { case subChan <- update: default: } } } }() return prodChan, nil } func (pt *progressTracker) Subscribe(id string) <-chan string { c := make(chan string, 1) var state *subscriberState func() { pt.m.Lock() defer pt.m.Unlock() state = pt.subscribers[id] if state == nil { pt.subscribers[id] = &subscriberState{ step: NotStarted, subChans: []chan<- string{c}, } } }() if state == nil { return c } state.m.Lock() defer state.m.Unlock() if state.step == Done { return nil } state.subChans = append(state.subChans, c) return c }