183 lines
4.0 KiB
Go
183 lines
4.0 KiB
Go
package app
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"go.balki.me/tss/db"
|
|
"go.balki.me/tss/parser"
|
|
"go.balki.me/tss/proxy"
|
|
"go.balki.me/tss/schedule"
|
|
"go.balki.me/tss/telegram"
|
|
)
|
|
|
|
var log = logr.Discard()
|
|
|
|
func SetLogger(l logr.Logger) {
|
|
log = l
|
|
}
|
|
|
|
func Run(configPath string) error {
|
|
cfg, err := ParseConfig(configPath)
|
|
|
|
if err != nil {
|
|
log.Error(err, "failed to parse config", "path", configPath)
|
|
return err
|
|
}
|
|
|
|
scheduler, err := schedule.NewScheduler(cfg.LastSuccessPath)
|
|
if err != nil {
|
|
log.Error(err, "failed to create scheduler", "path", cfg.LastSuccessPath)
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
if err := scheduler.Save(); err != nil {
|
|
log.Error(err, "failed to save last success info", "path", cfg.LastSuccessPath)
|
|
}
|
|
}()
|
|
|
|
tgramProxy, err := proxy.GetTransport(cfg.TelegramProxy)
|
|
if err != nil {
|
|
log.Error(err, "failed to get proxy transport", "proxyURL", cfg.TelegramProxy)
|
|
return err
|
|
}
|
|
|
|
tgram := telegram.NewSender(tgramProxy, cfg.TelegramAuthToken)
|
|
|
|
wg := sync.WaitGroup{}
|
|
for i := range cfg.Feeds {
|
|
feed := &cfg.Feeds[i]
|
|
log.Info("processing feed", "feed", feed.Name)
|
|
wg.Add(1)
|
|
go func() {
|
|
ProcessFeed(feed, scheduler, cfg.DbDir, tgram)
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgram telegram.Sender) {
|
|
log := log.WithValues("feed", feed.Name)
|
|
sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron)
|
|
if err != nil {
|
|
log.Error(err, "shouldDownload failed")
|
|
return
|
|
}
|
|
|
|
if !sd {
|
|
log.Info("skipping feed due to schedule")
|
|
return
|
|
}
|
|
|
|
fdb, err := db.NewDB(dbDir, feed.Name)
|
|
if err != nil {
|
|
log.Error(err, "failed to get db", "db_dir", dbDir)
|
|
return
|
|
}
|
|
|
|
var entries []parser.FeedEntry
|
|
for _, url := range feed.URL {
|
|
data, err := Download(url, feed.Proxy)
|
|
if err != nil {
|
|
log.Error(err, "download failed", "url", url, "proxy", feed.Proxy)
|
|
return
|
|
}
|
|
|
|
currentEntries, err := parser.ParseFeed(feed.Type, data)
|
|
if err != nil {
|
|
log.Error(err, "feed parsing failed", "url", url, "data", data)
|
|
return
|
|
}
|
|
entries = append(entries, currentEntries...)
|
|
}
|
|
|
|
scheduler.Good(feed.Name)
|
|
|
|
stat := struct {
|
|
Total int
|
|
New int
|
|
Filtered int
|
|
Error int
|
|
}{}
|
|
|
|
defer func() {
|
|
log.Info("done processing feed", "total", stat.Total, "new", stat.New, "filtered", stat.Filtered, "error", stat.Error)
|
|
}()
|
|
|
|
stat.Total = len(entries)
|
|
|
|
var records []db.Record
|
|
var newEntries []parser.FeedEntry
|
|
if fdb.IsNewFeed() {
|
|
stat.New = stat.Total
|
|
ftl := int(feed.FirstTimeLimit)
|
|
if feed.FirstTimeLimit == NoLimit || len(entries) <= ftl {
|
|
newEntries = entries
|
|
} else {
|
|
var filteredEntries []parser.FeedEntry
|
|
newEntries, filteredEntries = entries[:ftl], entries[ftl:]
|
|
for _, entry := range filteredEntries {
|
|
records = append(records, db.Record{
|
|
Time: time.Now(),
|
|
Status: db.Filtered,
|
|
Filter: "FirstTime",
|
|
FeedEntry: entry,
|
|
})
|
|
}
|
|
stat.Filtered = len(filteredEntries)
|
|
}
|
|
} else {
|
|
newEntries, err = fdb.Filter(entries)
|
|
if err != nil {
|
|
log.Error(err, "failed to filter entries")
|
|
return
|
|
}
|
|
stat.New = len(newEntries)
|
|
}
|
|
|
|
for _, entry := range newEntries {
|
|
r := db.Record{
|
|
Time: time.Now(),
|
|
FeedEntry: entry,
|
|
}
|
|
err := tgram.SendLink(entry.Link, feed.Channel, feed.Rhash, feed.Title)
|
|
if err != nil {
|
|
log.Error(err, "failed to send to telegram", "link", entry.Link, "channel", feed.Channel, "rhash", feed.Rhash)
|
|
r.Status = db.Error
|
|
stat.Error++
|
|
} else {
|
|
r.Status = db.Sent
|
|
}
|
|
records = append(records, r)
|
|
}
|
|
err = fdb.Save(records)
|
|
if err != nil {
|
|
log.Error(err, "failed to save sent records", "num_records", len(records))
|
|
}
|
|
}
|
|
|
|
func Download(url string, proxyURL string) ([]byte, error) {
|
|
transport, err := proxy.GetTransport(proxyURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
client := &http.Client{Transport: transport}
|
|
res, err := client.Get(url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
err := res.Body.Close()
|
|
if err != nil {
|
|
log.Error(err, "Body.Close() failed")
|
|
}
|
|
}()
|
|
return io.ReadAll(res.Body)
|
|
}
|