all repos

rss-tools @ 50b546d

get rss feed from sources that(i need and) dont provide one

rss-tools/sources/telegram/telegram.go (view raw)

Oleksandr Smirnov Oleksandr Smirnov
olexsmir@gmail.com
add moviefeed source, 1 month ago
1
package telegram
2
3
import (
4
	"bytes"
5
	"context"
6
	"encoding/binary"
7
	"encoding/gob"
8
	"fmt"
9
	"log/slog"
10
	"net/http"
11
	"strings"
12
	"time"
13
14
	"olexsmir.xyz/rss-tools/app"
15
)
16
17
type telegram struct {
18
	db        *app.Bucket
19
	messages  *app.Bucket
20
	client    *http.Client
21
	get       func(context.Context, string) (*http.Response, error)
22
	tg        *TelegramSDK
23
	allowedID int64
24
	logger    *slog.Logger
25
}
26
27
func Register(a *app.App) error {
28
	db, err := a.Bucket("telegram")
29
	if err != nil {
30
		return err
31
	}
32
33
	messages, err := a.Bucket("telegram:messages")
34
	if err != nil {
35
		return err
36
	}
37
38
	t := &telegram{
39
		db:        db,
40
		messages:  messages,
41
		client:    a.Client,
42
		get:       a.Get,
43
		tg:        NewSDK(a.Client, a.Config.TGToken),
44
		allowedID: a.Config.TGUserID,
45
		logger:    a.Logger,
46
	}
47
48
	a.AddWorker(t.worker)
49
	a.Route("GET /telegram", t.handler)
50
	a.Logger.Info("telegram source registered")
51
	return nil
52
}
53
54
func (t *telegram) handler(w http.ResponseWriter, r *http.Request) {
55
	// todo: cache feed contruction
56
	// todo: dont include messages older than N days
57
58
	messages, err := t.loadMessages(r.Context())
59
	if err != nil {
60
		http.Error(w, "failed to load messages", http.StatusInternalServerError)
61
		return
62
	}
63
64
	feed := app.NewFeed("Telegram feed", "telegram-feed")
65
	for _, m := range messages {
66
		if changed := t.enrichMessageWithLinkTitles(r.Context(), m); changed {
67
			if err := t.saveMessage(m); err != nil {
68
				http.Error(w, "failed to update cached titles", http.StatusInternalServerError)
69
				return
70
			}
71
		}
72
		feed.Add(feedEntryFromMessage(m))
73
	}
74
75
	if err := feed.Render(w); err != nil {
76
		http.Error(w, "failed to render feed", http.StatusInternalServerError)
77
		return
78
	}
79
}
80
81
func (t *telegram) worker(ctx context.Context) error {
82
	t.logger.Info("starting telegram bot")
83
84
	offset, err := t.loadOffset()
85
	if err != nil {
86
		return err
87
	}
88
89
	for {
90
		updates, err := t.tg.GetUpdates(ctx, offset)
91
		if err != nil {
92
			t.logger.ErrorContext(ctx, "getUpdates failed", "err", err)
93
			select {
94
			case <-ctx.Done():
95
				return nil
96
			case <-time.After(5 * time.Second):
97
				continue
98
			}
99
		}
100
101
		for _, u := range updates {
102
			if u.Message != nil && u.Message.From != nil {
103
				t.logger.InfoContext(ctx, "message from", "user_id", u.Message.From.ID, "username", u.Message.From.Username, "msg", messageText(u.Message))
104
			}
105
106
			if u.Message == nil || u.Message.From == nil || u.Message.From.ID != t.allowedID {
107
				offset = u.UpdateID + 1
108
				continue
109
			}
110
111
			_ = t.enrichMessageWithLinkTitles(ctx, u.Message)
112
113
			if err := t.saveMessage(u.Message); err != nil {
114
				t.logger.ErrorContext(ctx, "failed to save message", "err", err)
115
			}
116
117
			if err := t.tg.SetReaction(ctx, u.Message.From.ID, u.Message.MessageID, "👍"); err != nil {
118
				slog.ErrorContext(ctx, "failed to set reaction", "err", err)
119
			}
120
121
			offset = u.UpdateID + 1
122
		}
123
124
		if err := t.saveOffset(offset); err != nil {
125
			slog.ErrorContext(ctx, "failed to save offset", "err", err)
126
		}
127
128
		select {
129
		case <-ctx.Done():
130
			return nil
131
		case <-time.After(time.Second):
132
		}
133
	}
134
}
135
136
func (t *telegram) saveOffset(offset int64) error {
137
	return t.db.Set([]byte("offset"), binary.BigEndian.AppendUint64(nil, uint64(offset)))
138
}
139
140
func (t *telegram) loadOffset() (int64, error) {
141
	val, err := t.db.Get([]byte("offset"))
142
	if err != nil || val == nil {
143
		return 0, err
144
	}
145
	return int64(binary.BigEndian.Uint64(val)), nil
146
}
147
148
func (t *telegram) saveMessage(m *Message) error {
149
	var buf bytes.Buffer
150
	if err := gob.NewEncoder(&buf).Encode(m); err != nil {
151
		return err
152
	}
153
	key := binary.BigEndian.AppendUint64(nil, uint64(m.MessageID))
154
	return t.messages.Set(key, buf.Bytes())
155
}
156
157
func (t *telegram) loadMessages(ctx context.Context) ([]*Message, error) {
158
	var messages []*Message
159
	err := t.messages.ForEach(func(k, v []byte) error {
160
		var m Message
161
		if err := gob.NewDecoder(bytes.NewReader(v)).Decode(&m); err != nil {
162
			t.logger.WarnContext(ctx, "failed to decode telegram message, skipping", "key", fmt.Sprintf("%x", k), "err", err)
163
			return nil
164
		}
165
		messages = append(messages, &m)
166
		return nil
167
	})
168
	return messages, err
169
}
170
171
func (t *telegram) enrichMessageWithLinkTitles(ctx context.Context, m *Message) bool {
172
	text := messageText(m)
173
	if !isSingleLinkMessage(text) {
174
		return false
175
	}
176
177
	links := normalizeLinks(messageLinks(text))
178
	if len(links) == 0 {
179
		return false
180
	}
181
	if m.LinkTitles == nil {
182
		m.LinkTitles = make(map[string]string, len(links))
183
	}
184
185
	changed := false
186
	for _, link := range links {
187
		cachedTitle := normalizePageTitle(m.LinkTitles[link])
188
		if isMeaningfulPageTitle(cachedTitle) {
189
			continue
190
		}
191
		if cachedTitle != "" {
192
			delete(m.LinkTitles, link)
193
			changed = true
194
		}
195
		title, err := fetchPageTitle(ctx, t.get, link)
196
		if err != nil {
197
			t.logger.WarnContext(ctx, "failed to lookup page title", "url", link, "err", err)
198
			continue
199
		}
200
		m.LinkTitles[link] = title
201
		changed = true
202
	}
203
	return changed
204
}
205
206
func feedEntryFromMessage(m *Message) app.FeedEntry {
207
	updated := time.Unix(m.Date, 0)
208
	text := messageText(m)
209
	normalizedLinks := normalizeLinks(messageLinks(text))
210
	entryID := fmt.Sprintf("telegram-%d", m.MessageID)
211
	if videoID, ok := firstYouTubeVideoID(normalizedLinks); ok {
212
		entryID = "yt:video:" + videoID
213
	}
214
215
	if m.PhotoBase64 == "" {
216
		title := text
217
		if isSingleLinkMessage(text) {
218
			for _, link := range normalizedLinks {
219
				if t := strings.TrimSpace(m.LinkTitles[link]); t != "" {
220
					title = t
221
					break
222
				}
223
			}
224
		}
225
		if len(title) > 64 {
226
			title = title[:64] + "..."
227
		}
228
229
		content := text
230
		contentType := ""
231
		if len(normalizedLinks) > 0 {
232
			content, _ = linkifyMessageText(text)
233
			contentType = "html"
234
		}
235
236
		return app.FeedEntry{
237
			Title:       title,
238
			ID:          entryID,
239
			Links:       feedLinks(normalizedLinks),
240
			Content:     content,
241
			Updated:     updated,
242
			ContentType: contentType,
243
		}
244
	}
245
246
	parts := make([]string, 0, 2)
247
	if t := strings.TrimSpace(text); t != "" {
248
		linkified, _ := linkifyMessageText(t)
249
		parts = append(parts, "<p>"+linkified+"</p>")
250
	}
251
	mimeType := m.PhotoMIMEType
252
	if mimeType == "" {
253
		mimeType = "image/jpeg"
254
	}
255
	parts = append(parts, fmt.Sprintf(`<p><img src="data:%s;base64,%s" alt="telegram image"/></p>`, mimeType, m.PhotoBase64))
256
257
	return app.FeedEntry{
258
		Title:       fmt.Sprintf("🖼️ [%s]", updated.Format("2006-01-02")),
259
		ID:          entryID,
260
		Links:       feedLinks(normalizedLinks),
261
		Content:     strings.Join(parts, ""),
262
		ContentType: "html",
263
		Updated:     updated,
264
	}
265
}
266
267
func isSingleLinkMessage(text string) bool {
268
	links := findLinks(text)
269
	if len(links) != 1 {
270
		return false
271
	}
272
	link := links[0]
273
	if strings.TrimSpace(text[:link.start]) != "" {
274
		return false
275
	}
276
	after := strings.TrimSpace(text[link.end:])
277
	return trailingPunctRe.ReplaceAllString(after, "") == ""
278
}
279
280
func messageText(m *Message) string {
281
	if m == nil {
282
		return ""
283
	}
284
	if caption := strings.TrimSpace(m.Caption); caption != "" {
285
		return m.Caption
286
	}
287
	return m.Text
288
}