15 files changed,
666 insertions(+),
0 deletions(-)
Author:
Oleksandr Smirnov
olexsmir@gmail.com
Committed at:
2026-04-22 20:13:38 +0300
Authored at:
2026-04-13 22:27:51 +0300
Change ID:
vkyppnxmoqwvtyzzoxmzqsozyptnpqmo
A
app/app.go
··· 1 +package app 2 + 3 +import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "net/http" 8 + "sync" 9 + "time" 10 + 11 + "go.etcd.io/bbolt" 12 +) 13 + 14 +type App struct { 15 + mux *http.ServeMux 16 + workers []func(context.Context) error 17 + shutdowns []func(context.Context) 18 + wg *sync.WaitGroup 19 + db *bbolt.DB 20 + scraperClient *http.Client 21 + 22 + // TODO: cacher, each scrapper should be able to get it's own cacher 23 + Config *Config 24 + Client *http.Client 25 + Logger *slog.Logger 26 +} 27 + 28 +func New(cfg *Config, db *bbolt.DB) *App { 29 + return &App{ 30 + mux: http.NewServeMux(), 31 + workers: nil, 32 + wg: &sync.WaitGroup{}, 33 + db: db, 34 + scraperClient: &http.Client{Timeout: 10 * time.Second}, 35 + 36 + Logger: slog.Default(), 37 + Client: &http.Client{Timeout: 31 * time.Second}, 38 + Config: cfg, 39 + } 40 +} 41 + 42 +// Route registers a global route. pattern syntax is the same as in [http.ServeMux].HandlerFunc 43 +func (a App) Route(pattern string, handler http.HandlerFunc) { 44 + a.mux.HandleFunc(pattern, handler) 45 +} 46 + 47 +// AddWorker adds background worker 48 +func (a *App) AddWorker(worker func(ctx context.Context) error) { 49 + a.workers = append(a.workers, worker) 50 +} 51 + 52 +// AddShutdown registers a shutdown hook that will be called when the app stops. 53 +// Shutdown hooks are called in reverse order of registration. 54 +func (a *App) AddShutdown(fn func(ctx context.Context)) { 55 + a.shutdowns = append(a.shutdowns, fn) 56 +} 57 + 58 +const ( 59 + defaultScraperUserAgent = "rss-tools/1.0)" 60 + defaultScraperAccept = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8" 61 +) 62 + 63 +// Get is intended for scraping sources; API SDK calls should use [App.Client] directly. 64 +func (a *App) Get(ctx context.Context, url string) (*http.Response, error) { 65 + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) 66 + if err != nil { 67 + return nil, err 68 + } 69 + req.Header.Set("User-Agent", defaultScraperUserAgent) 70 + req.Header.Set("Accept", defaultScraperAccept) 71 + return a.scraperClient.Do(req) 72 +} 73 + 74 +// Start starts an app and with all it's registered sources 75 +func (a *App) Start(ctx context.Context) error { 76 + // workers 77 + for _, worker := range a.workers { 78 + a.wg.Add(1) 79 + go func(w func(context.Context) error) { 80 + defer a.wg.Done() 81 + if err := w(ctx); err != nil { 82 + slog.ErrorContext(ctx, "worker exited with an error", "err", err) 83 + } 84 + }(worker) 85 + } 86 + 87 + // http server 88 + // TODO: opt in auth middleware 89 + handler := a.recoverMiddleware(a.mux) 90 + handler = a.loggingMiddleware(handler) 91 + httpSrv := &http.Server{ 92 + Addr: fmt.Sprintf(":%d", a.Config.Port), // fixme 93 + Handler: handler, 94 + } 95 + 96 + go func() { 97 + go func() { 98 + <-ctx.Done() 99 + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 100 + defer cancel() 101 + httpSrv.Shutdown(shutdownCtx) 102 + }() 103 + }() 104 + 105 + slog.Info("starting http server", "port", a.Config.Port) 106 + if err := httpSrv.ListenAndServe(); err != http.ErrServerClosed { 107 + return err 108 + } 109 + 110 + a.wg.Wait() 111 + 112 + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 113 + defer cancel() 114 + for _, fn := range a.shutdowns { 115 + fn(shutdownCtx) 116 + } 117 + 118 + return nil 119 +}
A
app/atom.go
··· 1 +package app 2 + 3 +import ( 4 + "encoding/xml" 5 + "net/http" 6 + "time" 7 +) 8 + 9 +type AtomFeed struct { 10 + XMLName xml.Name `xml:"feed"` 11 + XMLNS string `xml:"xmlns,attr"` 12 + Title string `xml:"title"` 13 + ID string `xml:"id"` 14 + Updated string `xml:"updated"` 15 + Entries []AtomEntry `xml:"entry"` 16 +} 17 + 18 +type AtomEntry struct { 19 + Title string `xml:"title"` 20 + ID string `xml:"id"` 21 + Updated string `xml:"updated"` 22 + Content AtomContent `xml:"content"` 23 +} 24 + 25 +type AtomContent struct { 26 + XMLName xml.Name `xml:"content"` 27 + Type string `xml:"type,attr,omitempty"` 28 + Value string `xml:",chardata"` 29 +} 30 + 31 +type FeedBuilder struct { 32 + feed AtomFeed 33 +} 34 + 35 +func NewFeed(title, id string) *FeedBuilder { 36 + return &FeedBuilder{feed: AtomFeed{ 37 + XMLNS: "http://www.w3.org/2005/Atom", 38 + Title: title, 39 + ID: id, 40 + Updated: time.Now().Format(time.RFC3339), 41 + }} 42 +} 43 + 44 +func (f *FeedBuilder) Add(title, id, content string, date time.Time) *FeedBuilder { 45 + f.feed.Entries = append(f.feed.Entries, AtomEntry{ 46 + Title: title, 47 + ID: id, 48 + Updated: date.Format(time.RFC3339), 49 + Content: content, 50 + }) 51 + return f 52 +} 53 + 54 +func (f *FeedBuilder) Render(w http.ResponseWriter) error { 55 + w.Header().Set("Content-Type", "application/atom+xml") 56 + enc := xml.NewEncoder(w) 57 + enc.Indent("", " ") 58 + return enc.Encode(f.feed) 59 +}
A
app/bucket.go
··· 1 +package app 2 + 3 +import "go.etcd.io/bbolt" 4 + 5 +type Bucket struct { 6 + db *bbolt.DB 7 + name []byte 8 +} 9 + 10 +func (a *App) Bucket(name string) (*Bucket, error) { 11 + b := &Bucket{db: a.db, name: []byte(name)} 12 + return b, a.db.Update(func(tx *bbolt.Tx) error { 13 + _, err := tx.CreateBucketIfNotExists(b.name) 14 + return err 15 + }) 16 +} 17 + 18 +func (b *Bucket) Get(key []byte) ([]byte, error) { 19 + var v []byte 20 + err := b.db.View(func(tx *bbolt.Tx) error { 21 + v = tx.Bucket(b.name).Get(key) 22 + return nil 23 + }) 24 + return v, err 25 +} 26 + 27 +func (b *Bucket) Set(key, val []byte) error { 28 + return b.db.Update(func(tx *bbolt.Tx) error { 29 + return tx.Bucket(b.name).Put(key, val) 30 + }) 31 +} 32 + 33 +func (b *Bucket) Delete(key []byte) error { 34 + return b.db.Update(func(tx *bbolt.Tx) error { 35 + return tx.Bucket(b.name).Delete(key) 36 + }) 37 +} 38 + 39 +func (b *Bucket) ForEach(fn func(k, v []byte) error) error { 40 + return b.db.View(func(tx *bbolt.Tx) error { 41 + return tx.Bucket(b.name).ForEach(fn) 42 + }) 43 +}
A
app/config.go
··· 1 +package app 2 + 3 +import ( 4 + "encoding/json" 5 + "os" 6 +) 7 + 8 +type Config struct { 9 + Port int `json:"port"` 10 + TGUserID int64 `json:"tg_userid"` 11 + TGToken string `json:"tg_token"` 12 +} 13 + 14 +func NewConfig(fpath string) (*Config, error) { 15 + // TODO per source config 16 + 17 + configFile, err := os.ReadFile(fpath) 18 + if err != nil { 19 + return nil, err 20 + } 21 + 22 + var config Config 23 + err = json.Unmarshal(configFile, &config) 24 + return &config, err 25 +}
A
app/http.go
··· 1 +package app 2 + 3 +import ( 4 + "log/slog" 5 + "net/http" 6 + "time" 7 +) 8 + 9 +func (a *App) recoverMiddleware(next http.Handler) http.Handler { 10 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 11 + defer func() { 12 + if err := recover(); err != nil { 13 + a.Logger.Error("recover middleware", "err", err) 14 + w.WriteHeader(http.StatusInternalServerError) 15 + } 16 + }() 17 + next.ServeHTTP(w, r) 18 + }) 19 +} 20 + 21 +func (a *App) loggingMiddleware(next http.Handler) http.Handler { 22 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 23 + start := time.Now() 24 + wrapped := wrapResponseWriter(w) 25 + next.ServeHTTP(wrapped, r) 26 + slog.Info("http request", 27 + "method", r.Method, 28 + "status", wrapped.status, 29 + "path", r.URL.Path, 30 + "latency", time.Since(start).String(), 31 + "ua", r.UserAgent(), 32 + ) 33 + }) 34 +} 35 + 36 +type responseWriter struct { 37 + http.ResponseWriter 38 + status int 39 + wroteHeader bool 40 +} 41 + 42 +func wrapResponseWriter(w http.ResponseWriter) *responseWriter { 43 + return &responseWriter{ResponseWriter: w} 44 +} 45 + 46 +func (rw *responseWriter) Status() int { 47 + return rw.status 48 +} 49 + 50 +func (rw *responseWriter) WriteHeader(code int) { 51 + if rw.wroteHeader { 52 + return 53 + } 54 + 55 + rw.status = code 56 + rw.ResponseWriter.WriteHeader(code) 57 + rw.wroteHeader = true 58 +} 59 + 60 +func (rw *responseWriter) Flush() { 61 + if f, ok := rw.ResponseWriter.(http.Flusher); ok { 62 + f.Flush() 63 + } 64 +}
A
go.sum
··· 1 +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= 2 +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 3 +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 4 +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 5 +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= 6 +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 7 +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= 8 +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= 9 +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= 10 +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= 11 +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= 12 +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 13 +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 14 +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
A
main.go
··· 1 +package main 2 + 3 +import ( 4 + "context" 5 + "flag" 6 + 7 + "go.etcd.io/bbolt" 8 + "olexsmir.xyz/rss-tools/app" 9 + "olexsmir.xyz/rss-tools/sources/telegram" 10 + "olexsmir.xyz/rss-tools/sources/ztoe" 11 +) 12 + 13 +func main() { 14 + var cfgPath, dbPath string 15 + flag.StringVar(&cfgPath, "config", "./config.json", "Path to config file") 16 + flag.StringVar(&dbPath, "db", "./db", "Path to local database") 17 + flag.Parse() 18 + 19 + if err := run(context.Background(), cfgPath, dbPath); err != nil { 20 + panic(err) 21 + } 22 +} 23 + 24 +func run(ctx context.Context, cfgPath, dbPath string) error { 25 + db, err := bbolt.Open(dbPath, 0o600, nil) 26 + if err != nil { 27 + return err 28 + } 29 + defer db.Close() 30 + 31 + cfg, err := app.NewConfig(cfgPath) 32 + if err != nil { 33 + return err 34 + } 35 + 36 + app := app.New(cfg, db) 37 + _ = ztoe.Register(app) 38 + _ = telegram.Register(app) 39 + 40 + return app.Start(ctx) 41 +}
A
sources/telegram/sdk.go
··· 1 +package telegram 2 + 3 +import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "net/http" 9 + "net/url" 10 + "strconv" 11 +) 12 + 13 +const apiBase = "https://api.telegram.org" 14 + 15 +type TelegramSDK struct { 16 + client *http.Client 17 + token string 18 +} 19 + 20 +func NewSDK(client *http.Client, token string) *TelegramSDK { 21 + return &TelegramSDK{ 22 + token: token, 23 + client: client, 24 + } 25 +} 26 + 27 +type Response[T any] struct { 28 + OK bool `json:"ok"` 29 + Result T `json:"result"` 30 + Description string `json:"description"` 31 +} 32 + 33 +type Update struct { 34 + UpdateID int64 `json:"update_id"` 35 + Message *Message `json:"message"` 36 +} 37 + 38 +type User struct { 39 + ID int64 `json:"id"` 40 + FirstName string `json:"first_name"` 41 + Username string `json:"username"` 42 +} 43 + 44 +type Chat struct { 45 + ID int64 `json:"id"` 46 +} 47 + 48 +type Message struct { 49 + MessageID int64 `json:"message_id"` 50 + From *User `json:"from"` 51 + Chat *Chat `json:"chat"` 52 + Text string `json:"text"` 53 + Date int64 `json:"date"` 54 +} 55 + 56 +func (t *TelegramSDK) GetUpdates(ctx context.Context, offset int64) ([]Update, error) { 57 + params := url.Values{} 58 + params.Set("offset", strconv.FormatInt(offset, 10)) 59 + params.Set("timeout", "30") 60 + 61 + var resp Response[[]Update] 62 + if err := t.req(ctx, "getUpdates", params, nil, &resp); err != nil { 63 + return nil, err 64 + } 65 + return resp.Result, nil 66 +} 67 + 68 +type messageReactionReq struct { 69 + Type string `json:"type"` 70 + Emoji string `json:"emoji"` 71 +} 72 + 73 +type setReactionReq struct { 74 + ChatID int64 `json:"chat_id"` 75 + MessageID int64 `json:"message_id"` 76 + Reaction []messageReactionReq `json:"reaction"` 77 +} 78 + 79 +func (t *TelegramSDK) SetReaction(ctx context.Context, chatID, messageID int64, emoji string) error { 80 + var resp Response[bool] 81 + return t.req(ctx, "setMessageReaction", nil, setReactionReq{ 82 + ChatID: chatID, 83 + MessageID: messageID, 84 + Reaction: []messageReactionReq{{Type: "emoji", Emoji: emoji}}, 85 + }, &resp) 86 +} 87 + 88 +func (t *TelegramSDK) req(ctx context.Context, method string, params url.Values, body any, out any) error { 89 + u := fmt.Sprintf("%s/bot%s/%s", apiBase, t.token, method) 90 + if params != nil { 91 + u += "?" + params.Encode() 92 + } 93 + 94 + var req *http.Request 95 + var err error 96 + if body != nil { 97 + var data []byte 98 + data, err = json.Marshal(body) 99 + if err != nil { 100 + return err 101 + } 102 + req, err = http.NewRequestWithContext(ctx, http.MethodPost, u, bytes.NewReader(data)) 103 + if err != nil { 104 + return err 105 + } 106 + req.Header.Set("Content-Type", "application/json") 107 + } else { 108 + req, err = http.NewRequestWithContext(ctx, http.MethodGet, u, nil) 109 + if err != nil { 110 + return err 111 + } 112 + } 113 + 114 + res, err := t.client.Do(req) 115 + if err != nil { 116 + return err 117 + } 118 + defer res.Body.Close() 119 + return json.NewDecoder(res.Body).Decode(out) 120 +}
A
sources/telegram/telegram.go
··· 1 +package telegram 2 + 3 +import ( 4 + "bytes" 5 + "context" 6 + "encoding/binary" 7 + "encoding/gob" 8 + "fmt" 9 + "log/slog" 10 + "net/http" 11 + "time" 12 + 13 + "olexsmir.xyz/rss-tools/app" 14 +) 15 + 16 +type telegram struct { 17 + db *app.Bucket 18 + messages *app.Bucket 19 + client *http.Client 20 + tg *TelegramSDK 21 + allowedID int64 22 +} 23 + 24 +func Register(a *app.App) error { 25 + db, err := a.Bucket("telegram") 26 + if err != nil { 27 + return err 28 + } 29 + 30 + messages, err := a.Bucket("telegram:messages") 31 + if err != nil { 32 + return err 33 + } 34 + 35 + t := &telegram{ 36 + db: db, 37 + messages: messages, 38 + client: a.Client, 39 + tg: NewSDK(a.Client, a.Config.TGToken), 40 + allowedID: a.Config.TGUserID, 41 + } 42 + 43 + a.AddWorker(t.worker) 44 + a.Route("GET /telegram", t.handler) 45 + return nil 46 +} 47 + 48 +func (t *telegram) handler(w http.ResponseWriter, r *http.Request) { 49 + // todo: cache feed contruction 50 + // todo: dont include messages older than N days 51 + 52 + messages, err := t.loadMessages() 53 + if err != nil { 54 + http.Error(w, "failed to load messages", http.StatusInternalServerError) 55 + return 56 + } 57 + 58 + feed := app.NewFeed("Telegram feed", "telegram-feed") 59 + for _, m := range messages { 60 + title := m.Text 61 + if len(title) > 64 { 62 + title = title[:64] + "..." 63 + } 64 + feed.AddEntry(app.FeedEntry{ 65 + Title: title, 66 + ID: fmt.Sprintf("telegram-%d", m.MessageID), 67 + Content: m.Text, 68 + Updated: time.Unix(m.Date, 0), 69 + }) 70 + } 71 + 72 + w.WriteHeader(http.StatusOK) 73 + feed.Render(w) 74 +} 75 + 76 +func (t *telegram) worker(ctx context.Context) error { 77 + offset, err := t.loadOffset() 78 + if err != nil { 79 + return err 80 + } 81 + 82 + for { 83 + updates, err := t.tg.GetUpdates(ctx, offset) 84 + if err != nil { 85 + slog.ErrorContext(ctx, "getUpdates failed", "err", err) 86 + select { 87 + case <-ctx.Done(): 88 + return nil 89 + case <-time.After(5 * time.Second): 90 + continue 91 + } 92 + } 93 + 94 + for _, u := range updates { 95 + if u.Message != nil && u.Message.From != nil { 96 + slog.InfoContext(ctx, "message from", "user_id", u.Message.From.ID, "username", u.Message.From.Username, "msg", u.Message.Text) 97 + } 98 + 99 + if u.Message == nil && u.Message.From == nil || u.Message.From.ID != t.allowedID { 100 + offset = u.UpdateID + 1 101 + continue 102 + } 103 + 104 + if err := t.saveMessage(u.Message); err != nil { 105 + slog.ErrorContext(ctx, "failed to save message", "err", err) 106 + } 107 + 108 + if err := t.tg.SetReaction(ctx, u.Message.From.ID, u.Message.MessageID, "👍"); err != nil { 109 + slog.ErrorContext(ctx, "failed to set reaction", "err", err) 110 + } 111 + 112 + offset = u.UpdateID + 1 113 + } 114 + 115 + if err := t.saveOffset(offset); err != nil { 116 + slog.ErrorContext(ctx, "failed to save offset", "err", err) 117 + } 118 + 119 + select { 120 + case <-ctx.Done(): 121 + return nil 122 + case <-time.After(time.Second): 123 + } 124 + } 125 +} 126 + 127 +func (t *telegram) saveOffset(offset int64) error { 128 + return t.db.Set([]byte("offset"), binary.BigEndian.AppendUint64(nil, uint64(offset))) 129 +} 130 + 131 +func (t *telegram) loadOffset() (int64, error) { 132 + val, err := t.db.Get([]byte("offset")) 133 + if err != nil || val == nil { 134 + return 0, err 135 + } 136 + return int64(binary.BigEndian.Uint64(val)), nil 137 +} 138 + 139 +func (t *telegram) saveMessage(m *Message) error { 140 + var buf bytes.Buffer 141 + if err := gob.NewEncoder(&buf).Encode(m); err != nil { 142 + return err 143 + } 144 + key := binary.BigEndian.AppendUint64(nil, uint64(m.MessageID)) 145 + return t.messages.Set(key, buf.Bytes()) 146 +} 147 + 148 +func (t *telegram) loadMessages() ([]*Message, error) { 149 + var messages []*Message 150 + err := t.messages.ForEach(func(k, v []byte) error { 151 + var m Message 152 + if err := gob.NewDecoder(bytes.NewReader(v)).Decode(&m); err != nil { 153 + return err 154 + } 155 + messages = append(messages, &m) 156 + return nil 157 + }) 158 + return messages, err 159 +}