package app import ( "io" "net/http" "sync" "time" "github.com/go-logr/logr" "go.balki.me/tss/db" "go.balki.me/tss/parser" "go.balki.me/tss/proxy" "go.balki.me/tss/schedule" "go.balki.me/tss/telegram" ) var log = logr.Discard() func SetLogger(l logr.Logger) { log = l } func Run(configPath string) error { cfg, err := ParseConfig(configPath) if err != nil { log.Error(err, "failed to parse config", "path", configPath) return err } scheduler, err := schedule.NewScheduler(cfg.LastSuccessPath) if err != nil { log.Error(err, "failed to create scheduler", "path", cfg.LastSuccessPath) return err } defer func() { if err := scheduler.Save(); err != nil { log.Error(err, "failed to save last success info", "path", cfg.LastSuccessPath) } }() tgramProxy, err := proxy.GetTransport(cfg.TelegramProxy) if err != nil { log.Error(err, "failed to get proxy transport", "proxyURL", cfg.TelegramProxy) return err } tgram := telegram.NewSender(tgramProxy, cfg.TelegramAuthToken) wg := sync.WaitGroup{} for i := range cfg.Feeds { feed := &cfg.Feeds[i] log.Info("processing feed", "feed", feed.Name) wg.Add(1) go func() { ProcessFeed(feed, scheduler, cfg.DbDir, tgram) wg.Done() }() } wg.Wait() return nil } func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgram telegram.Sender) { log := log.WithValues("feed", feed.Name) sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron) if err != nil { log.Error(err, "shouldDownload failed") return } if !sd { log.Info("skipping feed due to schedule") return } fdb, err := db.NewDB(dbDir, feed.Name) if err != nil { log.Error(err, "failed to get db", "db_dir", dbDir) return } var entries []parser.FeedEntry for _, url := range feed.URL { data, err := Download(url, feed.Proxy) if err != nil { log.Error(err, "download failed", "url", url, "proxy", feed.Proxy) return } currentEntries, err := parser.ParseFeed(feed.Type, data) if err != nil { log.Error(err, "feed parsing failed", "url", url, "data", data) return } entries = append(entries, currentEntries...) } scheduler.Good(feed.Name) stat := struct { Total int New int Filtered int Error int }{} defer func() { log.Info("done processing feed", "total", stat.Total, "new", stat.New, "filtered", stat.Filtered, "error", stat.Error) }() stat.Total = len(entries) var records []db.Record var newEntries []parser.FeedEntry if fdb.IsNewFeed() { stat.New = stat.Total ftl := int(feed.FirstTimeLimit) if feed.FirstTimeLimit == NoLimit || len(entries) <= ftl { newEntries = entries } else { var filteredEntries []parser.FeedEntry newEntries, filteredEntries = entries[:ftl], entries[ftl:] for _, entry := range filteredEntries { records = append(records, db.Record{ Time: time.Now(), Status: db.Filtered, Filter: "FirstTime", FeedEntry: entry, }) } stat.Filtered = len(filteredEntries) } } else { newEntries, err = fdb.Filter(entries) if err != nil { log.Error(err, "failed to filter entries") return } stat.New = len(newEntries) } for _, entry := range newEntries { r := db.Record{ Time: time.Now(), FeedEntry: entry, } err := tgram.SendLink(entry.Link, feed.Channel, feed.Rhash, feed.Title) if err != nil { log.Error(err, "failed to send to telegram", "link", entry.Link, "channel", feed.Channel, "rhash", feed.Rhash) r.Status = db.Error stat.Error++ } else { r.Status = db.Sent } records = append(records, r) } err = fdb.Save(records) if err != nil { log.Error(err, "failed to save sent records", "num_records", len(records)) } } func Download(url string, proxyURL string) ([]byte, error) { transport, err := proxy.GetTransport(proxyURL) if err != nil { return nil, err } client := &http.Client{Transport: transport} res, err := client.Get(url) if err != nil { return nil, err } defer func() { err := res.Body.Close() if err != nil { log.Error(err, "Body.Close() failed") } }() return io.ReadAll(res.Body) }