Skip to content

perf(llc): Reduce the number of read message per channel from DB when paginating (part 2)#2681

Open
VelikovPetar wants to merge 22 commits into
masterfrom
feature/FLU-485_optimize_read_message_from_db_part2
Open

perf(llc): Reduce the number of read message per channel from DB when paginating (part 2)#2681
VelikovPetar wants to merge 22 commits into
masterfrom
feature/FLU-485_optimize_read_message_from_db_part2

Conversation

@VelikovPetar
Copy link
Copy Markdown
Contributor

@VelikovPetar VelikovPetar commented May 25, 2026

Submit a pull request

Linear: Part two of: FLU-485

Review after: #2679

CLA

  • I have signed the Stream CLA (required).
  • The code changes follow best practices
  • Code changes are tested (add some information if not applicable)

Description of the pull request

Replaces the per-message hydration (_messageFromJoinRow) with a batched hydration (_messagesFromJoinRows).

Testing:

Apply the following patch and run the new parity/benchmark tests to verify the performance improvements and no regression (except where some behaviour was intentionally changed):

Benchmark and parity tests
Subject: [PATCH] refactor(dao): Benchmark
---
Index: packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart b/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart
new file mode 100644
--- /dev/null	(date 1779706335909)
+++ b/packages/stream_chat_persistence/test/src/dao/get_messages_by_cid_parity_test.dart	(date 1779706335909)
@@ -0,0 +1,481 @@
+import 'package:flutter_test/flutter_test.dart';
+import 'package:stream_chat/stream_chat.dart';
+import 'package:stream_chat_persistence/src/dao/dao.dart';
+import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
+
+import '../../stream_chat_persistence_client_test.dart';
+
+void main() {
+  late DriftChatDatabase database;
+  late MessageDao messageDao;
+
+  setUp(() {
+    database = testDatabaseProvider('testUserId');
+    messageDao = database.messageDao;
+  });
+
+  tearDown(() async {
+    await database.disconnect();
+  });
+
+  // Seeds a channel with `count` messages that exercise every hydration path
+  // `_messageFromJoinRow` touches: user attribution, latest/own reactions
+  // (including on quoted messages), quoted messages (including one pointing
+  // at a deleted/missing target), polls *with* mixed votes + an answer,
+  // and a thread draft attached to a real parent message. Monotonic
+  // 1-second offsets on `createdAt` because Drift stores DateTime as
+  // integer Unix seconds — sub-second offsets collapse onto the same tick.
+  Future<void> seedRichMessages(String cid, int count) async {
+    final channels = [ChannelModel(cid: cid)];
+    final dbUser = User(id: 'testUserId'); // matches the DB's _userId
+    final otherUsers = List.generate(count, (i) => User(id: 'otherUser$i'));
+    final allUsers = [dbUser, ...otherUsers];
+    final baseTime = DateTime.now();
+
+    const optionA = PollOption(id: 'opt0', text: 'A');
+    const optionB = PollOption(id: 'opt1', text: 'B');
+    final poll = Poll(
+      id: 'poll0',
+      name: 'Pick one',
+      options: const [optionA, optionB],
+      // `createdById` must reference an existing user — `PollDao._pollFromJoinRow`
+      // uses `readTable(users)` (not `readTableOrNull`) on a LEFT JOIN, which
+      // throws if there's no matching row. Pre-existing PollDao quirk; out of
+      // scope for the pagination-pushdown work.
+      createdById: dbUser.id,
+    );
+
+    final messages = List.generate(
+      count,
+      (i) => Message(
+        id: 'msg$i',
+        type: 'regular',
+        user: allUsers[i % allUsers.length],
+        text: 'Hello $i',
+        createdAt: baseTime.add(Duration(seconds: i)),
+        updatedAt: baseTime.add(Duration(seconds: i)),
+        // Every 3rd message (i ≥ 3) quotes the message 2 positions earlier.
+        // The very last quoting message instead points at a non-existent
+        // id, so we cover the "quote target deleted from cache" path.
+        quotedMessageId: (i >= 3 && i % 3 == 0)
+            ? (i == count - 1 ? 'msg-deleted' : 'msg${i - 2}')
+            : null,
+        // Every 5th message attaches to a poll.
+        pollId: (i % 5 == 0) ? 'poll0' : null,
+      ),
+    );
+
+    // Reactions populate both `latestReactions` (any user) and
+    // `ownReactions` (matching the DB user) on top-level messages AND on the
+    // messages that are themselves quote targets, so the parity check
+    // catches divergence in nested hydration too.
+    final reactions = <Reaction>[
+      for (var i = 0; i < count; i++) ...[
+        if (i.isEven)
+          Reaction(
+            type: 'like',
+            messageId: 'msg$i',
+            user: dbUser,
+            createdAt: baseTime.add(Duration(seconds: i)),
+          ),
+        Reaction(
+          type: 'love',
+          messageId: 'msg$i',
+          user: otherUsers[i % otherUsers.length],
+          createdAt: baseTime.add(Duration(seconds: i)),
+        ),
+      ],
+    ];
+
+    // Mixed-user poll votes: an own vote, two other-user votes (one per
+    // option), and an own free-text answer. Exercises every bucket the
+    // poll hydration splits into (latestVotesByOption, latestAnswers,
+    // ownVotesAndAnswers).
+    final pollVotes = [
+      PollVote(
+        id: 'vote-own-a',
+        pollId: poll.id,
+        userId: dbUser.id,
+        user: dbUser,
+        optionId: optionA.id,
+        createdAt: baseTime,
+      ),
+      PollVote(
+        id: 'vote-other-a',
+        pollId: poll.id,
+        userId: otherUsers[0].id,
+        user: otherUsers[0],
+        optionId: optionA.id,
+        createdAt: baseTime.add(const Duration(seconds: 1)),
+      ),
+      PollVote(
+        id: 'vote-other-b',
+        pollId: poll.id,
+        userId: otherUsers[1 % otherUsers.length].id,
+        user: otherUsers[1 % otherUsers.length],
+        optionId: optionB.id,
+        createdAt: baseTime.add(const Duration(seconds: 2)),
+      ),
+      PollVote(
+        id: 'answer-own',
+        pollId: poll.id,
+        userId: dbUser.id,
+        user: dbUser,
+        answerText: 'because reasons',
+        createdAt: baseTime.add(const Duration(seconds: 3)),
+      ),
+    ];
+
+    // Thread draft attached to `msg1` (a top-level message that lives in
+    // the main `messages` table — required because `DraftMessages.parentId`
+    // FK-references `Messages.id`). This is the only way `fetchDraft=true`
+    // actually attaches a draft to any row.
+    final threadDraft = Draft(
+      channelCid: cid,
+      parentId: 'msg1',
+      createdAt: baseTime,
+      message: DraftMessage(
+        id: 'thread-draft-msg1',
+        text: 'Unsent reply to msg1',
+        parentId: 'msg1',
+      ),
+    );
+
+    await database.userDao.updateUsers(allUsers);
+    await database.channelDao.updateChannels(channels);
+    await database.pollDao.updatePolls([poll]);
+    await messageDao.updateMessages(cid, messages);
+    await database.reactionDao.updateReactions(reactions);
+    await database.pollVoteDao.updatePollVotes(pollVotes);
+    await database.draftMessageDao.updateDraftMessages([threadDraft]);
+  }
+
+  // Builds a structural fingerprint of a Message that captures every field
+  // the two implementations should agree on after hydration. Used in place
+  // of `==` because `Reaction` (and friends) don't extend `Equatable`, so
+  // identity-based equality on nested lists fails for instances built by
+  // separate calls.
+  //
+  // Goes one level into the quoted message so divergence in nested
+  // hydration (reactions/poll on the quote) surfaces too. Also fingerprints
+  // poll vote *counts* per bucket — a regression that, say, leaked
+  // other-user votes into `ownVotesAndAnswers` would change the counts.
+  String fingerprintMessage(Message m) {
+    String reactionFp(Reaction r) => '${r.type}@${r.user?.id ?? "-"}';
+    String reactionListFp(List<Reaction>? rs) =>
+        '[${(rs ?? const []).map(reactionFp).join(",")}]';
+    String pollFp(Poll? p) {
+      if (p == null) return '-';
+      final voteCounts = {
+        for (final entry in p.latestVotesByOption.entries)
+          entry.key: entry.value.length,
+      };
+      return [
+        p.id,
+        'answers=${p.latestAnswers.length}',
+        'own=${p.ownVotesAndAnswers.length}',
+        'votes=$voteCounts',
+      ].join(';');
+    }
+
+    String quotedFp(Message? q) {
+      if (q == null) return '-';
+      return [
+        q.id,
+        'latest=${reactionListFp(q.latestReactions)}',
+        'own=${reactionListFp(q.ownReactions)}',
+        'poll=${pollFp(q.poll)}',
+      ].join(';');
+    }
+
+    String draftFp(Draft? d) {
+      if (d == null) return '-';
+      return '${d.message.id}@parent=${d.parentId ?? "-"}';
+    }
+
+    return [
+      'id=${m.id}',
+      'text=${m.text ?? ""}',
+      'user=${m.user?.id ?? "-"}',
+      'createdAt=${m.createdAt.toUtc().toIso8601String()}',
+      'latest=${reactionListFp(m.latestReactions)}',
+      'own=${reactionListFp(m.ownReactions)}',
+      'quoted=${quotedFp(m.quotedMessage)}',
+      'poll=${pollFp(m.poll)}',
+      'draft=${draftFp(m.draft)}',
+    ].join(' | ');
+  }
+
+  group('getMessagesByCid: full data parity (legacy vs SQL pushdown)', () {
+    const cid = 'test:Cid';
+    const n = 30;
+
+    Future<void> assertParity(String label, PaginationParams? p) async {
+      await seedRichMessages(cid, n);
+
+      final legacy = await messageDao.getMessagesByCidLegacy(
+        cid,
+        messagePagination: p,
+      );
+      final pushdown = await messageDao.getMessagesByCid(
+        cid,
+        messagePagination: p,
+      );
+
+      expect(
+        pushdown.length,
+        legacy.length,
+        reason: 'list lengths differ for "$label"',
+      );
+      expect(
+        pushdown.map(fingerprintMessage).toList(),
+        equals(legacy.map(fingerprintMessage).toList()),
+        reason: 'message data parity broken for "$label"',
+      );
+    }
+
+    test('no pagination', () => assertParity('no pagination', null));
+
+    test(
+      'limit only',
+      () => assertParity('limit: 10', const PaginationParams(limit: 10)),
+    );
+
+    test(
+      'lessThan + limit',
+      () => assertParity(
+        'lessThan: msg25, limit: 10',
+        const PaginationParams(limit: 10, lessThan: 'msg25'),
+      ),
+    );
+
+    // `greaterThan-only + limit` intentionally diverges from the legacy
+    // Dart-side filter: the SQL pushdown treats it as forward pagination
+    // (cursor exclusive, ASC, first N after the cursor) — see the
+    // `isForwardPagination` branch in `MessageDao.getMessagesByCid`. The
+    // legacy reference keeps the cursor and returns the tail of
+    // `[cursor..end]`. Assert each implementation's contract explicitly
+    // rather than comparing them.
+    test('greaterThan + limit (forward pagination, no legacy parity)',
+        () async {
+      await seedRichMessages(cid, n);
+
+      const params = PaginationParams(limit: 10, greaterThan: 'msg5');
+      final legacy =
+          await messageDao.getMessagesByCidLegacy(cid, messagePagination: params);
+      final pushdown =
+          await messageDao.getMessagesByCid(cid, messagePagination: params);
+
+      // Legacy: keeps the cursor, takes the last 10 of [msg5..msg29] → msg20..msg29.
+      expect(
+        legacy.map((m) => m.id).toList(),
+        equals([for (var i = 20; i < 30; i++) 'msg$i']),
+      );
+      // Pushdown: cursor exclusive, first 10 after the cursor → msg6..msg15.
+      expect(
+        pushdown.map((m) => m.id).toList(),
+        equals([for (var i = 6; i < 16; i++) 'msg$i']),
+      );
+    });
+
+    test(
+      'lessThan + greaterThan + limit',
+      () => assertParity(
+        'lessThan: msg25, greaterThan: msg5, limit: 10',
+        const PaginationParams(
+          limit: 10,
+          lessThan: 'msg25',
+          greaterThan: 'msg5',
+        ),
+      ),
+    );
+
+    test('empty channel', () async {
+      // No seed at all — the legacy path returns `[]` early; the batched path
+      // also short-circuits on empty rows. Locks that contract.
+      const emptyCid = 'test:Empty';
+      await database.channelDao.updateChannels([ChannelModel(cid: emptyCid)]);
+
+      final legacy = await messageDao.getMessagesByCidLegacy(emptyCid);
+      final batched = await messageDao.getMessagesByCid(emptyCid);
+
+      expect(legacy, isEmpty);
+      expect(batched, isEmpty);
+    });
+
+    test('single message', () async {
+      const singleCid = 'test:Single';
+      final dbUser = User(id: 'testUserId');
+      await database.userDao.updateUsers([dbUser]);
+      await database.channelDao.updateChannels([ChannelModel(cid: singleCid)]);
+      await messageDao.updateMessages(singleCid, [
+        Message(
+          id: 'only',
+          type: 'regular',
+          user: dbUser,
+          text: 'solo',
+          createdAt: DateTime.now(),
+        ),
+      ]);
+
+      final legacy = await messageDao.getMessagesByCidLegacy(singleCid);
+      final batched = await messageDao.getMessagesByCid(singleCid);
+
+      expect(batched.length, legacy.length);
+      expect(
+        batched.map(fingerprintMessage).toList(),
+        equals(legacy.map(fingerprintMessage).toList()),
+      );
+    });
+  });
+
+  // The standard `seedRichMessages` helper exercises every hydration path
+  // separately. These extra scenarios put related entities together on the
+  // *same* message and stack quoting depth — they catch interactions that a
+  // one-feature-per-message seed cannot.
+  group('getMessagesByCid: edge-case parity (legacy vs SQL pushdown)', () {
+    Future<void> assertParityForChannel(String cid) async {
+      final legacy = await messageDao.getMessagesByCidLegacy(cid);
+      final batched = await messageDao.getMessagesByCid(cid);
+
+      expect(batched.length, legacy.length);
+      expect(
+        batched.map(fingerprintMessage).toList(),
+        equals(legacy.map(fingerprintMessage).toList()),
+      );
+    }
+
+    test('message with poll AND quote AND thread draft attached', () async {
+      const cid = 'test:Mixed';
+      final dbUser = User(id: 'testUserId');
+      await database.userDao.updateUsers([dbUser]);
+      await database.channelDao.updateChannels([ChannelModel(cid: cid)]);
+      await database.pollDao.updatePolls([
+        Poll(
+          id: 'poll-mixed',
+          name: 'Pick',
+          options: const [
+            PollOption(id: 'a', text: 'A'),
+            PollOption(id: 'b', text: 'B'),
+          ],
+          createdById: dbUser.id,
+        ),
+      ]);
+
+      final baseTime = DateTime.now();
+      await messageDao.updateMessages(cid, [
+        Message(
+          id: 'quoted',
+          type: 'regular',
+          user: dbUser,
+          text: 'first',
+          createdAt: baseTime,
+        ),
+        Message(
+          id: 'mixed',
+          type: 'regular',
+          user: dbUser,
+          text: 'all three',
+          createdAt: baseTime.add(const Duration(seconds: 1)),
+          quotedMessageId: 'quoted',
+          pollId: 'poll-mixed',
+        ),
+      ]);
+      await database.reactionDao.updateReactions([
+        Reaction(
+          type: 'like',
+          messageId: 'quoted',
+          user: dbUser,
+          createdAt: baseTime,
+        ),
+      ]);
+      await database.draftMessageDao.updateDraftMessages([
+        Draft(
+          channelCid: cid,
+          parentId: 'mixed',
+          createdAt: baseTime,
+          message: DraftMessage(
+            id: 'thread-draft',
+            text: 'unsent reply',
+            parentId: 'mixed',
+          ),
+        ),
+      ]);
+
+      await assertParityForChannel(cid);
+    });
+
+    test('depth-2 quote chain (A → B → C)', () async {
+      const cid = 'test:Chain';
+      final dbUser = User(id: 'testUserId');
+      await database.userDao.updateUsers([dbUser]);
+      await database.channelDao.updateChannels([ChannelModel(cid: cid)]);
+      final baseTime = DateTime.now();
+      await messageDao.updateMessages(cid, [
+        Message(
+          id: 'C',
+          type: 'regular',
+          user: dbUser,
+          text: 'root',
+          createdAt: baseTime,
+        ),
+        Message(
+          id: 'B',
+          type: 'regular',
+          user: dbUser,
+          text: 'mid',
+          createdAt: baseTime.add(const Duration(seconds: 1)),
+          quotedMessageId: 'C',
+        ),
+        Message(
+          id: 'A',
+          type: 'regular',
+          user: dbUser,
+          text: 'top',
+          createdAt: baseTime.add(const Duration(seconds: 2)),
+          quotedMessageId: 'B',
+        ),
+      ]);
+      await assertParityForChannel(cid);
+    });
+
+    test('three messages quoting the same target', () async {
+      // Deduped quotes: the batched path collects unique quotedIds into one
+      // SELECT-IN. The fingerprint of the quoted message must be identical
+      // across all three quoting rows.
+      const cid = 'test:DedupedQuotes';
+      final dbUser = User(id: 'testUserId');
+      await database.userDao.updateUsers([dbUser]);
+      await database.channelDao.updateChannels([ChannelModel(cid: cid)]);
+      final baseTime = DateTime.now();
+      await messageDao.updateMessages(cid, [
+        Message(
+          id: 'target',
+          type: 'regular',
+          user: dbUser,
+          text: 'quoted everywhere',
+          createdAt: baseTime,
+        ),
+        for (var i = 0; i < 3; i++)
+          Message(
+            id: 'q$i',
+            type: 'regular',
+            user: dbUser,
+            text: 'quoting $i',
+            createdAt: baseTime.add(Duration(seconds: i + 1)),
+            quotedMessageId: 'target',
+          ),
+      ]);
+      await database.reactionDao.updateReactions([
+        Reaction(
+          type: 'like',
+          messageId: 'target',
+          user: dbUser,
+          createdAt: baseTime,
+        ),
+      ]);
+
+      await assertParityForChannel(cid);
+    });
+  });
+}
Index: packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_hydration_bench_test.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_hydration_bench_test.dart b/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_hydration_bench_test.dart
new file mode 100644
--- /dev/null	(date 1779706330758)
+++ b/packages/stream_chat_persistence/test/src/benchmark/get_messages_by_cid_hydration_bench_test.dart	(date 1779706330758)
@@ -0,0 +1,420 @@
+import 'package:drift/drift.dart';
+import 'package:drift/native.dart';
+import 'package:flutter_test/flutter_test.dart';
+import 'package:stream_chat/stream_chat.dart';
+import 'package:stream_chat_persistence/src/dao/dao.dart';
+import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
+
+/// Counts SELECT statements and rows returned through them. Used to
+/// instrument the legacy (pre-pushdown + N+1 hydration) and current
+/// (SQL-pushdown + batched hydration) implementations of `getMessagesByCid`
+/// for head-to-head comparison.
+class _CountingInterceptor extends QueryInterceptor {
+  int selectCount = 0;
+  int rowsReturned = 0;
+
+  void reset() {
+    selectCount = 0;
+    rowsReturned = 0;
+  }
+
+  @override
+  Future<List<Map<String, Object?>>> runSelect(
+    QueryExecutor executor,
+    String statement,
+    List<Object?> args,
+  ) async {
+    final result = await executor.runSelect(statement, args);
+    selectCount++;
+    rowsReturned += result.length;
+    return result;
+  }
+}
+
+typedef _BenchResult = ({
+  List<String> messageIds,
+  List<String> fingerprints,
+  int selectCount,
+  int rowsReturned,
+  int medianMicros,
+});
+
+// Goes one level into the quoted message and into the poll's vote buckets
+// so divergence in nested hydration (reactions/votes/answers on the quote
+// or poll) surfaces — bench parity is otherwise just "same ids in same
+// order" and would miss a regression that left, say, ownReactions empty.
+String _fingerprintMessage(Message m) {
+  String reactionFp(Reaction r) => '${r.type}@${r.user?.id ?? "-"}';
+  String reactionListFp(List<Reaction>? rs) =>
+      '[${(rs ?? const []).map(reactionFp).join(",")}]';
+  String pollFp(Poll? p) {
+    if (p == null) return '-';
+    final voteCounts = {
+      for (final entry in p.latestVotesByOption.entries)
+        entry.key: entry.value.length,
+    };
+    return [
+      p.id,
+      'answers=${p.latestAnswers.length}',
+      'own=${p.ownVotesAndAnswers.length}',
+      'votes=$voteCounts',
+    ].join(';');
+  }
+
+  String quotedFp(Message? q) {
+    if (q == null) return '-';
+    return [
+      q.id,
+      'latest=${reactionListFp(q.latestReactions)}',
+      'own=${reactionListFp(q.ownReactions)}',
+      'poll=${pollFp(q.poll)}',
+    ].join(';');
+  }
+
+  String draftFp(Draft? d) {
+    if (d == null) return '-';
+    return '${d.message.id}@parent=${d.parentId ?? "-"}';
+  }
+
+  return [
+    'id=${m.id}',
+    'user=${m.user?.id ?? "-"}',
+    'latest=${reactionListFp(m.latestReactions)}',
+    'own=${reactionListFp(m.ownReactions)}',
+    'quoted=${quotedFp(m.quotedMessage)}',
+    'poll=${pollFp(m.poll)}',
+    'draft=${draftFp(m.draft)}',
+  ].join(' | ');
+}
+
+void main() {
+  late DriftChatDatabase database;
+  late MessageDao messageDao;
+  late _CountingInterceptor interceptor;
+
+  setUp(() {
+    interceptor = _CountingInterceptor();
+    final executor = NativeDatabase.memory().interceptWith(interceptor);
+    database = DriftChatDatabase('testUserId', executor);
+    messageDao = database.messageDao;
+  });
+
+  tearDown(() async {
+    await database.disconnect();
+  });
+
+  // Seeds messages with reactions (own + other-user) on top-level AND on
+  // quoted-target messages, every-3rd quote chain (with the last quote
+  // pointing at a deleted target), every-5th poll with a mixed-user vote
+  // set + an own answer, and a thread draft attached to a real parent. The
+  // bench needs the same rich shape as the parity test or the SELECT-count
+  // improvement is invisible (and a hydration regression would be invisible
+  // too).
+  Future<void> seedRichMessages(String cid, int count) async {
+    final channels = [ChannelModel(cid: cid)];
+    final dbUser = User(id: 'testUserId');
+    final otherUsers = List.generate(count, (i) => User(id: 'otherUser$i'));
+    final allUsers = [dbUser, ...otherUsers];
+    final baseTime = DateTime.now();
+
+    const optionA = PollOption(id: 'opt0', text: 'A');
+    const optionB = PollOption(id: 'opt1', text: 'B');
+    final poll = Poll(
+      id: 'poll0',
+      name: 'Pick one',
+      options: const [optionA, optionB],
+      createdById: dbUser.id,
+    );
+
+    final messages = List.generate(
+      count,
+      (i) => Message(
+        id: 'msg$i',
+        type: 'regular',
+        user: allUsers[i % allUsers.length],
+        text: 'Hello $i',
+        createdAt: baseTime.add(Duration(seconds: i)),
+        updatedAt: baseTime.add(Duration(seconds: i)),
+        quotedMessageId: (i >= 3 && i % 3 == 0)
+            ? (i == count - 1 ? 'msg-deleted' : 'msg${i - 2}')
+            : null,
+        pollId: (i % 5 == 0) ? 'poll0' : null,
+      ),
+    );
+
+    final reactions = <Reaction>[
+      for (var i = 0; i < count; i++) ...[
+        if (i.isEven)
+          Reaction(
+            type: 'like',
+            messageId: 'msg$i',
+            user: dbUser,
+            createdAt: baseTime.add(Duration(seconds: i)),
+          ),
+        Reaction(
+          type: 'love',
+          messageId: 'msg$i',
+          user: otherUsers[i % otherUsers.length],
+          createdAt: baseTime.add(Duration(seconds: i)),
+        ),
+      ],
+    ];
+
+    final pollVotes = [
+      PollVote(
+        id: 'vote-own-a',
+        pollId: poll.id,
+        userId: dbUser.id,
+        user: dbUser,
+        optionId: optionA.id,
+        createdAt: baseTime,
+      ),
+      PollVote(
+        id: 'vote-other-a',
+        pollId: poll.id,
+        userId: otherUsers[0].id,
+        user: otherUsers[0],
+        optionId: optionA.id,
+        createdAt: baseTime.add(const Duration(seconds: 1)),
+      ),
+      PollVote(
+        id: 'vote-other-b',
+        pollId: poll.id,
+        userId: otherUsers[1 % otherUsers.length].id,
+        user: otherUsers[1 % otherUsers.length],
+        optionId: optionB.id,
+        createdAt: baseTime.add(const Duration(seconds: 2)),
+      ),
+      PollVote(
+        id: 'answer-own',
+        pollId: poll.id,
+        userId: dbUser.id,
+        user: dbUser,
+        answerText: 'because reasons',
+        createdAt: baseTime.add(const Duration(seconds: 3)),
+      ),
+    ];
+
+    final threadDraft = Draft(
+      channelCid: cid,
+      parentId: 'msg1',
+      createdAt: baseTime,
+      message: DraftMessage(
+        id: 'thread-draft-msg1',
+        text: 'Unsent reply to msg1',
+        parentId: 'msg1',
+      ),
+    );
+
+    await database.userDao.updateUsers(allUsers);
+    await database.channelDao.updateChannels(channels);
+    await database.pollDao.updatePolls([poll]);
+    await messageDao.updateMessages(cid, messages);
+    await database.reactionDao.updateReactions(reactions);
+    await database.pollVoteDao.updatePollVotes(pollVotes);
+    await database.draftMessageDao.updateDraftMessages([threadDraft]);
+  }
+
+  Future<_BenchResult> runBench(
+    Future<List<Message>> Function() fn, {
+    int warmups = 2,
+    int iterations = 10,
+  }) async {
+    for (var i = 0; i < warmups; i++) {
+      await fn();
+    }
+
+    final timings = <int>[];
+    List<Message>? lastResult;
+    for (var i = 0; i < iterations; i++) {
+      interceptor.reset();
+      final sw = Stopwatch()..start();
+      lastResult = await fn();
+      sw.stop();
+      timings.add(sw.elapsedMicroseconds);
+    }
+    timings.sort();
+    final median = timings[timings.length ~/ 2];
+
+    return (
+      messageIds: lastResult!.map((m) => m.id).toList(),
+      fingerprints: lastResult.map(_fingerprintMessage).toList(),
+      selectCount: interceptor.selectCount,
+      rowsReturned: interceptor.rowsReturned,
+      medianMicros: median,
+    );
+  }
+
+  void printTable(
+    String scenario,
+    int n,
+    _BenchResult legacy,
+    _BenchResult batched,
+  ) {
+    String improvement(int oldV, int newV) =>
+        newV == 0 ? '—' : '${(oldV / newV).toStringAsFixed(2)}×';
+
+    String pad(Object v, [int width = 10]) => v.toString().padRight(width);
+
+    // Full-data parity, not just id-order: the fingerprint catches a
+    // regression in any hydrated field (reactions, polls, votes, quoted
+    // sub-fields, draft).
+    final parity =
+        batched.fingerprints.toString() == legacy.fingerprints.toString()
+            ? 'OK'
+            : 'MISMATCH';
+
+    // ignore: avoid_print
+    print('''
+
+Scenario: $scenario  (N=$n, P=${batched.messageIds.length})
+                 ${pad('OLD')}${pad('NEW')}Improvement (old/new)
+SELECT calls     ${pad(legacy.selectCount)}${pad(batched.selectCount)}${improvement(legacy.selectCount, batched.selectCount)}
+Rows fetched     ${pad(legacy.rowsReturned)}${pad(batched.rowsReturned)}${improvement(legacy.rowsReturned, batched.rowsReturned)}
+Time (us, med)   ${pad(legacy.medianMicros)}${pad(batched.medianMicros)}${improvement(legacy.medianMicros, batched.medianMicros)}
+Result parity    $parity
+''');
+  }
+
+  group('getMessagesByCid hydration: legacy vs batched', () {
+    const cid = 'test:Cid';
+    const n = 100;
+
+    Future<void> runScenario(String label, PaginationParams? p) async {
+      await seedRichMessages(cid, n);
+
+      final legacy = await runBench(
+        () => messageDao.getMessagesByCidLegacy(cid, messagePagination: p),
+      );
+      final batched = await runBench(
+        () => messageDao.getMessagesByCid(cid, messagePagination: p),
+      );
+
+      expect(
+        batched.messageIds,
+        equals(legacy.messageIds),
+        reason: 'id-order parity broken for "$label"',
+      );
+      expect(
+        batched.fingerprints,
+        equals(legacy.fingerprints),
+        reason: 'full-data parity broken for "$label" — a hydrated field '
+            '(reactions / poll votes / quoted sub-fields / draft) differs',
+      );
+      // Strict less-than: with rich related data the batched path must
+      // collapse the per-row reaction/poll/draft fetches into a handful of
+      // batched ones. Anything ≥ legacy would mean the refactor regressed.
+      expect(
+        batched.selectCount,
+        lessThan(legacy.selectCount),
+        reason: 'batched hydration must issue fewer SELECTs than legacy '
+            'for "$label"',
+      );
+      expect(
+        batched.rowsReturned,
+        lessThanOrEqualTo(legacy.rowsReturned),
+        reason: 'batched hydration must not materialize more rows than '
+            'legacy for "$label"',
+      );
+
+      printTable(label, n, legacy, batched);
+    }
+
+    test('no pagination', () => runScenario('no pagination', null));
+
+    test(
+      'limit: 25',
+      () => runScenario('limit: 25', const PaginationParams(limit: 25)),
+    );
+
+    test(
+      'lessThan + limit (scroll up)',
+      () => runScenario(
+        'lessThan: msg80, limit: 25',
+        const PaginationParams(limit: 25, lessThan: 'msg80'),
+      ),
+    );
+
+    // `greaterThan-only + limit` intentionally diverges from the legacy
+    // Dart-side filter: the SQL pushdown treats it as forward pagination
+    // (cursor exclusive, ASC, first N after the cursor). The batched
+    // hydration SELECT-count win is orthogonal to the pagination-semantics
+    // change, so we still assert it; for ids we compare against the
+    // explicit expected window instead of against legacy.
+    test('greaterThan + limit (scroll down, forward pagination)', () async {
+      await seedRichMessages(cid, n);
+
+      const params = PaginationParams(limit: 25, greaterThan: 'msg10');
+      final legacy = await runBench(
+        () => messageDao.getMessagesByCidLegacy(cid, messagePagination: params),
+      );
+      final batched = await runBench(
+        () => messageDao.getMessagesByCid(cid, messagePagination: params),
+      );
+
+      // Legacy: keeps the cursor, takes the last 25 of [msg10..msg99]
+      // → msg75..msg99.
+      expect(
+        legacy.messageIds,
+        equals([for (var i = 75; i < 100; i++) 'msg$i']),
+        reason: 'legacy must keep the cursor and return the tail of '
+            '[cursor..end]',
+      );
+      // Pushdown: forward pagination, first 25 after the cursor → msg11..msg35.
+      expect(
+        batched.messageIds,
+        equals([for (var i = 11; i < 36; i++) 'msg$i']),
+        reason: 'forward pagination must return the first 25 messages after '
+            'the cursor in ASC order',
+      );
+      expect(batched.selectCount, lessThan(legacy.selectCount));
+      expect(batched.rowsReturned, lessThanOrEqualTo(legacy.rowsReturned));
+
+      printTable(
+          'greaterThan: msg10, limit: 25 (forward)', n, legacy, batched);
+    });
+
+    test(
+      'lessThan + greaterThan + limit',
+      () => runScenario(
+        'lessThan: msg80, greaterThan: msg10, limit: 25',
+        const PaginationParams(
+          limit: 25,
+          lessThan: 'msg80',
+          greaterThan: 'msg10',
+        ),
+      ),
+    );
+  });
+
+  group('getMessagesByCid hydration: stress', () {
+    const cid = 'test:Cid';
+
+    test(
+      '500 messages, limit: 25',
+      () async {
+        await seedRichMessages(cid, 500);
+
+        final legacy = await runBench(
+          () => messageDao.getMessagesByCidLegacy(
+            cid,
+            messagePagination: const PaginationParams(limit: 25),
+          ),
+        );
+        final batched = await runBench(
+          () => messageDao.getMessagesByCid(
+            cid,
+            messagePagination: const PaginationParams(limit: 25),
+          ),
+        );
+
+        expect(batched.messageIds, equals(legacy.messageIds));
+        expect(batched.fingerprints, equals(legacy.fingerprints));
+        expect(batched.selectCount, lessThan(legacy.selectCount));
+        expect(batched.rowsReturned, lessThanOrEqualTo(legacy.rowsReturned));
+
+        printTable('stress: 500 messages, limit 25', 500, legacy, batched);
+      },
+    );
+  });
+}
Index: packages/stream_chat_persistence/lib/src/dao/message_dao.dart
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart
--- a/packages/stream_chat_persistence/lib/src/dao/message_dao.dart	(revision cc443dce0fecb9d52e3749172f4fbc6b02425531)
+++ b/packages/stream_chat_persistence/lib/src/dao/message_dao.dart	(date 1779706338696)
@@ -1,4 +1,7 @@
+import 'dart:math';
+
 import 'package:drift/drift.dart';
