Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 27 additions & 36 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ module Simplex.Messaging.Agent
deleteUser,
setUserService,
connRequestPQSupport,
prepareConnectionToCreate,
createConnectionAsync,
setConnShortLinkAsync,
getConnShortLinkAsync,
Expand Down Expand Up @@ -356,9 +357,14 @@ setUserService :: AgentClient -> UserId -> Bool -> AE ()
setUserService c = withAgentEnv c .: setUserService' c
{-# INLINE setUserService #-}

-- | Create SMP agent connection (NEW command) asynchronously, synchronous response is new connection id
createConnectionAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE ConnId
createConnectionAsync c userId aCorrId enableNtfs = withAgentEnv c .:. newConnAsync c userId aCorrId enableNtfs
-- | Create SMP agent connection without queue (to be used with createConnectionAsync).
prepareConnectionToCreate :: AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AE ConnId
prepareConnectionToCreate c userId enableNtfs = withAgentEnv c .: newConnNoQueues c userId enableNtfs
{-# INLINE prepareConnectionToCreate #-}

-- | Enqueue NEW command for a prepared connection.
createConnectionAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE ()
createConnectionAsync c aCorrId connId enableNtfs = withAgentEnv c .:. newConnAsync c aCorrId connId enableNtfs
{-# INLINE createConnectionAsync #-}

-- | Create or update user's contact connection short link (LSET command) asynchronously, no synchronous response
Expand All @@ -371,20 +377,19 @@ getConnShortLinkAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Con
getConnShortLinkAsync c = withAgentEnv c .:: getConnShortLinkAsync' c
{-# INLINE getConnShortLinkAsync #-}

-- | Join SMP agent connection (JOIN command) asynchronously, synchronous response is new connection id.
-- If connId is provided (for contact URIs), it updates the existing connection record created by getConnShortLinkAsync.
joinConnectionAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId
joinConnectionAsync c userId aCorrId connId_ enableNtfs = withAgentEnv c .:: joinConnAsync c userId aCorrId connId_ enableNtfs
-- | Enqueue JOIN command for a prepared connection.
joinConnectionAsync :: AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ()
joinConnectionAsync c aCorrId updateConn connId enableNtfs = withAgentEnv c .:: joinConnAsync c aCorrId updateConn connId enableNtfs
{-# INLINE joinConnectionAsync #-}

-- | Allow connection to continue after CONF notification (LET command), no synchronous response
allowConnectionAsync :: AgentClient -> ACorrId -> ConnId -> ConfirmationId -> ConnInfo -> AE ()
allowConnectionAsync c = withAgentEnv c .:: allowConnectionAsync' c
{-# INLINE allowConnectionAsync #-}

-- | Accept contact after REQ notification (ACPT command) asynchronously, synchronous response is new connection id
acceptContactAsync :: AgentClient -> UserId -> ACorrId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId
acceptContactAsync c userId aCorrId enableNtfs = withAgentEnv c .:: acceptContactAsync' c userId aCorrId enableNtfs
-- | Accept contact after REQ notification (ACPT command) asynchronously, for a prepared connection.
acceptContactAsync :: AgentClient -> ACorrId -> ConnId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ()
acceptContactAsync c aCorrId connId enableNtfs = withAgentEnv c .:: acceptContactAsync' c aCorrId connId enableNtfs
{-# INLINE acceptContactAsync #-}

-- | Acknowledge message (ACK command) asynchronously, no synchronous response
Expand Down Expand Up @@ -837,11 +842,10 @@ setUserService' c userId enable = do
unless ok $ throwE $ CMD PROHIBITED "setUserService"
when (changed && not enable) $ withStore' c (`deleteClientServices` userId)

newConnAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ConnId
newConnAsync c userId corrId enableNtfs cMode pqInitKeys subMode = do
connId <- newConnNoQueues c userId enableNtfs cMode (CR.connPQEncryption pqInitKeys)
newConnAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ()
newConnAsync c corrId connId enableNtfs cMode pqInitKeys subMode =
enqueueCommand c corrId connId Nothing $ AClientCommand $ NEW enableNtfs (ACM cMode) pqInitKeys subMode
pure connId
{-# INLINE newConnAsync #-}

newConnNoQueues :: AgentClient -> UserId -> Bool -> SConnectionMode c -> PQSupport -> AM ConnId
newConnNoQueues c userId enableNtfs cMode pqSupport = do
Expand All @@ -852,34 +856,21 @@ newConnNoQueues c userId enableNtfs cMode pqSupport = do

-- TODO [short links] TBC, but probably we will need async join for contact addresses as the contact will be created after user confirming the connection,
-- and join should retry, the same as 1-time invitation joins.
joinConnAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId
joinConnAsync c userId corrId connId_ enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = do
when (isJust connId_) $ throwE $ CMD PROHIBITED "joinConnAsync: connId not allowed for invitation URI"
withInvLock c (strEncode cReqUri) "joinConnAsync" $ do
joinConnAsync :: AgentClient -> ACorrId -> Bool -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ()
joinConnAsync c corrId updateConn connId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = do
when updateConn $ throwE $ CMD PROHIBITED "joinConnAsync: updateConn not allowed for invitation URI"
withInvLock c (strEncode cReqUri) "joinConnAsync" $
lift (compatibleInvitationUri cReqUri) >>= \case
Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do
g <- asks random
let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion (Just v)
cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport}
connId <- withStore c $ \db -> createNewConn db g cData SCMInvitation
enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo
pure connId
Nothing -> throwE $ AGENT A_VERSION
joinConnAsync c userId corrId connId_ enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode = do
joinConnAsync c corrId updateConn connId enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode =
lift (compatibleContactUri cReqUri) >>= \case
Just (_, Compatible connAgentVersion) -> do
let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion Nothing
connId <- case connId_ of
Just cId -> do
-- update connection record created by getConnShortLinkAsync
withStore' c $ \db -> updateNewConnJoin db cId connAgentVersion pqSupport enableNtfs
pure cId
Nothing -> do
g <- asks random
let cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport}
withStore c $ \db -> createNewConn db g cData SCMInvitation
when updateConn $ withStore' c $ \db -> updateNewConnJoin db connId connAgentVersion pqSupport enableNtfs
enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo
pure connId
Nothing -> throwE $ AGENT A_VERSION

allowConnectionAsync' :: AgentClient -> ACorrId -> ConnId -> ConfirmationId -> ConnInfo -> AM ()
Expand All @@ -895,11 +886,11 @@ allowConnectionAsync' c corrId connId confId ownConnInfo =
-- and also it can't be triggered by user concurrently several times in a row. It could be improved similarly to
-- `acceptContact` by creating a new map for invitation locks and taking lock here, and removing `unacceptInvitation`
-- while marking invitation as accepted inside "lock level transaction" after successful `joinConnAsync`.
acceptContactAsync' :: AgentClient -> UserId -> ACorrId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId
acceptContactAsync' c userId corrId enableNtfs invId ownConnInfo pqSupport subMode = do
acceptContactAsync' :: AgentClient -> ACorrId -> ConnId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ()
acceptContactAsync' c corrId connId enableNtfs invId ownConnInfo pqSupport subMode = do
Invitation {connReq} <- withStore c $ \db -> getInvitation db "acceptContactAsync'" invId
withStore' c $ \db -> acceptInvitation db invId ownConnInfo
joinConnAsync c userId corrId Nothing enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do
joinConnAsync c corrId False connId enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do
withStore' c (`unacceptInvitation` invId)
throwE err

Expand Down
33 changes: 22 additions & 11 deletions tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,8 @@ testInvitationShortLinkAsync viaProxy a b = do
connReq' `shouldBe` connReq
linkUserData connData' `shouldBe` userData
runRight $ do
aId <- A.joinConnectionAsync b 1 "123" Nothing True connReq "bob's connInfo" PQSupportOn SMSubscribe
aId <- A.prepareConnectionToJoin b 1 True connReq PQSupportOn
A.joinConnectionAsync b "123" False aId True connReq "bob's connInfo" PQSupportOn SMSubscribe
get b =##> \case ("123", c, JOINED sndSecure) -> c == aId && sndSecure; _ -> False
("", _, CONF confId _ "bob's connInfo") <- get a
allowConnection a bId confId "alice's connInfo"
Expand Down Expand Up @@ -2685,10 +2686,12 @@ receiveMsg c cId msgId msg = do
testAsyncCommands :: SndQueueSecured -> AgentClient -> AgentClient -> AgentMsgId -> IO ()
testAsyncCommands sqSecured alice bob baseId =
runRight_ $ do
bobId <- createConnectionAsync alice 1 "1" True SCMInvitation IKPQOn SMSubscribe
bobId <- prepareConnectionToCreate alice 1 True SCMInvitation PQSupportOn
createConnectionAsync alice "1" bobId True SCMInvitation IKPQOn SMSubscribe
("1", bobId', INV (ACR _ qInfo)) <- get alice
liftIO $ bobId' `shouldBe` bobId
aliceId <- joinConnectionAsync bob 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe
aliceId <- prepareConnectionToJoin bob 1 True qInfo PQSupportOn
joinConnectionAsync bob "2" False aliceId True qInfo "bob's connInfo" PQSupportOn SMSubscribe
("2", aliceId', JOINED sqSecured') <- get bob
liftIO $ do
aliceId' `shouldBe` aliceId
Expand Down Expand Up @@ -2779,8 +2782,8 @@ testGetConnShortLinkAsync ps = withAgentClients2 $ \alice bob ->
liftIO $ qInfo' `shouldBe` qInfo
liftIO $ userCtData' `shouldBe` userCtData
-- join connection async using connId from getConnShortLinkAsync
aliceId <- joinConnectionAsync bob 1 "2" (Just newId) True qInfo' "bob's connInfo" PQSupportOn SMSubscribe
liftIO $ aliceId `shouldBe` newId
joinConnectionAsync bob "2" True newId True qInfo' "bob's connInfo" PQSupportOn SMSubscribe
let aliceId = newId
("2", aliceId', JOINED False) <- get bob
liftIO $ aliceId' `shouldBe` aliceId
-- complete connection
Expand All @@ -2796,7 +2799,10 @@ testGetConnShortLinkAsync ps = withAgentClients2 $ \alice bob ->
testAsyncCommandsRestore :: (ASrvTransport, AStoreType) -> IO ()
testAsyncCommandsRestore ps = do
alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB
bobId <- runRight $ createConnectionAsync alice 1 "1" True SCMInvitation IKPQOn SMSubscribe
bobId <- runRight $ do
connId <- prepareConnectionToCreate alice 1 True SCMInvitation PQSupportOn
createConnectionAsync alice "1" connId True SCMInvitation IKPQOn SMSubscribe
pure connId
liftIO $ noMessages alice "alice doesn't receive INV because server is down"
disposeAgentClient alice
withAgent 2 agentCfg initAgentServers testDB $ \alice' ->
Expand All @@ -2812,7 +2818,8 @@ testAcceptContactAsync sqSecured alice bob baseId =
(aliceId, sqSecuredJoin) <- joinConnection bob 1 True qInfo "bob's connInfo" SMSubscribe
liftIO $ sqSecuredJoin `shouldBe` False -- joining via contact address connection
("", _, REQ invId _ "bob's connInfo") <- get alice
bobId <- acceptContactAsync alice 1 "1" True invId "alice's connInfo" PQSupportOn SMSubscribe
bobId <- prepareConnectionToAccept alice 1 True invId PQSupportOn
acceptContactAsync alice "1" bobId True invId "alice's connInfo" PQSupportOn SMSubscribe
get alice =##> \case ("1", c, JOINED sqSecured') -> c == bobId && sqSecured' == sqSecured; _ -> False
("", _, CONF confId _ "alice's connInfo") <- get bob
allowConnection bob aliceId confId "bob's connInfo"
Expand Down Expand Up @@ -3083,10 +3090,12 @@ testJoinConnectionAsyncReplyErrorV8 ps@(t, ASType qsType _) = do
withAgent 1 cfg' initAgentServers testDB $ \a ->
withAgent 2 cfg' initAgentServersSrv2 testDB2 $ \b -> do
(aId, bId) <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
bId <- createConnectionAsync a 1 "1" True SCMInvitation IKPQOn SMSubscribe
bId <- prepareConnectionToCreate a 1 True SCMInvitation PQSupportOn
createConnectionAsync a "1" bId True SCMInvitation IKPQOn SMSubscribe
("1", bId', INV (ACR _ qInfo)) <- get a
liftIO $ bId' `shouldBe` bId
aId <- joinConnectionAsync b 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe
aId <- prepareConnectionToJoin b 1 True qInfo PQSupportOn
joinConnectionAsync b "2" False aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe
liftIO $ threadDelay 500000
ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId
pure (aId, bId)
Expand Down Expand Up @@ -3128,10 +3137,12 @@ testJoinConnectionAsyncReplyError ps@(t, ASType qsType _) = do
withAgent 1 agentCfg initAgentServers testDB $ \a ->
withAgent 2 agentCfg initAgentServersSrv2 testDB2 $ \b -> do
(aId, bId) <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
bId <- createConnectionAsync a 1 "1" True SCMInvitation IKPQOn SMSubscribe
bId <- prepareConnectionToCreate a 1 True SCMInvitation PQSupportOn
createConnectionAsync a "1" bId True SCMInvitation IKPQOn SMSubscribe
("1", bId', INV (ACR _ qInfo)) <- get a
liftIO $ bId' `shouldBe` bId
aId <- joinConnectionAsync b 1 "2" Nothing True qInfo "bob's connInfo" PQSupportOn SMSubscribe
aId <- prepareConnectionToJoin b 1 True qInfo PQSupportOn
joinConnectionAsync b "2" False aId True qInfo "bob's connInfo" PQSupportOn SMSubscribe
liftIO $ threadDelay 500000
ConnectionStats {rcvQueuesInfo = [], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId
pure (aId, bId)
Expand Down
Loading