tss/app/app.go
2022-06-17 17:06:45 -04:00

166 lines
4.0 KiB
Go

package app
import (
"io"
"net/http"
"sync"
"time"
"go.balki.me/tss/log"
"go.balki.me/tss/parser"
"go.balki.me/tss/proxy"
"go.balki.me/tss/schedule"
"go.balki.me/tss/telegram"
)
func Run(configPath string) {
cfg, err := ParseConfig(configPath)
if err != nil {
log.Panic("failed to parse config", "path", configPath, "err", err)
}
scheduler, err := schedule.NewScheduler(cfg.LastSuccessPath)
if err != nil {
log.Panic("failed to create scheduler", "path", cfg.LastSuccessPath, "error", err)
}
defer func() {
if err := scheduler.Save(); err != nil {
log.Panic("failed to save last success info", "path", cfg.LastSuccessPath, "err", err)
}
}()
tgramProxy, err := proxy.GetTransport(cfg.TelegramProxy)
if err != nil {
log.Panic("failed to get proxy transport", "proxyURL", cfg.TelegramProxy, "error", err)
}
tgram := telegram.NewTelegramSender(tgramProxy, cfg.TelegramAuthToken)
wg := sync.WaitGroup{}
for i := range cfg.Feeds {
feed := &cfg.Feeds[i]
log.Info("processing feed", "feed", feed.Name)
wg.Add(1)
go func() {
ProcessFeed(feed, scheduler, cfg.DbDir, tgram)
wg.Done()
}()
}
wg.Wait()
}
func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgram telegram.TelegramSender) {
sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron)
if err != nil {
log.Error("shouldDownload failed", "feed", feed.Name, "err", err)
return
}
if !sd {
log.Info("skipping feed due to schedule", "feed", feed.Name)
return
}
fdb, err := NewDB(dbDir, feed.Name)
if err != nil {
log.Error("failed to get db", "feed", feed.Name, "db_dir", dbDir, "error", err)
return
}
var entries []parser.FeedEntry
for _, url := range feed.Url {
data, err := Download(url, feed.Proxy)
if err != nil {
log.Error("download failed", "feed", feed.Name, "url", url, "proxy", feed.Proxy, "error", err)
return
}
currentEntries, err := parser.ParseFeed(feed.Type, data)
if err != nil {
log.Error("feed parsing failed", "feed", feed.Name, "url", url, "data", data, "error", err)
return
}
entries = append(entries, currentEntries...)
}
scheduler.Good(feed.Name)
stat := struct {
Total int
New int
Filtered int
Error int
}{}
defer func() {
log.Info("done processing feed", "feed", feed.Name, "total", stat.Total, "new", stat.New, "filtered", stat.Filtered, "error", stat.Error)
}()
stat.Total = len(entries)
var records []Record
var newEntries []parser.FeedEntry
if fdb.IsNewFeed() {
stat.New = stat.Total
ftl := int(feed.FirstTimeLimit)
if feed.FirstTimeLimit == NoLimit || len(entries) <= ftl {
newEntries = entries
} else {
var filteredEntries []parser.FeedEntry
newEntries, filteredEntries = entries[:ftl], entries[ftl:]
for _, entry := range filteredEntries {
records = append(records, Record{
Time: time.Now(),
Status: Filtered,
Filter: "FirstTime",
FeedEntry: entry,
})
}
stat.Filtered = len(filteredEntries)
}
} else {
newEntries, err = fdb.Filter(entries)
if err != nil {
log.Error("failed to filter entries", "feed", feed.Name, "error", err)
return
}
stat.New = len(newEntries)
}
for _, entry := range newEntries {
r := Record{
Time: time.Now(),
FeedEntry: entry,
}
err := tgram.SendLink(entry.Link, feed.Channel, feed.Rhash, feed.Title)
if err != nil {
log.Error("failed to send to telegram", "feed", feed.Name, "link", entry.Link, "channel", feed.Channel, "rhash", feed.Rhash, "error", err)
r.Status = Error
stat.Error++
} else {
r.Status = Sent
}
records = append(records, r)
}
err = fdb.Save(records)
if err != nil {
log.Error("failed to save sent records", "feed", feed.Name, "num_records", len(records), "error", err)
}
}
func Download(url string, proxyUrl string) ([]byte, error) {
transport, err := proxy.GetTransport(proxyUrl)
if err != nil {
return nil, err
}
client := &http.Client{Transport: transport}
res, err := client.Get(url)
if err != nil {
return nil, err
}
defer res.Body.Close()
return io.ReadAll(res.Body)
}