+import 'package:flutter/foundation.dart';
 import 'package:stream_chat/stream_chat.dart';
 import 'package:stream_chat_persistence/src/db/drift_chat_database.dart';
 import 'package:stream_chat_persistence/src/entity/messages.dart';
@@ -360,4 +363,106 @@
         .map((row) => row.read(messages.createdAt))
         .getSingleOrNull();
   }
+
+  Future<Message> _messageFromJoinRow(
+      TypedResult rows, {
+        bool fetchDraft = false,
+      }) async {
+    final userEntity = rows.readTableOrNull(_users);
+    final pinnedByEntity = rows.readTableOrNull(_pinnedByUsers);
+    final msgEntity = rows.readTable(messages);
+    final latestReactions = await _db.reactionDao.getReactions(msgEntity.id);
+    final ownReactions = await _db.reactionDao.getReactionsByUserId(
+      msgEntity.id,
+      _db.userId,
+    );
+
+    final quotedMessage = await switch (msgEntity.quotedMessageId) {
+      final id? => getMessageById(id),
+      _ => null,
+    };
+
+    final poll = await switch (msgEntity.pollId) {
+      final id? => _db.pollDao.getPollById(id),
+      _ => null,
+    };
+
+    final draft = await switch (fetchDraft) {
+      true => _db.draftMessageDao.getDraftMessageByCid(
+        msgEntity.channelCid,
+        parentId: msgEntity.id,
+      ),
+      _ => null,
+    };
+
+    return msgEntity.toMessage(
+      user: userEntity?.toUser(),
+      pinnedBy: pinnedByEntity?.toUser(),
+      latestReactions: latestReactions,
+      ownReactions: ownReactions,
+      quotedMessage: quotedMessage,
+      poll: poll,
+      draft: draft,
+    );
+  }
+
+  /// Pre-SQL-pushdown reference implementation of [getMessagesByCid]. Fetches
+  /// every cached message for the channel, hydrates each row, then trims the
+  /// result in Dart. Kept only as the head-to-head baseline for the
+  /// `get_messages_by_cid_bench_test.dart` benchmark — remove once we no
+  /// longer need behavioral parity proof.
+  @visibleForTesting
+  Future<List<Message>> getMessagesByCidLegacy(
+      String cid, {
+        bool fetchDraft = true,
+        PaginationParams? messagePagination,
+      }) async {
+    final query = select(messages).join([
+      leftOuterJoin(_users, messages.userId.equalsExp(_users.id)),
+      leftOuterJoin(
+        _pinnedByUsers,
+        messages.pinnedByUserId.equalsExp(_pinnedByUsers.id),
+      ),
+    ])
+      ..where(messages.channelCid.equals(cid))
+      ..where(messages.parentId.isNull() | messages.showInChannel.equals(true))
+      ..orderBy([OrderingTerm.asc(messages.createdAt)]);
+
+    final result = await query.get();
+    if (result.isEmpty) return [];
+
+    final msgList = await Future.wait(
+      result.map(
+            (row) => _messageFromJoinRow(
+          row,
+          fetchDraft: fetchDraft,
+        ),
+      ),
+    );
+
+    if (msgList.isNotEmpty) {
+      if (messagePagination?.lessThan != null) {
+        final lessThanIndex = msgList.indexWhere(
+              (m) => m.id == messagePagination!.lessThan,
+        );
+        if (lessThanIndex != -1) {
+          msgList.removeRange(lessThanIndex, msgList.length);
+        }
+      }
+      if (messagePagination?.greaterThan != null) {
+        final greaterThanIndex = msgList.indexWhere(
+              (m) => m.id == messagePagination!.greaterThan,
+        );
+        if (greaterThanIndex != -1) {
+          msgList.removeRange(0, greaterThanIndex);
+        }
+      }
+      if (messagePagination?.limit != null) {
+        return msgList
+            .skip(max(0, msgList.length - messagePagination!.limit))
+            .toList();
+      }
+    }
+    return msgList;
+  }
 }

