concurrent download

This commit is contained in:
Balakrishnan Balasubramanian 2022-05-13 12:55:45 -04:00
parent 7d8f7b58a8
commit 4270b9ae24
3 changed files with 18 additions and 3 deletions

View File

@ -1,4 +1,4 @@
## MVP ## MVP DONE
* ✓ Write TODO * ✓ Write TODO
* ✓ Cron * ✓ Cron
@ -14,7 +14,7 @@
* ✓ Wrap content inside item ⚠️ atom calls it entry * ✓ Wrap content inside item ⚠️ atom calls it entry
* ✓ Implement atom * ✓ Implement atom
* ✓ Cleanup * ✓ Cleanup
* Show Feed name instead of Link * Show Feed name instead of Link
* ✓ Make Rhash optional * ✓ Make Rhash optional
## Issues ## Issues

View File

@ -3,6 +3,7 @@ package app
import ( import (
"io" "io"
"net/http" "net/http"
"sync"
"time" "time"
"go.balki.me/tss/log" "go.balki.me/tss/log"
@ -36,10 +37,16 @@ func Run(configPath string) {
tgram := telegram.NewTelegramSender(tgramProxy, cfg.TelegramAuthToken) tgram := telegram.NewTelegramSender(tgramProxy, cfg.TelegramAuthToken)
wg := sync.WaitGroup{}
for _, feed := range cfg.Feeds { for _, feed := range cfg.Feeds {
log.Info("processing feed", "feed", feed.Name) log.Info("processing feed", "feed", feed.Name)
wg.Add(1)
go func() {
ProcessFeed(feed, scheduler, cfg.DbDir, tgram) ProcessFeed(feed, scheduler, cfg.DbDir, tgram)
wg.Done()
}()
} }
wg.Wait()
} }
func ProcessFeed(feed FeedCfg, scheduler Scheduler, dbDir string, tgram telegram.TelegramSender) { func ProcessFeed(feed FeedCfg, scheduler Scheduler, dbDir string, tgram telegram.TelegramSender) {

View File

@ -4,6 +4,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"os" "os"
"sync"
"time" "time"
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
@ -19,6 +20,7 @@ type Scheduler interface {
type scheduler struct { type scheduler struct {
filePath string filePath string
mutex sync.Mutex
lastSuccessTime map[string]time.Time lastSuccessTime map[string]time.Time
} }
@ -45,6 +47,8 @@ func NewScheduler(filePath string) (Scheduler, error) {
} }
func (s *scheduler) Save() error { func (s *scheduler) Save() error {
s.mutex.Lock()
defer s.mutex.Unlock()
data, err := yaml.Marshal(&s.lastSuccessTime) data, err := yaml.Marshal(&s.lastSuccessTime)
if err != nil { if err != nil {
return err return err
@ -53,10 +57,14 @@ func (s *scheduler) Save() error {
} }
func (s *scheduler) Good(name string) { func (s *scheduler) Good(name string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.lastSuccessTime[name] = time.Now() s.lastSuccessTime[name] = time.Now()
} }
func (s *scheduler) ShouldDownload(name string, scheduleSpec string) (bool, error) { func (s *scheduler) ShouldDownload(name string, scheduleSpec string) (bool, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
lastSuccess, ok := s.lastSuccessTime[name] lastSuccess, ok := s.lastSuccessTime[name]
if !ok { if !ok {
return true, nil return true, nil