From 2364e5624dd319edf6b108f49ba55dbd251583ab Mon Sep 17 00:00:00 2001 From: onlyyu1996 <1158673577@qq.com> Date: Mon, 18 May 2026 15:14:54 +0800 Subject: [PATCH] fix(share): requeue sync updates on flush failure --- src/cortex-share/src/sync.rs | 67 ++++++++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 2 deletions(-) diff --git a/src/cortex-share/src/sync.rs b/src/cortex-share/src/sync.rs index 86df82ffe..17508bfbd 100644 --- a/src/cortex-share/src/sync.rs +++ b/src/cortex-share/src/sync.rs @@ -123,14 +123,33 @@ impl ShareSync { let count = updates.len(); - for update in updates { - self.send_update(update).await?; + let mut sent = 0; + let mut remaining = updates.into_iter(); + while let Some(update) = remaining.next() { + if let Err(error) = self.send_update(update.clone()).await { + let mut unsent = Vec::with_capacity(count - sent); + unsent.push(update); + unsent.extend(remaining); + self.requeue_front(unsent).await; + return Err(error); + } + sent += 1; } debug!("Flushed {} sync updates", count); Ok(count) } + async fn requeue_front(&self, mut updates: Vec) { + if updates.is_empty() { + return; + } + + let mut pending = self.pending.lock().await; + updates.extend(std::mem::take(&mut *pending)); + *pending = updates; + } + /// Send a single update. async fn send_update(&self, update: SyncUpdate) -> Result<()> { let response = self @@ -192,6 +211,7 @@ impl Default for ShareSync { #[cfg(test)] mod tests { use super::*; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[tokio::test] async fn test_queue_update() { @@ -208,4 +228,47 @@ mod tests { let pending = sync.pending.lock().await; assert_eq!(pending.len(), 1); } + + #[tokio::test] + async fn test_flush_requeues_failed_and_unsent_updates() { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let server = tokio::spawn(async move { + let (mut stream, _) = listener.accept().await.unwrap(); + let mut buf = [0; 2048]; + let _ = stream.read(&mut buf).await.unwrap(); + stream + .write_all(b"HTTP/1.1 200 OK\r\ncontent-length: 2\r\nconnection: close\r\n\r\nOK") + .await + .unwrap(); + + let (mut stream, _) = listener.accept().await.unwrap(); + let mut buf = [0; 2048]; + let _ = stream.read(&mut buf).await.unwrap(); + }); + + let sync = ShareSync::new().with_api_url(format!("http://{}", addr)); + let share = SharedSession::new( + "test-session".to_string(), + "https://example.com/share/123".to_string(), + "secret123".to_string(), + ); + + sync.queue_update(&share, "first", serde_json::json!({"order": 1})) + .await; + sync.queue_update(&share, "second", serde_json::json!({"order": 2})) + .await; + sync.queue_update(&share, "third", serde_json::json!({"order": 3})) + .await; + + let result = sync.flush().await; + assert!(result.is_err(), "flush should report the failed update"); + + let pending = sync.pending.lock().await; + let keys: Vec<_> = pending.iter().map(|update| update.key.as_str()).collect(); + assert_eq!(keys, vec!["second", "third"]); + + server.await.unwrap(); + } }