refactor db out
This commit is contained in:
51
app/app.go
51
app/app.go
@ -6,34 +6,44 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.balki.me/tss/log"
|
||||
"github.com/go-logr/logr"
|
||||
"go.balki.me/tss/db"
|
||||
"go.balki.me/tss/parser"
|
||||
"go.balki.me/tss/proxy"
|
||||
"go.balki.me/tss/schedule"
|
||||
"go.balki.me/tss/telegram"
|
||||
)
|
||||
|
||||
func Run(configPath string) {
|
||||
var log = logr.Discard()
|
||||
|
||||
func SetLogger(log logr.Logger) {
|
||||
log = log
|
||||
}
|
||||
|
||||
func Run(configPath string) error {
|
||||
cfg, err := ParseConfig(configPath)
|
||||
|
||||
if err != nil {
|
||||
log.Panic("failed to parse config", "path", configPath, "err", err)
|
||||
log.Error(err, "failed to parse config", "path", configPath)
|
||||
return err
|
||||
}
|
||||
|
||||
scheduler, err := schedule.NewScheduler(cfg.LastSuccessPath)
|
||||
if err != nil {
|
||||
log.Panic("failed to create scheduler", "path", cfg.LastSuccessPath, "error", err)
|
||||
log.Error(err, "failed to create scheduler", "path", cfg.LastSuccessPath)
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := scheduler.Save(); err != nil {
|
||||
log.Panic("failed to save last success info", "path", cfg.LastSuccessPath, "err", err)
|
||||
log.Error(err, "failed to save last success info", "path", cfg.LastSuccessPath)
|
||||
}
|
||||
}()
|
||||
|
||||
tgramProxy, err := proxy.GetTransport(cfg.TelegramProxy)
|
||||
if err != nil {
|
||||
log.Panic("failed to get proxy transport", "proxyURL", cfg.TelegramProxy, "error", err)
|
||||
log.Error(err, "failed to get proxy transport", "proxyURL", cfg.TelegramProxy)
|
||||
return err
|
||||
}
|
||||
|
||||
tgram := telegram.NewTelegramSender(tgramProxy, cfg.TelegramAuthToken)
|
||||
@ -49,12 +59,13 @@ func Run(configPath string) {
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgram telegram.TelegramSender) {
|
||||
sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron)
|
||||
if err != nil {
|
||||
log.Error("shouldDownload failed", "feed", feed.Name, "err", err)
|
||||
log.Error(err, "shouldDownload failed", "feed", feed.Name)
|
||||
return
|
||||
}
|
||||
|
||||
@ -63,9 +74,9 @@ func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgra
|
||||
return
|
||||
}
|
||||
|
||||
fdb, err := NewDB(dbDir, feed.Name)
|
||||
fdb, err := db.NewDB(dbDir, feed.Name)
|
||||
if err != nil {
|
||||
log.Error("failed to get db", "feed", feed.Name, "db_dir", dbDir, "error", err)
|
||||
log.Error(err, "failed to get db", "feed", feed.Name, "db_dir", dbDir)
|
||||
return
|
||||
}
|
||||
|
||||
@ -73,13 +84,13 @@ func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgra
|
||||
for _, url := range feed.Url {
|
||||
data, err := Download(url, feed.Proxy)
|
||||
if err != nil {
|
||||
log.Error("download failed", "feed", feed.Name, "url", url, "proxy", feed.Proxy, "error", err)
|
||||
log.Error(err, "download failed", "feed", feed.Name, "url", url, "proxy", feed.Proxy)
|
||||
return
|
||||
}
|
||||
|
||||
currentEntries, err := parser.ParseFeed(feed.Type, data)
|
||||
if err != nil {
|
||||
log.Error("feed parsing failed", "feed", feed.Name, "url", url, "data", data, "error", err)
|
||||
log.Error(err, "feed parsing failed", "feed", feed.Name, "url", url, "data", data)
|
||||
return
|
||||
}
|
||||
entries = append(entries, currentEntries...)
|
||||
@ -100,7 +111,7 @@ func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgra
|
||||
|
||||
stat.Total = len(entries)
|
||||
|
||||
var records []Record
|
||||
var records []db.Record
|
||||
var newEntries []parser.FeedEntry
|
||||
if fdb.IsNewFeed() {
|
||||
stat.New = stat.Total
|
||||
@ -111,9 +122,9 @@ func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgra
|
||||
var filteredEntries []parser.FeedEntry
|
||||
newEntries, filteredEntries = entries[:ftl], entries[ftl:]
|
||||
for _, entry := range filteredEntries {
|
||||
records = append(records, Record{
|
||||
records = append(records, db.Record{
|
||||
Time: time.Now(),
|
||||
Status: Filtered,
|
||||
Status: db.Filtered,
|
||||
Filter: "FirstTime",
|
||||
FeedEntry: entry,
|
||||
})
|
||||
@ -123,30 +134,30 @@ func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgra
|
||||
} else {
|
||||
newEntries, err = fdb.Filter(entries)
|
||||
if err != nil {
|
||||
log.Error("failed to filter entries", "feed", feed.Name, "error", err)
|
||||
log.Error(err, "failed to filter entries", "feed", feed.Name)
|
||||
return
|
||||
}
|
||||
stat.New = len(newEntries)
|
||||
}
|
||||
|
||||
for _, entry := range newEntries {
|
||||
r := Record{
|
||||
r := db.Record{
|
||||
Time: time.Now(),
|
||||
FeedEntry: entry,
|
||||
}
|
||||
err := tgram.SendLink(entry.Link, feed.Channel, feed.Rhash, feed.Title)
|
||||
if err != nil {
|
||||
log.Error("failed to send to telegram", "feed", feed.Name, "link", entry.Link, "channel", feed.Channel, "rhash", feed.Rhash, "error", err)
|
||||
r.Status = Error
|
||||
log.Error(err, "failed to send to telegram", "feed", feed.Name, "link", entry.Link, "channel", feed.Channel, "rhash", feed.Rhash)
|
||||
r.Status = db.Error
|
||||
stat.Error++
|
||||
} else {
|
||||
r.Status = Sent
|
||||
r.Status = db.Sent
|
||||
}
|
||||
records = append(records, r)
|
||||
}
|
||||
err = fdb.Save(records)
|
||||
if err != nil {
|
||||
log.Error("failed to save sent records", "feed", feed.Name, "num_records", len(records), "error", err)
|
||||
log.Error(err, "failed to save sent records", "feed", feed.Name, "num_records", len(records))
|
||||
}
|
||||
}
|
||||
|
||||
|
122
app/db.go
122
app/db.go
@ -1,122 +0,0 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.balki.me/tss/log"
|
||||
"go.balki.me/tss/parser"
|
||||
)
|
||||
|
||||
type Status string
|
||||
|
||||
const (
|
||||
Sent Status = "SENT"
|
||||
Filtered = "FILTERED"
|
||||
Error = "ERROR"
|
||||
)
|
||||
|
||||
//default format used by yaml.Marshal
|
||||
const TimeFormat string = "2006-01-02T15:04:05.999999999-07:00"
|
||||
|
||||
type Record struct {
|
||||
Time time.Time
|
||||
Status Status
|
||||
Filter string
|
||||
FeedEntry parser.FeedEntry
|
||||
}
|
||||
|
||||
type DB interface {
|
||||
IsNewFeed() bool
|
||||
Filter(entries []parser.FeedEntry) ([]parser.FeedEntry, error)
|
||||
Save([]Record) error
|
||||
}
|
||||
|
||||
type db struct {
|
||||
dbPath string
|
||||
seenLinks map[string]struct{}
|
||||
}
|
||||
|
||||
func (d *db) IsNewFeed() bool {
|
||||
return len(d.seenLinks) == 0
|
||||
}
|
||||
|
||||
func NewDB(storageDir, feedName string) (DB, error) {
|
||||
dbPath := path.Join(storageDir, fmt.Sprintf("%s.csv", feedName))
|
||||
db := db{
|
||||
dbPath: dbPath,
|
||||
seenLinks: map[string]struct{}{},
|
||||
}
|
||||
f, err := os.Open(dbPath)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
log.Info("db file does not exist, will be created", "feed", feedName, "path", dbPath)
|
||||
return &db, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
reader := csv.NewReader(f)
|
||||
records, err := reader.ReadAll()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse csv, path:%v, error:%w", dbPath, err)
|
||||
}
|
||||
for _, rec := range records {
|
||||
var recStatus Status = Status(rec[2])
|
||||
if recStatus == Sent || recStatus == Filtered {
|
||||
db.seenLinks[rec[1]] = struct{}{}
|
||||
}
|
||||
}
|
||||
return &db, nil
|
||||
}
|
||||
|
||||
func (d *db) Filter(entries []parser.FeedEntry) ([]parser.FeedEntry, error) {
|
||||
var filteredEntries []parser.FeedEntry
|
||||
for _, entry := range entries {
|
||||
if _, ok := d.seenLinks[entry.Link]; !ok {
|
||||
filteredEntries = append(filteredEntries, entry)
|
||||
d.seenLinks[entry.Link] = struct{}{}
|
||||
}
|
||||
}
|
||||
return filteredEntries, nil
|
||||
}
|
||||
|
||||
func (d *db) Save(records []Record) error {
|
||||
if len(records) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(d.dbPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
csvw := csv.NewWriter(f)
|
||||
defer csvw.Flush()
|
||||
|
||||
if d.IsNewFeed() {
|
||||
csvw.Write([]string{
|
||||
/* 1 */ "Date",
|
||||
/* 2 */ "Link",
|
||||
/* 3 */ "Status",
|
||||
/* 4 */ "Filter",
|
||||
/* 5 */ "Content",
|
||||
})
|
||||
}
|
||||
for _, r := range records {
|
||||
csvw.Write([]string{
|
||||
/* 1 */ r.Time.Format(TimeFormat),
|
||||
/* 2 */ r.FeedEntry.Link,
|
||||
/* 3 */ string(r.Status),
|
||||
/* 4 */ r.Filter,
|
||||
/* 5 */ fmt.Sprintf("<item>%s</item>", strings.ReplaceAll(r.FeedEntry.Content, "\n", " ")),
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user