all repos

rss-tools @ 65589de

get rss feed from sources that(i need and) dont provide one

rss-tools/sources/telegram/telegram.go (view raw)

Oleksandr Smirnov Oleksandr Smirnov
olexsmir@gmail.com
telegram: dont return messages older than 48 hours; purge db, 14 days ago
1
package telegram
2
3
import (
4
	"bytes"
5
	"context"
6
	"encoding/binary"
7
	"encoding/gob"
8
	"fmt"
9
	"html"
10
	"log/slog"
11
	"net/http"
12
	"strings"
13
	"time"
14
15
	"olexsmir.xyz/rss-tools/app"
16
	"olexsmir.xyz/rss-tools/app/atom"
17
)
18
19
type telegram struct {
20
	db        *app.Bucket
21
	messages  *app.Bucket
22
	client    *http.Client
23
	get       func(context.Context, string) (*http.Response, error)
24
	tg        *TelegramSDK
25
	allowedID int64
26
	logger    *slog.Logger
27
}
28
29
const (
30
	telegramMessageMaxAge = 48 * time.Hour
31
	telegramPurgeInterval = time.Hour
32
)
33
34
func Register(a *app.App) error {
35
	db, err := a.Bucket("telegram")
36
	if err != nil {
37
		return err
38
	}
39
40
	messages, err := a.Bucket("telegram:messages")
41
	if err != nil {
42
		return err
43
	}
44
45
	t := &telegram{
46
		db:        db,
47
		messages:  messages,
48
		client:    a.Client,
49
		get:       a.Get,
50
		tg:        NewSDK(a.Client, a.Config.TGToken),
51
		allowedID: a.Config.TGUserID,
52
		logger:    a.Logger,
53
	}
54
55
	a.AddWorker(t.worker)
56
	a.AddWorker(t.purgeWorker)
57
	a.Route("GET /telegram", t.handler)
58
	a.Logger.Info("telegram source registered")
59
	return nil
60
}
61
62
func (t *telegram) handler(w http.ResponseWriter, r *http.Request) {
63
	// todo: cache feed contruction
64
65
	messages, err := t.loadMessages(r.Context())
66
	if err != nil {
67
		http.Error(w, "failed to load messages", http.StatusInternalServerError)
68
		return
69
	}
70
	messages = filterRecentMessages(messages, time.Now().Add(-telegramMessageMaxAge))
71
72
	feed := atom.NewFeed("Telegram feed", "telegram-feed")
73
	for _, m := range messages {
74
		if changed := t.enrichMessageWithLinkTitles(r.Context(), m); changed {
75
			if err := t.saveMessage(m); err != nil {
76
				http.Error(w, "failed to update cached titles", http.StatusInternalServerError)
77
				return
78
			}
79
		}
80
	}
81
82
	for _, m := range groupMessages(messages) {
83
		feed.Add(feedEntryFromMessage(m))
84
	}
85
86
	if err := feed.Render(w); err != nil {
87
		http.Error(w, "failed to render feed", http.StatusInternalServerError)
88
		return
89
	}
90
}
91
92
func (t *telegram) worker(ctx context.Context) error {
93
	t.logger.Info("starting telegram bot")
94
95
	offset, err := t.loadOffset()
96
	if err != nil {
97
		return err
98
	}
99
100
	for {
101
		updates, err := t.tg.GetUpdates(ctx, offset)
102
		if err != nil {
103
			t.logger.ErrorContext(ctx, "getUpdates failed", "err", err)
104
			select {
105
			case <-ctx.Done():
106
				return nil
107
			case <-time.After(5 * time.Second):
108
				continue
109
			}
110
		}
111
112
		for _, u := range updates {
113
			if u.Message != nil && u.Message.From != nil {
114
				t.logger.InfoContext(ctx, "message from", "user_id", u.Message.From.ID, "username", u.Message.From.Username, "msg", messageText(u.Message))
115
			}
116
117
			if u.Message == nil || u.Message.From == nil || u.Message.From.ID != t.allowedID {
118
				offset = u.UpdateID + 1
119
				continue
120
			}
121
122
			_ = t.enrichMessageWithLinkTitles(ctx, u.Message)
123
124
			if err := t.saveMessage(u.Message); err != nil {
125
				t.logger.ErrorContext(ctx, "failed to save message", "err", err)
126
			}
127
128
			if err := t.tg.SetReaction(ctx, u.Message.From.ID, u.Message.MessageID, "👍"); err != nil {
129
				slog.ErrorContext(ctx, "failed to set reaction", "err", err)
130
			}
131
132
			offset = u.UpdateID + 1
133
		}
134
135
		if err := t.saveOffset(offset); err != nil {
136
			slog.ErrorContext(ctx, "failed to save offset", "err", err)
137
		}
138
139
		select {
140
		case <-ctx.Done():
141
			return nil
142
		case <-time.After(time.Second):
143
		}
144
	}
145
}
146
147
func (t *telegram) purgeWorker(ctx context.Context) error {
148
	t.logger.Info("starting telegram cache purge")
149
	ticker := time.NewTicker(telegramPurgeInterval)
150
	defer ticker.Stop()
151
152
	for {
153
		cutoff := time.Now().Add(-telegramMessageMaxAge)
154
		purged, err := t.purgeOldMessages(ctx, cutoff)
155
		if err != nil {
156
			t.logger.ErrorContext(ctx, "failed to purge old telegram messages", "err", err)
157
		} else if purged > 0 {
158
			t.logger.InfoContext(ctx, "purged old telegram messages", "count", purged)
159
		}
160
161
		select {
162
		case <-ctx.Done():
163
			return nil
164
		case <-ticker.C:
165
		}
166
	}
167
}
168
169
func (t *telegram) saveOffset(offset int64) error {
170
	return t.db.Set([]byte("offset"), binary.BigEndian.AppendUint64(nil, uint64(offset)))
171
}
172
173
func (t *telegram) loadOffset() (int64, error) {
174
	val, err := t.db.Get([]byte("offset"))
175
	if err != nil || val == nil {
176
		return 0, err
177
	}
178
	return int64(binary.BigEndian.Uint64(val)), nil
179
}
180
181
func (t *telegram) saveMessage(m *Message) error {
182
	var buf bytes.Buffer
183
	if err := gob.NewEncoder(&buf).Encode(m); err != nil {
184
		return err
185
	}
186
	key := binary.BigEndian.AppendUint64(nil, uint64(m.MessageID))
187
	return t.messages.Set(key, buf.Bytes())
188
}
189
190
func (t *telegram) loadMessages(ctx context.Context) ([]*Message, error) {
191
	var messages []*Message
192
	err := t.messages.ForEach(func(k, v []byte) error {
193
		var m Message
194
		if err := gob.NewDecoder(bytes.NewReader(v)).Decode(&m); err != nil {
195
			t.logger.WarnContext(ctx, "failed to decode telegram message, skipping", "key", fmt.Sprintf("%x", k), "err", err)
196
			return nil
197
		}
198
		messages = append(messages, &m)
199
		return nil
200
	})
201
	return messages, err
202
}
203
204
func (t *telegram) purgeOldMessages(ctx context.Context, cutoff time.Time) (int, error) {
205
	var keys [][]byte
206
	err := t.messages.ForEach(func(k, v []byte) error {
207
		var m Message
208
		if err := gob.NewDecoder(bytes.NewReader(v)).Decode(&m); err != nil {
209
			t.logger.WarnContext(ctx, "failed to decode telegram message for purge, skipping", "key", fmt.Sprintf("%x", k), "err", err)
210
			return nil
211
		}
212
		if isMessageExpired(&m, cutoff) {
213
			keyCopy := append([]byte(nil), k...)
214
			keys = append(keys, keyCopy)
215
		}
216
		return nil
217
	})
218
	if err != nil {
219
		return 0, err
220
	}
221
222
	purged := 0
223
	for _, key := range keys {
224
		if err := t.messages.Delete(key); err != nil {
225
			return purged, err
226
		}
227
		purged++
228
	}
229
	return purged, nil
230
}
231
232
func filterRecentMessages(messages []*Message, cutoff time.Time) []*Message {
233
	if len(messages) == 0 {
234
		return messages
235
	}
236
	out := make([]*Message, 0, len(messages))
237
	for _, m := range messages {
238
		if !isMessageExpired(m, cutoff) {
239
			out = append(out, m)
240
		}
241
	}
242
	return out
243
}
244
245
func isMessageExpired(m *Message, cutoff time.Time) bool {
246
	if m == nil || m.Date == 0 {
247
		return true
248
	}
249
	return time.Unix(m.Date, 0).Before(cutoff)
250
}
251
252
func groupMessages(messages []*Message) []*Message {
253
	if len(messages) == 0 {
254
		return messages
255
	}
256
257
	groups := make(map[string]*Message)
258
	out := make([]*Message, 0, len(messages))
259
	for _, m := range messages {
260
		if m == nil || strings.TrimSpace(m.MediaGroupID) == "" {
261
			out = append(out, m)
262
			continue
263
		}
264
265
		group, ok := groups[m.MediaGroupID]
266
		if !ok {
267
			group = &Message{
268
				MessageID:    m.MessageID,
269
				From:         m.From,
270
				Chat:         m.Chat,
271
				Text:         m.Text,
272
				Caption:      m.Caption,
273
				Date:         m.Date,
274
				MediaGroupID: m.MediaGroupID,
275
				LinkTitles:   m.LinkTitles,
276
			}
277
			groups[m.MediaGroupID] = group
278
			out = append(out, group)
279
		}
280
281
		if m.MessageID != 0 && (group.MessageID == 0 || m.MessageID < group.MessageID) {
282
			group.MessageID = m.MessageID
283
		}
284
		if m.Date != 0 && (group.Date == 0 || m.Date < group.Date) {
285
			group.Date = m.Date
286
		}
287
		if strings.TrimSpace(messageText(group)) == "" && strings.TrimSpace(messageText(m)) != "" {
288
			group.Caption = m.Caption
289
			group.Text = m.Text
290
			if len(m.LinkTitles) > 0 {
291
				group.LinkTitles = m.LinkTitles
292
			}
293
		} else if len(group.LinkTitles) == 0 && len(m.LinkTitles) > 0 {
294
			group.LinkTitles = m.LinkTitles
295
		}
296
297
		group.PhotoAttachments = append(group.PhotoAttachments, messagePhotos(m)...)
298
		if group.PhotoBase64 == "" && m.PhotoBase64 != "" {
299
			group.PhotoBase64 = m.PhotoBase64
300
			group.PhotoMIMEType = m.PhotoMIMEType
301
		}
302
	}
303
	return out
304
}
305
306
func (t *telegram) enrichMessageWithLinkTitles(ctx context.Context, m *Message) bool {
307
	text := messageText(m)
308
	if !isSingleLinkMessage(text) {
309
		return false
310
	}
311
312
	links := normalizeLinks(messageLinks(text))
313
	if len(links) == 0 {
314
		return false
315
	}
316
	if m.LinkTitles == nil {
317
		m.LinkTitles = make(map[string]string, len(links))
318
	}
319
320
	changed := false
321
	for _, link := range links {
322
		cachedTitle := normalizePageTitle(m.LinkTitles[link])
323
		if isMeaningfulPageTitle(cachedTitle) {
324
			continue
325
		}
326
		if cachedTitle != "" {
327
			delete(m.LinkTitles, link)
328
			changed = true
329
		}
330
		title, err := fetchPageTitle(ctx, t.get, link)
331
		if err != nil {
332
			t.logger.WarnContext(ctx, "failed to lookup page title", "url", link, "err", err)
333
			continue
334
		}
335
		m.LinkTitles[link] = title
336
		changed = true
337
	}
338
	return changed
339
}
340
341
func feedEntryFromMessage(m *Message) *atom.Entry {
342
	updated := time.Unix(m.Date, 0)
343
	text := normalizeMessageText(messageText(m))
344
	normalizedLinks := normalizeLinks(messageLinks(text))
345
	entryID := fmt.Sprintf("telegram-%d", m.MessageID)
346
	if videoID, ok := firstYouTubeVideoID(normalizedLinks); ok {
347
		entryID = "yt:video:" + videoID
348
	}
349
350
	photos := messagePhotos(m)
351
	if len(photos) == 0 {
352
		title := text
353
		if isSingleLinkMessage(text) {
354
			for _, link := range normalizedLinks {
355
				if t := strings.TrimSpace(m.LinkTitles[link]); t != "" {
356
					title = t
357
					break
358
				}
359
			}
360
		}
361
		if len(title) > 64 {
362
			title = title[:64] + "..."
363
		}
364
365
		content := text
366
		contentType := ""
367
		if len(normalizedLinks) > 0 {
368
			content, _ = linkifyMessageText(text)
369
			content = preserveLineBreaks(content)
370
			contentType = "html"
371
		} else if strings.Contains(text, "\n") {
372
			content = preserveLineBreaks(html.EscapeString(text))
373
			contentType = "html"
374
		}
375
376
		return &atom.Entry{
377
			Title:   title,
378
			ID:      entryID,
379
			Link:    feedLinks(normalizedLinks),
380
			Content: atom.NewText(content, contentType),
381
			Updated: atom.Time(updated),
382
		}
383
	}
384
385
	parts := make([]string, 0, 1+len(photos))
386
	if t := strings.TrimSpace(text); t != "" {
387
		linkified, _ := linkifyMessageText(text)
388
		linkified = preserveLineBreaks(linkified)
389
		parts = append(parts, "<p>"+linkified+"</p>")
390
	}
391
	for _, photo := range photos {
392
		if photo.Base64 == "" {
393
			continue
394
		}
395
		mimeType := photo.MIMEType
396
		if mimeType == "" {
397
			mimeType = "image/jpeg"
398
		}
399
		parts = append(parts, fmt.Sprintf(`<p><img src="data:%s;base64,%s" alt="telegram image"/></p>`, mimeType, photo.Base64))
400
	}
401
402
	return &atom.Entry{
403
		Title:   fmt.Sprintf("🖼️ [%s]", updated.Format("2006-01-02")),
404
		ID:      entryID,
405
		Link:    feedLinks(normalizedLinks),
406
		Content: atom.NewText(strings.Join(parts, ""), "html"),
407
		Updated: atom.Time(updated),
408
	}
409
}
410
411
func isSingleLinkMessage(text string) bool {
412
	links := findLinks(text)
413
	if len(links) != 1 {
414
		return false
415
	}
416
	link := links[0]
417
	if strings.TrimSpace(text[:link.start]) != "" {
418
		return false
419
	}
420
	after := strings.TrimSpace(text[link.end:])
421
	return trailingPunctRe.ReplaceAllString(after, "") == ""
422
}
423
424
func messageText(m *Message) string {
425
	if m == nil {
426
		return ""
427
	}
428
	if caption := strings.TrimSpace(m.Caption); caption != "" {
429
		return m.Caption
430
	}
431
	return m.Text
432
}
433
434
func normalizeMessageText(text string) string {
435
	text = strings.ReplaceAll(text, "\r\n", "\n")
436
	return strings.ReplaceAll(text, "\r", "\n")
437
}
438
439
func preserveLineBreaks(text string) string {
440
	if !strings.Contains(text, "\n") {
441
		return text
442
	}
443
	return strings.ReplaceAll(text, "\n", "<br/>")
444
}
445
446
func messagePhotos(m *Message) []PhotoAttachment {
447
	if m == nil {
448
		return nil
449
	}
450
	if len(m.PhotoAttachments) > 0 {
451
		out := make([]PhotoAttachment, len(m.PhotoAttachments))
452
		copy(out, m.PhotoAttachments)
453
		return out
454
	}
455
	if m.PhotoBase64 == "" {
456
		return nil
457
	}
458
	mimeType := m.PhotoMIMEType
459
	if mimeType == "" {
460
		mimeType = "image/jpeg"
461
	}
462
	return []PhotoAttachment{{Base64: m.PhotoBase64, MIMEType: mimeType}}
463
}