Summary by CodeRabbit

  • New Features

    • Bulk retrieval for messages, reactions, polls, and thread drafts to improve batch operations and responsiveness.
  • Bug Fixes

    • Fixed pagination cursor semantics (inclusive/exclusive) and forward-pagination boundaries for correct page results.
    • Ensured deterministic ordering to avoid skipped/duplicated messages.
  • Performance

    • Reduced database reads via batched hydration and chunked queries for faster message and reaction loading.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 25, 2026

📝 Walkthrough

Walkthrough

Batches related DB fetches and adds bulk DAO APIs with a chunking utility; replaces per-row message hydration with batched _messagesFromJoinRows; moves cursor pagination into SQL with tuple-accurate (createdAt,id) predicates; and expands tests for hydration, pagination, and bulk behaviors.

Changes

Persistence Layer Query Optimization

Layer / File(s) Summary
Query Utility Foundation
packages/stream_chat_persistence/lib/src/db/query_utils.dart, packages/stream_chat_persistence/CHANGELOG.md
Adds chunked<T> to split large ID lists for SQLite IN queries and updates CHANGELOG with detailed performance and pagination notes.
Bulk Query Methods Across DAOs
packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart, packages/stream_chat_persistence/lib/src/dao/poll_dao.dart, packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart, packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart, packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart
Adds bulk/batch APIs: getDraftMessagesByParentIds, getPollsByIds, getPollVotesForPolls, getReactionsForMessages, and getReactionsForMessagesByUserId. Centralizes reaction selection into _selectReactions helpers and uses chunked for safe batched queries.
Batched Message Hydration Refactoring
packages/stream_chat_persistence/lib/src/dao/message_dao.dart, packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart
Replaces per-row hydration with _messagesFromJoinRows and _buildMessage that pre-collect IDs, batch-fetch reactions/polls/drafts, and recursively hydrate quoted messages; updates getMessage/getThread flows to use batched hydration.
Message Pagination with SQL-Level Cursors
packages/stream_chat_persistence/lib/src/dao/message_dao.dart
Overhauls getMessagesByCid to resolve cursor (createdAt,id) via _lookupCursor, determine forward/backward pagination, apply tuple-accurate SQL predicates and limit, and reverse results when necessary before hydration; removes previous post-fetch trimming and unused dart:math import.
Hydration Test Coverage
packages/stream_chat_persistence/test/src/dao/message_dao_test.dart, packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart, packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart
Adds tests validating latest vs own reactions, per-row reaction isolation, poll hydration with mixed votes/answers, conditional thread-draft attachment via fetchDraft, quoted-message hydration including depth-2 chains, and hydration correctness across pagination boundaries.
Pagination Test Coverage
packages/stream_chat_persistence/test/src/dao/message_dao_test.dart
Sets deterministic timestamps via shared baseTime, switches to exclusive greaterThan cursors, and adds extensive getMessagesByCid scenarios including tied-createdAt handling and first/last ID assertions.
Bulk Query API Test Coverage
packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart, packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart, packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart, packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart, packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart
Adds tests for bulk methods: getDraftMessagesByParentIds (channel-scoped mapping), getReactionsForMessages with >900 IDs, getPollsByIds with null mapping for missing IDs, getPollVotesForPolls, and reaction-by-user grouping/empty-input behavior.

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly Related PRs

