9 files changed,
305 insertions(+),
23 deletions(-)
Author:
Oleksandr Smirnov
olexsmir@gmail.com
Committed at:
2025-12-26 17:22:37 +0200
Change ID:
kuqvqkntkywxwuytmnoqsxnwtuonozzp
Parent:
99b3ff9
M
app.go
@@ -3,6 +3,7 @@
import ( "context" "errors" + "fmt" "log/slog" "olexsmir.xyz/smutok/internal/config"@@ -41,7 +42,13 @@ }
fr.SetAuthToken(token) fs := freshrss.NewSyncer(fr, store) - fw := freshrss.NewWorker() + + writeToken, err := getWriteToken(ctx, fr, store) + if err != nil { + return nil, err + } + + fw := freshrss.NewWorker(fr, store, writeToken) return &app{ cfg: cfg,@@ -75,3 +82,27 @@ }
return token, nil } + +func getWriteToken(ctx context.Context, fr *freshrss.Client, db *store.Sqlite) (string, error) { + token, err := db.GetWriteToken(ctx) + if err == nil { + return token, nil + } + + if !errors.Is(err, store.ErrNotFound) { + return "", err + } + + slog.Info("requesting write token") + + token, err = fr.GetWriteToken(ctx) + if err != nil { + return "", err + } + + if serr := db.SetWriteToken(ctx, token); serr != nil { + return "", serr + } + + return token, nil +}
M
internal/freshrss/client.go
@@ -18,6 +18,7 @@
const ( StateRead = "user/-/state/com.google/read" StateReadingList = "user/-/state/com.google/reading-list" + StateKeptUnread = "user/-/state/com.google/kept-unread" StateStarred = "user/-/state/com.google/starred" )@@ -67,7 +68,7 @@ }
func (g Client) GetWriteToken(ctx context.Context) (string, error) { var resp string - err := g.request(ctx, "/reader/api/0/token", nil, &resp) + err := g.request(ctx, "/reader/api/0/token", url.Values{}, &resp) return resp, err }@@ -227,7 +228,8 @@ for _, tag := range opts.ItemID {
body.Add("i", tag) } - err := g.postRequest(ctx, "/reader/api/0/edit-tag", body, nil) + var resp string + err := g.postRequest(ctx, "/reader/api/0/edit-tag", body, &resp) return err }
M
internal/freshrss/sync.go
@@ -33,12 +33,7 @@
f.ot = ot newOt := time.Now().Unix() - // note: sync all articles once if it's initial sync - - // todo: sync pending `mark_read`, /edit-tag takes multiple &i=<id>&i=<ca> - // todo: sync pending `mark_unread` - // todo: sync pending `star` - // todo: sync pending `unstar` + // TODO: sync all articles once if it's initial sync if err := f.syncTags(ctx); err != nil { return err
M
internal/freshrss/worker.go
@@ -1,7 +1,112 @@
package freshrss -type Worker struct{} +import ( + "context" + "log/slog" + "sync" + "time" + + "olexsmir.xyz/smutok/internal/store" +) + +type Worker struct { + api *Client + store *store.Sqlite + + writeToken string +} + +func NewWorker(api *Client, store *store.Sqlite, writeToken string) *Worker { + return &Worker{ + api: api, + store: store, + writeToken: writeToken, + } +} + +func (w *Worker) Run(ctx context.Context) { + // TODO: get tick time from config ??? + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + var wg sync.WaitGroup + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !w.isNetworkAvailable(ctx) { + slog.Info("worker: no internet connection") + continue + } + + wg.Go(func() { + if err := w.pendingReads(ctx); err != nil { + slog.Error("worker: read", "err", err) + } + }) + wg.Go(func() { + if err := w.pendingUnreads(ctx); err != nil { + slog.Error("worker: unread", "err", err) + } + }) + wg.Go(func() { + if err := w.pendingStar(ctx); err != nil { + slog.Error("worker: star", "err", err) + } + }) + wg.Go(func() { + if err := w.pendingUnstar(ctx); err != nil { + slog.Error("worker: unread", "err", err) + } + }) + wg.Wait() + } + } +} + +// TODO: implement me +func (Worker) isNetworkAvailable(_ context.Context) bool { + return true +} + +func (w *Worker) pendingReads(ctx context.Context) error { + slog.Debug("worker: pending read") + return w.handle(ctx, store.Read, StateRead, "") +} + +func (w *Worker) pendingUnreads(ctx context.Context) error { + slog.Debug("worker: pending unread") + return w.handle(ctx, store.Unread, StateKeptUnread, StateRead) +} + +func (w *Worker) pendingStar(ctx context.Context) error { + slog.Debug("worker: pending star") + return w.handle(ctx, store.Star, StateStarred, "") +} + +func (w *Worker) pendingUnstar(ctx context.Context) error { + slog.Debug("worker: pending unstar") + return w.handle(ctx, store.Unstar, "", StateStarred) +} + +func (w *Worker) handle(ctx context.Context, action store.Action, addState, rmState string) error { + articleIDs, err := w.store.GetPendingActions(ctx, action) + if err != nil { + return err + } + + if len(articleIDs) == 0 { + return nil + } + + if err := w.api.EditTag(ctx, w.writeToken, EditTag{ + ItemID: articleIDs, + TagToAdd: addState, + TagToRemove: rmState, + }); err != nil { + return err + } -func NewWorker() *Worker { - return &Worker{} + return w.store.DeletePendingActions(ctx, action, articleIDs) }
M
internal/store/schema.hcl
@@ -11,6 +11,10 @@ column "token" {
null = true type = text } + column "write_token" { + null = true + type = text + } column "last_sync" { null = true type = date@@ -208,10 +212,13 @@ ref_columns = [table.articles.column.id]
on_update = NO_ACTION on_delete = CASCADE } - index "idx_pending_actions_created" { + index "idx_pending_actions_created_at" { columns = [column.created_at] } - index "idx_pending_actions_article" { + index "idx_pending_actions_article_id" { columns = [column.article_id] + } + check { + expr = "(action IN ('read', 'unread', 'star', 'unstar'))" } }
M
internal/store/sqlite_feeds.go
@@ -33,7 +33,7 @@ values = values[:len(values)-1] // trim trailing comma
query := fmt.Sprintf(`--sql DELETE FROM feeds - WHERE id NOT IN (VALUES %s) + WHERE id NOT IN (%s) `, values) args := make([]any, len(currentFeedIDs))
M
internal/store/sqlite_pendin_actions.go
@@ -1,8 +1,118 @@
package store -// actions: -// - 'read', 'starred' -// - 'unread', 'unstar' +import ( + "context" + "fmt" + "strings" +) + +type Action int -// SELECT article_id FROM pending_actions WHERE action = 'mark_read' ORDER BY created_at; -// DELETE FROM pending_actions WHERE action = 'mark_read' AND article_id IN ('', '?', '?'); +const ( + Read Action = iota + Unread + Star + Unstar +) + +func (a Action) String() string { + switch a { + case Read: + return "read" + case Unread: + return "unread" + case Star: + return "star" + case Unstar: + return "unstar" + default: + return "unsupported" + } +} + +func (s *Sqlite) ChangeArticleStatus(ctx context.Context, articleID string, action Action) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + // update article status + var query string + switch action { + case Read: + query = `update article_statuses set is_read = 1 where article_id = ?` + case Unread: + query = `update article_statuses set is_read = 0 where article_id = ?` + case Star: + query = `update article_statuses set is_starred = 1 where article_id = ?` + case Unstar: + query = `update article_statuses set is_starred = 0 where article_id = ?` + } + + e, err := tx.ExecContext(ctx, query, articleID) + if err != nil { + return err + } + if n, _ := e.RowsAffected(); n == 0 { + return ErrNotFound + } + + // enqueue action + if _, err := tx.ExecContext(ctx, `insert into pending_actions (article_id, action) values (?, ?)`, + articleID, action.String()); err != nil { + return err + } + + return tx.Commit() +} + +func (s *Sqlite) GetPendingActions(ctx context.Context, action Action) ([]string, error) { + query := `--sql + select article_id + from pending_actions + where action = ? + order by created_at desc + limit 10` + + rows, err := s.db.QueryContext(ctx, query, action.String()) + if err != nil { + return nil, err + } + defer rows.Close() + + var res []string + for rows.Next() { + var pa string + if serr := rows.Scan(&pa); serr != nil { + return res, serr + } + res = append(res, pa) + } + + if err = rows.Err(); err != nil { + return res, err + } + + return res, nil +} + +func (s *Sqlite) DeletePendingActions(ctx context.Context, action Action, articleIDs []string) error { + placeholders := strings.Repeat("(?),", len(articleIDs)) + placeholders = placeholders[:len(placeholders)-1] + + args := make([]any, len(articleIDs)+1) + args[0] = action.String() + for i, v := range articleIDs { + args[i+1] = v + } + + query := fmt.Sprintf(`--sql + delete from pending_actions + where action = ? + and article_id in (%s) + `, placeholders) + + _, err := s.db.ExecContext(ctx, query, args...) + return err +}
M
internal/store/sqlite_reader.go
@@ -34,8 +34,25 @@ }
func (s *Sqlite) SetToken(ctx context.Context, token string) error { _, err := s.db.ExecContext(ctx, - `insert into reader (id, token) values (1, ?) + `insert into reader (id, write_token) values (1, ?) on conflict(id) do update set token = excluded.token`, token) return err } + +func (s *Sqlite) GetWriteToken(ctx context.Context) (string, error) { + var tok string + err := s.db.QueryRowContext(ctx, "select write_token from reader where id = 1 and write_token is not null").Scan(&tok) + if errors.Is(err, sql.ErrNoRows) { + return "", ErrNotFound + } + return tok, err +} + +func (s *Sqlite) SetWriteToken(ctx context.Context, token string) error { + _, err := s.db.ExecContext(ctx, + `insert into reader (id, write_token) values (1, ?) + on conflict(id) do update set write_token = excluded.write_token`, + token) + return err +}
M
main.go
@@ -3,10 +3,10 @@
import ( "context" _ "embed" - "errors" "fmt" "log/slog" "os" + "os/signal" "strings" "github.com/urfave/cli/v3"@@ -37,7 +37,22 @@ }
} func runTui(ctx context.Context, c *cli.Command) error { - return errors.New("there's no tui, i lied") + app, err := bootstrap(ctx) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(ctx) + + go func() { app.freshrssWorker.Run(ctx) }() + + quitCh := make(chan os.Signal, 1) + signal.Notify(quitCh, os.Interrupt) + <-quitCh + + cancel() + + return nil } // sync