2 files changed,
90 insertions(+),
1 deletions(-)
Author:
Oleksandr Smirnov
olexsmir@gmail.com
Committed at:
2026-05-23 15:55:44 +0300
Authored at:
2026-05-23 15:54:47 +0300
Change ID:
qqzuqmwkunsymnxvsovtxpmkplspmuzv
Parent:
8c0c68e
M
sources/telegram/telegram.go
··· 26 26 logger *slog.Logger 27 27 } 28 28 29 +const ( 30 + telegramMessageMaxAge = 48 * time.Hour 31 + telegramPurgeInterval = time.Hour 32 +) 33 + 29 34 func Register(a *app.App) error { 30 35 db, err := a.Bucket("telegram") 31 36 if err != nil { ··· 48 53 } 49 54 50 55 a.AddWorker(t.worker) 56 + a.AddWorker(t.purgeWorker) 51 57 a.Route("GET /telegram", t.handler) 52 58 a.Logger.Info("telegram source registered") 53 59 return nil ··· 55 61 56 62 func (t *telegram) handler(w http.ResponseWriter, r *http.Request) { 57 63 // todo: cache feed contruction 58 - // todo: dont include messages older than N days 59 64 60 65 messages, err := t.loadMessages(r.Context()) 61 66 if err != nil { 62 67 http.Error(w, "failed to load messages", http.StatusInternalServerError) 63 68 return 64 69 } 70 + messages = filterRecentMessages(messages, time.Now().Add(-telegramMessageMaxAge)) 65 71 66 72 feed := atom.NewFeed("Telegram feed", "telegram-feed") 67 73 for _, m := range messages { ··· 138 144 } 139 145 } 140 146 147 +func (t *telegram) purgeWorker(ctx context.Context) error { 148 + t.logger.Info("starting telegram cache purge") 149 + ticker := time.NewTicker(telegramPurgeInterval) 150 + defer ticker.Stop() 151 + 152 + for { 153 + cutoff := time.Now().Add(-telegramMessageMaxAge) 154 + purged, err := t.purgeOldMessages(ctx, cutoff) 155 + if err != nil { 156 + t.logger.ErrorContext(ctx, "failed to purge old telegram messages", "err", err) 157 + } else if purged > 0 { 158 + t.logger.InfoContext(ctx, "purged old telegram messages", "count", purged) 159 + } 160 + 161 + select { 162 + case <-ctx.Done(): 163 + return nil 164 + case <-ticker.C: 165 + } 166 + } 167 +} 168 + 141 169 func (t *telegram) saveOffset(offset int64) error { 142 170 return t.db.Set([]byte("offset"), binary.BigEndian.AppendUint64(nil, uint64(offset))) 143 171 } ··· 171 199 return nil 172 200 }) 173 201 return messages, err 202 +} 203 + 204 +func (t *telegram) purgeOldMessages(ctx context.Context, cutoff time.Time) (int, error) { 205 + var keys [][]byte 206 + err := t.messages.ForEach(func(k, v []byte) error { 207 + var m Message 208 + if err := gob.NewDecoder(bytes.NewReader(v)).Decode(&m); err != nil { 209 + t.logger.WarnContext(ctx, "failed to decode telegram message for purge, skipping", "key", fmt.Sprintf("%x", k), "err", err) 210 + return nil 211 + } 212 + if isMessageExpired(&m, cutoff) { 213 + keyCopy := append([]byte(nil), k...) 214 + keys = append(keys, keyCopy) 215 + } 216 + return nil 217 + }) 218 + if err != nil { 219 + return 0, err 220 + } 221 + 222 + purged := 0 223 + for _, key := range keys { 224 + if err := t.messages.Delete(key); err != nil { 225 + return purged, err 226 + } 227 + purged++ 228 + } 229 + return purged, nil 230 +} 231 + 232 +func filterRecentMessages(messages []*Message, cutoff time.Time) []*Message { 233 + if len(messages) == 0 { 234 + return messages 235 + } 236 + out := make([]*Message, 0, len(messages)) 237 + for _, m := range messages { 238 + if !isMessageExpired(m, cutoff) { 239 + out = append(out, m) 240 + } 241 + } 242 + return out 243 +} 244 + 245 +func isMessageExpired(m *Message, cutoff time.Time) bool { 246 + if m == nil || m.Date == 0 { 247 + return true 248 + } 249 + return time.Unix(m.Date, 0).Before(cutoff) 174 250 } 175 251 176 252 func groupMessages(messages []*Message) []*Message {
M
sources/telegram/telegram_test.go
··· 125 125 entry := feedEntryFromMessage(msg) 126 126 is.Equal(t, "Example Post Title", entry.Title) 127 127 } 128 + 129 +func TestFilterRecentMessages(t *testing.T) { 130 + cutoff := time.Date(2026, 4, 25, 12, 0, 0, 0, time.UTC) 131 + messages := []*Message{ 132 + {MessageID: 1, Date: cutoff.Add(-time.Second).Unix()}, 133 + {MessageID: 2, Date: cutoff.Unix()}, 134 + nil, 135 + } 136 + 137 + filtered := filterRecentMessages(messages, cutoff) 138 + is.Equal(t, 1, len(filtered)) 139 + is.Equal(t, int64(2), filtered[0].MessageID) 140 +}