Suggested Reviewers

  • xsahil03x
  • renefloor

"🐰 I hopped through rows and chunks today,
Prefetched friends so queries play,
Cursors sorted, batches run—hooray!
Faster reads, then off I sway. 🥕"

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the main change: performance optimization to reduce database message reads during pagination, with 'part 2' indicating continuation of work. It's specific, concise, and directly relates to the primary objective of the changeset.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/FLU-485_optimize_read_message_from_db_part2

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@VelikovPetar VelikovPetar changed the title Feature/flu 485 optimize read message from db part2 perf(llc): Reduce the number of read message per channel from DB when paginating (part 2) May 25, 2026
@VelikovPetar VelikovPetar marked this pull request as ready for review May 25, 2026 11:00
@codecov
Copy link
Copy Markdown

codecov Bot commented May 25, 2026

Codecov Report

❌ Patch coverage is 99.38650% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 65.62%. Comparing base (508c019) to head (c04a7cf).

Files with missing lines Patch % Lines
...at_persistence/lib/src/dao/pinned_message_dao.dart 97.64% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2681      +/-   ##
==========================================
+ Coverage   65.33%   65.62%   +0.29%     
==========================================
  Files         423      424       +1     
  Lines       26646    26848     +202     
==========================================
+ Hits        17408    17620     +212     
+ Misses       9238     9228      -10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart`:
- Around line 306-317: The cursor predicates only filter by messages.createdAt,
which can cause duplicates/misses when createdAt ties exist; update each branch
(lessThanCutoff, lessThanOrEqualCutoff, greaterThanCutoff,
greaterThanOrEqualCutoff) to add a secondary predicate on messages.id matching
the ordering key so the pair (createdAt, id) is used (e.g., for lessThan use
createdAt < t OR (createdAt == t AND id < cursorId)); modify the query.where
calls in the blocks referencing messages.createdAt and messages.id to apply the
combined comparisons for all four operators, and apply the same change to the
analogous blocks around the second occurrence (lines referenced in the comment:
the other block at 350-362).
- Around line 40-43: The recursive quoted-message hydration in
_messagesFromJoinRows lacks a visited-set, so protect against cycles by adding a
visited ID set parameter (e.g., Set<String> visited or Set<String>
visitedMessageIds) defaulting to empty, check the current message's id before
hydrating its quote(s) and skip recursion if already visited, and pass the
updated set when calling the same hydration logic recursively; apply the same
visited-set guard to the other recursive hydration block referenced around lines
88-106 so both recursion entry points use the visited set to prevent infinite
recursion.

In `@packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart`:
- Around line 39-42: The recursive quoted-message hydration in
_messagesFromJoinRows (and the similar logic around lines 88-106) lacks cycle
protection; add a visited-id guard: introduce a Set<String> visited (or Set<int>
depending on message id type) parameter with a default empty set, add the
current message id to visited before recursing, and before resolving a quoted
message check if its id is already in visited; if it is, stop recursion (e.g.,
set quotedMessage to null or a shallow reference) to break the cycle. Pass the
visited set through any recursive calls so cycles are detected across the entire
resolution chain and avoid infinite recursion.

In `@packages/stream_chat_persistence/lib/src/db/query_utils.dart`:
- Around line 14-17: The public function chunked<T>(List<T> input, [int size =
900]) can hang or misbehave if size <= 0; add an upfront argument validation in
chunked to guard against non-positive sizes (e.g. if (size <= 0) throw
ArgumentError.value(size, 'size', 'must be > 0')) so the for-loop using i +=
size cannot loop infinitely or produce invalid sublists; keep the check at the
top of chunked before the for-loop and reference the existing parameters input
and size.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3db24b32-5504-4c67-995b-722956c7187b

📥 Commits

Reviewing files that changed from the base of the PR and between 508c019 and cc443dc.

📒 Files selected for processing (16)
  • packages/stream_chat_persistence/CHANGELOG.md
  • packages/stream_chat_persistence/lib/src/dao/draft_message_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/message_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/pinned_message_reaction_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/poll_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/poll_vote_dao.dart
  • packages/stream_chat_persistence/lib/src/dao/reaction_dao.dart
  • packages/stream_chat_persistence/lib/src/db/query_utils.dart
  • packages/stream_chat_persistence/test/src/dao/draft_message_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/message_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/pinned_message_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/pinned_message_reaction_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/poll_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/poll_vote_dao_test.dart
  • packages/stream_chat_persistence/test/src/dao/reaction_dao_test.dart

Comment on lines +40 to 43
Future<List<Message>> _messagesFromJoinRows(
List<TypedResult> rows, {
bool fetchDraft = false,
}) async {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard recursive quote hydration against cyclic quote graphs.

_messagesFromJoinRows recursively hydrates quoted messages without a visited-set guard. A cycle like A -> B -> A will recurse until stack overflow.

💡 Suggested fix
 Future<List<Message>> _messagesFromJoinRows(
   List<TypedResult> rows, {
   bool fetchDraft = false,
+  Set<String>? _visitedQuoteIds,
 }) async {
   if (rows.isEmpty) return const [];
+  final visited = _visitedQuoteIds ?? <String>{};

   final messageIds = <String>[];
   final quotedIds = <String>[];
@@
   for (final row in rows) {
     final msg = row.readTable(messages);
     messageIds.add(msg.id);
@@
   }
+  visited.addAll(messageIds);
@@
-  if (quotedIds.isNotEmpty) {
+  final nextQuotedIds = quotedIds.where((id) => !visited.contains(id)).toList();
+  if (nextQuotedIds.isNotEmpty) {
     final quoteRows = await (select(messages).join([
@@
-          ..where(messages.id.isIn(quotedIds)))
+          ..where(messages.id.isIn(nextQuotedIds)))
         .get();
     final quotedMessages = await _messagesFromJoinRows(
       quoteRows,
       fetchDraft: true,
+      _visitedQuoteIds: visited,
     );

Also applies to: 88-106

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart` around lines
40 - 43, The recursive quoted-message hydration in _messagesFromJoinRows lacks a
visited-set, so protect against cycles by adding a visited ID set parameter
(e.g., Set<String> visited or Set<String> visitedMessageIds) defaulting to
empty, check the current message's id before hydrating its quote(s) and skip
recursion if already visited, and pass the updated set when calling the same
hydration logic recursively; apply the same visited-set guard to the other
recursive hydration block referenced around lines 88-106 so both recursion entry
points use the visited set to prevent infinite recursion.

