apply go tools cleanup
This commit is contained in:
parent
a61be99d03
commit
f1accd3006
17
app/app.go
17
app/app.go
@ -46,7 +46,7 @@ func Run(configPath string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
tgram := telegram.NewTelegramSender(tgramProxy, cfg.TelegramAuthToken)
|
tgram := telegram.NewSender(tgramProxy, cfg.TelegramAuthToken)
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
for i := range cfg.Feeds {
|
for i := range cfg.Feeds {
|
||||||
@ -62,7 +62,7 @@ func Run(configPath string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgram telegram.TelegramSender) {
|
func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgram telegram.Sender) {
|
||||||
log := log.WithValues("feed", feed.Name)
|
log := log.WithValues("feed", feed.Name)
|
||||||
sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron)
|
sd, err := scheduler.ShouldDownload(feed.Name, feed.Cron)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -82,7 +82,7 @@ func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgra
|
|||||||
}
|
}
|
||||||
|
|
||||||
var entries []parser.FeedEntry
|
var entries []parser.FeedEntry
|
||||||
for _, url := range feed.Url {
|
for _, url := range feed.URL {
|
||||||
data, err := Download(url, feed.Proxy)
|
data, err := Download(url, feed.Proxy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err, "download failed", "url", url, "proxy", feed.Proxy)
|
log.Error(err, "download failed", "url", url, "proxy", feed.Proxy)
|
||||||
@ -162,8 +162,8 @@ func ProcessFeed(feed *FeedCfg, scheduler schedule.Scheduler, dbDir string, tgra
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Download(url string, proxyUrl string) ([]byte, error) {
|
func Download(url string, proxyURL string) ([]byte, error) {
|
||||||
transport, err := proxy.GetTransport(proxyUrl)
|
transport, err := proxy.GetTransport(proxyURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -172,6 +172,11 @@ func Download(url string, proxyUrl string) ([]byte, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer func() {
|
||||||
|
err := res.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err, "Body.Close() failed")
|
||||||
|
}
|
||||||
|
}()
|
||||||
return io.ReadAll(res.Body)
|
return io.ReadAll(res.Body)
|
||||||
}
|
}
|
||||||
|
@ -25,9 +25,9 @@ type FeedCfg struct {
|
|||||||
// Telegram instant view rhash id, see https://instantview.telegram.org/#publishing-templates
|
// Telegram instant view rhash id, see https://instantview.telegram.org/#publishing-templates
|
||||||
Rhash string `yaml:"rhash" jsonschema:"default="`
|
Rhash string `yaml:"rhash" jsonschema:"default="`
|
||||||
|
|
||||||
// Feed Url, string or array of strings
|
// Feed URL, string or array of strings
|
||||||
// If array, all entries are merged, e.g ["https://example.com/blog/tag/foo/feed", "https://example.com/blog/tag/bar/feed" ]
|
// If array, all entries are merged, e.g ["https://example.com/blog/tag/foo/feed", "https://example.com/blog/tag/bar/feed" ]
|
||||||
Url FeedURL `yaml:"url" jsonschema:"required,oneof_type=string;array"`
|
URL FeedURL `yaml:"url" jsonschema:"required,oneof_type=string;array"`
|
||||||
|
|
||||||
// Cron expression, see https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format
|
// Cron expression, see https://pkg.go.dev/github.com/robfig/cron#hdr-CRON_Expression_Format
|
||||||
Cron string `yaml:"cron" jsonschema:"required"`
|
Cron string `yaml:"cron" jsonschema:"required"`
|
||||||
@ -118,7 +118,7 @@ func ParseConfig(configPath string) (*Config, error) {
|
|||||||
if c.TelegramProxy == "" {
|
if c.TelegramProxy == "" {
|
||||||
c.TelegramProxy = c.Proxy
|
c.TelegramProxy = c.Proxy
|
||||||
}
|
}
|
||||||
for i, _ := range c.Feeds {
|
for i := range c.Feeds {
|
||||||
feedCfg := &c.Feeds[i]
|
feedCfg := &c.Feeds[i]
|
||||||
if feedCfg.Proxy == "" {
|
if feedCfg.Proxy == "" {
|
||||||
feedCfg.Proxy = c.Proxy
|
feedCfg.Proxy = c.Proxy
|
||||||
@ -130,7 +130,7 @@ func ParseConfig(configPath string) (*Config, error) {
|
|||||||
c.TelegramProxy = ""
|
c.TelegramProxy = ""
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, _ := range c.Feeds {
|
for i := range c.Feeds {
|
||||||
feedCfg := &c.Feeds[i]
|
feedCfg := &c.Feeds[i]
|
||||||
if feedCfg.Proxy == "NONE" {
|
if feedCfg.Proxy == "NONE" {
|
||||||
feedCfg.Proxy = ""
|
feedCfg.Proxy = ""
|
||||||
|
34
db/db.go
34
db/db.go
@ -23,11 +23,11 @@ type Status string
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
Sent Status = "SENT"
|
Sent Status = "SENT"
|
||||||
Filtered = "FILTERED"
|
Filtered Status = "FILTERED"
|
||||||
Error = "ERROR"
|
Error Status = "ERROR"
|
||||||
)
|
)
|
||||||
|
|
||||||
//default format used by yaml.Marshal
|
// TimeFormat used by yaml.Marshal
|
||||||
const TimeFormat string = "2006-01-02T15:04:05.999999999-07:00"
|
const TimeFormat string = "2006-01-02T15:04:05.999999999-07:00"
|
||||||
|
|
||||||
type Record struct {
|
type Record struct {
|
||||||
@ -66,7 +66,13 @@ func NewDB(storageDir, feedName string) (DB, error) {
|
|||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer func() {
|
||||||
|
err := f.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err, "f.Close() failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
reader := csv.NewReader(f)
|
reader := csv.NewReader(f)
|
||||||
records, err := reader.ReadAll()
|
records, err := reader.ReadAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -74,7 +80,7 @@ func NewDB(storageDir, feedName string) (DB, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, rec := range records {
|
for _, rec := range records {
|
||||||
var recStatus Status = Status(rec[2])
|
recStatus := Status(rec[2])
|
||||||
if recStatus == Sent || recStatus == Filtered {
|
if recStatus == Sent || recStatus == Filtered {
|
||||||
db.seenLinks[rec[1]] = struct{}{}
|
db.seenLinks[rec[1]] = struct{}{}
|
||||||
}
|
}
|
||||||
@ -102,28 +108,40 @@ func (d *db) Save(records []Record) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer func() {
|
||||||
|
err := f.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err, "f.Close() failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
csvw := csv.NewWriter(f)
|
csvw := csv.NewWriter(f)
|
||||||
defer csvw.Flush()
|
defer csvw.Flush()
|
||||||
|
|
||||||
if d.IsNewFeed() {
|
if d.IsNewFeed() {
|
||||||
csvw.Write([]string{
|
err := csvw.Write([]string{
|
||||||
/* 1 */ "Date",
|
/* 1 */ "Date",
|
||||||
/* 2 */ "Link",
|
/* 2 */ "Link",
|
||||||
/* 3 */ "Status",
|
/* 3 */ "Status",
|
||||||
/* 4 */ "Filter",
|
/* 4 */ "Filter",
|
||||||
/* 5 */ "Content",
|
/* 5 */ "Content",
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for _, r := range records {
|
for _, r := range records {
|
||||||
csvw.Write([]string{
|
err := csvw.Write([]string{
|
||||||
/* 1 */ r.Time.Format(TimeFormat),
|
/* 1 */ r.Time.Format(TimeFormat),
|
||||||
/* 2 */ r.FeedEntry.Link,
|
/* 2 */ r.FeedEntry.Link,
|
||||||
/* 3 */ string(r.Status),
|
/* 3 */ string(r.Status),
|
||||||
/* 4 */ r.Filter,
|
/* 4 */ r.Filter,
|
||||||
/* 5 */ fmt.Sprintf("<item>%s</item>", strings.ReplaceAll(r.FeedEntry.Content, "\n", " ")),
|
/* 5 */ fmt.Sprintf("<item>%s</item>", strings.ReplaceAll(r.FeedEntry.Content, "\n", " ")),
|
||||||
})
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,7 @@ func ParseFeed(feedType FeedType, data []byte) ([]FeedEntry, error) {
|
|||||||
case Atom:
|
case Atom:
|
||||||
return parseAtom(data)
|
return parseAtom(data)
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("Unknown feed type: %s", feedType)
|
return nil, fmt.Errorf("unknown feed type: %s", feedType)
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseAtom(data []byte) ([]FeedEntry, error) {
|
func parseAtom(data []byte) ([]FeedEntry, error) {
|
||||||
|
@ -14,15 +14,15 @@ func GetTransport(proxy string) (http.RoundTripper, error) {
|
|||||||
if proxy == "" {
|
if proxy == "" {
|
||||||
return http.DefaultTransport, nil
|
return http.DefaultTransport, nil
|
||||||
}
|
}
|
||||||
proxyUrl, err := url.Parse(proxy)
|
proxyURL, err := url.Parse(proxy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse proxyUrl, url:%s, err: %w", proxy, err)
|
return nil, fmt.Errorf("failed to parse proxyUrl, url:%s, err: %w", proxy, err)
|
||||||
}
|
}
|
||||||
if proxyUrl.Host == "unix" && proxyUrl.Scheme == "socks5" &&
|
if proxyURL.Host == "unix" && proxyURL.Scheme == "socks5" &&
|
||||||
len(proxyUrl.Path) > 1 /* Path cannot be empty or just / */ {
|
len(proxyURL.Path) > 1 /* Path cannot be empty or just / */ {
|
||||||
return unixSocks5Proxy(proxyUrl)
|
return unixSocks5Proxy(proxyURL)
|
||||||
}
|
}
|
||||||
return proxyTcp(proxyUrl)
|
return proxyTCP(proxyURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
type forwardDialer func(ctx context.Context, network, address string) (net.Conn, error)
|
type forwardDialer func(ctx context.Context, network, address string) (net.Conn, error)
|
||||||
@ -34,14 +34,14 @@ func (d forwardDialer) Dial(network, address string) (net.Conn, error) {
|
|||||||
panic("Dial should not be called")
|
panic("Dial should not be called")
|
||||||
}
|
}
|
||||||
|
|
||||||
func unixSocks5Proxy(proxyUrl *url.URL) (http.RoundTripper, error) {
|
func unixSocks5Proxy(proxyURL *url.URL) (http.RoundTripper, error) {
|
||||||
trans := defaultTransport().Clone()
|
trans := defaultTransport().Clone()
|
||||||
if trans.DialContext == nil {
|
if trans.DialContext == nil {
|
||||||
panic("DefaultTransport has nil DialContext")
|
panic("DefaultTransport has nil DialContext")
|
||||||
}
|
}
|
||||||
var auth *proxy.Auth
|
var auth *proxy.Auth
|
||||||
username := proxyUrl.User.Username()
|
username := proxyURL.User.Username()
|
||||||
password, _ := proxyUrl.User.Password()
|
password, _ := proxyURL.User.Password()
|
||||||
// Both username and password should have atleast one char
|
// Both username and password should have atleast one char
|
||||||
if username != "" && password != "" {
|
if username != "" && password != "" {
|
||||||
auth = &proxy.Auth{
|
auth = &proxy.Auth{
|
||||||
@ -49,9 +49,9 @@ func unixSocks5Proxy(proxyUrl *url.URL) (http.RoundTripper, error) {
|
|||||||
Password: password,
|
Password: password,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dialer, err := proxy.SOCKS5("unix", proxyUrl.Path, auth, forwardDialer(trans.DialContext))
|
dialer, err := proxy.SOCKS5("unix", proxyURL.Path, auth, forwardDialer(trans.DialContext))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to make socks proxy, url: %s, err: %w", proxyUrl, err)
|
return nil, fmt.Errorf("failed to make socks proxy, url: %s, err: %w", proxyURL, err)
|
||||||
}
|
}
|
||||||
ctxDialer, ok := dialer.(proxy.ContextDialer)
|
ctxDialer, ok := dialer.(proxy.ContextDialer)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -62,9 +62,9 @@ func unixSocks5Proxy(proxyUrl *url.URL) (http.RoundTripper, error) {
|
|||||||
return trans, nil
|
return trans, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func proxyTcp(proxyUrl *url.URL) (http.RoundTripper, error) {
|
func proxyTCP(proxyURL *url.URL) (http.RoundTripper, error) {
|
||||||
trans := defaultTransport().Clone()
|
trans := defaultTransport().Clone()
|
||||||
trans.Proxy = http.ProxyURL(proxyUrl)
|
trans.Proxy = http.ProxyURL(proxyURL)
|
||||||
return trans, nil
|
return trans, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +41,10 @@ func NewScheduler(filePath string) (Scheduler, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("path:%v does not exist and unable to create: err: %w", filePath, err)
|
return nil, fmt.Errorf("path:%v does not exist and unable to create: err: %w", filePath, err)
|
||||||
}
|
}
|
||||||
f.Close()
|
err = f.Close()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
log.Info("scheduler file does not exist, will be created", "path", filePath)
|
log.Info("scheduler file does not exist, will be created", "path", filePath)
|
||||||
} else {
|
} else {
|
||||||
err = yaml.Unmarshal(data, &s.lastSuccessTime)
|
err = yaml.Unmarshal(data, &s.lastSuccessTime)
|
||||||
|
@ -20,7 +20,7 @@ func SetLogger(l logr.Logger) {
|
|||||||
log = l
|
log = l
|
||||||
}
|
}
|
||||||
|
|
||||||
type TelegramSender interface {
|
type Sender interface {
|
||||||
SendLink(link, channel, rhash, title string) error
|
SendLink(link, channel, rhash, title string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,16 +51,22 @@ func (ts *telegramSender) SendLink(link, channel, rhash, title string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
apiUrl := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", ts.authToken)
|
apiURL := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", ts.authToken)
|
||||||
|
|
||||||
ts.rateLimiterPerMin.Wait()
|
ts.rateLimiterPerMin.Wait()
|
||||||
ts.rateLimiterPerSec.Wait()
|
ts.rateLimiterPerSec.Wait()
|
||||||
|
|
||||||
res, err := ts.client.Post(apiUrl, "application/json", bytes.NewReader(data))
|
res, err := ts.client.Post(apiURL, "application/json", bytes.NewReader(data))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer func() {
|
||||||
|
err := res.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err, "res.Body.Close() failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
responseText, err := io.ReadAll(res.Body)
|
responseText, err := io.ReadAll(res.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -73,7 +79,7 @@ func (ts *telegramSender) SendLink(link, channel, rhash, title string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTelegramSender(transport http.RoundTripper, authToken string) TelegramSender {
|
func NewSender(transport http.RoundTripper, authToken string) Sender {
|
||||||
return &telegramSender{
|
return &telegramSender{
|
||||||
client: &http.Client{Transport: transport},
|
client: &http.Client{Transport: transport},
|
||||||
authToken: authToken,
|
authToken: authToken,
|
||||||
|
Loading…
Reference in New Issue
Block a user