tss/app/app.go

183 lines
4.0 KiB
Go
Raw Permalink Normal View History

2022-04-26 09:55:59 -04:00
package app
2022-05-01 15:32:41 -04:00
import (
"io"
2022-05-01 23:28:54 -04:00
"net/http"
2022-05-13 12:55:45 -04:00
"sync"
2022-05-01 23:28:54 -04:00
"time"
2022-05-01 15:32:41 -04:00
2022-06-17 17:51:16 -04:00
"github.com/go-logr/logr"
"go.balki.me/tss/db"
2022-05-03 17:14:37 -04:00
"go.balki.me/tss/parser"
2022-05-01 15:32:41 -04:00
"go.balki.me/tss/proxy"
2022-06-12 18:46:16 -04:00
"go.balki.me/tss/schedule"
2022-05-01 23:28:54 -04:00
"go.balki.me/tss/telegram"
2022-05-01 15:32:41 -04:00
)
2022-04-28 19:24:21 -04:00
2022-06-17 17:51:16 -04:00
var log = logr.Discard()
2022-06-20 15:49:55 -04:00
func SetLogger(l logr.Logger) {
log = l
2022-06-17 17:51:16 -04:00
}
func Run(configPath string) error {
2022-04-28 19:24:21 -04:00
cfg, err := ParseConfig(configPath)
if err != nil {
2022-06-17 17:51:16 -04:00
log.Error(err, "failed to parse config", "path", configPath)
return err
2022-04-28 19:24:21 -04:00
}
2022-06-12 18:46:16 -04:00
scheduler, err := schedule.NewScheduler(cfg.LastSuccessPath)
2022-05-01 23:28:54 -04:00
if err != nil {
2022-06-17 17:51:16 -04:00
log.Error(err, "failed to create scheduler", "path", cfg.LastSuccessPath)
return err
2022-05-01 23:28:54 -04:00
}
2022-04-28 19:24:21 -04:00
defer func() {
if err := scheduler.Save(); err != nil {
2022-06-17 17:51:16 -04:00
log.Error(err, "failed to save last success info", "path", cfg.LastSuccessPath)
2022-04-27 00:19:05 -04:00
}
2022-04-28 19:24:21 -04:00
}()
2022-04-26 09:55:59 -04:00
2022-05-01 23:28:54 -04:00
tgramProxy, err := proxy.GetTransport(cfg.TelegramProxy)
if err != nil {
2022-06-17 17:51:16 -04:00
log.Error(err, "failed to get proxy transport", "proxyURL", cfg.TelegramProxy)
return err
2022-05-01 23:28:54 -04:00
}
2022-06-21 21:32:16 -04:00
tgram := telegram.NewSender(tgramProxy, cfg.TelegramAuthToken)
2022-05-01 19:13:27 -04:00
2022-05-13 12:55:45 -04:00
wg := sync.WaitGroup{}
2022-05-13 13:56:34 -04:00
for i := range cfg.Feeds {
feed := &cfg.Feeds[i]
2022-04-28 19:24:21 -04:00
log.Info("processing feed", "feed", feed.Name)
2022-05-13 12:55:45 -04:00
wg.Add(1)
go func() {
ProcessFeed(feed, scheduler, cfg.DbDir, tgram)
wg.Done()
}()
2022-04-28 19:24:21 -04:00
}
2022-05-13 12:55:45 -04:00
wg.Wait()
2022-06-17 17:51:16 -04:00
return nil
2022-04-26 09:55:59 -04:00
}
2022-04-28 19:24:21 -04:00
2022-06-21 21:32:16 -04:00
func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgram telegram.Sender) {
2022-06-17 18:23:06 -04:00
log := log.WithValues("feed", feed.Name)
2022-04-28 19:24:21 -04:00
sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron)
if err != nil {
2022-06-17 18:23:06 -04:00
log.Error(err, "shouldDownload failed")
2022-04-28 19:24:21 -04:00
return
}
2022-05-01 15:32:41 -04:00
2022-04-28 19:24:21 -04:00
if !sd {
2022-06-17 18:23:06 -04:00
log.Info("skipping feed due to schedule")
2022-04-28 19:24:21 -04:00
return
}
2022-05-01 15:32:41 -04:00
2022-06-17 17:51:16 -04:00
fdb, err := db.NewDB(dbDir, feed.Name)
2022-05-01 15:32:41 -04:00
if err != nil {
2022-06-17 18:23:06 -04:00
log.Error(err, "failed to get db", "db_dir", dbDir)
2022-05-01 15:32:41 -04:00
return
}
2022-05-27 18:26:30 -04:00
var entries []parser.FeedEntry
2022-06-21 21:32:16 -04:00
for _, url := range feed.URL {
2022-05-27 18:26:30 -04:00
data, err := Download(url, feed.Proxy)
if err != nil {
2022-06-17 18:23:06 -04:00
log.Error(err, "download failed", "url", url, "proxy", feed.Proxy)
2022-05-27 18:26:30 -04:00
return
}
2022-05-01 15:32:41 -04:00
2022-05-27 18:26:30 -04:00
currentEntries, err := parser.ParseFeed(feed.Type, data)
if err != nil {
2022-06-17 18:23:06 -04:00
log.Error(err, "feed parsing failed", "url", url, "data", data)
2022-05-27 18:26:30 -04:00
return
}
entries = append(entries, currentEntries...)
2022-04-28 19:24:21 -04:00
}
2022-05-01 15:32:41 -04:00
scheduler.Good(feed.Name)
2022-05-23 13:56:19 -04:00
stat := struct {
Total int
New int
Filtered int
Error int
}{}
defer func() {
2022-06-17 18:23:06 -04:00
log.Info("done processing feed", "total", stat.Total, "new", stat.New, "filtered", stat.Filtered, "error", stat.Error)
2022-05-23 13:56:19 -04:00
}()
stat.Total = len(entries)
2022-06-17 17:51:16 -04:00
var records []db.Record
2022-05-03 17:14:37 -04:00
var newEntries []parser.FeedEntry
2022-06-17 17:06:45 -04:00
if fdb.IsNewFeed() {
2022-05-23 13:56:19 -04:00
stat.New = stat.Total
2022-05-02 23:23:56 -04:00
ftl := int(feed.FirstTimeLimit)
if feed.FirstTimeLimit == NoLimit || len(entries) <= ftl {
newEntries = entries
} else {
2022-05-03 17:14:37 -04:00
var filteredEntries []parser.FeedEntry
2022-05-02 23:23:56 -04:00
newEntries, filteredEntries = entries[:ftl], entries[ftl:]
for _, entry := range filteredEntries {
2022-06-17 17:51:16 -04:00
records = append(records, db.Record{
2022-05-02 23:23:56 -04:00
Time: time.Now(),
2022-06-17 17:51:16 -04:00
Status: db.Filtered,
2022-05-02 23:23:56 -04:00
Filter: "FirstTime",
FeedEntry: entry,
})
}
2022-05-23 13:56:19 -04:00
stat.Filtered = len(filteredEntries)
2022-05-02 23:23:56 -04:00
}
} else {
2022-06-17 17:06:45 -04:00
newEntries, err = fdb.Filter(entries)
2022-05-02 23:23:56 -04:00
if err != nil {
2022-06-17 18:23:06 -04:00
log.Error(err, "failed to filter entries")
2022-05-23 13:56:19 -04:00
return
2022-05-02 23:23:56 -04:00
}
2022-05-23 13:56:19 -04:00
stat.New = len(newEntries)
2022-05-01 15:32:41 -04:00
}
2022-05-01 23:28:54 -04:00
2022-05-02 23:23:56 -04:00
for _, entry := range newEntries {
2022-06-17 17:51:16 -04:00
r := db.Record{
2022-05-02 23:23:56 -04:00
Time: time.Now(),
2022-05-01 23:28:54 -04:00
FeedEntry: entry,
}
2022-05-07 21:01:14 -04:00
err := tgram.SendLink(entry.Link, feed.Channel, feed.Rhash, feed.Title)
2022-05-01 23:28:54 -04:00
if err != nil {
2022-06-17 18:23:06 -04:00
log.Error(err, "failed to send to telegram", "link", entry.Link, "channel", feed.Channel, "rhash", feed.Rhash)
2022-06-17 17:51:16 -04:00
r.Status = db.Error
2022-05-23 13:56:19 -04:00
stat.Error++
2022-05-01 23:28:54 -04:00
} else {
2022-06-17 17:51:16 -04:00
r.Status = db.Sent
2022-05-01 23:28:54 -04:00
}
records = append(records, r)
}
2022-06-17 17:06:45 -04:00
err = fdb.Save(records)
2022-05-01 23:28:54 -04:00
if err != nil {
2022-06-17 18:23:06 -04:00
log.Error(err, "failed to save sent records", "num_records", len(records))
2022-05-01 23:28:54 -04:00
}
2022-05-01 15:32:41 -04:00
}
2022-06-21 21:32:16 -04:00
func Download(url string, proxyURL string) ([]byte, error) {
transport, err := proxy.GetTransport(proxyURL)
2022-05-01 15:32:41 -04:00
if err != nil {
return nil, err
}
2022-05-01 23:28:54 -04:00
client := &http.Client{Transport: transport}
2022-05-01 15:32:41 -04:00
res, err := client.Get(url)
if err != nil {
return nil, err
}
2022-06-21 21:32:16 -04:00
defer func() {
err := res.Body.Close()
if err != nil {
log.Error(err, "Body.Close() failed")
}
}()
2022-05-01 15:32:41 -04:00
return io.ReadAll(res.Body)
2022-04-28 19:24:21 -04:00
}