Comment thread packages/stream_chat_persistence/lib/src/dao/message_dao.dart Outdated
Comment on lines +39 to 42
Future<List<Message>> _messagesFromJoinRows(
List<TypedResult> rows, {
bool fetchDraft = false,
}) async {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Add cycle protection to recursive quoted-message hydration.

_messagesFromJoinRows recursively resolves quotes with no visited guard. Cyclic quote references can lead to infinite recursion and crash.

💡 Suggested fix
 Future<List<Message>> _messagesFromJoinRows(
   List<TypedResult> rows, {
   bool fetchDraft = false,
+  Set<String>? _visitedQuoteIds,
 }) async {
   if (rows.isEmpty) return const [];
+  final visited = _visitedQuoteIds ?? <String>{};
@@
   for (final row in rows) {
     final msg = row.readTable(pinnedMessages);
     messageIds.add(msg.id);
@@
   }
+  visited.addAll(messageIds);
@@
-  if (quotedIds.isNotEmpty) {
+  final nextQuotedIds = quotedIds.where((id) => !visited.contains(id)).toList();
+  if (nextQuotedIds.isNotEmpty) {
@@
-          ..where(pinnedMessages.id.isIn(quotedIds)))
+          ..where(pinnedMessages.id.isIn(nextQuotedIds)))
         .get();
     final quotedMessages = await _messagesFromJoinRows(
       quoteRows,
       fetchDraft: true,
+      _visitedQuoteIds: visited,
     );

Also applies to: 88-106

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/dao/pinned_message_dao.dart` around
lines 39 - 42, The recursive quoted-message hydration in _messagesFromJoinRows
(and the similar logic around lines 88-106) lacks cycle protection; add a
visited-id guard: introduce a Set<String> visited (or Set<int> depending on
message id type) parameter with a default empty set, add the current message id
to visited before recursing, and before resolving a quoted message check if its
id is already in visited; if it is, stop recursion (e.g., set quotedMessage to
null or a shallow reference) to break the cycle. Pass the visited set through
any recursive calls so cycles are detected across the entire resolution chain
and avoid infinite recursion.

Comment on lines +14 to +17
Iterable<List<T>> chunked<T>(List<T> input, [int size = 900]) sync* {
for (var i = 0; i < input.length; i += size) {
yield input.sublist(i, math.min(i + size, input.length));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard size to prevent infinite loops on invalid input.

chunked is public, and size <= 0 makes the loop at Line 15 non-terminating or invalid. Add an argument check up front.

Suggested fix
 Iterable<List<T>> chunked<T>(List<T> input, [int size = 900]) sync* {
+  if (size <= 0) {
+    throw ArgumentError.value(size, 'size', 'must be greater than 0');
+  }
   for (var i = 0; i < input.length; i += size) {
     yield input.sublist(i, math.min(i + size, input.length));
   }
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/db/query_utils.dart` around lines 14
- 17, The public function chunked<T>(List<T> input, [int size = 900]) can hang
or misbehave if size <= 0; add an upfront argument validation in chunked to
guard against non-positive sizes (e.g. if (size <= 0) throw
ArgumentError.value(size, 'size', 'must be > 0')) so the for-loop using i +=
size cannot loop infinitely or produce invalid sublists; keep the check at the
top of chunked before the for-loop and reference the existing parameters input
and size.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart`:
- Around line 257-260: _lookupCursor is currently returning cursor messages from
any channel (including top-level showInChannel=true messages), causing
getMessagesByCid to paginate a channel using boundaries from another channel;
change _lookupCursor to accept the requested channel id (cid) or add a new
helper like _lookupCursorForCid and only return a cursor when the candidate
message's cid equals the requested cid (otherwise return null/no-op), update all
call sites that pass messagePagination?.lessThan / lessThanOrEqual / greaterThan
/ greaterThanOrEqual to pass the cid and apply the same cid-scoping change to
the other cursor lookups referenced elsewhere (the second block noted in the
comment).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 125ee85c-7b0e-4302-91c7-5040febbb7c8

📥 Commits

Reviewing files that changed from the base of the PR and between cc443dc and c04a7cf.

📒 Files selected for processing (3)
  • packages/stream_chat_persistence/CHANGELOG.md
  • packages/stream_chat_persistence/lib/src/dao/message_dao.dart
  • packages/stream_chat_persistence/test/src/dao/message_dao_test.dart
✅ Files skipped from review due to trivial changes (1)
  • packages/stream_chat_persistence/CHANGELOG.md

Comment on lines +257 to +260
_lookupCursor(messagePagination?.lessThan),
_lookupCursor(messagePagination?.lessThanOrEqual),
_lookupCursor(messagePagination?.greaterThan),
_lookupCursor(messagePagination?.greaterThanOrEqual),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Scope cursor lookup to the requested channel.

_lookupCursor currently accepts any top-level / showInChannel=true message as a valid cursor, even if that message belongs to a different channel. In that case getMessagesByCid paginates channel cid using a boundary from another channel instead of falling back to a no-op cursor.

Suggested fix
     final (
       lessThanCursor,
       lessThanOrEqualCursor,
       greaterThanCursor,
       greaterThanOrEqualCursor,
     ) = await (
-      _lookupCursor(messagePagination?.lessThan),
-      _lookupCursor(messagePagination?.lessThanOrEqual),
-      _lookupCursor(messagePagination?.greaterThan),
-      _lookupCursor(messagePagination?.greaterThanOrEqual),
+      _lookupCursor(cid, messagePagination?.lessThan),
+      _lookupCursor(cid, messagePagination?.lessThanOrEqual),
+      _lookupCursor(cid, messagePagination?.greaterThan),
+      _lookupCursor(cid, messagePagination?.greaterThanOrEqual),
     ).wait;
@@
-  Future<({DateTime createdAt, String id})?> _lookupCursor(String? id) async {
+  Future<({DateTime createdAt, String id})?> _lookupCursor(
+    String cid,
+    String? id,
+  ) async {
     if (id == null) return null;
     final createdAt = await (selectOnly(messages)
           ..addColumns([messages.createdAt])
           ..where(messages.id.equals(id))
+          ..where(messages.channelCid.equals(cid))
           ..where(
             messages.parentId.isNull() | messages.showInChannel.equals(true),
           ))
         .map((row) => row.read(messages.createdAt))
         .getSingleOrNull();

Also applies to: 358-373

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/stream_chat_persistence/lib/src/dao/message_dao.dart` around lines
257 - 260, _lookupCursor is currently returning cursor messages from any channel
(including top-level showInChannel=true messages), causing getMessagesByCid to
paginate a channel using boundaries from another channel; change _lookupCursor
to accept the requested channel id (cid) or add a new helper like
_lookupCursorForCid and only return a cursor when the candidate message's cid
equals the requested cid (otherwise return null/no-op), update all call sites
that pass messagePagination?.lessThan / lessThanOrEqual / greaterThan /
greaterThanOrEqual to pass the cid and apply the same cid-scoping change to the
other cursor lookups referenced elsewhere (the second block noted in the
comment).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant