all repos

rss-tools @ 70f8cb6

get rss feed from sources that(i need and) dont provide one

rss-tools/sources/telegram/telegram.go (view raw)

Oleksandr Smirnov Oleksandr Smirnov
olexsmir@gmail.com
fix telegram offset thingy, 10 days ago
1
package telegram
2
3
import (
4
	"bytes"
5
	"context"
6
	"encoding/binary"
7
	"encoding/gob"
8
	"fmt"
9
	"html"
10
	"log/slog"
11
	"net/http"
12
	"strings"
13
	"time"
14
15
	"olexsmir.xyz/rss-tools/app"
16
	"olexsmir.xyz/rss-tools/app/atom"
17
)
18
19
type telegram struct {
20
	db        *app.Bucket
21
	messages  *app.Bucket
22
	client    *http.Client
23
	get       func(context.Context, string) (*http.Response, error)
24
	tg        *TelegramSDK
25
	allowedID int64
26
	logger    *slog.Logger
27
}
28
29
const (
30
	telegramMessageMaxAge = 48 * time.Hour
31
	telegramPurgeInterval = time.Hour
32
)
33
34
func Register(a *app.App) error {
35
	db, err := a.Bucket("telegram")
36
	if err != nil {
37
		return err
38
	}
39
40
	messages, err := a.Bucket("telegram:messages")
41
	if err != nil {
42
		return err
43
	}
44
45
	t := &telegram{
46
		db:        db,
47
		messages:  messages,
48
		client:    a.Client,
49
		get:       a.Get,
50
		tg:        NewSDK(a.Client, a.Config.TGToken),
51
		allowedID: a.Config.TGUserID,
52
		logger:    a.Logger,
53
	}
54
55
	a.AddWorker(t.worker)
56
	a.AddWorker(t.purgeWorker)
57
	a.Route("GET /telegram", t.handler)
58
	a.Logger.Info("telegram source registered")
59
	return nil
60
}
61
62
func (t *telegram) handler(w http.ResponseWriter, r *http.Request) {
63
	// todo: cache feed contruction
64
65
	messages, err := t.loadMessages(r.Context())
66
	if err != nil {
67
		http.Error(w, "failed to load messages", http.StatusInternalServerError)
68
		return
69
	}
70
	messages = filterRecentMessages(messages, time.Now().Add(-telegramMessageMaxAge))
71
72
	feed := atom.NewFeed("Telegram feed", "telegram-feed")
73
	for _, m := range messages {
74
		if changed := t.enrichMessageWithLinkTitles(r.Context(), m); changed {
75
			if err := t.saveMessage(m); err != nil {
76
				http.Error(w, "failed to update cached titles", http.StatusInternalServerError)
77
				return
78
			}
79
		}
80
	}
81
82
	for _, m := range groupMessages(messages) {
83
		feed.Add(feedEntryFromMessage(m))
84
	}
85
86
	if err := feed.Render(w); err != nil {
87
		http.Error(w, "failed to render feed", http.StatusInternalServerError)
88
		return
89
	}
90
}
91
92
func (t *telegram) worker(ctx context.Context) error {
93
	t.logger.Info("starting telegram bot")
94
95
	offset, err := t.loadOffset()
96
	if err != nil {
97
		return err
98
	}
99
100
	for {
101
		updates, err := t.tg.GetUpdates(ctx, offset)
102
		if err != nil {
103
			t.logger.ErrorContext(ctx, "getUpdates failed", "err", err)
104
			select {
105
			case <-ctx.Done():
106
				return nil
107
			case <-time.After(5 * time.Second):
108
				continue
109
			}
110
		}
111
112
		for _, u := range updates {
113
			if u.Message != nil && u.Message.From != nil {
114
				t.logger.InfoContext(ctx, "message from", "user_id", u.Message.From.ID, "username", u.Message.From.Username, "msg", messageText(u.Message))
115
			}
116
117
			if u.Message == nil || u.Message.From == nil || u.Message.From.ID != t.allowedID {
118
				offset = u.UpdateID + 1
119
				continue
120
			}
121
122
			key := binary.BigEndian.AppendUint64(nil, uint64(u.Message.MessageID))
123
			if existing, _ := t.messages.Get(key); existing != nil {
124
				offset = u.UpdateID + 1
125
				continue
126
			}
127
128
			_ = t.enrichMessageWithLinkTitles(ctx, u.Message)
129
130
			if err := t.saveMessage(u.Message); err != nil {
131
				t.logger.ErrorContext(ctx, "failed to save message", "err", err)
132
			}
133
134
			if err := t.tg.SetReaction(ctx, u.Message.From.ID, u.Message.MessageID, "👍"); err != nil {
135
				slog.ErrorContext(ctx, "failed to set reaction", "err", err)
136
			}
137
138
			offset = u.UpdateID + 1
139
		}
140
141
		if err := t.saveOffset(offset); err != nil {
142
			slog.ErrorContext(ctx, "failed to save offset", "err", err)
143
		}
144
145
		select {
146
		case <-ctx.Done():
147
			return nil
148
		case <-time.After(time.Second):
149
		}
150
	}
151
}
152
153
func (t *telegram) purgeWorker(ctx context.Context) error {
154
	t.logger.Info("starting telegram cache purge")
155
	ticker := time.NewTicker(telegramPurgeInterval)
156
	defer ticker.Stop()
157
158
	for {
159
		cutoff := time.Now().Add(-telegramMessageMaxAge)
160
		purged, err := t.purgeOldMessages(ctx, cutoff)
161
		if err != nil {
162
			t.logger.ErrorContext(ctx, "failed to purge old telegram messages", "err", err)
163
		} else if purged > 0 {
164
			t.logger.InfoContext(ctx, "purged old telegram messages", "count", purged)
165
		}
166
167
		select {
168
		case <-ctx.Done():
169
			return nil
170
		case <-ticker.C:
171
		}
172
	}
173
}
174
175
func (t *telegram) saveOffset(offset int64) error {
176
	return t.db.Set([]byte("offset"), binary.BigEndian.AppendUint64(nil, uint64(offset)))
177
}
178
179
func (t *telegram) loadOffset() (int64, error) {
180
	val, err := t.db.Get([]byte("offset"))
181
	if err != nil || val == nil {
182
		return 0, err
183
	}
184
	return int64(binary.BigEndian.Uint64(val)), nil
185
}
186
187
func (t *telegram) saveMessage(m *Message) error {
188
	var buf bytes.Buffer
189
	if err := gob.NewEncoder(&buf).Encode(m); err != nil {
190
		return err
191
	}
192
	key := binary.BigEndian.AppendUint64(nil, uint64(m.MessageID))
193
	return t.messages.Set(key, buf.Bytes())
194
}
195
196
func (t *telegram) loadMessages(ctx context.Context) ([]*Message, error) {
197
	var messages []*Message
198
	err := t.messages.ForEach(func(k, v []byte) error {
199
		var m Message
200
		if err := gob.NewDecoder(bytes.NewReader(v)).Decode(&m); err != nil {
201
			t.logger.WarnContext(ctx, "failed to decode telegram message, skipping", "key", fmt.Sprintf("%x", k), "err", err)
202
			return nil
203
		}
204
		messages = append(messages, &m)
205
		return nil
206
	})
207
	return messages, err
208
}
209
210
func (t *telegram) purgeOldMessages(ctx context.Context, cutoff time.Time) (int, error) {
211
	var keys [][]byte
212
	err := t.messages.ForEach(func(k, v []byte) error {
213
		var m Message
214
		if err := gob.NewDecoder(bytes.NewReader(v)).Decode(&m); err != nil {
215
			t.logger.WarnContext(ctx, "failed to decode telegram message for purge, skipping", "key", fmt.Sprintf("%x", k), "err", err)
216
			return nil
217
		}
218
		if isMessageExpired(&m, cutoff) {
219
			keyCopy := append([]byte(nil), k...)
220
			keys = append(keys, keyCopy)
221
		}
222
		return nil
223
	})
224
	if err != nil {
225
		return 0, err
226
	}
227
228
	purged := 0
229
	for _, key := range keys {
230
		if err := t.messages.Delete(key); err != nil {
231
			return purged, err
232
		}
233
		purged++
234
	}
235
	return purged, nil
236
}
237
238
func filterRecentMessages(messages []*Message, cutoff time.Time) []*Message {
239
	if len(messages) == 0 {
240
		return messages
241
	}
242
	out := make([]*Message, 0, len(messages))
243
	for _, m := range messages {
244
		if !isMessageExpired(m, cutoff) {
245
			out = append(out, m)
246
		}
247
	}
248
	return out
249
}
250
251
func isMessageExpired(m *Message, cutoff time.Time) bool {
252
	if m == nil || m.Date == 0 {
253
		return true
254
	}
255
	return time.Unix(m.Date, 0).Before(cutoff)
256
}
257
258
func groupMessages(messages []*Message) []*Message {
259
	if len(messages) == 0 {
260
		return messages
261
	}
262
263
	groups := make(map[string]*Message)
264
	out := make([]*Message, 0, len(messages))
265
	for _, m := range messages {
266
		if m == nil || strings.TrimSpace(m.MediaGroupID) == "" {
267
			out = append(out, m)
268
			continue
269
		}
270
271
		group, ok := groups[m.MediaGroupID]
272
		if !ok {
273
			group = &Message{
274
				MessageID:    m.MessageID,
275
				From:         m.From,
276
				Chat:         m.Chat,
277
				Text:         m.Text,
278
				Caption:      m.Caption,
279
				Date:         m.Date,
280
				MediaGroupID: m.MediaGroupID,
281
				LinkTitles:   m.LinkTitles,
282
			}
283
			groups[m.MediaGroupID] = group
284
			out = append(out, group)
285
		}
286
287
		if m.MessageID != 0 && (group.MessageID == 0 || m.MessageID < group.MessageID) {
288
			group.MessageID = m.MessageID
289
		}
290
		if m.Date != 0 && (group.Date == 0 || m.Date < group.Date) {
291
			group.Date = m.Date
292
		}
293
		if strings.TrimSpace(messageText(group)) == "" && strings.TrimSpace(messageText(m)) != "" {
294
			group.Caption = m.Caption
295
			group.Text = m.Text
296
			if len(m.LinkTitles) > 0 {
297
				group.LinkTitles = m.LinkTitles
298
			}
299
		} else if len(group.LinkTitles) == 0 && len(m.LinkTitles) > 0 {
300
			group.LinkTitles = m.LinkTitles
301
		}
302
303
		group.PhotoAttachments = append(group.PhotoAttachments, messagePhotos(m)...)
304
		if group.PhotoBase64 == "" && m.PhotoBase64 != "" {
305
			group.PhotoBase64 = m.PhotoBase64
306
			group.PhotoMIMEType = m.PhotoMIMEType
307
		}
308
	}
309
	return out
310
}
311
312
func (t *telegram) enrichMessageWithLinkTitles(ctx context.Context, m *Message) bool {
313
	text := messageText(m)
314
	if !isSingleLinkMessage(text) {
315
		return false
316
	}
317
318
	links := normalizeLinks(messageLinks(text))
319
	if len(links) == 0 {
320
		return false
321
	}
322
	if m.LinkTitles == nil {
323
		m.LinkTitles = make(map[string]string, len(links))
324
	}
325
326
	changed := false
327
	for _, link := range links {
328
		cachedTitle := normalizePageTitle(m.LinkTitles[link])
329
		if isMeaningfulPageTitle(cachedTitle) {
330
			continue
331
		}
332
		if cachedTitle != "" {
333
			delete(m.LinkTitles, link)
334
			changed = true
335
		}
336
		title, err := fetchPageTitle(ctx, t.get, link)
337
		if err != nil {
338
			t.logger.WarnContext(ctx, "failed to lookup page title", "url", link, "err", err)
339
			continue
340
		}
341
		m.LinkTitles[link] = title
342
		changed = true
343
	}
344
	return changed
345
}
346
347
func feedEntryFromMessage(m *Message) *atom.Entry {
348
	updated := time.Unix(m.Date, 0)
349
	text := normalizeMessageText(messageText(m))
350
	normalizedLinks := normalizeLinks(messageLinks(text))
351
	entryID := fmt.Sprintf("telegram-%d", m.MessageID)
352
	if videoID, ok := firstYouTubeVideoID(normalizedLinks); ok {
353
		entryID = "yt:video:" + videoID
354
	}
355
356
	photos := messagePhotos(m)
357
	if len(photos) == 0 {
358
		title := text
359
		if isSingleLinkMessage(text) {
360
			for _, link := range normalizedLinks {
361
				if t := strings.TrimSpace(m.LinkTitles[link]); t != "" {
362
					title = t
363
					break
364
				}
365
			}
366
		}
367
		if len(title) > 64 {
368
			title = title[:64] + "..."
369
		}
370
371
		content := text
372
		contentType := ""
373
		if len(normalizedLinks) > 0 {
374
			content, _ = linkifyMessageText(text)
375
			content = preserveLineBreaks(content)
376
			contentType = "html"
377
		} else if strings.Contains(text, "\n") {
378
			content = preserveLineBreaks(html.EscapeString(text))
379
			contentType = "html"
380
		}
381
382
		return &atom.Entry{
383
			Title:   title,
384
			ID:      entryID,
385
			Link:    feedLinks(normalizedLinks),
386
			Content: atom.NewText(content, contentType),
387
			Updated: atom.Time(updated),
388
		}
389
	}
390
391
	parts := make([]string, 0, 1+len(photos))
392
	if t := strings.TrimSpace(text); t != "" {
393
		linkified, _ := linkifyMessageText(text)
394
		linkified = preserveLineBreaks(linkified)
395
		parts = append(parts, "<p>"+linkified+"</p>")
396
	}
397
	for _, photo := range photos {
398
		if photo.Base64 == "" {
399
			continue
400
		}
401
		mimeType := photo.MIMEType
402
		if mimeType == "" {
403
			mimeType = "image/jpeg"
404
		}
405
		parts = append(parts, fmt.Sprintf(`<p><img src="data:%s;base64,%s" alt="telegram image"/></p>`, mimeType, photo.Base64))
406
	}
407
408
	return &atom.Entry{
409
		Title:   fmt.Sprintf("🖼️ [%s]", updated.Format("2006-01-02")),
410
		ID:      entryID,
411
		Link:    feedLinks(normalizedLinks),
412
		Content: atom.NewText(strings.Join(parts, ""), "html"),
413
		Updated: atom.Time(updated),
414
	}
415
}
416
417
func isSingleLinkMessage(text string) bool {
418
	links := findLinks(text)
419
	if len(links) != 1 {
420
		return false
421
	}
422
	link := links[0]
423
	if strings.TrimSpace(text[:link.start]) != "" {
424
		return false
425
	}
426
	after := strings.TrimSpace(text[link.end:])
427
	return trailingPunctRe.ReplaceAllString(after, "") == ""
428
}
429
430
func messageText(m *Message) string {
431
	if m == nil {
432
		return ""
433
	}
434
	if caption := strings.TrimSpace(m.Caption); caption != "" {
435
		return m.Caption
436
	}
437
	return m.Text
438
}
439
440
func normalizeMessageText(text string) string {
441
	text = strings.ReplaceAll(text, "\r\n", "\n")
442
	return strings.ReplaceAll(text, "\r", "\n")
443
}
444
445
func preserveLineBreaks(text string) string {
446
	if !strings.Contains(text, "\n") {
447
		return text
448
	}
449
	return strings.ReplaceAll(text, "\n", "<br/>")
450
}
451
452
func messagePhotos(m *Message) []PhotoAttachment {
453
	if m == nil {
454
		return nil
455
	}
456
	if len(m.PhotoAttachments) > 0 {
457
		out := make([]PhotoAttachment, len(m.PhotoAttachments))
458
		copy(out, m.PhotoAttachments)
459
		return out
460
	}
461
	if m.PhotoBase64 == "" {
462
		return nil
463
	}
464
	mimeType := m.PhotoMIMEType
465
	if mimeType == "" {
466
		mimeType = "image/jpeg"
467
	}
468
	return []PhotoAttachment{{Base64: m.PhotoBase64, MIMEType: mimeType}}
469
}