almost done
This commit is contained in:
parent
af4b7297b0
commit
38e964c522
47
app/app.go
47
app/app.go
@ -1,6 +1,11 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
import "go.balki.me/tss/log"
|
import (
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"go.balki.me/tss/log"
|
||||||
|
"go.balki.me/tss/proxy"
|
||||||
|
)
|
||||||
|
|
||||||
func Run(configPath string) {
|
func Run(configPath string) {
|
||||||
cfg, err := ParseConfig(configPath)
|
cfg, err := ParseConfig(configPath)
|
||||||
@ -19,7 +24,7 @@ func Run(configPath string) {
|
|||||||
|
|
||||||
for _, feed := range cfg.Feeds {
|
for _, feed := range cfg.Feeds {
|
||||||
log.Info("processing feed", "feed", feed.Name)
|
log.Info("processing feed", "feed", feed.Name)
|
||||||
ProcessFeed(feed, scheduler)
|
ProcessFeed(feed, scheduler, cfg.DbDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -53,19 +58,51 @@ func Run(configPath string) {
|
|||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProcessFeed(feed FeedCfg, scheduler *Scheduler) {
|
func ProcessFeed(feed FeedCfg, scheduler *Scheduler, dbDir string) {
|
||||||
sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron)
|
sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("shouldDownload failed", "feed", feed.Name, "err", err)
|
log.Error("shouldDownload failed", "feed", feed.Name, "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !sd {
|
if !sd {
|
||||||
log.Info("skipping feed due to schedule", "feed", feed.Name)
|
log.Info("skipping feed due to schedule", "feed", feed.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, err = Download(feed.Url, feed.Proxy)
|
|
||||||
|
db, err := NewDB(dbDir, feed.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("download failed", "feed", feed.Name, "url", feed.Url, "proxy", feed.Proxy)
|
log.Error("failed to get db", "feed", feed.Name, "db_dir", dbDir, "error", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data, err := Download(feed.Url, feed.Proxy)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("download failed", "feed", feed.Name, "url", feed.Url, "proxy", feed.Proxy, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
entries, err := ParseFeed(data)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("feed parsing failed", "feed", feed.Name, "data", data, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = db.Filter(entries)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("failed to filter entries", "feed", feed.Name, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Download(url string, proxyUrl string) ([]byte, error) {
|
||||||
|
client, err := proxy.GetClient(proxyUrl)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
res, err := client.Get(url)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
return io.ReadAll(res.Body)
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@ type FeedCfg struct {
|
|||||||
type Config struct {
|
type Config struct {
|
||||||
DataDir string `yaml:"data_dir"`
|
DataDir string `yaml:"data_dir"`
|
||||||
LastSuccessPath string `yaml:"last_loaded_path"`
|
LastSuccessPath string `yaml:"last_loaded_path"`
|
||||||
|
DbDir string `yaml:"db_dir"`
|
||||||
Feeds []FeedCfg `yaml:"feeds"`
|
Feeds []FeedCfg `yaml:"feeds"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -44,5 +45,9 @@ func ParseConfig(configPath string) (*Config, error) {
|
|||||||
c.LastSuccessPath = path.Join(c.DataDir, "last_success.yml")
|
c.LastSuccessPath = path.Join(c.DataDir, "last_success.yml")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.DbDir == "" {
|
||||||
|
c.DbDir = path.Join(c.DataDir, "feed_data")
|
||||||
|
}
|
||||||
|
|
||||||
return &c, nil
|
return &c, nil
|
||||||
}
|
}
|
||||||
|
97
app/db.go
Normal file
97
app/db.go
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/csv"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Status string
|
||||||
|
|
||||||
|
const (
|
||||||
|
Sent Status = "SENT"
|
||||||
|
Filtered = "FILTERED"
|
||||||
|
Error = "ERROR"
|
||||||
|
)
|
||||||
|
|
||||||
|
//default format used by yaml.Marshal
|
||||||
|
const TimeFormat string = "2006-01-02T15:04:05.999999999-07:00"
|
||||||
|
|
||||||
|
type Record struct {
|
||||||
|
Time time.Time
|
||||||
|
Status Status
|
||||||
|
FeedEntry FeedEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
type DB interface {
|
||||||
|
Filter(entries []FeedEntry) ([]FeedEntry, error)
|
||||||
|
Save([]Record) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type db struct {
|
||||||
|
dbPath string
|
||||||
|
seenLinks map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDB(storageDir, feedName string) (DB, error) {
|
||||||
|
dbPath := path.Join(storageDir, fmt.Sprintf("%s.csv", feedName))
|
||||||
|
f, err := os.Open(dbPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
reader := csv.NewReader(f)
|
||||||
|
records, err := reader.ReadAll()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse csv, path:%v, error:%w", dbPath, err)
|
||||||
|
}
|
||||||
|
db := db{dbPath: dbPath}
|
||||||
|
db.seenLinks = map[string]struct{}{}
|
||||||
|
for _, rec := range records {
|
||||||
|
var recStatus Status = Status(rec[2])
|
||||||
|
if recStatus == Sent || recStatus == Filtered {
|
||||||
|
db.seenLinks[rec[1]] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &db, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *db) Filter(entries []FeedEntry) ([]FeedEntry, error) {
|
||||||
|
var filteredEntries []FeedEntry
|
||||||
|
for _, entry := range entries {
|
||||||
|
if _, ok := d.seenLinks[entry.Link]; !ok {
|
||||||
|
filteredEntries = append(filteredEntries, entry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filteredEntries, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *db) Save(records []Record) error {
|
||||||
|
f, err := os.OpenFile(d.dbPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
csvw := csv.NewWriter(f)
|
||||||
|
if len(d.seenLinks) == 0 { //New file, write header
|
||||||
|
csvw.Write([]string{
|
||||||
|
"Date",
|
||||||
|
"Link",
|
||||||
|
"Status",
|
||||||
|
"FilteredBy",
|
||||||
|
"Content",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
for _, r := range records {
|
||||||
|
csvw.Write([]string{
|
||||||
|
r.Time.Format(TimeFormat),
|
||||||
|
r.FeedEntry.Link,
|
||||||
|
string(r.Status),
|
||||||
|
"-",
|
||||||
|
r.FeedEntry.Content,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -1,5 +1 @@
|
|||||||
package app
|
package app
|
||||||
|
|
||||||
func Download(url string, proxy string) ([]byte, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
28
app/parser.go
Normal file
28
app/parser.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/xml"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FeedEntry struct {
|
||||||
|
Title string `xml:"title"`
|
||||||
|
Link string `xml:"link"`
|
||||||
|
Author string `xml:"author"`
|
||||||
|
Guid string `xml:"guid"`
|
||||||
|
Description string `xml:"description"`
|
||||||
|
Content string `xml:",innerxml"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseFeed(data []byte) ([]FeedEntry, error) {
|
||||||
|
|
||||||
|
v := struct {
|
||||||
|
Items []FeedEntry `xml:"channel>item"`
|
||||||
|
}{}
|
||||||
|
|
||||||
|
err := xml.Unmarshal(data, &v)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return v.Items, nil
|
||||||
|
}
|
0
exp/csv/csv.go
Normal file
0
exp/csv/csv.go
Normal file
41
exp/csv/main.go
Normal file
41
exp/csv/main.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/csv"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gopkg.in/yaml.v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
fmt.Println("vim-go")
|
||||||
|
fb := bytes.NewReader(nil)
|
||||||
|
cr := csv.NewReader(fb)
|
||||||
|
records, err := cr.ReadAll()
|
||||||
|
fmt.Println(records, err)
|
||||||
|
fmt.Println(time.Now().String())
|
||||||
|
yesterday := time.Now().Add(-24 * time.Hour)
|
||||||
|
m := map[string]time.Time{
|
||||||
|
"Bala": time.Now(),
|
||||||
|
"Linus": yesterday,
|
||||||
|
}
|
||||||
|
data, _ := yaml.Marshal(&m)
|
||||||
|
fmt.Printf("%s\n", data)
|
||||||
|
//format := "2022-05-01T15:08:20.593630746-04:00"
|
||||||
|
format := "2006-01-02T15:04:05.999999999-07:00"
|
||||||
|
fmt.Println("============")
|
||||||
|
fmt.Println(yesterday.Format(format))
|
||||||
|
fmt.Println("============")
|
||||||
|
fmt.Println(yesterday.GoString())
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
|
Linus:
|
||||||
|
|
||||||
|
============
|
||||||
|
2022-04-30T15:14:40.302916106-04:00
|
||||||
|
2022-04-30T15:14:40.302916106-04:00
|
||||||
|
*/
|
Loading…
Reference in New Issue
Block a user