From 04685840fc9c94635db16f1482d5248e3042e414 Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Sat, 23 May 2026 10:07:23 +0000 Subject: [PATCH 1/3] agent: process SMP messages concurrently between different connections --- .../2026-05-18-parallel-message-processing.md | 161 ++++++++++++++++ src/Simplex/Messaging/Agent.hs | 23 ++- src/Simplex/Messaging/Agent/Client.hs | 17 +- src/Simplex/Messaging/Client.hs | 31 ++- src/Simplex/Messaging/Client/Agent.hs | 13 +- src/Simplex/Messaging/Notifications/Server.hs | 177 +++++++++--------- .../Messaging/Notifications/Server/Env.hs | 24 ++- src/Simplex/Messaging/Server/Env/STM.hs | 2 +- 8 files changed, 309 insertions(+), 139 deletions(-) create mode 100644 plans/2026-05-18-parallel-message-processing.md diff --git a/plans/2026-05-18-parallel-message-processing.md b/plans/2026-05-18-parallel-message-processing.md new file mode 100644 index 0000000000..4c9a34ecba --- /dev/null +++ b/plans/2026-05-18-parallel-message-processing.md @@ -0,0 +1,161 @@ +# Parallel Message Processing - Eliminate Single-Thread Bottlenecks + +## Problem + +Message reception flows through two single-thread bottlenecks: + +1. **Agent `msgQ` bottleneck**: Multiple SMP server connections write to one shared `TBQueue` (`AgentClient.msgQ` / `SMPClientAgent.msgQ`). A single `subscriber` thread reads and processes all messages sequentially - DB lookups, double-ratchet decryption, DB writes - regardless of which connection they came from. + +2. **Chat `subQ` bottleneck**: The agent's `subscriber` thread writes processed events to one shared `TBQueue` (`AgentClient.subQ`). A single `agentSubscriber` thread in simplex-chat reads and processes all events sequentially. + +Both bottlenecks serialize work that could run in parallel, since messages from different connections are independent. + +## Solution + +Replace queues with callbacks at both layers. The producer calls a processing function directly in its own thread. + +### Layer 1: SMP client - eliminate `msgQ` + +**Current flow:** +``` +SMP connection thread -> writeTBQueue msgQ -> subscriber thread -> processSMPTransmissions +``` + +**New flow:** +``` +SMP connection thread -> processMsg callback (with per-client MVar lock) +``` + +**Why the MVar lock:** Within one SMP client, two threads produce messages: +- The receive loop (`processMsgs` in `Client.hs:686`) +- `writeSMPMessage` (`Client.hs:874`) - called from `processSUBResponse_` when a SUB response includes an inline MSG + +These two must be serialized within one client. An MVar lock ensures they take turns calling the callback. Across different clients (different server connections), no lock is shared - natural parallelism. + +#### Changes + +**`src/Simplex/Messaging/Client.hs`:** +- In `PClient`: replace `msgQ :: Maybe (TBQueue ...)` with `processServerMsg :: Maybe (ServerTransmissionBatch v err msg -> IO ())` and `processLock :: MVar ()` +- `processMsgs`: acquire `processLock`, call `processServerMsg` with the batch +- `writeSMPMessage`: acquire `processLock`, call `processServerMsg` +- `getProtocolClient`: takes `Maybe (ServerTransmissionBatch v err msg -> IO ())` instead of `Maybe (TBQueue ...)` +- `smpClientStub`: sets `processServerMsg = Nothing` +- `serverTransmission`: unchanged + +**`src/Simplex/Messaging/Agent/Client.hs`:** +- Remove `msgQ` field from `AgentClient` +- `smpConnectClient`: pass `processSMPTransmissions` wrapper as callback instead of `Just msgQ` +- Remove `AgentQueuesInfo` and `getAgentQueuesInfo` entirely (dead with no queues to monitor) +- Add `inflightCallbacks :: TVar Int` for monitoring instead - increment before callback, decrement in bracket + +**`src/Simplex/Messaging/Agent.hs`:** +- Remove `subscriber` function +- Remove `subscriber` from `runAgentThreads` +- `processSMPTransmissions` stays, called directly from SMP client threads +- `agentOperationBracket c AORcvNetwork` moves into the callback wrapper +- Exception handling: wrap callback with `catchOwn` matching current `subscriber`'s error handling + +**`src/Simplex/Messaging/Client/Agent.hs`:** +- `SMPClientAgent`: replace `msgQ` with callback field `processServerMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()` +- `newSMPClientAgent`: takes callback parameter instead of creating `msgQ` +- `connectClient`: passes callback to `getProtocolClient` + +**`src/Simplex/Messaging/Notifications/Server.hs`:** +- `ntfSubscriber`: remove `receiveSMP` loop; the processing logic becomes the callback passed via `SMPClientAgent` +- Processing stays in M (via `UnliftIO` or pre-bound env) + +**Tests (`tests/SMPProxyTests.hs`):** +- 2 sites: change `getProtocolClient ... (Just msgQ) ...` to pass a callback that writes to a local test TBQueue + +### Layer 2: Agent to chat - eliminate `subQ` + +**Current flow:** +``` +agent processSMPTransmissions -> writeTBQueue subQ -> chat agentSubscriber -> process +``` + +**New flow:** +``` +agent processSMPTransmissions -> processEvent callback [events] +``` + +**Key design decisions:** +- Callback takes `[ATransmission]` (list), not single event. All events from one connection batch are passed together to maintain ordering within a connection. +- Error notifications (currently `nonBlockingWriteTBQueue`) use `forkIO $ callback [event]` - fire-and-forget, order doesn't matter for errors. +- The `isFullTBQueue subQ` / pending mechanism disappears - the callback receives the full list directly, no need to buffer/flush. +- `AgentClient` keeps `testQ :: Maybe (TBQueue ATransmission)` for tests only. + +#### Changes + +**`src/Simplex/Messaging/Agent/Client.hs`:** +- Replace `subQ :: TBQueue ATransmission` with: + - `processEvent :: [ATransmission] -> IO ()` - callback, accepts event list + - `testQ :: Maybe (TBQueue ATransmission)` - test-only, `Nothing` in production +- Remove `AgentQueuesInfo` / `getAgentQueuesInfo` +- Add `inflightCallbacks :: TVar Int` with bracket: `withInflight c $ processEvent c events` + +**`src/Simplex/Messaging/Agent.hs`:** +- `processSMPTransmissions`: accumulate events in a local list (currently uses `pendingMsgs` TVar + flush pattern). Call `processEvent` once at end with the full list. +- `runCommandProcessing`: same - call `processEvent` once with all events for the command batch. Remove `isFullTBQueue`/pending logic. +- All `notify`/`notify'` helpers within `processSMPTransmissions` write to a local `TVar [ATransmission]` instead of directly to `subQ`. Flushed at end as single `processEvent` call. +- Error sites (currently `nonBlockingWriteTBQueue`): use `forkIO $ processEvent c [event]` +- Other direct `writeTBQueue subQ` sites (CONNECT/DISCONNECT events, SUSPENDED, etc.): call `processEvent c [event]` directly. +- Remove `subscriber` function entirely. +- Exception safety: `processEvent` call wrapped in bracket that catches "own" exceptions and logs them. + +**`src/Simplex/Messaging/Agent/Client.hs`:** +- `notifySub'` (line 838): change to `forkIO $ processEvent c [event]` (non-blocking error notification) + +**`src/Simplex/Messaging/Agent/NtfSubSupervisor.hs`:** +- 1 site: change `nonBlockingWriteTBQueue subQ event` to `forkIO $ processEvent c [event]` + +**`src/Simplex/FileTransfer/Agent.hs`:** +- 1 site (line 351): `notify` helper changes to `processEvent c [event]` + +**`simplex-chat/src/Simplex/Chat/Library/Commands.hs`:** +- Remove `agentSubscriber` thread +- Pass chat's `process` function (adapted to accept `[ATransmission]`) as `processEvent` callback at agent initialization + +**Tests:** +- `pGet` changes from `readTBQueue (subQ c)` to `readTBQueue (fromJust $ testQ c)` - 1 line +- Agent test setup: `processEvent = mapM_ (atomically . writeTBQueue q)` where `q` is `testQ` +- ~348 test call sites unchanged + +## Concurrency Safety + +- **Per-SMP-connection:** MVar in each SMP client serializes `processMsgs` and `writeSMPMessage` +- **Cross-connection:** Different SMP clients have different MVars, run in different threads - fully parallel +- **Per-connection-id:** `withConnLock connId` in `processSMPTransmissions` handles per-connection locking +- **Chat callback:** Must be safe for concurrent calls from different agent threads. Chat dispatches by entity type and connection ID; individual handlers use their own locks. +- **Exception safety:** Callback wrapped with bracket pattern - catches own exceptions, logs, decrements inflight counter. Exceptions don't kill SMP client threads. + +## Implementation Order + +Both layers change in one PR since they share `Client.hs`. + +### Phase 1: SMP client callback (`Client.hs` + both agent types) + +- [ ] 1.1 `Client.hs`: Replace `msgQ` with `processServerMsg` callback + `processLock` MVar in `PClient` +- [ ] 1.2 `Client.hs`: Update `processMsgs`, `writeSMPMessage`, `getProtocolClient`, `smpClientStub` +- [ ] 1.3 `Client/Agent.hs`: Replace `msgQ` in `SMPClientAgent` with callback field, update `newSMPClientAgent`, `connectClient` +- [ ] 1.4 `Agent/Client.hs`: Remove `msgQ` from `AgentClient`, update `smpConnectClient` to pass `processSMPTransmissions` as callback +- [ ] 1.5 `Agent.hs`: Remove `subscriber` thread from `runAgentThreads`, add exception wrapper to callback +- [ ] 1.6 `Notifications/Server.hs`: Convert `receiveSMP` from loop to callback passed to `SMPClientAgent` +- [ ] 1.7 `SMPProxyTests.hs`: Update 2 call sites to use callback + local test queue + +### Phase 2: Agent event callback (`subQ` -> `processEvent`) + +- [ ] 2.1 `Agent/Client.hs`: Add `processEvent :: [ATransmission] -> IO ()` and `testQ :: Maybe (TBQueue ATransmission)`, remove `subQ`, remove `AgentQueuesInfo` +- [ ] 2.2 `Agent.hs`: Rewrite `processSMPTransmissions` to accumulate events in local list and call `processEvent` once at end +- [ ] 2.3 `Agent.hs`: Update `runCommandProcessing` - remove pending/isFullTBQueue pattern, call `processEvent` with list +- [ ] 2.4 `Agent.hs`, `Agent/Client.hs`, `NtfSubSupervisor.hs`, `FileTransfer/Agent.hs`: Update all `writeTBQueue subQ` / `nonBlockingWriteTBQueue subQ` sites (~32 total) +- [ ] 2.5 `Agent/Client.hs`: Add inflight counter with bracket +- [ ] 2.6 Update `pGet` to read from `testQ` (1 line), update test agent setup +- [ ] 2.7 `simplex-chat`: Pass chat's `process` as callback, remove `agentSubscriber` +- [ ] 2.8 Fix any multi-server test ordering issues + +## Risks + +- **Chat thread safety:** Chat's `process` may not be safe for concurrent calls. Audit needed. +- **Backpressure:** Slow callback blocks SMP client receive thread. Acceptable - the connection that produced the message waits. Cross-connection interference eliminated. +- **Ordering:** Within one SMP connection - preserved (MVar + list callback). Across connections - non-deterministic (same as today, since `msgQ` interleaving was arbitrary). Most tests use 1 server. diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index bd77b892a1..8801a68f03 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -145,6 +145,7 @@ where import Control.Applicative ((<|>)) import Control.Concurrent.STM (retry) +import Data.IORef import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -270,19 +271,25 @@ getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp, netC liftIO $ checkServers "SMP" smp >> checkServers "XFTP" xftp currentTs <- liftIO getCurrentTime notices <- liftIO $ withTransaction store (`getClientNotices` presetServers) `catchAll_` pure [] - c@AgentClient {acThread} <- liftIO . newAgentClient clientId initServers currentTs notices =<< ask + env <- ask + cRef <- liftIO $ newIORef (error "agent client not initialized") + let processMsg t = do + c <- readIORef cRef + agentOperationBracket c AORcvNetwork waitUntilActive (processSMPTransmissions c t) `runReaderT` env + `catchOwn` \e -> atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ CRITICAL True $ "subscriber error: " <> show e) + c@AgentClient {acThread} <- liftIO $ newAgentClient clientId initServers currentTs notices processMsg env + liftIO $ writeIORef cRef c t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c) atomically . writeTVar acThread . Just =<< mkWeakThreadId t pure c checkServers protocol srvs = forM_ (M.assocs srvs) $ \(userId, srvs') -> checkUserServers ("getSMPAgentClient " <> protocol <> " " <> tshow userId) srvs' runAgentThreads c - | backgroundMode = run c "subscriber" $ subscriber c + | backgroundMode = forever $ liftIO $ threadDelay maxBound | otherwise = do restoreServersStats c raceAny_ - [ run c "subscriber" $ subscriber c, - run c "runNtfSupervisor" $ runNtfSupervisor c, + [ run c "runNtfSupervisor" $ runNtfSupervisor c, run c "cleanupManager" $ cleanupManager c, run c "logServersStats" $ logServersStats c ] @@ -2982,14 +2989,6 @@ getNextSMPServer :: AgentClient -> UserId -> [SMPServer] -> AM SMPServerWithAuth getNextSMPServer c userId = getNextServer c userId storageSrvs {-# INLINE getNextSMPServer #-} -subscriber :: AgentClient -> AM' () -subscriber c@AgentClient {msgQ, subQ} = run $ forever $ do - t <- atomically $ readTBQueue msgQ - agentOperationBracket c AORcvNetwork waitUntilActive $ - processSMPTransmissions c t - where - run a = a `catchOwn` \e -> notify $ CRITICAL True $ "Agent subscriber stopped: " <> show e - notify err = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR err) cleanupManager :: AgentClient -> AM' () cleanupManager c@AgentClient {subQ} = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index d33794006b..c2ef42c290 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -338,7 +338,7 @@ data AgentClient = AgentClient { acThread :: TVar (Maybe (Weak ThreadId)), active :: TVar Bool, subQ :: TBQueue ATransmission, - msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg), + processServerMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (), smpServers :: TMap UserId (UserServers 'PSMP), smpClients :: TMap SMPTransportSession SMPClientVar, useClientServices :: TMap UserId Bool, @@ -505,15 +505,14 @@ data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther deriving (Eq, Show) -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. -newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> Env -> IO AgentClient -newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices agentEnv = do +newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Env -> IO AgentClient +newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices processServerMsg agentEnv = do let cfg = config agentEnv qSize = tbqSize cfg proxySessTs <- newTVarIO =<< getCurrentTime acThread <- newTVarIO Nothing active <- newTVarIO True subQ <- newTBQueueIO qSize - msgQ <- newTBQueueIO qSize smpServers <- newTVarIO $ M.map mkUserServers smp smpClients <- TM.emptyIO useClientServices <- newTVarIO useServices @@ -553,7 +552,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices { acThread, active, subQ, - msgQ, + processServerMsg, smpServers, smpClients, useClientServices, @@ -733,7 +732,7 @@ getSMPProxyClient c@AgentClient {active, smpClients, smpProxiedRelays, workerSeq Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT smpConnectClient :: AgentClient -> NetworkRequestMode -> SMPTransportSession -> TMap SMPServer ProxiedRelayVar -> SMPClientVar -> AM SMPConnectedClient -smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v = +smpConnectClient c@AgentClient {smpClients, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v = newProtocolClient c tSess smpClients connectClient v `catchAllErrors` \e -> lift (resubscribeSMPSession c tSess) >> throwE e where @@ -746,7 +745,7 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm env <- ask smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do ts <- readTVarIO proxySessTs - ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs + ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just $ processServerMsg c) ts $ smpClientDisconnected c tSess env v' prs atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c updateClientService service smp pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs} @@ -2835,8 +2834,8 @@ data ClientInfo deriving (Show) getAgentQueuesInfo :: AgentClient -> IO AgentQueuesInfo -getAgentQueuesInfo AgentClient {msgQ, subQ, smpClients} = do - msgQInfo <- atomically $ getTBQueueInfo msgQ +getAgentQueuesInfo AgentClient {subQ, smpClients} = do + let msgQInfo = TBQueueInfo {qLength = 0, qFull = False} subQInfo <- atomically $ getTBQueueInfo subQ smpClientsMap <- readTVarIO smpClients let smpClientsMap' = M.mapKeys (decodeLatin1 . strEncode) smpClientsMap diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 67b31de186..79ef392c81 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -128,6 +128,7 @@ where import Control.Applicative ((<|>)) import Control.Concurrent (ThreadId, forkFinally, forkIO, killThread, mkWeakThreadId) +import Control.Concurrent.MVar import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (Exception, Handler (..), IOException, SomeAsyncException, SomeException) @@ -199,7 +200,8 @@ data PClient v err msg = PClient sentCommands :: TMap CorrId (Request err msg), sndQ :: TBQueue (Maybe (Request err msg), ByteString), rcvQ :: TBQueue (NonEmpty (Transmission (Either err msg))), - msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg)) + processServerMsg :: Maybe (ServerTransmissionBatch v err msg -> IO ()), + processLock :: MVar () } smpClientStub :: TVar ChaChaDRG -> ByteString -> VersionSMP -> Maybe (THandleAuth 'TClient) -> IO SMPClient @@ -213,6 +215,7 @@ smpClientStub g sessionId thVersion thAuth = do timeoutErrorCount <- newTVarIO 0 sndQ <- newTBQueueIO 100 rcvQ <- newTBQueueIO 100 + processLock <- newMVar () let NetworkConfig {tcpConnectTimeout, tcpTimeout} = defaultNetworkConfig return ProtocolClient @@ -244,7 +247,8 @@ smpClientStub g sessionId thVersion thAuth = do sentCommands, sndQ, rcvQ, - msgQ = Nothing + processServerMsg = Nothing, + processLock } } @@ -562,10 +566,10 @@ type SMPTransportSession = TransportSession BrokerMsg -- | Connects to 'ProtocolServer' using passed client configuration -- and queue for messages and notifications. -- --- A single queue can be used for multiple 'SMPClient' instances, +-- A single callback can be used for multiple 'SMPClient' instances, -- as 'SMPServerTransmission' includes server information. -getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> NetworkRequestMode -> TransportSession msg -> ProtocolClientConfig v -> [HostName] -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> UTCTime -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) -getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serviceCredentials, serverVRange, agreeSecret, proxyServer, useSNI} presetDomains msgQ proxySessTs disconnected = do +getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> NetworkRequestMode -> TransportSession msg -> ProtocolClientConfig v -> [HostName] -> Maybe (ServerTransmissionBatch v err msg -> IO ()) -> UTCTime -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) +getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serviceCredentials, serverVRange, agreeSecret, proxyServer, useSNI} presetDomains processServerMsg proxySessTs disconnected = do case chooseTransportHost networkConfig (host srv) of Right useHost -> (getCurrentTime >>= mkProtocolClient useHost >>= runClient useTransport useHost) @@ -583,6 +587,7 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS sentCommands <- TM.emptyIO sndQ <- newTBQueueIO qSize rcvQ <- newTBQueueIO qSize + processLock <- newMVar () return PClient { connected, @@ -597,7 +602,8 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS sentCommands, sndQ, rcvQ, - msgQ + processServerMsg, + processLock } runClient :: (ServiceName, ATransport 'TClient) -> TransportHost -> PClient v err msg -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) @@ -686,8 +692,10 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS processMsgs :: ProtocolClient v err msg -> NonEmpty (Transmission (Either err msg)) -> IO () processMsgs c ts = do ts' <- catMaybes <$> mapM (processMsg c) (L.toList ts) - forM_ msgQ $ \q -> - mapM_ (atomically . writeTBQueue q . serverTransmission c) (L.nonEmpty ts') + forM_ processServerMsg $ \process -> + forM_ (L.nonEmpty ts') $ \ts'' -> + withMVar (processLock $ client_ c) $ \_ -> + process $ serverTransmission c ts'' processMsg :: ProtocolClient v err msg -> Transmission (Either err msg) -> IO (Maybe (EntityId, ServerTransmission err msg)) processMsg ProtocolClient {client_ = PClient {sentCommands}} (corrId, entId, respOrErr) @@ -714,7 +722,7 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS Just e -> Left $ PCEProtocolError e _ -> Right r sendMsg :: ServerTransmission err msg -> IO (Maybe (EntityId, ServerTransmission err msg)) - sendMsg t = case msgQ of + sendMsg t = case processServerMsg of Just _ -> pure $ Just (entId, t) Nothing -> Nothing <$ case clientResp of @@ -872,7 +880,10 @@ processSUBResponse_ c rId = \case r' -> pure . Left $ unexpectedResponse r' writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO () -writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c [(rId, STEvent (Right msg))]) (msgQ $ client_ c) +writeSMPMessage c rId msg = + forM_ (processServerMsg $ client_ c) $ \process -> + withMVar (processLock $ client_ c) $ \_ -> + process $ serverTransmission c [(rId, STEvent (Right msg))] serverTransmission :: ProtocolClient v err msg -> NonEmpty (RecipientId, ServerTransmission err msg) -> ServerTransmissionBatch v err msg serverTransmission ProtocolClient {thParams, client_ = PClient {transportSession}} ts = (transportSession, thParams, ts) diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index 76b2a7cf93..a1699009d7 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -138,7 +138,7 @@ data SMPClientAgent p = SMPClientAgent dbService :: Maybe DBService, active :: TVar Bool, startedAt :: UTCTime, - msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg), + processMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (), agentQ :: TBQueue SMPClientAgentEvent, randomDrg :: TVar ChaChaDRG, smpClients :: TMap SMPServer SMPClientVar, @@ -158,11 +158,10 @@ data SMPClientAgent p = SMPClientAgent type OwnServer = Bool -newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> Maybe DBService -> TVar ChaChaDRG -> IO (SMPClientAgent p) -newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} dbService randomDrg = do +newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Maybe DBService -> TVar ChaChaDRG -> IO (SMPClientAgent p) +newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {agentQSize} processMsg dbService randomDrg = do active <- newTVarIO True startedAt <- getCurrentTime - msgQ <- newTBQueueIO msgQSize agentQ <- newTBQueueIO agentQSize smpClients <- TM.emptyIO smpSessions <- TM.emptyIO @@ -179,7 +178,7 @@ newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize dbService, active, startedAt, - msgQ, + processMsg, agentQ, randomDrg, smpClients, @@ -257,7 +256,7 @@ isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} = -- | Run an SMP client for SMPClientVar connectClient :: SMPClientAgent p -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient) -connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, msgQ, randomDrg, startedAt} srv v = case dbService of +connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, processMsg, randomDrg, startedAt} srv v = case dbService of Just dbs -> runExceptT $ do creds <- ExceptT $ getCredentials dbs srv smp <- ExceptT $ getClient cfg {serviceCredentials = Just creds} @@ -267,7 +266,7 @@ connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, m Nothing -> getClient cfg where cfg = smpCfg agentCfg - getClient cfg' = getProtocolClient randomDrg NRMBackground (1, srv, Nothing) cfg' [] (Just msgQ) startedAt clientDisconnected + getClient cfg' = getProtocolClient randomDrg NRMBackground (1, srv, Nothing) cfg' [] (Just processMsg) startedAt clientDisconnected clientDisconnected :: SMPClient -> IO () clientDisconnected smp = do diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 02429e9108..11dcd860a9 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -54,7 +54,7 @@ import GHC.IORef (atomicSwapIORef) import GHC.Stats (getRTSStats) import Network.Socket (ServiceName, Socket, socketToHandle) import Numeric.Natural (Natural) -import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..)) +import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -68,7 +68,7 @@ import Simplex.Messaging.Notifications.Server.Store (NtfSTMStore, TokenNtfMessag import Simplex.Messaging.Notifications.Server.Store.Postgres import Simplex.Messaging.Notifications.Server.Store.Types import Simplex.Messaging.Notifications.Transport -import Simplex.Messaging.Protocol (EntityId (..), ErrorType (..), NotifierId, Party (..), ProtocolServer (host), SMPServer, ServiceSub (..), SignedTransmission, Transmission, pattern NoEntity, pattern SMPServer, encodeTransmission, tGetServer, tPut) +import Simplex.Messaging.Protocol (BrokerMsg, EntityId (..), ErrorType (..), NotifierId, Party (..), ProtocolServer (host), SMPServer, ServiceSub (..), SignedTransmission, Transmission, pattern NoEntity, pattern SMPServer, encodeTransmission, tGetServer, tPut) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server import Simplex.Messaging.Server.Control (CPClientRole (..)) @@ -77,7 +77,7 @@ import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), import Simplex.Messaging.Session import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) -import Simplex.Messaging.Transport (ASrvTransport, ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams) +import Simplex.Messaging.Transport (ASrvTransport, ATransport (..), SMPVersion, THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams) import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.Server (AddHTTP, runTransportServer, runLocalTCPServer) import Simplex.Messaging.Util @@ -101,7 +101,11 @@ runNtfServer cfg = do runNtfServerBlocking started cfg runNtfServerBlocking :: TMVar Bool -> NtfServerConfig -> IO () -runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg +runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg processMsg + where + processMsg envRef t = do + env <- readIORef envRef + receiveSMPMessage env t type M a = ReaderT NtfEnv IO a @@ -525,97 +529,91 @@ subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st sm void $ updateSubStatus st srvId' nId NSPending subscribeQueuesNtfs ca smpServer' [sub] +receiveSMPMessage :: NtfEnv -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO () +receiveSMPMessage env ((_, srv@(SMPServer (h :| _) _ _), _), THandleParams {sessionId}, ts) = + (`runReaderT` env) $ do + st <- asks store + ps <- asks pushServer + stats <- asks serverStats + let ca = smpAgent $ subscriber env + forM_ ts $ \(ntfId, t) -> case t of + STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen + STResponse {} -> pure () -- it was already reported as timeout error + STEvent msgOrErr -> do + let smpQueue = SMPQueueNtf srv ntfId + case msgOrErr of + Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do + ntfTs <- liftIO getSystemTime + liftIO $ updatePeriodStats (activeSubs stats) ntfId + let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} + srvHost = safeDecodeUtf8 $ strEncode h + isOwn = isOwnServer ca srv + liftIO (addTokenLastNtf st newNtf) >>= \case + Right (tkn, lastNtfs) -> do + pushNotification ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs + liftIO $ incNtfStat_ stats ntfReceived + when isOwn $ liftIO $ incServerStat srvHost (ntfReceivedOwn stats) + Left AUTH -> liftIO $ do + incNtfStat_ stats ntfReceivedAuth + when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) + Left _ -> pure () + Right SMP.END -> + whenM (atomically $ activeClientSession' ca sessionId srv) $ + void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd + Right SMP.DELD -> + void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted + Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e + Right _ -> logError "SMP server unexpected response" + Left e -> logError $ "SMP client error: " <> tshow e + ntfSubscriber :: NtfSubscriber -> M () -ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = - race_ receiveSMP receiveAgent +ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {agentQ}} = do + st <- asks store + batchSize <- asks $ subsBatchSize . config + liftIO $ forever $ + atomically (readTBQueue agentQ) >>= \case + CAConnected srv serviceId -> do + let asService = if isJust serviceId then "as service " else "" + logInfo $ "SMP server reconnected " <> asService <> showServer' srv + CADisconnected srv nIds -> do + updated <- batchUpdateSrvSubStatus st srv Nothing nIds NSInactive + logSubStatus srv "disconnected" (L.length nIds) updated + CASubscribed srv serviceId nIds -> do + updated <- batchUpdateSrvSubStatus st srv serviceId nIds NSActive + let asService = if isJust serviceId then " as service" else "" + logSubStatus srv ("subscribed" <> asService) (L.length nIds) updated + CASubError srv errs -> do + forM_ (L.nonEmpty $ mapMaybe (\(nId, err) -> (nId,) <$> queueSubErrorStatus err) $ L.toList errs) $ \subStatuses -> do + updated <- batchUpdateSrvSubErrors st srv subStatuses + logSubErrors srv subStatuses updated + -- TODO [certs rcv] resubscribe queues with statuses NSErr and NSService + CAServiceDisconnected srv serviceSub -> + logNote $ "SMP server service disconnected " <> showService srv serviceSub + CAServiceSubscribed srv serviceSub@(ServiceSub _ n idsHash) (ServiceSub _ n' idsHash') + | n /= n' -> logWarn $ msg <> ", confirmed subs: " <> tshow n' + | idsHash /= idsHash' -> logWarn $ msg <> ", different IDs hash" + | otherwise -> logNote msg + where + msg = "SMP server service subscribed " <> showService srv serviceSub + CAServiceSubError srv serviceSub e -> + -- Errors that require re-subscribing queues directly are reported as CAServiceUnavailable. + -- See smpSubscribeService in Simplex.Messaging.Client.Agent + logError $ "SMP server service subscription error " <> showService srv serviceSub <> ": " <> tshow e + CAServiceUnavailable srv serviceSub -> do + logError $ "SMP server service unavailable: " <> showService srv serviceSub + removeServiceAndAssociations st srv >>= \case + Right (srvId, updated) -> do + logSubStatus srv "removed service association" updated updated + void $ subscribeSrvSubs ca st batchSize (srv, srvId, Nothing) + Left e -> logError $ "SMP server update and resubscription error " <> tshow e where - receiveSMP = do - st <- asks store - ps <- asks pushServer - stats <- asks serverStats - forever $ do - ((_, srv@(SMPServer (h :| _) _ _), _), THandleParams {sessionId}, ts) <- atomically $ readTBQueue msgQ - forM_ ts $ \(ntfId, t) -> case t of - STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen - STResponse {} -> pure () -- it was already reported as timeout error - STEvent msgOrErr -> do - let smpQueue = SMPQueueNtf srv ntfId - case msgOrErr of - Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do - ntfTs <- liftIO getSystemTime - liftIO $ updatePeriodStats (activeSubs stats) ntfId - let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} - srvHost = safeDecodeUtf8 $ strEncode h - isOwn = isOwnServer ca srv - liftIO (addTokenLastNtf st newNtf) >>= \case - Right (tkn, lastNtfs) -> do - pushNotification ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs - liftIO $ incNtfStat_ stats ntfReceived - when isOwn $ liftIO $ incServerStat srvHost (ntfReceivedOwn stats) - Left AUTH -> liftIO $ do - incNtfStat_ stats ntfReceivedAuth - when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) - Left _ -> pure () - Right SMP.END -> - whenM (atomically $ activeClientSession' ca sessionId srv) $ - void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd - Right SMP.DELD -> - void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted - Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e - Right _ -> logError "SMP server unexpected response" - Left e -> logError $ "SMP client error: " <> tshow e - - receiveAgent = do - st <- asks store - batchSize <- asks $ subsBatchSize . config - liftIO $ forever $ - atomically (readTBQueue agentQ) >>= \case - CAConnected srv serviceId -> do - let asService = if isJust serviceId then "as service " else "" - logInfo $ "SMP server reconnected " <> asService <> showServer' srv - CADisconnected srv nIds -> do - updated <- batchUpdateSrvSubStatus st srv Nothing nIds NSInactive - logSubStatus srv "disconnected" (L.length nIds) updated - CASubscribed srv serviceId nIds -> do - updated <- batchUpdateSrvSubStatus st srv serviceId nIds NSActive - let asService = if isJust serviceId then " as service" else "" - logSubStatus srv ("subscribed" <> asService) (L.length nIds) updated - CASubError srv errs -> do - forM_ (L.nonEmpty $ mapMaybe (\(nId, err) -> (nId,) <$> queueSubErrorStatus err) $ L.toList errs) $ \subStatuses -> do - updated <- batchUpdateSrvSubErrors st srv subStatuses - logSubErrors srv subStatuses updated - -- TODO [certs rcv] resubscribe queues with statuses NSErr and NSService - CAServiceDisconnected srv serviceSub -> - logNote $ "SMP server service disconnected " <> showService srv serviceSub - CAServiceSubscribed srv serviceSub@(ServiceSub _ n idsHash) (ServiceSub _ n' idsHash') - | n /= n' -> logWarn $ msg <> ", confirmed subs: " <> tshow n' - | idsHash /= idsHash' -> logWarn $ msg <> ", different IDs hash" - | otherwise -> logNote msg - where - msg = "SMP server service subscribed " <> showService srv serviceSub - CAServiceSubError srv serviceSub e -> - -- Errors that require re-subscribing queues directly are reported as CAServiceUnavailable. - -- See smpSubscribeService in Simplex.Messaging.Client.Agent - logError $ "SMP server service subscription error " <> showService srv serviceSub <> ": " <> tshow e - CAServiceUnavailable srv serviceSub -> do - logError $ "SMP server service unavailable: " <> showService srv serviceSub - removeServiceAndAssociations st srv >>= \case - Right (srvId, updated) -> do - logSubStatus srv "removed service association" updated updated - void $ subscribeSrvSubs ca st batchSize (srv, srvId, Nothing) - Left e -> logError $ "SMP server update and resubscription error " <> tshow e - where - showService srv (ServiceSub serviceId n _) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs" - + showService srv (ServiceSub serviceId n _) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs" logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int -> IO () - logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> do + logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> logError $ "SMP server subscription errors " <> showServer' srv <> ": " <> tshow (L.head ss) <> " (" <> tshow (length ss) <> " errors, " <> tshow updated <> " subs updated)" - queueSubErrorStatus :: SMPClientError -> Maybe NtfSubStatus queueSubErrorStatus = \case PCEProtocolError AUTH -> Just NSAuth - -- TODO [certs rcv] we could allow making individual subscriptions within service session to handle SERVICE error. - -- This would require full stack changes in SMP server, SMP client and SMP service agent. PCEProtocolError SERVICE -> Just NSService PCEProtocolError e -> updateErr "SMP error " e PCEResponseError e -> updateErr "ResponseError " e @@ -623,12 +621,11 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = PCETransportError e -> updateErr "TransportError " e PCECryptoError e -> updateErr "CryptoError " e PCEIncompatibleHost -> Just $ NSErr "IncompatibleHost" - PCEServiceUnavailable -> Just NSService -- this error should not happen on individual subscriptions + PCEServiceUnavailable -> Just NSService PCEResponseTimeout -> Nothing PCENetworkError _ -> Nothing PCEIOError _ -> Nothing where - -- Note on moving to PostgreSQL: the idea of logging errors without e is removed here updateErr :: Show e => ByteString -> e -> Maybe NtfSubStatus updateErr errType e = Just $ NSErr $ errType <> bshow e diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 365d464c85..54349f6001 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -28,6 +28,7 @@ module Simplex.Messaging.Notifications.Server.Env ) where import Control.Concurrent (ThreadId) +import Data.IORef import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad @@ -45,7 +46,7 @@ import qualified Data.X509.Validation as XV import Network.Socket import qualified Network.TLS as TLS import Numeric.Natural -import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError) +import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmissionBatch) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol @@ -54,14 +55,14 @@ import Simplex.Messaging.Notifications.Server.Stats import Simplex.Messaging.Notifications.Server.Store.Postgres import Simplex.Messaging.Notifications.Server.Store.Types import Simplex.Messaging.Notifications.Transport (NTFVersion, VersionRangeNTF) -import Simplex.Messaging.Protocol (BasicAuth, CorrId, Party (..), SMPServer, SParty (..), ServiceId, Transmission) +import Simplex.Messaging.Protocol (BasicAuth, BrokerMsg, CorrId, ErrorType, Party (..), SMPServer, SParty (..), ServiceId, Transmission) import Simplex.Messaging.Server.Env.STM (StartOptions (..)) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..)) import Simplex.Messaging.Session import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), ServiceCredentials (..), THandleParams, TransportPeer (..)) +import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), SMPVersion, ServiceCredentials (..), THandleParams, TransportPeer (..)) import Simplex.Messaging.Transport.Credentials (genCredentials, tlsCredentials) import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials, TransportServerConfig, loadFingerprint, loadServerCredential) import Simplex.Messaging.Util (liftEitherWith, tshow) @@ -119,17 +120,20 @@ data NtfEnv = NtfEnv serverStats :: NtfServerStats } -newNtfServerEnv :: NtfServerConfig -> IO NtfEnv -newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, useServiceCreds} = do +newNtfServerEnv :: NtfServerConfig -> (IORef NtfEnv -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> IO NtfEnv +newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, useServiceCreds} mkProcessMsg = do random <- C.newRandom store <- newNtfDbStore dbStoreConfig tlsServerCreds <- loadServerCredential ntfCredentials XV.Fingerprint fp <- loadFingerprint ntfCredentials let dbService = if useServiceCreds then Just $ mkDbService random store else Nothing - subscriber <- newNtfSubscriber smpAgentCfg dbService random + envRef <- newIORef $ error "NtfEnv not initialized" + subscriber <- newNtfSubscriber smpAgentCfg (mkProcessMsg envRef) dbService random pushServer <- newNtfPushServer pushQSize apnsConfig serverStats <- newNtfServerStats =<< getCurrentTime - pure NtfEnv {config, subscriber, pushServer, store, random, tlsServerCreds, serverIdentity = C.KeyHash fp, serverStats} + let env = NtfEnv {config, subscriber, pushServer, store, random, tlsServerCreds, serverIdentity = C.KeyHash fp, serverStats} + writeIORef envRef env + pure env where mkDbService g st = DBService {getCredentials, updateServiceId} where @@ -158,11 +162,11 @@ data NtfSubscriber = NtfSubscriber type SMPSubscriberVar = SessionVar SMPSubscriber -newNtfSubscriber :: SMPClientAgentConfig -> Maybe DBService -> TVar ChaChaDRG -> IO NtfSubscriber -newNtfSubscriber smpAgentCfg dbService random = do +newNtfSubscriber :: SMPClientAgentConfig -> (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Maybe DBService -> TVar ChaChaDRG -> IO NtfSubscriber +newNtfSubscriber smpAgentCfg processMsg dbService random = do smpSubscribers <- TM.emptyIO subscriberSeq <- newTVarIO 0 - smpAgent <- newSMPClientAgent SNotifierService smpAgentCfg dbService random + smpAgent <- newSMPClientAgent SNotifierService smpAgentCfg processMsg dbService random pure NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent} data SMPSubscriber = SMPSubscriber diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 574111c15e..3879fd8ee3 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -706,7 +706,7 @@ mkJournalStoreConfig queueStoreCfg storePath msgQueueQuota maxJournalMsgCount ma newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent newSMPProxyAgent smpAgentCfg random = do - smpAgent <- newSMPClientAgent SSender smpAgentCfg Nothing random + smpAgent <- newSMPClientAgent SSender smpAgentCfg (\_ -> pure ()) Nothing random pure ProxyAgent {smpAgent} readWriteQueueStore :: forall q. StoreQueueClass q => Bool -> (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO (StoreLog 'WriteMode) From 3042be3168ce85c5c3a0ce58f0c16b90b171847d Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Mon, 25 May 2026 11:55:20 +0000 Subject: [PATCH 2/3] fix test, restore comments --- src/Simplex/Messaging/Notifications/Server.hs | 5 ++++- tests/SMPProxyTests.hs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 11dcd860a9..a8579c4256 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -614,6 +614,8 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {agentQ}} = do queueSubErrorStatus :: SMPClientError -> Maybe NtfSubStatus queueSubErrorStatus = \case PCEProtocolError AUTH -> Just NSAuth + -- TODO [certs rcv] we could allow making individual subscriptions within service session to handle SERVICE error. + -- This would require full stack changes in SMP server, SMP client and SMP service agent. PCEProtocolError SERVICE -> Just NSService PCEProtocolError e -> updateErr "SMP error " e PCEResponseError e -> updateErr "ResponseError " e @@ -621,11 +623,12 @@ ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {agentQ}} = do PCETransportError e -> updateErr "TransportError " e PCECryptoError e -> updateErr "CryptoError " e PCEIncompatibleHost -> Just $ NSErr "IncompatibleHost" - PCEServiceUnavailable -> Just NSService + PCEServiceUnavailable -> Just NSService -- this error should not happen on individual subscriptions PCEResponseTimeout -> Nothing PCENetworkError _ -> Nothing PCEIOError _ -> Nothing where + -- Note on moving to PostgreSQL: the idea of logging errors without e is removed here updateErr :: Show e => ByteString -> e -> Maybe NtfSubStatus updateErr errType e = Just $ NSErr $ errType <> bshow e diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 0d8ccdf89e..1edc95766f 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -172,7 +172,7 @@ deliverMessagesViaProxy proxyServ relayServ alg unsecuredMsgs securedMsgs = do THAuthClient {} <- maybe (fail "getProtocolClient returned no thAuth") pure $ thAuth $ thParams pc -- set up relay msgQ <- newTBQueueIO 1024 - rc' <- getProtocolClient g NRMInteractive (2, relayServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion currentClientSMPRelayVersion} [] (Just msgQ) ts (\_ -> pure ()) + rc' <- getProtocolClient g NRMInteractive (2, relayServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion currentClientSMPRelayVersion} [] (Just $ atomically . writeTBQueue msgQ) ts (\_ -> pure ()) rc <- either (fail . show) pure rc' -- prepare receiving queue (rPub, rPriv) <- atomically $ C.generateAuthKeyPair alg g From 151feece262b7fe7f460abc69f86c61ab6dc40ef Mon Sep 17 00:00:00 2001 From: "Evgeny @ SimpleX Chat" <259188159+evgeny-simplex@users.noreply.github.com> Date: Mon, 25 May 2026 13:26:29 +0000 Subject: [PATCH 3/3] refactor --- src/Simplex/Messaging/Agent.hs | 6 +- src/Simplex/Messaging/Agent/Client.hs | 6 +- src/Simplex/Messaging/Client/Agent.hs | 8 +- src/Simplex/Messaging/Notifications/Server.hs | 154 +++++++++--------- .../Messaging/Notifications/Server/Env.hs | 15 +- src/Simplex/Messaging/Server/Env/STM.hs | 2 +- 6 files changed, 90 insertions(+), 101 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 8801a68f03..b8adb8315c 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -145,7 +145,6 @@ where import Control.Applicative ((<|>)) import Control.Concurrent.STM (retry) -import Data.IORef import Control.Logger.Simple import Control.Monad import Control.Monad.Except @@ -272,13 +271,10 @@ getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp, netC currentTs <- liftIO getCurrentTime notices <- liftIO $ withTransaction store (`getClientNotices` presetServers) `catchAll_` pure [] env <- ask - cRef <- liftIO $ newIORef (error "agent client not initialized") - let processMsg t = do - c <- readIORef cRef + let processMsg c t = agentOperationBracket c AORcvNetwork waitUntilActive (processSMPTransmissions c t) `runReaderT` env `catchOwn` \e -> atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ CRITICAL True $ "subscriber error: " <> show e) c@AgentClient {acThread} <- liftIO $ newAgentClient clientId initServers currentTs notices processMsg env - liftIO $ writeIORef cRef c t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c) atomically . writeTVar acThread . Just =<< mkWeakThreadId t pure c diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c2ef42c290..e70004a562 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -338,7 +338,7 @@ data AgentClient = AgentClient { acThread :: TVar (Maybe (Weak ThreadId)), active :: TVar Bool, subQ :: TBQueue ATransmission, - processServerMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (), + processServerMsg :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (), smpServers :: TMap UserId (UserServers 'PSMP), smpClients :: TMap SMPTransportSession SMPClientVar, useClientServices :: TMap UserId Bool, @@ -505,7 +505,7 @@ data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther deriving (Eq, Show) -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. -newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Env -> IO AgentClient +newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> (AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Env -> IO AgentClient newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices processServerMsg agentEnv = do let cfg = config agentEnv qSize = tbqSize cfg @@ -732,7 +732,7 @@ getSMPProxyClient c@AgentClient {active, smpClients, smpProxiedRelays, workerSeq Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT smpConnectClient :: AgentClient -> NetworkRequestMode -> SMPTransportSession -> TMap SMPServer ProxiedRelayVar -> SMPClientVar -> AM SMPConnectedClient -smpConnectClient c@AgentClient {smpClients, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v = +smpConnectClient c@AgentClient {processServerMsg, smpClients, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v = newProtocolClient c tSess smpClients connectClient v `catchAllErrors` \e -> lift (resubscribeSMPSession c tSess) >> throwE e where diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index a1699009d7..f035a800f4 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -138,7 +138,7 @@ data SMPClientAgent p = SMPClientAgent dbService :: Maybe DBService, active :: TVar Bool, startedAt :: UTCTime, - processMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (), + processMsg :: SMPClientAgent p -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (), agentQ :: TBQueue SMPClientAgentEvent, randomDrg :: TVar ChaChaDRG, smpClients :: TMap SMPServer SMPClientVar, @@ -158,7 +158,7 @@ data SMPClientAgent p = SMPClientAgent type OwnServer = Bool -newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Maybe DBService -> TVar ChaChaDRG -> IO (SMPClientAgent p) +newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> (SMPClientAgent p -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Maybe DBService -> TVar ChaChaDRG -> IO (SMPClientAgent p) newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {agentQSize} processMsg dbService randomDrg = do active <- newTVarIO True startedAt <- getCurrentTime @@ -256,7 +256,7 @@ isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} = -- | Run an SMP client for SMPClientVar connectClient :: SMPClientAgent p -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient) -connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, processMsg, randomDrg, startedAt} srv v = case dbService of +connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, processMsg, randomDrg, startedAt} srv v = case dbService of Just dbs -> runExceptT $ do creds <- ExceptT $ getCredentials dbs srv smp <- ExceptT $ getClient cfg {serviceCredentials = Just creds} @@ -266,7 +266,7 @@ connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, p Nothing -> getClient cfg where cfg = smpCfg agentCfg - getClient cfg' = getProtocolClient randomDrg NRMBackground (1, srv, Nothing) cfg' [] (Just processMsg) startedAt clientDisconnected + getClient cfg' = getProtocolClient randomDrg NRMBackground (1, srv, Nothing) cfg' [] (Just $ processMsg ca) startedAt clientDisconnected clientDisconnected :: SMPClient -> IO () clientDisconnected smp = do diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index a8579c4256..e4d20acff5 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -101,11 +101,7 @@ runNtfServer cfg = do runNtfServerBlocking started cfg runNtfServerBlocking :: TMVar Bool -> NtfServerConfig -> IO () -runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg processMsg - where - processMsg envRef t = do - env <- readIORef envRef - receiveSMPMessage env t +runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg receiveSMPMessage type M a = ReaderT NtfEnv IO a @@ -529,42 +525,37 @@ subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st sm void $ updateSubStatus st srvId' nId NSPending subscribeQueuesNtfs ca smpServer' [sub] -receiveSMPMessage :: NtfEnv -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO () -receiveSMPMessage env ((_, srv@(SMPServer (h :| _) _ _), _), THandleParams {sessionId}, ts) = - (`runReaderT` env) $ do - st <- asks store - ps <- asks pushServer - stats <- asks serverStats - let ca = smpAgent $ subscriber env - forM_ ts $ \(ntfId, t) -> case t of - STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen - STResponse {} -> pure () -- it was already reported as timeout error - STEvent msgOrErr -> do - let smpQueue = SMPQueueNtf srv ntfId - case msgOrErr of - Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do - ntfTs <- liftIO getSystemTime - liftIO $ updatePeriodStats (activeSubs stats) ntfId - let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} - srvHost = safeDecodeUtf8 $ strEncode h - isOwn = isOwnServer ca srv - liftIO (addTokenLastNtf st newNtf) >>= \case - Right (tkn, lastNtfs) -> do - pushNotification ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs - liftIO $ incNtfStat_ stats ntfReceived - when isOwn $ liftIO $ incServerStat srvHost (ntfReceivedOwn stats) - Left AUTH -> liftIO $ do - incNtfStat_ stats ntfReceivedAuth - when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) - Left _ -> pure () - Right SMP.END -> - whenM (atomically $ activeClientSession' ca sessionId srv) $ - void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd - Right SMP.DELD -> - void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted - Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e - Right _ -> logError "SMP server unexpected response" - Left e -> logError $ "SMP client error: " <> tshow e +receiveSMPMessage :: NtfPostgresStore -> NtfPushServer -> NtfServerStats -> SMPClientAgent 'NotifierService -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO () +receiveSMPMessage st ps stats ca ((_, srv@(SMPServer (h :| _) _ _), _), THandleParams {sessionId}, ts) = + forM_ ts $ \(ntfId, t) -> case t of + STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen + STResponse {} -> pure () -- it was already reported as timeout error + STEvent msgOrErr -> do + let smpQueue = SMPQueueNtf srv ntfId + case msgOrErr of + Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do + ntfTs <- getSystemTime + updatePeriodStats (activeSubs stats) ntfId + let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} + srvHost = safeDecodeUtf8 $ strEncode h + isOwn = isOwnServer ca srv + addTokenLastNtf st newNtf >>= \case + Right (tkn, lastNtfs) -> do + pushNotification st stats ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs + incNtfStat_ stats ntfReceived + when isOwn $ incServerStat srvHost (ntfReceivedOwn stats) + Left AUTH -> do + incNtfStat_ stats ntfReceivedAuth + when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) + Left _ -> pure () + Right SMP.END -> + whenM (atomically $ activeClientSession' ca sessionId srv) $ + void $ updateSrvSubStatus st smpQueue NSEnd + Right SMP.DELD -> + void $ updateSrvSubStatus st smpQueue NSDeleted + Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e + Right _ -> logError "SMP server unexpected response" + Left e -> logError $ "SMP client error: " <> tshow e ntfSubscriber :: NtfSubscriber -> M () ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {agentQ}} = do @@ -639,55 +630,53 @@ logSubStatus srv event n updated = showServer' :: SMPServer -> Text showServer' = decodeLatin1 . strEncode . host -pushNotification :: NtfPushServer -> Maybe T.Text -> OwnServer -> NtfTknRec -> PushNotification -> M () -pushNotification s srvHost_ isOwn tkn@NtfTknRec {token = DeviceToken pp _} ntf = do - q <- getOrCreatePushWorker s (srvHost_, pp) isOwn +pushNotification :: NtfPostgresStore -> NtfServerStats -> NtfPushServer -> Maybe T.Text -> OwnServer -> NtfTknRec -> PushNotification -> IO () +pushNotification st stats s srvHost_ isOwn tkn@NtfTknRec {token = DeviceToken pp _} ntf = do + q <- getOrCreatePushWorker st stats s (srvHost_, pp) isOwn atomically $ writeTBQueue q (tkn, ntf) -getOrCreatePushWorker :: NtfPushServer -> (Maybe T.Text, PushProvider) -> OwnServer -> M (TBQueue (NtfTknRec, PushNotification)) -getOrCreatePushWorker s@NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} key@(srvHost_, _) isOwn = do - ts <- liftIO getCurrentTime +getOrCreatePushWorker :: NtfPostgresStore -> NtfServerStats -> NtfPushServer -> (Maybe T.Text, PushProvider) -> OwnServer -> IO (TBQueue (NtfTknRec, PushNotification)) +getOrCreatePushWorker st stats s@NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} key@(srvHost_, _) isOwn = do + ts <- getCurrentTime atomically (getSessVar pushWorkerSeq key pushWorkers ts) >>= \case Left v -> do - q <- liftIO $ newTBQueueIO pushQSize - tId <- mkWeakThreadId =<< forkIO (runPushWorker s srvHost_ isOwn q) + q <- newTBQueueIO pushQSize + tId <- mkWeakThreadId =<< forkIO (runPushWorker st stats s srvHost_ isOwn q) atomically $ putTMVar (sessionVar v) PushWorker {workerQ = q, workerThreadId = tId} pure q Right v -> workerQ <$> atomically (readTMVar $ sessionVar v) -runPushWorker :: NtfPushServer -> Maybe T.Text -> OwnServer -> TBQueue (NtfTknRec, PushNotification) -> M () -runPushWorker s srvHost_ isOwn q = forever $ do +runPushWorker :: NtfPostgresStore -> NtfServerStats -> NtfPushServer -> Maybe T.Text -> OwnServer -> TBQueue (NtfTknRec, PushNotification) -> IO () +runPushWorker st stats s srvHost_ isOwn q = forever $ do (tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue q) - liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) - st <- asks store + logDebug $ "sending push notification to " <> T.pack (show pp) case ntf of PNVerification _ -> - liftIO (deliverNotification st pp tkn ntf) >>= \case + deliverNotification st pp tkn ntf >>= \case Right _ -> do - void $ liftIO $ setTknStatusConfirmed st tkn - incNtfStatT t ntfVrfDelivered - Left _ -> incNtfStatT t ntfVrfFailed + void $ setTknStatusConfirmed st tkn + incNtfStatT_ stats t ntfVrfDelivered + Left _ -> incNtfStatT_ stats t ntfVrfFailed PNCheckMessages -> - liftIO (deliverNotification st pp tkn ntf) >>= \case + deliverNotification st pp tkn ntf >>= \case Right _ -> do - void $ liftIO $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime - incNtfStatT t ntfCronDelivered - Left _ -> incNtfStatT t ntfCronFailed + void $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime + incNtfStatT_ stats t ntfCronDelivered + Left _ -> incNtfStatT_ stats t ntfCronFailed PNMessage {} -> checkActiveTkn tknStatus $ do - stats <- asks serverStats - liftIO $ updatePeriodStats (activeTokens stats) ntfTknId - liftIO (deliverNotification st pp tkn ntf) >>= \case + updatePeriodStats (activeTokens stats) ntfTknId + deliverNotification st pp tkn ntf >>= \case Left _ -> do - incNtfStatT t ntfFailed - when isOwn $ liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ + incNtfStatT_ stats t ntfFailed + when isOwn $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ Right () -> do - incNtfStatT t ntfDelivered - when isOwn $ liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ + incNtfStatT_ stats t ntfDelivered + when isOwn $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ where - checkActiveTkn :: NtfTknStatus -> M () -> M () + checkActiveTkn :: NtfTknStatus -> IO () -> IO () checkActiveTkn status action | status == NTActive = action - | otherwise = liftIO $ logError "bad notification token status" + | otherwise = logError "bad notification token status" deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ()) deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf' = do (deliver, clientVar) <- getPushClient s pp @@ -730,13 +719,13 @@ pushWorkersQLength workers = do periodicNtfsThread :: NtfPushServer -> M () periodicNtfsThread s = do st <- asks store + stats <- asks serverStats ntfsInterval <- asks $ periodicNtfsInterval . config let interval = 1000000 * ntfsInterval - UnliftIO unlift <- askUnliftIO liftIO $ forever $ do threadDelay interval now <- systemSeconds <$> getSystemTime - cnt <- withPeriodicNtfTokens st now $ \tkn -> unlift $ pushNotification s Nothing False tkn PNCheckMessages + cnt <- withPeriodicNtfTokens st now $ \tkn -> pushNotification st stats s Nothing False tkn PNCheckMessages logNote $ "Scheduled periodic notifications: " <> tshow cnt runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M () @@ -826,14 +815,16 @@ verifyNtfTransmission st thAuth (tAuth, authorized, (corrId, entId, cmd)) = case e -> VRFailed e client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M () -client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = +client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = do + st <- asks store + stats <- asks serverStats forever $ atomically (readTBQueue rcvQ) - >>= mapM processCommand + >>= mapM (processCommand st stats) >>= atomically . writeTBQueue sndQ where - processCommand :: NtfRequest -> M (Transmission NtfResponse) - processCommand = \case + processCommand :: NtfPostgresStore -> NtfServerStats -> NtfRequest -> M (Transmission NtfResponse) + processCommand st stats = \case NtfReqNew corrId (ANE SToken newTkn@(NewNtfTkn token _ dhPubKey)) -> (corrId,NoEntity,) <$> do logDebug "TNEW - new token" (srvDhPubKey, srvDhPrivKey) <- atomically . C.generateKeyPair =<< asks random @@ -843,7 +834,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = ts <- liftIO $ getSystemDate let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts withNtfStore (`addNtfToken` tkn) $ \_ -> do - pushNotification ps Nothing False tkn $ PNVerification regCode + liftIO $ pushNotification st stats ps Nothing False tkn $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknCreated pure $ NRTknId tknId srvDhPubKey @@ -859,7 +850,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = | otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification where sendVerification = do - pushNotification ps Nothing False tkn $ PNVerification tknRegCode + liftIO $ pushNotification st stats ps Nothing False tkn $ PNVerification tknRegCode incNtfStatT token ntfVrfQueued pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey TVFY code -- this allows repeated verification for cases when client connection dropped before server response @@ -877,7 +868,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = regCode <- getRegCode let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode} withNtfStore (`replaceNtfToken` tkn') $ \_ -> do - pushNotification ps Nothing False tkn' $ PNVerification regCode + liftIO $ pushNotification st stats ps Nothing False tkn' $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknReplaced pure NROk @@ -949,6 +940,11 @@ incNtfStatT (DeviceToken PPApnsNull _) _ = pure () incNtfStatT _ statSel = incNtfStat statSel {-# INLINE incNtfStatT #-} +incNtfStatT_ :: NtfServerStats -> DeviceToken -> (NtfServerStats -> IORef Int) -> IO () +incNtfStatT_ _ (DeviceToken PPApnsNull _) _ = pure () +incNtfStatT_ stats _ statSel = incNtfStat_ stats statSel +{-# INLINE incNtfStatT_ #-} + incNtfStat :: (NtfServerStats -> IORef Int) -> M () incNtfStat statSel = asks serverStats >>= liftIO . (`incNtfStat_` statSel) {-# INLINE incNtfStat #-} diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 54349f6001..7ece78609f 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -28,7 +28,6 @@ module Simplex.Messaging.Notifications.Server.Env ) where import Control.Concurrent (ThreadId) -import Data.IORef import qualified Control.Exception as E import Control.Logger.Simple import Control.Monad @@ -120,20 +119,18 @@ data NtfEnv = NtfEnv serverStats :: NtfServerStats } -newNtfServerEnv :: NtfServerConfig -> (IORef NtfEnv -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> IO NtfEnv +newNtfServerEnv :: NtfServerConfig -> (NtfPostgresStore -> NtfPushServer -> NtfServerStats -> SMPClientAgent 'NotifierService -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> IO NtfEnv newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, useServiceCreds} mkProcessMsg = do random <- C.newRandom store <- newNtfDbStore dbStoreConfig tlsServerCreds <- loadServerCredential ntfCredentials XV.Fingerprint fp <- loadFingerprint ntfCredentials - let dbService = if useServiceCreds then Just $ mkDbService random store else Nothing - envRef <- newIORef $ error "NtfEnv not initialized" - subscriber <- newNtfSubscriber smpAgentCfg (mkProcessMsg envRef) dbService random pushServer <- newNtfPushServer pushQSize apnsConfig serverStats <- newNtfServerStats =<< getCurrentTime - let env = NtfEnv {config, subscriber, pushServer, store, random, tlsServerCreds, serverIdentity = C.KeyHash fp, serverStats} - writeIORef envRef env - pure env + let dbService = if useServiceCreds then Just $ mkDbService random store else Nothing + processMsg = mkProcessMsg store pushServer serverStats + subscriber <- newNtfSubscriber smpAgentCfg processMsg dbService random + pure NtfEnv {config, subscriber, pushServer, store, random, tlsServerCreds, serverIdentity = C.KeyHash fp, serverStats} where mkDbService g st = DBService {getCredentials, updateServiceId} where @@ -162,7 +159,7 @@ data NtfSubscriber = NtfSubscriber type SMPSubscriberVar = SessionVar SMPSubscriber -newNtfSubscriber :: SMPClientAgentConfig -> (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Maybe DBService -> TVar ChaChaDRG -> IO NtfSubscriber +newNtfSubscriber :: SMPClientAgentConfig -> (SMPClientAgent 'NotifierService -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Maybe DBService -> TVar ChaChaDRG -> IO NtfSubscriber newNtfSubscriber smpAgentCfg processMsg dbService random = do smpSubscribers <- TM.emptyIO subscriberSeq <- newTVarIO 0 diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 3879fd8ee3..123ccd545c 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -706,7 +706,7 @@ mkJournalStoreConfig queueStoreCfg storePath msgQueueQuota maxJournalMsgCount ma newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent newSMPProxyAgent smpAgentCfg random = do - smpAgent <- newSMPClientAgent SSender smpAgentCfg (\_ -> pure ()) Nothing random + smpAgent <- newSMPClientAgent SSender smpAgentCfg (\_ _ -> pure ()) Nothing random pure ProxyAgent {smpAgent} readWriteQueueStore :: forall q. StoreQueueClass q => Bool -> (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO (StoreLog 'WriteMode)