all repos

smutok @ 725a325

yet another tui rss reader (not abandoned, just paused development)
8 files changed, 623 insertions(+), 35 deletions(-)
feat: sync
Author: Oleksandr Smirnov olexsmir@gmail.com
Committed at: 2025-12-24 01:05:17 +0200
Change ID: oloswqvtwkynlmsosqowunymrsyltsyw
Parent: d021876
M internal/provider/freshrss.go

@@ -6,7 +6,6 @@ "encoding/json"

"errors" "fmt" "io" - "log/slog" "net/http" "net/url" "strconv"

@@ -31,7 +30,7 @@ func NewFreshRSS(host string) *FreshRSS {

return &FreshRSS{ host: host, client: &http.Client{ - Timeout: 10 * time.Second, + Timeout: 20 * time.Second, }, } }

@@ -113,27 +112,27 @@ return resp.Tags, err

} type ContentItem struct { - ID string - Published int64 - Title string - Author string - Canonical []string - Content string - Categories []string - Origin struct { + ID string + Published int64 + Title string + Author string + Canonical []string + Content string + Categories []string + TimestampUsec string + Origin struct { HTMLURL string StreamID string Title string } // CrawlTimeMsec string `json:"crawlTimeMsec"` - // TimestampUsec string `json:"timestampUsec"` } -func (g FreshRSS) StreamContents(ctx context.Context, steamID, excludeTarget string, lastModified, n int) ([]ContentItem, error) { +func (g FreshRSS) StreamContents(ctx context.Context, steamID, excludeTarget string, lastModified int64, n int) ([]ContentItem, error) { params := url.Values{} setOption(&params, "xt", excludeTarget) - setOptionInt(&params, "ot", lastModified) + setOptionInt64(&params, "ot", lastModified) setOptionInt(&params, "n", n) params.Set("r", "n")

@@ -158,6 +157,7 @@ ci.Content = item.Get("summary.content").String()

ci.Origin.StreamID = item.Get("origin.streamId").String() ci.Origin.HTMLURL = item.Get("origin.htmlUrl").String() ci.Origin.Title = item.Get("origin.title").String() + ci.TimestampUsec = item.Get("timestampUsec").String() for _, href := range item.Get("canonical.#.href").Array() { if h := href.String(); h != "" {

@@ -174,7 +174,7 @@

return res, nil } -func (g FreshRSS) StreamIDs(ctx context.Context, excludeTarget, includeTarget string, n int) ([]string, error) { +func (g FreshRSS) StreamIDs(ctx context.Context, includeTarget, excludeTarget string, n int) ([]string, error) { params := url.Values{} setOption(&params, "xt", excludeTarget) setOption(&params, "s", includeTarget)

@@ -254,12 +254,20 @@ b.Set(k, strconv.Itoa(v))

} } +func setOptionInt64(b *url.Values, k string, v int64) { + if v != 0 { + b.Set(k, strconv.FormatInt(v, 10)) + } +} + // request, makes GET request with params passed as url params func (g *FreshRSS) request(ctx context.Context, endpoint string, params url.Values, resp any) error { u, err := url.Parse(g.host + endpoint) if err != nil { return err } + + setOptionInt64(&params, "ck", time.Now().UnixMilli()) u.RawQuery = params.Encode() req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)

@@ -317,7 +325,6 @@ return fmt.Errorf("failed to read response body: %w", err)

} *strPtr = string(body) - slog.Debug("string response", "content", string(body)) return nil }
M internal/store/schema.hcl

@@ -19,3 +19,199 @@ primary_key {

columns = [column.id] } } + +table "folders" { + schema = schema.main + column "id" { + null = false + type = text + } + primary_key { + columns = [column.id] + } +} + +table "feeds" { + schema = schema.main + column "id" { + null = false + type = text + } + column "title" { + null = false + type = text + } + column "url" { + null = false + type = text + } + column "htmlUrl" { + null = false + type = text + } + primary_key { + columns = [column.id] + } +} + +table "feed_folders" { + schema = schema.main + column "feed_id" { + null = false + type = text + } + column "folder_id" { + null = false + type = text + } + primary_key { + columns = [column.feed_id, column.folder_id] + } + foreign_key "0" { + columns = [column.folder_id] + ref_columns = [table.folders.column.id] + on_update = NO_ACTION + on_delete = CASCADE + } + foreign_key "1" { + columns = [column.feed_id] + ref_columns = [table.feeds.column.id] + on_update = NO_ACTION + on_delete = CASCADE + } + index "idx_feed_folders_by_folder" { + columns = [column.folder_id] + } + index "idx_feed_folders_by_feed" { + columns = [column.feed_id] + } +} + +table "articles" { + schema = schema.main + column "id" { // timestamp_usec + null = false + type = text + } + column "feed_id" { + null = false + type = text + } + column "title" { + null = false + type = text + } + column "content" { + null = true + type = text + } + column "author" { + null = true + type = text + } + column "href" { + null = true + type = text + } + column "published_at" { + null = true + type = int + } + primary_key { + columns = [column.id] + } + foreign_key "0" { + columns = [column.feed_id] + ref_columns = [table.feeds.column.id] + on_update = NO_ACTION + on_delete = CASCADE + } + index "idx_articles_feed_id" { + columns = [column.feed_id] + } + index "idx_articles_published" { + on { + desc = true + column = column.published_at + } + } + index "idx_articles_feed_published" { + on { + column = column.feed_id + } + on { + desc = true + column = column.published_at + } + } +} + +table "article_statuses" { + schema = schema.main + column "article_id" { + null = false + type = text + } + column "is_read" { + null = false + type = boolean + default = 0 + } + column "is_starred" { + null = false + type = boolean + default = 0 + } + primary_key { + columns = [column.article_id] + } + foreign_key "0" { + columns = [column.article_id] + ref_columns = [table.articles.column.id] + on_update = NO_ACTION + on_delete = CASCADE + } + index "idx_article_statuses_read" { + columns = [column.is_read] + } + index "idx_article_statuses_starred" { + columns = [column.is_starred] + } +} + +table "pending_actions" { + schema = schema.main + column "id" { + null = false + type = integer + auto_increment = true + } + column "article_id" { + null = false + type = text + } + column "action" { + null = false + type = text + } + column "created_at" { + null = false + type = integer + default = sql("strftime('%s', 'now')") + } + primary_key { + columns = [column.id] + } + foreign_key "0" { + columns = [column.article_id] + ref_columns = [table.articles.column.id] + on_update = NO_ACTION + on_delete = CASCADE + } + index "idx_pending_actions_created" { + columns = [column.created_at] + } + index "idx_pending_actions_article" { + columns = [column.article_id] + } +}
A internal/store/sqlite_articles.go

@@ -0,0 +1,121 @@

+package store + +import ( + "context" + "fmt" + "strings" +) + +func (s *Sqlite) UpsertArticle(ctx context.Context, timestampUsec, feedID, title, content, author, href string, publishedAt int) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + if _, err = tx.ExecContext(ctx, + `insert or ignore into articles (id, feed_id, title, content, author, href, published_at) values (?, ?, ?, ?, ?, ?, ?)`, + timestampUsec, feedID, title, content, author, href, publishedAt); err != nil { + return err + } + + if _, err = tx.ExecContext(ctx, `insert or ignore into article_statuses (article_id) values (?)`, timestampUsec); err != nil { + return err + } + + return tx.Commit() +} + +// can be done like this? +// --sql +// update article_statuses +// set is_starred = case +// when article_id in (%s) then 1 +// else 0 +// end + +func (s *Sqlite) SyncReadStatus(ctx context.Context, ids []string) error { + if len(ids) == 0 { + _, err := s.db.ExecContext(ctx, "update article_statuses set is_read = 1") + return err + } + + values := strings.Repeat("(?),", len(ids)) + values = values[:len(values)-1] + + args := make([]any, len(ids)) + for i, v := range ids { + args[i] = v + } + + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + // make read those that are not in list + readQuery := fmt.Sprintf(`--sql + update article_statuses + set is_read = true + where article_id not in (%s)`, values) + + if _, err = tx.ExecContext(ctx, readQuery, args...); err != nil { + return err + } + + // make unread those that are in list + unreadQuery := fmt.Sprintf(`--sql + update article_statuses + set is_read = false + where article_id in (%s)`, values) + + if _, err = tx.ExecContext(ctx, unreadQuery, args...); err != nil { + return err + } + + return tx.Commit() +} + +func (s *Sqlite) SyncStarredStatus(ctx context.Context, ids []string) error { + if len(ids) == 0 { + _, err := s.db.ExecContext(ctx, "update article_statuses set is_starred = 0") + return err + } + + values := strings.Repeat("(?),", len(ids)) + values = values[:len(values)-1] + + args := make([]any, len(ids)) + for i, v := range ids { + args[i] = v + } + + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + // make read those that are not in list + readQuery := fmt.Sprintf(`--sql + update article_statuses + set is_starred = false + where article_id not in (%s)`, values) + + if _, err = tx.ExecContext(ctx, readQuery, args...); err != nil { + return err + } + + // make unread those that are in list + unreadQuery := fmt.Sprintf(`--sql + update article_statuses + set is_starred = true + where article_id in (%s)`, values) + + if _, err = tx.ExecContext(ctx, unreadQuery, args...); err != nil { + return err + } + + return tx.Commit() +}
A internal/store/sqlite_feeds.go

@@ -0,0 +1,46 @@

+package store + +import ( + "context" + "fmt" + "strings" +) + +func (s *Sqlite) UpsertSubscription(ctx context.Context, id, title, url, htmlURL string) error { + _, err := s.db.ExecContext(ctx, + `insert or ignore into feeds (id, title, url, htmlUrl) + values (?, ?, ?, ?)`, + id, title, url, htmlURL) + return err +} + +func (s *Sqlite) LinkFeedWithFolder(ctx context.Context, feedID, folderID string) error { + _, err := s.db.ExecContext(ctx, + `insert or ignore into feed_folders (feed_id, folder_id) + values (?, ?)`, + feedID, folderID) + return err +} + +func (s *Sqlite) RemoveNonExistentFeeds(ctx context.Context, currentFeedIDs []string) error { + if len(currentFeedIDs) == 0 { + _, err := s.db.ExecContext(ctx, "delete from feeds") + return err + } + + values := strings.Repeat("(?),", len(currentFeedIDs)) + values = values[:len(values)-1] // trim trailing comma + + query := fmt.Sprintf(`--sql + DELETE FROM feeds + WHERE id NOT IN (VALUES %s) + `, values) + + args := make([]any, len(currentFeedIDs)) + for i, v := range currentFeedIDs { + args[i] = v + } + + _, err := s.db.ExecContext(ctx, query, args...) + return err +}
A internal/store/sqlite_folders.go

@@ -0,0 +1,10 @@

+package store + +import "context" + +func (s *Sqlite) UpsertTag(ctx context.Context, id string) error { + _, err := s.db.ExecContext(ctx, + `insert or replace into folders (id) values (?)`, + id) + return err +}
A internal/store/sqlite_pendin_actions.go

@@ -0,0 +1,8 @@

+package store + +// actions: +// - 'read', 'starred' +// - 'unread', 'unstar' + +// SELECT article_id FROM pending_actions WHERE action = 'mark_read' ORDER BY created_at; +// DELETE FROM pending_actions WHERE action = 'mark_read' AND article_id IN ('', '?', '?');
M internal/store/sqlite_reader.go

@@ -6,8 +6,8 @@ "database/sql"

"errors" ) -func (s *Sqlite) GetLastSyncTime(ctx context.Context) (int, error) { - var lut int +func (s *Sqlite) GetLastSyncTime(ctx context.Context) (int64, error) { + var lut int64 err := s.db.QueryRowContext(ctx, "select last_sync from reader where id = 1 and last_sync is not null").Scan(&lut) if errors.Is(err, sql.ErrNoRows) { return 0, ErrNotFound

@@ -15,7 +15,7 @@ }

return lut, err } -func (s *Sqlite) SetLastSyncTime(ctx context.Context, lastSync int) error { +func (s *Sqlite) SetLastSyncTime(ctx context.Context, lastSync int64) error { _, err := s.db.ExecContext(ctx, `insert into reader (id, last_sync) values (1, ?) on conflict(id) do update set last_sync = excluded.last_sync`,
M internal/sync/freshrss.go

@@ -2,37 +2,237 @@ package sync

import ( "context" + "errors" + "log/slog" + "strings" + "time" "olexsmir.xyz/smutok/internal/provider" "olexsmir.xyz/smutok/internal/store" ) type FreshRSS struct { - store store.Store + store *store.Sqlite api *provider.FreshRSS + + ot int64 } -func NewFreshRSS(store store.Store, api *provider.FreshRSS) *FreshRSS { +func NewFreshRSS(store *store.Sqlite, api *provider.FreshRSS) *FreshRSS { return &FreshRSS{ store: store, api: api, } } -func (g *FreshRSS) Sync(ctx context.Context) error { - // tags, err := g.api.TagList(ctx) - // subscriptions, err := g.api.SubscriptionList(ctx) - // unreadItems, err := g.api.StreamContents( - // ctx, - // "user/-/state/com.google/reading-list", - // "user/-/state/com.google/read", - // 0, - // 1000) - // ids, err := g.api.GetItemsIDs(ctx, - // "user/-/state/com.google/read", - // "user/-/state/com.google/reading-list", - // 1000, - // ) +func (f *FreshRSS) Sync(ctx context.Context) error { + ot, err := f.getLastSyncTime(ctx) + if err != nil { + return err + } + + f.ot = ot + newOt := time.Now().Unix() + + // note: sync all articles once if it's initial sync + + // todo: sync pending `mark_read`, /edit-tag takes multiple &i=<id>&i=<ca> + // todo: sync pending `mark_unread` + // todo: sync pending `star` + // todo: sync pending `unstar` + + if err := f.syncTags(ctx); err != nil { + return err + } + + if err := f.syncSubscriptions(ctx); err != nil { + return err + } + + if err := f.syncUnreadItems(ctx); err != nil { + return err + } + + if err := f.syncUnreadItemsStatuses(ctx); err != nil { + return err + } + + if err := f.syncStarredItems(ctx); err != nil { + return err + } + + if err := f.syncStarredItemStatuses(ctx); err != nil { + return err + } + + return f.store.SetLastSyncTime(ctx, newOt) +} + +func (f *FreshRSS) getLastSyncTime(ctx context.Context) (int64, error) { + ot, err := f.store.GetLastSyncTime(ctx) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + slog.Info("got last sync time, returning 0") + return 0, nil + } else { + return 0, err + } + } + + slog.Info("got last sync time", "ot", ot) + return ot, nil +} + +func (f *FreshRSS) syncTags(ctx context.Context) error { + slog.Info("syncing tags") - return nil + tags, err := f.api.TagList(ctx) + if err != nil { + return err + } + + var errs []error + for _, tag := range tags { + if strings.HasPrefix(tag.ID, "user/-/state/com.google/") && + !strings.HasSuffix(tag.ID, "/com.google/starred") { + continue + } + + if err := f.store.UpsertTag(ctx, tag.ID); err != nil { + errs = append(errs, err) + } + } + + slog.Info("finished tag sync", "errs", errs) + return errors.Join(errs...) +} + +func (f *FreshRSS) syncSubscriptions(ctx context.Context) error { + slog.Info("syncing subscriptions") + + subs, err := f.api.SubscriptionList(ctx) + if err != nil { + return err + } + + var errs []error + for _, sub := range subs { + if err := f.store.UpsertSubscription(ctx, sub.ID, sub.Title, sub.URL, sub.HTMLURL); err != nil { + errs = append(errs, err) + } + + for _, cat := range sub.Categories { + if !strings.Contains(cat.ID, "user/-/label") { + continue + } + + // NOTE: probably redundant + // if err := g.store.UpsertTag(ctx, cat.ID); err != nil { + // errs = append(errs, err) + // } + + if err := f.store.LinkFeedWithFolder(ctx, sub.ID, cat.ID); err != nil { + errs = append(errs, err) + } + } + } + + // delete local feeds that are no longer available on the server + ids := make([]string, len(subs)) + for i, s := range subs { + ids[i] = s.ID + } + + if err := f.store.RemoveNonExistentFeeds(ctx, ids); err != nil { + errs = append(errs, err) + } + + slog.Info("finished subscriptions sync", "errs", errs) + return errors.Join(errs...) +} + +func (f *FreshRSS) syncUnreadItems(ctx context.Context) error { + slog.Info("syncing unread items") + + items, err := f.api.StreamContents(ctx, + "user/-/state/com.google/reading-list", + "user/-/state/com.google/read", + f.ot, + 1000) + if err != nil { + return err + } + + slog.Debug("got unread items", "len", len(items)) + + var errs []error + for _, item := range items { + if err := f.store.UpsertArticle(ctx, item.TimestampUsec, item.Origin.StreamID, item.Title, item.Content, item.Author, item.Origin.HTMLURL, int(item.Published)); err != nil { + errs = append(errs, err) + } + } + + slog.Info("finished syncing unread items", "errs", errs) + return errors.Join(errs...) +} + +func (f *FreshRSS) syncUnreadItemsStatuses(ctx context.Context) error { + slog.Info("syncing unread items ids") + + ids, err := f.api.StreamIDs(ctx, + "user/-/state/com.google/reading-list", + "user/-/state/com.google/read", + 1000) + if err != nil { + return err + } + + slog.Debug("got unread ids", "len", len(ids), "ids", ids) + merr := f.store.SyncReadStatus(ctx, ids) + + slog.Info("finished syncing unread items", "err", merr) + return merr +} + +func (f *FreshRSS) syncStarredItems(ctx context.Context) error { + slog.Info("sync stared items") + + items, err := f.api.StreamContents(ctx, + "user/-/state/com.google/starred", + "", + f.ot, + 1000) + if err != nil { + return err + } + + slog.Debug("got starred items", "len", len(items)) + + var errs []error + for _, item := range items { + if err := f.store.UpsertArticle(ctx, item.TimestampUsec, item.Origin.StreamID, item.Title, item.Content, item.Author, item.Origin.HTMLURL, int(item.Published)); err != nil { + errs = append(errs, err) + } + } + + slog.Info("finished syncing unstarred items", "errs", errs) + return errors.Join(errs...) +} + +func (f *FreshRSS) syncStarredItemStatuses(ctx context.Context) error { + slog.Info("syncing starred items ids") + + ids, err := f.api.StreamIDs(ctx, + "user/-/state/com.google/starred", + "", + 1000) + if err != nil { + return err + } + + slog.Debug("got starred ids", "len", len(ids), "ids", ids) + merr := f.store.SyncStarredStatus(ctx, ids) + + slog.Info("finished syncing unread items", "err", merr) + return merr }