From 6d6fe2c2ca21ff6c41443fcf5f4b769dae4c4f98 Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Mon, 1 Jun 2026 09:33:21 +0000 Subject: [PATCH 1/4] agent: fix rare race conditions in async API --- src/Simplex/Messaging/Agent.hs | 64 +++++++++++--------------- tests/AgentTests/FunctionalAPITests.hs | 30 ++++++++---- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index bd77b892a..421789472 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -49,6 +49,7 @@ module Simplex.Messaging.Agent deleteUser, setUserService, connRequestPQSupport, + prepareConnectionToCreate, createConnectionAsync, setConnShortLinkAsync, getConnShortLinkAsync, @@ -357,8 +358,14 @@ setUserService c = withAgentEnv c .: setUserService' c {-# INLINE setUserService #-} -- | Create SMP agent connection (NEW command) asynchronously, synchronous response is new connection id -createConnectionAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE ConnId -createConnectionAsync c userId aCorrId enableNtfs = withAgentEnv c .:. newConnAsync c userId aCorrId enableNtfs +-- | Create SMP agent connection without queue (to be used with createConnectionAsync). +prepareConnectionToCreate :: ConnectionModeI c => AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AE ConnId +prepareConnectionToCreate c userId enableNtfs cMode pqSup = withAgentEnv c $ newConnNoQueues c userId enableNtfs cMode pqSup +{-# INLINE prepareConnectionToCreate #-} + +-- | Enqueue NEW command for a prepared connection. +createConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE () +createConnectionAsync c aCorrId connId enableNtfs = withAgentEnv c .:. newConnAsync c aCorrId connId enableNtfs {-# INLINE createConnectionAsync #-} -- | Create or update user's contact connection short link (LSET command) asynchronously, no synchronous response @@ -371,10 +378,10 @@ getConnShortLinkAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Con getConnShortLinkAsync c = withAgentEnv c .:: getConnShortLinkAsync' c {-# INLINE getConnShortLinkAsync #-} --- | Join SMP agent connection (JOIN command) asynchronously, synchronous response is new connection id. --- If connId is provided (for contact URIs), it updates the existing connection record created by getConnShortLinkAsync. -joinConnectionAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId -joinConnectionAsync c userId aCorrId connId_ enableNtfs = withAgentEnv c .:: joinConnAsync c userId aCorrId connId_ enableNtfs +-- | Enqueue JOIN command for a prepared connection. +joinConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE () +joinConnectionAsync c aCorrId connId enableNtfs cReqUri cInfo pqSup subMode = + withAgentEnv c $ joinConnAsync c aCorrId connId enableNtfs cReqUri cInfo pqSup subMode {-# INLINE joinConnectionAsync #-} -- | Allow connection to continue after CONF notification (LET command), no synchronous response @@ -837,11 +844,10 @@ setUserService' c userId enable = do unless ok $ throwE $ CMD PROHIBITED "setUserService" when (changed && not enable) $ withStore' c (`deleteClientServices` userId) -newConnAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ConnId -newConnAsync c userId corrId enableNtfs cMode pqInitKeys subMode = do - connId <- newConnNoQueues c userId enableNtfs cMode (CR.connPQEncryption pqInitKeys) +newConnAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM () +newConnAsync c corrId connId enableNtfs cMode pqInitKeys subMode = enqueueCommand c corrId connId Nothing $ AClientCommand $ NEW enableNtfs (ACM cMode) pqInitKeys subMode - pure connId +{-# INLINE newConnAsync #-} newConnNoQueues :: AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AM ConnId newConnNoQueues c userId enableNtfs cMode pqSupport = do @@ -850,36 +856,18 @@ newConnNoQueues c userId enableNtfs cMode pqSupport = do let cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport} withStore c $ \db -> createNewConn db g cData cMode --- TODO [short links] TBC, but probably we will need async join for contact addresses as the contact will be created after user confirming the connection, --- and join should retry, the same as 1-time invitation joins. -joinConnAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId -joinConnAsync c userId corrId connId_ enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = do - when (isJust connId_) $ throwE $ CMD PROHIBITED "joinConnAsync: connId not allowed for invitation URI" - withInvLock c (strEncode cReqUri) "joinConnAsync" $ do - lift (compatibleInvitationUri cReqUri) >>= \case - Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do - g <- asks random - let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v) - cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport} - connId <- withStore c $ \db -> createNewConn db g cData SCMInvitation - enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo - pure connId - Nothing -> throwE $ AGENT A_VERSION -joinConnAsync c userId corrId connId_ enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode = do +joinConnAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM () +joinConnAsync c corrId connId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = + lift (compatibleInvitationUri cReqUri) >>= \case + Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do + let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v) + enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo + Nothing -> throwE $ AGENT A_VERSION +joinConnAsync c corrId connId enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode = lift (compatibleContactUri cReqUri) >>= \case Just (_, Compatible connAgentVersion) -> do let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion Nothing - connId <- case connId_ of - Just cId -> do - -- update connection record created by getConnShortLinkAsync - withStore' c $ \db -> updateNewConnJoin db cId connAgentVersion pqSupport enableNtfs - pure cId - Nothing -> do - g <- asks random - let cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport} - withStore c $ \db -> createNewConn db g cData SCMInvitation enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo - pure connId Nothing -> throwE $ AGENT A_VERSION allowConnectionAsync' :: AgentClient -> ACorrId -> ConnId -> ConfirmationId -> ConnInfo -> AM () @@ -898,10 +886,12 @@ allowConnectionAsync' c corrId connId confId ownConnInfo = acceptContactAsync' :: AgentClient -> UserId -> ACorrId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId acceptContactAsync' c userId corrId enableNtfs invId ownConnInfo pqSupport subMode = do Invitation {connReq} <- withStore c $ \db -> getInvitation db "acceptContactAsync'" invId + connId <- newConnToJoin c userId "" enableNtfs connReq pqSupport withStore' c $ \db -> acceptInvitation db invId ownConnInfo - joinConnAsync c userId corrId Nothing enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do + joinConnAsync c corrId connId enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do withStore' c (`unacceptInvitation` invId) throwE err + pure connId ackMessageAsync' :: AgentClient -> ACorrId -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> AM () ackMessageAsync' c corrId connId msgId rcptInfo_ = do diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index fba0eac4a..fd62eebbc 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -1425,7 +1425,8 @@ testInvitationShortLinkAsync viaProxy a b = do connReq' `shouldBe` connReq linkUserData connData' `shouldBe` userData runRight $ do - aId <- A.joinConnectionAsync b 1 "123" Nothing True connReq "bob's connInfo" PQSupportOn SMSubscribe + aId <- A.prepareConnectionToJoin b 1 True connReq PQSupportOn + A.joinConnectionAsync b "123" aId True connReq "bob's connInfo" PQSupportOn SMSubscribe get b =##> \case ("123", c, JOINED sndSecure) -> c == aId && sndSecure; _ -> False ("", _, CONF confId _ "bob's connInfo") <- get a allowConnection a bId confId "alice's connInfo" @@ -2685,10 +2686,12 @@ receiveMsg c cId msgId msg = do testAsyncCommands :: SndQueueSecured -> AgentClient -> AgentClient -> AgentMsgId -> IO () testAsyncCommands sqSecured alice bob baseId = runRight_ $ do - bobId <- createConnectionAsync alice 1 "1" True SCMInvitation IKPQOn SMSubscribe + bobId <- prepareConnectionToCreate alice 1 True SCMInvitation PQSupportOn + createConnectionAsync alice "1" bobId True SCMInvitation IKPQOn SMSubscribe ("1", bobId', INV (ACR _ qInfo)) <- get alice liftIO $ bobId' `shouldBe` bobId - aliceId <- joinConnectionAsync bob 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe + aliceId <- prepareConnectionToJoin bob 1 True qInfo PQSupportOn + joinConnectionAsync bob "2" aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe ("2", aliceId', JOINED sqSecured') <- get bob liftIO $ do aliceId' `shouldBe` aliceId @@ -2779,8 +2782,8 @@ testGetConnShortLinkAsync ps = withAgentClients2 $ \alice bob -> liftIO $ qInfo' `shouldBe` qInfo liftIO $ userCtData' `shouldBe` userCtData -- join connection async using connId from getConnShortLinkAsync - aliceId <- joinConnectionAsync bob 1 "2" (Just newId) True qInfo' "bob's connInfo" PQSupportOn SMSubscribe - liftIO $ aliceId `shouldBe` newId + joinConnectionAsync bob "2" newId True qInfo' "bob's connInfo" PQSupportOn SMSubscribe + let aliceId = newId ("2", aliceId', JOINED False) <- get bob liftIO $ aliceId' `shouldBe` aliceId -- complete connection @@ -2796,7 +2799,10 @@ testGetConnShortLinkAsync ps = withAgentClients2 $ \alice bob -> testAsyncCommandsRestore :: (ASrvTransport, AStoreType) -> IO () testAsyncCommandsRestore ps = do alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB - bobId <- runRight $ createConnectionAsync alice 1 "1" True SCMInvitation IKPQOn SMSubscribe + bobId <- runRight $ do + connId <- prepareConnectionToCreate alice 1 True SCMInvitation PQSupportOn + createConnectionAsync alice "1" connId True SCMInvitation IKPQOn SMSubscribe + pure connId liftIO $ noMessages alice "alice doesn't receive INV because server is down" disposeAgentClient alice withAgent 2 agentCfg initAgentServers testDB $ \alice' -> @@ -3083,10 +3089,12 @@ testJoinConnectionAsyncReplyErrorV8 ps@(t, ASType qsType _) = do withAgent 1 cfg' initAgentServers testDB $ \a -> withAgent 2 cfg' initAgentServersSrv2 testDB2 $ \b -> do (aId, bId) <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do - bId <- createConnectionAsync a 1 "1" True SCMInvitation IKPQOn SMSubscribe + bId <- prepareConnectionToCreate a 1 True SCMInvitation PQSupportOn + createConnectionAsync a "1" bId True SCMInvitation IKPQOn SMSubscribe ("1", bId', INV (ACR _ qInfo)) <- get a liftIO $ bId' `shouldBe` bId - aId <- joinConnectionAsync b 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe + aId <- prepareConnectionToJoin b 1 True qInfo PQSupportOn + joinConnectionAsync b "2" aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe liftIO $ threadDelay 500000 ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId pure (aId, bId) @@ -3128,10 +3136,12 @@ testJoinConnectionAsyncReplyError ps@(t, ASType qsType _) = do withAgent 1 agentCfg initAgentServers testDB $ \a -> withAgent 2 agentCfg initAgentServersSrv2 testDB2 $ \b -> do (aId, bId) <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do - bId <- createConnectionAsync a 1 "1" True SCMInvitation IKPQOn SMSubscribe + bId <- prepareConnectionToCreate a 1 True SCMInvitation PQSupportOn + createConnectionAsync a "1" bId True SCMInvitation IKPQOn SMSubscribe ("1", bId', INV (ACR _ qInfo)) <- get a liftIO $ bId' `shouldBe` bId - aId <- joinConnectionAsync b 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe + aId <- prepareConnectionToJoin b 1 True qInfo PQSupportOn + joinConnectionAsync b "2" aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe liftIO $ threadDelay 500000 ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId pure (aId, bId) From 3175ae39610f819a4ebcf1c645ca91d765042a6d Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Mon, 1 Jun 2026 09:50:41 +0000 Subject: [PATCH 2/4] split async accept too --- src/Simplex/Messaging/Agent.hs | 29 +++++++++++++------------- tests/AgentTests/FunctionalAPITests.hs | 13 ++++++------ 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 421789472..31a94e8d0 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -379,9 +379,9 @@ getConnShortLinkAsync c = withAgentEnv c .:: getConnShortLinkAsync' c {-# INLINE getConnShortLinkAsync #-} -- | Enqueue JOIN command for a prepared connection. -joinConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE () -joinConnectionAsync c aCorrId connId enableNtfs cReqUri cInfo pqSup subMode = - withAgentEnv c $ joinConnAsync c aCorrId connId enableNtfs cReqUri cInfo pqSup subMode +joinConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE () +joinConnectionAsync c aCorrId updateConn connId enableNtfs cReqUri cInfo pqSup subMode = + withAgentEnv c $ joinConnAsync c aCorrId updateConn connId enableNtfs cReqUri cInfo pqSup subMode {-# INLINE joinConnectionAsync #-} -- | Allow connection to continue after CONF notification (LET command), no synchronous response @@ -389,9 +389,10 @@ allowConnectionAsync :: AgentClient -> ACorrId -> ConnId -> ConfirmationId -> Co allowConnectionAsync c = withAgentEnv c .:: allowConnectionAsync' c {-# INLINE allowConnectionAsync #-} --- | Accept contact after REQ notification (ACPT command) asynchronously, synchronous response is new connection id -acceptContactAsync :: AgentClient -> UserId -> ACorrId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId -acceptContactAsync c userId aCorrId enableNtfs = withAgentEnv c .:: acceptContactAsync' c userId aCorrId enableNtfs +-- | Accept contact after REQ notification (ACPT command) asynchronously, for a prepared connection. +acceptContactAsync :: AgentClient -> ACorrId -> ConnId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE () +acceptContactAsync c aCorrId connId enableNtfs invId ownConnInfo pqSupport subMode = + withAgentEnv c $ acceptContactAsync' c aCorrId connId enableNtfs invId ownConnInfo pqSupport subMode {-# INLINE acceptContactAsync #-} -- | Acknowledge message (ACK command) asynchronously, no synchronous response @@ -856,17 +857,19 @@ newConnNoQueues c userId enableNtfs cMode pqSupport = do let cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport} withStore c $ \db -> createNewConn db g cData cMode -joinConnAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM () -joinConnAsync c corrId connId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = +joinConnAsync :: ConnectionModeI c => AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM () +joinConnAsync c corrId updateConn connId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = lift (compatibleInvitationUri cReqUri) >>= \case Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v) + when updateConn $ withStore' c $ \db -> updateNewConnJoin db connId connAgentVersion pqSupport enableNtfs enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo Nothing -> throwE $ AGENT A_VERSION -joinConnAsync c corrId connId enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode = +joinConnAsync c corrId updateConn connId enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode = lift (compatibleContactUri cReqUri) >>= \case Just (_, Compatible connAgentVersion) -> do let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion Nothing + when updateConn $ withStore' c $ \db -> updateNewConnJoin db connId connAgentVersion pqSupport enableNtfs enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo Nothing -> throwE $ AGENT A_VERSION @@ -883,15 +886,13 @@ allowConnectionAsync' c corrId connId confId ownConnInfo = -- and also it can't be triggered by user concurrently several times in a row. It could be improved similarly to -- `acceptContact` by creating a new map for invitation locks and taking lock here, and removing `unacceptInvitation` -- while marking invitation as accepted inside "lock level transaction" after successful `joinConnAsync`. -acceptContactAsync' :: AgentClient -> UserId -> ACorrId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId -acceptContactAsync' c userId corrId enableNtfs invId ownConnInfo pqSupport subMode = do +acceptContactAsync' :: AgentClient -> ACorrId -> ConnId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM () +acceptContactAsync' c corrId connId enableNtfs invId ownConnInfo pqSupport subMode = do Invitation {connReq} <- withStore c $ \db -> getInvitation db "acceptContactAsync'" invId - connId <- newConnToJoin c userId "" enableNtfs connReq pqSupport withStore' c $ \db -> acceptInvitation db invId ownConnInfo - joinConnAsync c corrId connId enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do + joinConnAsync c corrId False connId enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do withStore' c (`unacceptInvitation` invId) throwE err - pure connId ackMessageAsync' :: AgentClient -> ACorrId -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> AM () ackMessageAsync' c corrId connId msgId rcptInfo_ = do diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index fd62eebbc..ef9f35d11 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -1426,7 +1426,7 @@ testInvitationShortLinkAsync viaProxy a b = do linkUserData connData' `shouldBe` userData runRight $ do aId <- A.prepareConnectionToJoin b 1 True connReq PQSupportOn - A.joinConnectionAsync b "123" aId True connReq "bob's connInfo" PQSupportOn SMSubscribe + A.joinConnectionAsync b "123" False aId True connReq "bob's connInfo" PQSupportOn SMSubscribe get b =##> \case ("123", c, JOINED sndSecure) -> c == aId && sndSecure; _ -> False ("", _, CONF confId _ "bob's connInfo") <- get a allowConnection a bId confId "alice's connInfo" @@ -2691,7 +2691,7 @@ testAsyncCommands sqSecured alice bob baseId = ("1", bobId', INV (ACR _ qInfo)) <- get alice liftIO $ bobId' `shouldBe` bobId aliceId <- prepareConnectionToJoin bob 1 True qInfo PQSupportOn - joinConnectionAsync bob "2" aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe + joinConnectionAsync bob "2" False aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe ("2", aliceId', JOINED sqSecured') <- get bob liftIO $ do aliceId' `shouldBe` aliceId @@ -2782,7 +2782,7 @@ testGetConnShortLinkAsync ps = withAgentClients2 $ \alice bob -> liftIO $ qInfo' `shouldBe` qInfo liftIO $ userCtData' `shouldBe` userCtData -- join connection async using connId from getConnShortLinkAsync - joinConnectionAsync bob "2" newId True qInfo' "bob's connInfo" PQSupportOn SMSubscribe + joinConnectionAsync bob "2" True newId True qInfo' "bob's connInfo" PQSupportOn SMSubscribe let aliceId = newId ("2", aliceId', JOINED False) <- get bob liftIO $ aliceId' `shouldBe` aliceId @@ -2818,7 +2818,8 @@ testAcceptContactAsync sqSecured alice bob baseId = (aliceId, sqSecuredJoin) <- joinConnection bob 1 True qInfo "bob's connInfo" SMSubscribe liftIO $ sqSecuredJoin `shouldBe` False -- joining via contact address connection ("", _, REQ invId _ "bob's connInfo") <- get alice - bobId <- acceptContactAsync alice 1 "1" True invId "alice's connInfo" PQSupportOn SMSubscribe + bobId <- prepareConnectionToAccept alice 1 True invId PQSupportOn + acceptContactAsync alice "1" bobId True invId "alice's connInfo" PQSupportOn SMSubscribe get alice =##> \case ("1", c, JOINED sqSecured') -> c == bobId && sqSecured' == sqSecured; _ -> False ("", _, CONF confId _ "alice's connInfo") <- get bob allowConnection bob aliceId confId "bob's connInfo" @@ -3094,7 +3095,7 @@ testJoinConnectionAsyncReplyErrorV8 ps@(t, ASType qsType _) = do ("1", bId', INV (ACR _ qInfo)) <- get a liftIO $ bId' `shouldBe` bId aId <- prepareConnectionToJoin b 1 True qInfo PQSupportOn - joinConnectionAsync b "2" aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe + joinConnectionAsync b "2" False aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe liftIO $ threadDelay 500000 ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId pure (aId, bId) @@ -3141,7 +3142,7 @@ testJoinConnectionAsyncReplyError ps@(t, ASType qsType _) = do ("1", bId', INV (ACR _ qInfo)) <- get a liftIO $ bId' `shouldBe` bId aId <- prepareConnectionToJoin b 1 True qInfo PQSupportOn - joinConnectionAsync b "2" aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe + joinConnectionAsync b "2" False aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe liftIO $ threadDelay 500000 ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId pure (aId, bId) From e4c2011edb34933907292408c8da5cf98b1b2ab4 Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Mon, 1 Jun 2026 20:26:15 +0000 Subject: [PATCH 3/4] fix, reduce diff --- src/Simplex/Messaging/Agent.hs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 31a94e8d0..bb6103dfb 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -357,9 +357,8 @@ setUserService :: AgentClient -> UserId -> Bool -> AE () setUserService c = withAgentEnv c .: setUserService' c {-# INLINE setUserService #-} --- | Create SMP agent connection (NEW command) asynchronously, synchronous response is new connection id -- | Create SMP agent connection without queue (to be used with createConnectionAsync). -prepareConnectionToCreate :: ConnectionModeI c => AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AE ConnId +prepareConnectionToCreate :: AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AE ConnId prepareConnectionToCreate c userId enableNtfs cMode pqSup = withAgentEnv c $ newConnNoQueues c userId enableNtfs cMode pqSup {-# INLINE prepareConnectionToCreate #-} @@ -379,9 +378,8 @@ getConnShortLinkAsync c = withAgentEnv c .:: getConnShortLinkAsync' c {-# INLINE getConnShortLinkAsync #-} -- | Enqueue JOIN command for a prepared connection. -joinConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE () -joinConnectionAsync c aCorrId updateConn connId enableNtfs cReqUri cInfo pqSup subMode = - withAgentEnv c $ joinConnAsync c aCorrId updateConn connId enableNtfs cReqUri cInfo pqSup subMode +joinConnectionAsync :: AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE () +joinConnectionAsync c aCorrId updateConn connId enableNtfs = withAgentEnv c .:: joinConnAsync c aCorrId updateConn connId enableNtfs {-# INLINE joinConnectionAsync #-} -- | Allow connection to continue after CONF notification (LET command), no synchronous response @@ -391,8 +389,7 @@ allowConnectionAsync c = withAgentEnv c .:: allowConnectionAsync' c -- | Accept contact after REQ notification (ACPT command) asynchronously, for a prepared connection. acceptContactAsync :: AgentClient -> ACorrId -> ConnId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE () -acceptContactAsync c aCorrId connId enableNtfs invId ownConnInfo pqSupport subMode = - withAgentEnv c $ acceptContactAsync' c aCorrId connId enableNtfs invId ownConnInfo pqSupport subMode +acceptContactAsync c aCorrId connId enableNtfs = withAgentEnv c .:: acceptContactAsync' c aCorrId connId enableNtfs {-# INLINE acceptContactAsync #-} -- | Acknowledge message (ACK command) asynchronously, no synchronous response @@ -857,14 +854,17 @@ newConnNoQueues c userId enableNtfs cMode pqSupport = do let cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport} withStore c $ \db -> createNewConn db g cData cMode -joinConnAsync :: ConnectionModeI c => AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM () -joinConnAsync c corrId updateConn connId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = - lift (compatibleInvitationUri cReqUri) >>= \case - Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do - let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v) - when updateConn $ withStore' c $ \db -> updateNewConnJoin db connId connAgentVersion pqSupport enableNtfs - enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo - Nothing -> throwE $ AGENT A_VERSION +-- TODO [short links] TBC, but probably we will need async join for contact addresses as the contact will be created after user confirming the connection, +-- and join should retry, the same as 1-time invitation joins. +joinConnAsync :: AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM () +joinConnAsync c corrId updateConn connId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = do + when updateConn $ throwE $ CMD PROHIBITED "joinConnAsync: updateConn not allowed for invitation URI" + withInvLock c (strEncode cReqUri) "joinConnAsync" $ + lift (compatibleInvitationUri cReqUri) >>= \case + Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do + let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v) + enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo + Nothing -> throwE $ AGENT A_VERSION joinConnAsync c corrId updateConn connId enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode = lift (compatibleContactUri cReqUri) >>= \case Just (_, Compatible connAgentVersion) -> do From 70938604e9e93b2dda8a0f095edd648fa877e68e Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Mon, 1 Jun 2026 20:29:36 +0000 Subject: [PATCH 4/4] composition --- src/Simplex/Messaging/Agent.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index bb6103dfb..2ed784039 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -359,7 +359,7 @@ setUserService c = withAgentEnv c .: setUserService' c -- | Create SMP agent connection without queue (to be used with createConnectionAsync). prepareConnectionToCreate :: AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AE ConnId -prepareConnectionToCreate c userId enableNtfs cMode pqSup = withAgentEnv c $ newConnNoQueues c userId enableNtfs cMode pqSup +prepareConnectionToCreate c userId enableNtfs = withAgentEnv c .: newConnNoQueues c userId enableNtfs {-# INLINE prepareConnectionToCreate #-} -- | Enqueue NEW command for a prepared connection.