From e14f019210c2715c4f3f76993f6b2fd2dc28fa8c Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Fri, 26 Jun 2026 14:44:25 +0530 Subject: [PATCH 1/6] Add "Continue" to Fold Step type --- .../Streamly/Benchmark/Data/Scanl/Type.hs | 4 +- core/docs/Changelog.md | 30 ++- .../Internal/Data/Fold/Combinators.hs | 2 +- .../Streamly/Internal/Data/Fold/Container.hs | 25 +- core/src/Streamly/Internal/Data/Fold/Type.hs | 46 +++- .../Streamly/Internal/Data/MutArray/Type.hs | 5 +- core/src/Streamly/Internal/Data/RingArray.hs | 4 +- .../Internal/Data/Scanl/Combinators.hs | 46 ++-- .../Streamly/Internal/Data/Scanl/Container.hs | 70 +++-- core/src/Streamly/Internal/Data/Scanl/Step.hs | 106 ++++++++ core/src/Streamly/Internal/Data/Scanl/Type.hs | 128 +++++++--- .../Streamly/Internal/Data/Scanl/Window.hs | 47 ++-- .../Internal/Data/Stream/Transform.hs | 27 +- core/src/Streamly/Internal/Data/Unfold.hs | 26 +- core/streamly-core.cabal | 1 + .../Internal/Data/Fold/Channel/Type.hs | 46 +++- src/Streamly/Internal/Data/Fold/Concurrent.hs | 6 +- .../Internal/Data/Scanl/Concurrent.hs | 69 +++-- test/Streamly/Test/Data/Fold/Combinators.hs | 3 +- test/Streamly/Test/Data/Fold/Type.hs | 110 +++++++- test/Streamly/Test/Data/Scanl/Combinators.hs | 164 +++++++++++- .../Test/Data/Scanl/CommonCombinators.hs | 10 +- test/Streamly/Test/Data/Scanl/CommonType.hs | 13 +- test/Streamly/Test/Data/Scanl/Concurrent.hs | 32 ++- test/Streamly/Test/Data/Scanl/Container.hs | 94 +++++++ test/Streamly/Test/Data/Scanl/Type.hs | 240 ++++++++++++++++-- test/Streamly/Test/Data/Scanl/Window.hs | 30 ++- test/Streamly/Test/Data/Stream/Transform.hs | 37 +++ test/Streamly/Test/Data/Unfold.hs | 78 ++++++ test/lib/Streamly/Test/Data/Scanl/Common.hs | 77 ++++++ test/streamly-tests.cabal | 1 + 31 files changed, 1365 insertions(+), 212 deletions(-) create mode 100644 core/src/Streamly/Internal/Data/Scanl/Step.hs create mode 100644 test/lib/Streamly/Test/Data/Scanl/Common.hs diff --git a/benchmark/Streamly/Benchmark/Data/Scanl/Type.hs b/benchmark/Streamly/Benchmark/Data/Scanl/Type.hs index 6e6216c3ff..6dc9f1f830 100644 --- a/benchmark/Streamly/Benchmark/Data/Scanl/Type.hs +++ b/benchmark/Streamly/Benchmark/Data/Scanl/Type.hs @@ -125,7 +125,7 @@ inspect $ 'scanl1M' `hasNoType` ''SPEC {-# INLINE scant' #-} scant' :: Int -> IO () -scant' n = withPostscanl n (Scanl.scant' (\s a -> Scanl.Partial (s + a)) (Scanl.Partial 0) id) +scant' n = withPostscanl n (Scanl.scant' (\s a -> Scanl.Partial (s + a)) (FL.Partial 0) id) #ifdef INSPECTION inspect $ 'scant' `hasNoType` ''Step @@ -138,7 +138,7 @@ scantM' n = withPostscanl n (Scanl.scantM' (\s a -> return (Scanl.Partial (s + a))) - (return (Scanl.Partial 0)) + (return (FL.Partial 0)) return) #ifdef INSPECTION diff --git a/core/docs/Changelog.md b/core/docs/Changelog.md index d0af7fafcf..0ee0f6cd97 100644 --- a/core/docs/Changelog.md +++ b/core/docs/Changelog.md @@ -2,19 +2,41 @@ ## Unreleased +### Enhancements + +* Filtering support for `Scanl`. +* FileSystem.Path now does not treat paths with leading "./" as rooted paths. + +### Bug Fixes + * Fixed `Stream.postscanl` to omit the output of a scan that terminates without consuming any input (e.g. `Scanl.take 0`). -* Breaking: In `FileSystem.Path` module the default for `eqPath` changed + +### Breaking changes + +* In Scanl module, `filter` and any other filtering operations like + `filterM`, `catMaybes`, `catLefts` etc. used to emit the previous + value of accumulator for filtered-out elements, now they do not emit + any output for those. +* `Scanl.incrScanWith` now provides the ring array reflecting the + state of the window /before/ the incoming element is inserted. +* In `FileSystem.Path` module the default for `eqPath` changed on Windows to case-sensitive comparison. -* Breaking: A leading "." component (e.g. "." or "./x") is no longer +* A leading "." component (e.g. "." or "./x") is no longer treated as a rooted path, making the behavior more in line with intuitive expectation. -* Breaking: In `FileSystem.Path` module the default for `eqPath` changed +* In `FileSystem.Path` module the default for `eqPath` changed on both Posix and Windows so that `allowRelativeEquality` is `True` by default. Literally identical relative paths (e.g. `./x` and `./x`, or `c:` and `c:` on Windows) now compare equal. Pass `allowRelativeEquality False` to restore the previous strict behaviour. -* Internal: Removed deprecated module `Streamly.Internal.Data.Stream.StreamD`. + +### Internal changes + +* `Scanl`'s Step type is now different from Fold's `Step` type + with an additional `Continue` constructor, this change enables proper + filtering in scans. +* Removed deprecated module `Streamly.Internal.Data.Stream.StreamD`. Use `Streamly.Internal.Data.Stream` instead. ## 0.3.1 (May 2026) diff --git a/core/src/Streamly/Internal/Data/Fold/Combinators.hs b/core/src/Streamly/Internal/Data/Fold/Combinators.hs index 2e3af36c0a..3c17c0a749 100644 --- a/core/src/Streamly/Internal/Data/Fold/Combinators.hs +++ b/core/src/Streamly/Internal/Data/Fold/Combinators.hs @@ -1788,7 +1788,7 @@ distributeScan getFolds = Scanl consume initial extract final initial = return $ Partial (Tuple' [] []) - run st [] _ = return $ Partial st + run st [] _ = return $ Scanl.Partial st run (Tuple' ys zs) (Fold step init extr fin : xs) a = do res <- init case res of diff --git a/core/src/Streamly/Internal/Data/Fold/Container.hs b/core/src/Streamly/Internal/Data/Fold/Container.hs index 4be18bee7a..5213ff9463 100644 --- a/core/src/Streamly/Internal/Data/Fold/Container.hs +++ b/core/src/Streamly/Internal/Data/Fold/Container.hs @@ -122,6 +122,7 @@ import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..)) import qualified Data.Set as Set import qualified Streamly.Internal.Data.IsMap as IsMap import qualified Streamly.Internal.Data.Scanl.Container as Scanl +import qualified Streamly.Internal.Data.Scanl.Type as Scanl import Prelude hiding (Foldable(..)) import Streamly.Internal.Data.Fold.Type @@ -395,7 +396,11 @@ demuxScanGeneric :: (Monad m, IsMap f, Traversable f) => -> (Key f -> m (Maybe (Fold m a b))) -> Scanl m a (m (f b), Maybe (Key f, b)) demuxScanGeneric getKey getFold = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final + Scanl + (\s a -> Scanl.Partial <$> step s a) + (Partial <$> initial) + extract + final where @@ -666,7 +671,11 @@ demuxScanGenericIO :: (MonadIO m, IsMap f, Traversable f) => -> (Key f -> m (Maybe (Fold m a b))) -> Scanl m a (m (f b), Maybe (Key f, b)) demuxScanGenericIO getKey getFold = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final + Scanl + (\s a -> Scanl.Partial <$> step s a) + (Partial <$> initial) + extract + final where @@ -1017,7 +1026,11 @@ classifyScanGeneric :: (Monad m, IsMap f, Traversable f, Ord (Key f)) => -- that the downstream consumers can choose to process or discard it. (a -> Key f) -> Fold m a b -> Scanl m a (m (f b), Maybe (Key f, b)) classifyScanGeneric f (Fold step1 initial1 extract1 final1) = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final + Scanl + (\s a -> Scanl.Partial <$> step s a) + (Partial <$> initial) + extract + final where @@ -1223,7 +1236,11 @@ toContainerIO f (Fold step1 initial1 _ final1) = classifyScanGenericIO :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) => (a -> Key f) -> Fold m a b -> Scanl m a (m (f b), Maybe (Key f, b)) classifyScanGenericIO f (Fold step1 initial1 extract1 final1) = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final + Scanl + (\s a -> Scanl.Partial <$> step s a) + (Partial <$> initial) + extract + final where diff --git a/core/src/Streamly/Internal/Data/Fold/Type.hs b/core/src/Streamly/Internal/Data/Fold/Type.hs index c7b8a2873a..8ebc0693a3 100644 --- a/core/src/Streamly/Internal/Data/Fold/Type.hs +++ b/core/src/Streamly/Internal/Data/Fold/Type.hs @@ -375,6 +375,7 @@ module Streamly.Internal.Data.Fold.Type -- XXX Do refold ops belong to Scanl or Fold? , fromRefold , fromScanl + , fromFold , drain , toList , toListRev @@ -625,8 +626,26 @@ rmapM f (Fold step initial extract final) = -- | Convert a left scan to a fold. {-# INLINE fromScanl #-} -fromScanl :: Scanl m a b -> Fold m a b -fromScanl (Scanl step initial extract final) = Fold step initial extract final +fromScanl :: Functor m => Scanl m a b -> Fold m a b +fromScanl (Scanl step initial extract final) = + Fold (\s a -> fmap Scanl.toFoldStep (step s a)) + initial + extract + final + +-- XXX This belongs in a Scanl module. It lives here for now only because +-- 'Fold.Type' imports 'Scanl.Type' (Folds are built on top of scans); move it +-- once that dependency is inverted. We should move fromScanl as well to the +-- Scanl module and rename it to "toFold". + +-- | Convert a fold to a left scan. +{-# INLINE fromFold #-} +fromFold :: Functor m => Fold m a b -> Scanl m a b +fromFold (Fold step initial extract final) = + Scanl (\s a -> fmap Scanl.fromFoldStep (step s a)) + initial + extract + final -- | Make a fold from a left fold style pure step function and initial value of -- the accumulator. @@ -793,7 +812,12 @@ foldrM' g = fromScanl . Scanl.mkScanrM g -- {-# INLINE foldt' #-} foldt' :: Monad m => (s -> a -> Step s b) -> Step s b -> (s -> b) -> Fold m a b -foldt' step initial = fromScanl . Scanl.scant' step initial +foldt' step initial extract = + Fold + (\s a -> pure $ step s a) + (pure initial) + (pure . extract) + (pure . extract) -- | Make a terminating fold with an effectful step function and initial state, -- and a state extraction function. @@ -1595,17 +1619,18 @@ postscanl runStep actionL sR = do rL <- actionL case rL of - Done bL -> do + Scanl.Done bL -> do rR <- stepR sR bL case rR of Partial sR1 -> Done <$> finalR sR1 Done bR -> return $ Done bR - Partial sL -> do + Scanl.Partial sL -> do !b <- extractL sL rR <- stepR sR b case rR of Partial sR1 -> return $ Partial (sL, sR1) Done bR -> finalL sL >> return (Done bR) + Scanl.Continue sL -> return $ Partial (sL, sR) initial = do rR <- initialR @@ -1704,11 +1729,13 @@ scanlWith isMany where + initialL_ = Scanl.fromFoldStep <$> initialL + {-# INLINE runStep #-} runStep actionL sR = do rL <- actionL case rL of - Done bL -> do + Scanl.Done bL -> do rR <- stepR sR bL case rR of Partial sR1 -> @@ -1717,20 +1744,21 @@ scanlWith isMany -- will not terminate. In that case we should return -- error in the beginning itself. And we should remove -- this recursion, assuming it won't return Done. - then runStep initialL sR1 + then runStep initialL_ sR1 else Done <$> finalR sR1 Done bR -> return $ Done bR - Partial sL -> do + Scanl.Partial sL -> do !b <- extractL sL rR <- stepR sR b case rR of Partial sR1 -> return $ Partial (sL, sR1) Done bR -> finalL sL >> return (Done bR) + Scanl.Continue sL -> return $ Partial (sL, sR) initial = do r <- initialR case r of - Partial sR -> runStep initialL sR + Partial sR -> runStep initialL_ sR Done b -> return $ Done b step (sL, sR) x = runStep (stepL sL x) sR diff --git a/core/src/Streamly/Internal/Data/MutArray/Type.hs b/core/src/Streamly/Internal/Data/MutArray/Type.hs index 2885c695de..9feb4f8f83 100644 --- a/core/src/Streamly/Internal/Data/MutArray/Type.hs +++ b/core/src/Streamly/Internal/Data/MutArray/Type.hs @@ -550,6 +550,7 @@ import Streamly.Internal.Data.Unfold.Type (Unfold(..)) import Streamly.Internal.System.IO (arrayPayloadSize, defaultChunkSize) import qualified Streamly.Internal.Data.Fold.Type as FL +import qualified Streamly.Internal.Data.Scanl.Type as Scanl import qualified Streamly.Internal.Data.MutByteArray.Type as Unboxed import qualified Streamly.Internal.Data.Parser.Type as Parser -- import qualified Streamly.Internal.Data.Fold.Type as Fold @@ -4074,8 +4075,8 @@ scanCompactMinAs ps minElems = runInner len buf = if len >= minBytes then do - return $ FL.Partial $ CompactMinComplete buf - else return $ FL.Partial $ CompactMinIncomplete buf + return $ Scanl.Partial $ CompactMinComplete buf + else return $ Scanl.Partial $ CompactMinIncomplete buf step CompactMinInit arr = runInner (byteLength arr) arr diff --git a/core/src/Streamly/Internal/Data/RingArray.hs b/core/src/Streamly/Internal/Data/RingArray.hs index dac99a92be..7b7c170b79 100644 --- a/core/src/Streamly/Internal/Data/RingArray.hs +++ b/core/src/Streamly/Internal/Data/RingArray.hs @@ -531,12 +531,12 @@ scanRingsOf n = Scanl step initial extract extract then error "scanRingsOf: window size must be > 0" else do mba <- liftIO $ MutByteArray.new rSize - return $ Partial $ Tuple3Fused' mba 0 0 + return $ Fold.Partial $ Tuple3Fused' mba 0 0 step (Tuple3Fused' mba rh offset) a = do RingArray _ _ rh1 <- replace_ (RingArray mba rSize rh) a let offset1 = offset + SIZE_OF(a) - return $ Partial $ Tuple3Fused' mba rh1 offset1 + return $ Scanl.Partial $ Tuple3Fused' mba rh1 offset1 -- XXX exitify optimization causes a problem here when modular folds are -- used. Sometimes inlining "extract" is helpful. diff --git a/core/src/Streamly/Internal/Data/Scanl/Combinators.hs b/core/src/Streamly/Internal/Data/Scanl/Combinators.hs index 04e9551fde..fee34c1b8f 100644 --- a/core/src/Streamly/Internal/Data/Scanl/Combinators.hs +++ b/core/src/Streamly/Internal/Data/Scanl/Combinators.hs @@ -229,6 +229,7 @@ import Streamly.Internal.Data.Unfold.Type (Unfold(..)) import qualified Prelude import qualified Streamly.Internal.Data.MutArray.Type as MA -- import qualified Streamly.Internal.Data.Array.Type as Array +import qualified Streamly.Internal.Data.Fold.Step as Fold import qualified Streamly.Internal.Data.Scanl.Window as Scanl import qualified Streamly.Internal.Data.Pipe.Type as Pipe -- import qualified Streamly.Internal.Data.RingArray as RingArray @@ -372,7 +373,7 @@ mapMaybeM f = lmapM f . catMaybes -- >>> f x = if even x then Just x else Nothing -- >>> scn = Scanl.mapMaybe f Scanl.toList -- >>> Stream.toList $ Stream.scanl scn (Stream.enumerateFromTo 1 10) --- [[],[],[2],[2],[2,4],[2,4],[2,4,6],[2,4,6],[2,4,6,8],[2,4,6,8],[2,4,6,8,10]] +-- [[],[2],[2,4],[2,4,6],[2,4,6,8],[2,4,6,8,10]] -- {-# INLINE mapMaybe #-} mapMaybe :: Monad m => (a -> Maybe b) -> Scanl m b r -> Scanl m a r @@ -411,6 +412,7 @@ pipe (Pipe consume produce pinitial) (Scanl fstep finitial fextract ffinal) = return $ case acc1 of Partial s -> Partial $ Tuple' cs1 s + Continue s -> Continue $ Tuple' cs1 s Done b1 -> Done b1 -- XXX this case is recursive may cause fusion issues. -- To remove recursion we will need a produce mode in scans which makes @@ -421,6 +423,7 @@ pipe (Pipe consume produce pinitial) (Scanl fstep finitial fextract ffinal) = r <- produce ps1 case acc1 of Partial s -> go s r + Continue s -> go s r Done b1 -> return $ Done b1 go acc (Pipe.SkipC cs1) = return $ Partial $ Tuple' cs1 acc @@ -492,6 +495,8 @@ scanWith isMany where + initialL_ = fromFoldStep <$> initialL + {-# INLINE runStep #-} runStep actionL sR = do rL <- actionL @@ -505,7 +510,11 @@ scanWith isMany -- will not terminate. In that case we should return -- error in the beginning itself. And we should remove -- this recursion, assuming it won't return Done. - then runStep initialL sR1 + then runStep initialL_ sR1 + else Done <$> finalR sR1 + Continue sR1 -> + if isMany + then runStep initialL_ sR1 else Done <$> finalR sR1 Done bR -> return $ Done bR Partial sL -> do @@ -513,13 +522,15 @@ scanWith isMany rR <- stepR sR b case rR of Partial sR1 -> return $ Partial (sL, sR1) + Continue sR1 -> return $ Continue (sL, sR1) Done bR -> finalL sL >> return (Done bR) + Continue sL -> return $ Continue (sL, sR) initial = do r <- initialR case r of - Partial sR -> runStep initialL sR - Done b -> return $ Done b + Fold.Partial sR -> toFoldStep <$> runStep initialL_ sR + Fold.Done b -> return $ Fold.Done b step (sL, sR) x = runStep (stepL sL x) sR @@ -614,7 +625,7 @@ rollingMapM f = Scanl step initial extract extract -- XXX We need just a postscan. We do not need an initial result here. -- Or we can supply a default initial result as an argument to rollingMapM. - initial = return $ Partial (Nothing, error "Empty stream") + initial = return $ Fold.Partial (Nothing, error "Empty stream") step (prev, _) cur = do x <- f prev cur @@ -723,7 +734,7 @@ the = scant' step initial id where - initial = Partial Nothing + initial = Fold.Partial Nothing step Nothing x = Partial (Just x) step old@(Just x0) x = @@ -760,7 +771,7 @@ sum = Scanl.cumulativeScan Scanl.incrSum -- {-# INLINE product #-} product :: (Monad m, Num a, Eq a) => Scanl m a a -product = scant' step (Partial 1) id +product = scant' step (Fold.Partial 1) id where @@ -1311,7 +1322,7 @@ takingEndByM p = Scanl step initial extract extract where - initial = return $ Partial Nothing' + initial = return $ Fold.Partial Nothing' step _ a = do r <- p a @@ -1336,7 +1347,7 @@ takingEndByM_ p = Scanl step initial extract extract where - initial = return $ Partial Nothing' + initial = return $ Fold.Partial Nothing' step _ a = do r <- p a @@ -1361,7 +1372,7 @@ droppingWhileM p = Scanl step initial extract extract where - initial = return $ Partial Nothing' + initial = return $ Fold.Partial Nothing' step Nothing' a = do r <- p a @@ -1848,11 +1859,11 @@ partitionByM f resR <- initialR return $ case resL of - Done bl -> Done bl - Partial sl -> + Fold.Done bl -> Fold.Done bl + Fold.Partial sl -> case resR of - Partial sr -> Partial $ PartLeft sl sr - Done br -> Done br + Fold.Partial sr -> Fold.Partial $ PartLeft sl sr + Fold.Done br -> Fold.Done br runBoth sL sR a = do pRes <- f a @@ -1861,11 +1872,13 @@ partitionByM f resL <- stepL sL b case resL of Partial s -> return $ Partial $ PartLeft s sR + Continue s -> return $ Continue $ PartLeft s sR Done x -> return $ Done x Right c -> do resR <- stepR sR c case resR of Partial s -> return $ Partial $ PartRight sL s + Continue s -> return $ Continue $ PartRight sL s Done x -> return $ Done x step (PartLeft sL sR) = runBoth sL sR @@ -2207,6 +2220,7 @@ unfoldEach (Unfold ustep inject) (Scanl fstep initial extract final) = fres <- fstep fs b case fres of Partial fs1 -> produce fs1 us1 + Continue fs1 -> produce fs1 us1 -- XXX What to do with the remaining stream? Done c -> return $ Done c StreamD.Skip us1 -> produce fs us1 @@ -2234,8 +2248,8 @@ bottomBy cmp n = Scanl step initial extract extract initial = do arr <- MA.emptyOf' n if n <= 0 - then return $ Done arr - else return $ Partial (arr, 0) + then return $ Fold.Done arr + else return $ Fold.Partial (arr, 0) step (arr, i) x = if i < n diff --git a/core/src/Streamly/Internal/Data/Scanl/Container.hs b/core/src/Streamly/Internal/Data/Scanl/Container.hs index 643bd14f34..a635de5362 100644 --- a/core/src/Streamly/Internal/Data/Scanl/Container.hs +++ b/core/src/Streamly/Internal/Data/Scanl/Container.hs @@ -102,6 +102,7 @@ import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3'(..)) import qualified Data.IntSet as IntSet import qualified Data.Set as Set +import qualified Streamly.Internal.Data.Fold.Step as Fold import qualified Streamly.Internal.Data.IsMap as IsMap import Prelude hiding (Foldable(..)) @@ -277,7 +278,7 @@ demuxGeneric :: (Monad m, IsMap f, Traversable f) => -> (Key f -> m (Maybe (Scanl m a b))) -> Scanl m a (m (f b), Maybe (Key f, b)) demuxGeneric getKey getFold = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final + Scanl (\s a -> Partial <$> step s a) (Fold.Partial <$> initial) extract final where @@ -287,19 +288,24 @@ demuxGeneric getKey getFold = runFold kv (Scanl step1 initial1 extract1 final1) (k, a) = do res <- initial1 case res of - Partial s -> do + Fold.Partial s -> do res1 <- step1 s a case res1 of Partial ss -> do b <- extract1 ss - let fld = Scanl step1 (return res1) extract1 final1 + let fld = Scanl step1 (return (Fold.Partial ss)) extract1 final1 return $ Tuple' (IsMap.mapInsert k fld kv) (Just (k, b)) + Continue ss -> + let fld = Scanl step1 (return (Fold.Partial ss)) extract1 final1 + in return + $ Tuple' + (IsMap.mapInsert k fld kv) Nothing Done b -> return $ Tuple' (IsMap.mapDelete k kv) (Just (k, b)) - Done b -> + Fold.Done b -> -- Done in "initial" is possible only for the very first time -- the fold is initialized, and in that case we have not yet -- inserted it in the Map, so we do not need to delete it. @@ -322,7 +328,7 @@ demuxGeneric getKey getFold = f (Scanl _ i e _) = do r <- i case r of - Partial s -> e s + Fold.Partial s -> e s _ -> error "demuxGeneric: unreachable code" final (Tuple' kv x) = return (Prelude.mapM f kv, x) @@ -332,7 +338,7 @@ demuxGeneric getKey getFold = f (Scanl _ i _ fin) = do r <- i case r of - Partial s -> fin s + Fold.Partial s -> fin s _ -> error "demuxGeneric: unreachable code" {-# INLINE demuxUsingMap #-} @@ -385,7 +391,7 @@ demuxGenericIO :: (MonadIO m, IsMap f, Traversable f) => -> (Key f -> m (Maybe (Scanl m a b))) -> Scanl m a (m (f b), Maybe (Key f, b)) demuxGenericIO getKey getFold = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final + Scanl (\s a -> Partial <$> step s a) (Fold.Partial <$> initial) extract final where @@ -395,37 +401,46 @@ demuxGenericIO getKey getFold = initFold kv (Scanl step1 initial1 extract1 final1) (k, a) = do res <- initial1 case res of - Partial s -> do + Fold.Partial s -> do res1 <- step1 s a case res1 of Partial ss -> do -- XXX Instead of using a Fold type here use a custom -- type with an IORef (possibly unboxed) for the -- accumulator. That will reduce the allocations. - let fld = Scanl step1 (return res1) extract1 final1 + let fld = Scanl step1 (return (Fold.Partial ss)) extract1 final1 ref <- liftIO $ newIORef fld b <- extract1 ss return $ Tuple' (IsMap.mapInsert k ref kv) (Just (k, b)) + Continue ss -> do + let fld = Scanl step1 (return (Fold.Partial ss)) extract1 final1 + ref <- liftIO $ newIORef fld + return + $ Tuple' (IsMap.mapInsert k ref kv) Nothing Done b -> return $ Tuple' kv (Just (k, b)) - Done b -> return $ Tuple' kv (Just (k, b)) + Fold.Done b -> return $ Tuple' kv (Just (k, b)) {-# INLINE runFold #-} runFold kv ref (Scanl step1 initial1 extract1 final1) (k, a) = do res <- initial1 case res of - Partial s -> do + Fold.Partial s -> do res1 <- step1 s a case res1 of Partial ss -> do - let fld = Scanl step1 (return res1) extract1 final1 + let fld = Scanl step1 (return (Fold.Partial ss)) extract1 final1 liftIO $ writeIORef ref fld b <- extract1 ss return $ Tuple' kv (Just (k, b)) + Continue ss -> do + let fld = Scanl step1 (return (Fold.Partial ss)) extract1 final1 + liftIO $ writeIORef ref fld + return $ Tuple' kv Nothing Done b -> let kv1 = IsMap.mapDelete k kv in return $ Tuple' kv1 (Just (k, b)) - Done _ -> error "demuxGenericIO: unreachable" + Fold.Done _ -> error "demuxGenericIO: unreachable" step (Tuple' kv _) a = do let k = getKey a @@ -447,7 +462,7 @@ demuxGenericIO getKey getFold = Scanl _ i e _ <- liftIO $ readIORef ref r <- i case r of - Partial s -> e s + Fold.Partial s -> e s _ -> error "demuxGenericIO: unreachable code" final (Tuple' kv x) = return (Prelude.mapM f kv, x) @@ -458,7 +473,7 @@ demuxGenericIO getKey getFold = Scanl _ i _ fin <- liftIO $ readIORef ref r <- i case r of - Partial s -> fin s + Fold.Partial s -> fin s _ -> error "demuxGenericIO: unreachable code" {-# INLINE demuxUsingMapIO #-} @@ -580,7 +595,7 @@ classifyGeneric :: (Monad m, IsMap f, Traversable f, Ord (Key f)) => -- that the downstream consumers can choose to process or discard it. (a -> Key f) -> Scanl m a b -> Scanl m a (m (f b), Maybe (Key f, b)) classifyGeneric f (Scanl step1 initial1 extract1 final1) = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final + Scanl (\s a -> Partial <$> step s a) (Fold.Partial <$> initial) extract final where @@ -592,16 +607,19 @@ classifyGeneric f (Scanl step1 initial1 extract1 final1) = initFold kv set k a = do x <- initial1 case x of - Partial s -> do + Fold.Partial s -> do r <- step1 s a case r of Partial s1 -> do b <- extract1 s1 return $ Tuple3' (IsMap.mapInsert k s1 kv) set (Just (k, b)) + Continue s1 -> + return + $ Tuple3' (IsMap.mapInsert k s1 kv) set Nothing Done b -> return $ Tuple3' kv set (Just (k, b)) - Done b -> return (Tuple3' kv (Set.insert k set) (Just (k, b))) + Fold.Done b -> return (Tuple3' kv (Set.insert k set) (Just (k, b))) step (Tuple3' kv set _) a = do let k = f a @@ -616,6 +634,8 @@ classifyGeneric f (Scanl step1 initial1 extract1 final1) = Partial s1 -> do b <- extract1 s1 return $ Tuple3' (IsMap.mapInsert k s1 kv) set (Just (k,b)) + Continue s1 -> + return $ Tuple3' (IsMap.mapInsert k s1 kv) set Nothing Done b -> let kv1 = IsMap.mapDelete k kv in return $ Tuple3' kv1 (Set.insert k set) (Just (k, b)) @@ -665,7 +685,7 @@ classify getKey = fmap snd . classifyUsingMap getKey classifyGenericIO :: (MonadIO m, IsMap f, Traversable f, Ord (Key f)) => (a -> Key f) -> Scanl m a b -> Scanl m a (m (f b), Maybe (Key f, b)) classifyGenericIO f (Scanl step1 initial1 extract1 final1) = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final + Scanl (\s a -> Partial <$> step s a) (Fold.Partial <$> initial) extract final where @@ -675,7 +695,7 @@ classifyGenericIO f (Scanl step1 initial1 extract1 final1) = initFold kv set k a = do x <- initial1 case x of - Partial s -> do + Fold.Partial s -> do r <- step1 s a case r of Partial s1 -> do @@ -684,9 +704,14 @@ classifyGenericIO f (Scanl step1 initial1 extract1 final1) = return $ Tuple3' (IsMap.mapInsert k ref kv) set (Just (k, b)) + Continue s1 -> do + ref <- liftIO $ newIORef s1 + return + $ Tuple3' + (IsMap.mapInsert k ref kv) set Nothing Done b -> return $ Tuple3' kv set (Just (k, b)) - Done b -> return (Tuple3' kv (Set.insert k set) (Just (k, b))) + Fold.Done b -> return (Tuple3' kv (Set.insert k set) (Just (k, b))) step (Tuple3' kv set _) a = do let k = f a @@ -703,6 +728,9 @@ classifyGenericIO f (Scanl step1 initial1 extract1 final1) = liftIO $ writeIORef ref s1 b <- extract1 s1 return $ Tuple3' kv set (Just (k, b)) + Continue s1 -> do + liftIO $ writeIORef ref s1 + return $ Tuple3' kv set Nothing Done b -> let kv1 = IsMap.mapDelete k kv in return diff --git a/core/src/Streamly/Internal/Data/Scanl/Step.hs b/core/src/Streamly/Internal/Data/Scanl/Step.hs new file mode 100644 index 0000000000..6e530a1909 --- /dev/null +++ b/core/src/Streamly/Internal/Data/Scanl/Step.hs @@ -0,0 +1,106 @@ +-- | +-- Module : Streamly.Internal.Data.Scanl.Step +-- Copyright : (c) 2019 Composewell Technologies +-- License : BSD3 +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC +-- +module Streamly.Internal.Data.Scanl.Step + ( + -- * Step Type + Step (..) + + , mapMStep + , chainStepM + , fromFoldStep + , toFoldStep + ) +where + +import Data.Bifunctor (Bifunctor(..)) +import Fusion.Plugin.Types (Fuse(..)) + +import qualified Streamly.Internal.Data.Fold.Step as Fold + +------------------------------------------------------------------------------ +-- Step of a scan +------------------------------------------------------------------------------ + +-- | Represents the result of the @step@ of a 'Scanl'. +-- +{-# ANN type Step Fuse #-} +data Step s b + = Partial !s + -- ^ Returns the next state of the scan accumulator indicating a new + -- result, the scan driver can extract the value of the accumulator and + -- emit it in the output stream, and then it can call the scan step + -- function again. + | Continue !s + -- ^ Returns the next state of the scan, the result should not be emitted + -- in the output stream, the accumulator may or may not have advanced to + -- the next state. If the driver calls "extract" on the state it may get + -- the same old result, but extract will never fail regardless. This is + -- essentially a mechanism to filter the output of a scan. + | Done !b + -- ^ Returns the final result, scan stops and cannot be driven further. + +-- | 'first' maps over the scan state and 'second' maps over the scan result. +-- +instance Bifunctor Step where + {-# INLINE bimap #-} + bimap f _ (Partial a) = Partial (f a) + bimap f _ (Continue a) = Continue (f a) + bimap _ g (Done b) = Done (g b) + + {-# INLINE first #-} + first f (Partial a) = Partial (f a) + first f (Continue a) = Continue (f a) + first _ (Done x) = Done x + + {-# INLINE second #-} + second _ (Partial x) = Partial x + second _ (Continue x) = Continue x + second f (Done a) = Done (f a) + +-- | 'fmap' maps over 'Done'. +-- +-- @ +-- fmap = 'second' +-- @ +-- +instance Functor (Step s) where + {-# INLINE fmap #-} + fmap = second + +-- | Map a monadic function over the result @b@ in @Step s b@. +-- +-- /Internal/ +{-# INLINE mapMStep #-} +mapMStep :: Applicative m => (a -> m b) -> Step s a -> m (Step s b) +mapMStep f res = + case res of + Partial s -> pure $ Partial s + Continue s -> pure $ Continue s + Done b -> Done <$> f b + +-- | If 'Partial' then map the state, if 'Done' then call the next step. +{-# INLINE chainStepM #-} +chainStepM :: Applicative m => + (s1 -> m s2) -> (a -> m (Step s2 b)) -> Step s1 a -> m (Step s2 b) +chainStepM f _ (Partial s) = Partial <$> f s +chainStepM f _ (Continue s) = Continue <$> f s +chainStepM _ g (Done b) = g b + +-- | Convert a fold 'Fold.Step' into a scan 'Step'. +{-# INLINE fromFoldStep #-} +fromFoldStep :: Fold.Step s b -> Step s b +fromFoldStep (Fold.Partial s) = Partial s +fromFoldStep (Fold.Done b) = Done b + +-- | Convert a scan 'Step' into a fold 'Fold.Step'. +{-# INLINE toFoldStep #-} +toFoldStep :: Step s b -> Fold.Step s b +toFoldStep (Partial s) = Fold.Partial s +toFoldStep (Continue s) = Fold.Partial s +toFoldStep (Done b) = Fold.Done b diff --git a/core/src/Streamly/Internal/Data/Scanl/Type.hs b/core/src/Streamly/Internal/Data/Scanl/Type.hs index d53ef2ffa7..54b600ab6f 100644 --- a/core/src/Streamly/Internal/Data/Scanl/Type.hs +++ b/core/src/Streamly/Internal/Data/Scanl/Type.hs @@ -10,11 +10,14 @@ -- -- Scanl vs Pipe: -- --- A scanl is a simpler version of pipes. A scan always produces an output and --- may or may not consume an input. It can consume at most one input on one --- output. Whereas a pipe may consume input even without producing anything or --- it can consume multiple inputs on a single output. Scans are simpler --- abstractions to think about and easier for the compiler to optimize. +-- A scanl is a simpler version of pipes. A scan consumes at most one input to +-- produce at most one output. It usually produces an output on each input, but +-- using the 'Continue' step it may consume an input without producing an +-- output (e.g. 'filter'), so a scan's output stream can be shorter than the +-- input but cannot be empty. Whereas a pipe may consume input even without +-- producing anything or it can consume multiple inputs on a single output. +-- Scans are simpler abstractions to think about and easier for the compiler to +-- optimize. -- -- Returning a stream on "extract": -- @@ -72,7 +75,7 @@ -- module Streamly.Internal.Data.Scanl.Type ( - module Streamly.Internal.Data.Fold.Step + module Streamly.Internal.Data.Scanl.Step -- * Scanl Type , Scanl (..) @@ -209,13 +212,15 @@ import Streamly.Internal.Data.Refold.Type (Refold(..)) -- import Streamly.Internal.Data.Scan (Scan(..)) import Streamly.Internal.Data.Tuple.Strict (Tuple'(..)) +import qualified Streamly.Internal.Data.Fold.Step as Fold + --import qualified Streamly.Internal.Data.Stream.Step as Stream import qualified Streamly.Internal.Data.StreamK.Type as K import Prelude hiding (Foldable(..), concatMap, filter, map, take, const) -- Entire module is exported, do not import selectively -import Streamly.Internal.Data.Fold.Step +import Streamly.Internal.Data.Scanl.Step #include "DocTestDataScanl.hs" @@ -284,7 +289,11 @@ import Streamly.Internal.Data.Fold.Step -- data Scanl m a b = -- | @Scanl@ @step@ @initial@ @extract@ @final@ - forall s. Scanl (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b) (s -> m b) + forall s. Scanl + (s -> a -> m (Step s b)) -- step + (m (Fold.Step s b)) -- initial + (s -> m b) -- extract + (s -> m b) -- cleanup {- -- XXX Change the type to as follows. This takes care of the unfoldMany case @@ -317,7 +326,7 @@ rmapM f (Scanl step initial extract final) = where - initial1 = initial >>= mapMStep f + initial1 = initial >>= Fold.mapMStep f step1 s a = step s a >>= mapMStep f ------------------------------------------------------------------------------ @@ -335,7 +344,7 @@ scanl', mkScanl :: Monad m => (b -> a -> b) -> b -> Scanl m a b scanl' step initial = Scanl (\s a -> return $ Partial $ step s a) - (return (Partial initial)) + (return (Fold.Partial initial)) return return @@ -345,7 +354,7 @@ scanl' step initial = {-# INLINE scanlM' #-} scanlM', mkScanlM :: Monad m => (b -> a -> m b) -> m b -> Scanl m a b scanlM' step initial = - Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) return return + Scanl (\s a -> Partial <$> step s a) (Fold.Partial <$> initial) return return -- | Maps a function on the output of the scan (the type @b@). instance Functor m => Functor (Scanl m a) where @@ -488,7 +497,7 @@ mkScanrM g z = -- /Pre-release/ -- {-# INLINE scant' #-} -scant', mkScant :: Monad m => (s -> a -> Step s b) -> Step s b -> (s -> b) -> Scanl m a b +scant', mkScant :: Monad m => (s -> a -> Step s b) -> Fold.Step s b -> (s -> b) -> Scanl m a b scant' step initial extract = Scanl (\s a -> return $ step s a) @@ -506,7 +515,7 @@ scant' step initial extract = -- /Pre-release/ -- {-# INLINE scantM' #-} -scantM', mkScantM :: (s -> a -> m (Step s b)) -> m (Step s b) -> (s -> m b) -> Scanl m a b +scantM', mkScantM :: (s -> a -> m (Step s b)) -> m (Fold.Step s b) -> (s -> m b) -> Scanl m a b scantM' step initial extract = Scanl step initial extract extract ------------------------------------------------------------------------------ @@ -519,9 +528,13 @@ scantM' step initial extract = Scanl step initial extract extract -- | Make a scan from a consumer. -- -- /Internal/ -fromRefold :: Refold m c a b -> c -> Scanl m a b +fromRefold :: Functor m => Refold m c a b -> c -> Scanl m a b fromRefold (Refold step inject extract) c = - Scanl step (inject c) extract extract + Scanl + (\s a -> fmap fromFoldStep (step s a)) + (inject c) + extract + extract ------------------------------------------------------------------------------ -- Basic Scans @@ -552,7 +565,7 @@ functionM f = Scanl step initial return return where - initial = return $ Partial Nothing + initial = return $ Fold.Partial Nothing step _ x = f x <&> Partial @@ -745,7 +758,7 @@ fromPure b = Scanl undefined (pure $ Done b) pure pure -- {-# INLINE const #-} const :: Applicative m => b -> Scanl m a b -const b = Scanl (\s _ -> pure $ Partial s) (pure $ Partial b) pure pure +const b = Scanl (\s _ -> pure $ Partial s) (pure $ Fold.Partial b) pure pure {- -- | Make a scan that yields the result of the supplied effectful action @@ -765,7 +778,7 @@ fromEffect b = Scanl undefined (Done <$> b) pure pure -- {-# INLINE constM #-} constM :: Applicative m => m b -> Scanl m a b -constM b = Scanl (\s _ -> pure $ Partial s) (Partial <$> b) pure pure +constM b = Scanl (\s _ -> pure $ Partial s) (Fold.Partial <$> b) pure pure {- {-# ANN type SeqFoldState Fuse #-} @@ -1008,7 +1021,8 @@ teeWithFst f -- | @teeWith k f1 f2@ distributes its input to both @f1@ and @f2@ until any -- one of them terminates. The outputs of the two scans are combined using the --- function @k@. +-- function @k@. If on an input the output of any side is filtered, no output +-- is produced for that input. -- -- Definition: -- @@ -1034,23 +1048,41 @@ teeWith f where + initial = do + resL <- initialL + resR <- initialR + case resL of + Fold.Done bl -> + Fold.Done . f bl <$> + case resR of + Fold.Partial sr -> finalR sr + Fold.Done br -> return br + Fold.Partial sl -> + case resR of + Fold.Done br -> Fold.Done . (`f` br) <$> finalL sl + Fold.Partial sr -> return $ Fold.Partial $ Tuple' sl sr + {-# INLINE runBoth #-} runBoth actionL actionR = do resL <- actionL resR <- actionR case resL of - Partial sl -> do - case resR of - Partial sr -> return $ Partial $ Tuple' sl sr - Done br -> Done . (`f` br) <$> finalL sl - - Done bl -> do + Done bl -> Done . f bl <$> case resR of Partial sr -> finalR sr + Continue sr -> finalR sr Done br -> return br - - initial = runBoth initialL initialR + Partial sl -> + case resR of + Done br -> Done . (`f` br) <$> finalL sl + Partial sr -> return $ Partial $ Tuple' sl sr + Continue sr -> return $ Continue $ Tuple' sl sr + Continue sl -> + case resR of + Done br -> Done . (`f` br) <$> finalL sl + Partial sr -> return $ Continue $ Tuple' sl sr + Continue sr -> return $ Continue $ Tuple' sl sr step (Tuple' sL sR) a = runBoth (stepL sL a) (stepR sR a) @@ -1277,8 +1309,6 @@ lmapM f (Scanl step begin done final) = Scanl step' begin done final -- | Postscan the input of a 'Scanl' to change it in a stateful manner using -- another 'Scanl'. -- --- This is basically an append operation. --- -- /Pre-release/ {-# INLINE postscanl #-} postscanl :: Monad m => Scanl m a b -> Scanl m b c -> Scanl m a c @@ -1297,23 +1327,26 @@ postscanl rR <- stepR sR bL case rR of Partial sR1 -> Done <$> finalR sR1 + Continue sR1 -> Done <$> finalR sR1 Done bR -> return $ Done bR Partial sL -> do !b <- extractL sL rR <- stepR sR b case rR of Partial sR1 -> return $ Partial (sL, sR1) + Continue sR1 -> return $ Continue (sL, sR1) Done bR -> finalL sL >> return (Done bR) + Continue sL -> return $ Continue (sL, sR) initial = do rR <- initialR case rR of - Partial sR -> do + Fold.Partial sR -> do rL <- initialL case rL of - Done _ -> Done <$> finalR sR - Partial sL -> return $ Partial (sL, sR) - Done b -> return $ Done b + Fold.Done _ -> Fold.Done <$> finalR sR + Fold.Partial sL -> return $ Fold.Partial (sL, sR) + Fold.Done b -> return $ Fold.Done b -- XXX should use Tuple' step (sL, sR) x = runStep (stepL sL x) sR @@ -1341,7 +1374,7 @@ catMaybes (Scanl step initial extract final) = step1 s a = case a of - Nothing -> return $ Partial s + Nothing -> return $ Continue s Just x -> step s x -- | Scan using a 'Maybe' returning scan, filter out 'Nothing' values. @@ -1363,10 +1396,11 @@ filtering f = scanl' step Nothing step _ a = if f a then Just a else Nothing --- | Include only those elements that pass a predicate. +-- | Process input and emit scan output only for those input elements that pass +-- a predicate. -- -- >>> Stream.toList $ Stream.scanl (Scanl.filter (> 5) Scanl.sum) $ Stream.fromList [1..10] --- [0,0,0,0,0,0,6,13,21,30,40] +-- [0,6,13,21,30,40] -- -- >>> filter p = Scanl.postscanlMaybe (Scanl.filtering p) -- >>> filter p = Scanl.filterM (return . p) @@ -1377,7 +1411,7 @@ filter :: Monad m => (a -> Bool) -> Scanl m a r -> Scanl m a r -- filter p = postscanlMaybe (filtering p) filter f (Scanl step begin extract final) = Scanl step' begin extract final where - step' x a = if f a then step x a else return $ Partial x + step' x a = if f a then step x a else return $ Continue x -- | Like 'filter' but with a monadic predicate. -- @@ -1390,7 +1424,7 @@ filterM f (Scanl step begin extract final) = Scanl step' begin extract final where step' x a = do use <- f a - if use then step x a else return $ Partial x + if use then step x a else return $ Continue x ------------------------------------------------------------------------------ -- Either streams @@ -1441,8 +1475,8 @@ taking n = scant' step initial extract initial = if n <= 0 - then Done Nothing - else Partial (Tuple'Fused n Nothing) + then Fold.Done Nothing + else Fold.Partial (Tuple'Fused n Nothing) step (Tuple'Fused i _) a = if i > 1 @@ -1457,7 +1491,7 @@ dropping n = scant' step initial extract where - initial = Partial (Tuple'Fused n Nothing) + initial = Fold.Partial (Tuple'Fused n Nothing) step (Tuple'Fused i _) a = if i > 0 @@ -1488,9 +1522,18 @@ take n (Scanl fstep finitial fextract ffinal) = Scanl step initial extract final if i1 < n then return $ Partial s1 else Done <$> ffinal s + Continue s -> + return $ Continue (Tuple'Fused i s) Done b -> return $ Done b - initial = finitial >>= next (-1) + initial = do + res <- finitial + case res of + Fold.Partial s -> + if n > (0 :: Int) + then return $ Fold.Partial (Tuple'Fused 0 s) + else Fold.Done <$> ffinal s + Fold.Done b -> return $ Fold.Done b step (Tuple'Fused i r) a = fstep r a >>= next i @@ -1554,6 +1597,7 @@ takeEndBy predicate (Scanl fstep finitial fextract ffinal) = else do case res of Partial s1 -> Done <$> ffinal s1 + Continue s1 -> Done <$> ffinal s1 Done b -> return $ Done b ------------------------------------------------------------------------------ diff --git a/core/src/Streamly/Internal/Data/Scanl/Window.hs b/core/src/Streamly/Internal/Data/Scanl/Window.hs index 2d0e28a37e..d45cfa98d2 100644 --- a/core/src/Streamly/Internal/Data/Scanl/Window.hs +++ b/core/src/Streamly/Internal/Data/Scanl/Window.hs @@ -78,6 +78,7 @@ import Streamly.Internal.Data.Tuple.Strict (Tuple'(..), Tuple3Fused' (Tuple3Fused')) import Streamly.Internal.Data.Unbox (Unbox(..)) +import qualified Streamly.Internal.Data.Fold.Step as Fold import qualified Streamly.Internal.Data.MutArray.Type as MutArray import qualified Streamly.Internal.Data.RingArray as RingArray import qualified Streamly.Internal.Data.Scanl.Type as Scanl @@ -125,7 +126,8 @@ data SlidingWindow a r s = SWArray !a !Int !s | SWRing !r !s -- data SlidingWindow a s = SWArray !a !Int !s !Int | SWRing !a !Int !s -- | Like 'incrScan' but also provides the ring array to the scan. The ring --- array reflects the state of the ring after inserting the incoming element. +-- array reflects the state of the window /before/ the incoming element is +-- inserted. -- -- IMPORTANT NOTE: The ring is mutable, therefore, references to it should not -- be stored and used later, the state would have changed by then. If you need @@ -146,28 +148,31 @@ incrScanWith n (Scanl step1 initial1 extract1 final1) = arr <- liftIO $ MutArray.emptyOf n return $ case r of - Partial s -> Partial $ SWArray arr (0 :: Int) s - Done b -> Done b + Fold.Partial s -> Fold.Partial $ SWArray arr (0 :: Int) s + Fold.Done b -> Fold.Done b step (SWArray arr i st) a = do - -- XXX compare this with the slidingWindow impl - arr1 <- liftIO $ MutArray.unsafeSnoc arr a - r <- step1 st (Insert a, RingArray.unsafeCastMutArray arr1) - return $ case r of - Partial s -> + r <- step1 st (Insert a, RingArray.unsafeCastMutArray arr) + case r of + Partial s -> do + arr1 <- liftIO $ MutArray.unsafeSnoc arr a let i1 = i + 1 - in if i1 < n - then Partial $ SWArray arr1 i1 s - else Partial $ SWRing (RingArray.unsafeCastMutArray arr1) s - Done b -> Done b + return + $ if i1 < n + then Partial $ SWArray arr1 i1 s + else Partial $ SWRing (RingArray.unsafeCastMutArray arr1) s + Continue s -> return $ Continue $ SWArray arr i s + Done b -> return $ Done b step (SWRing rb st) a = do - (rb1, old) <- RingArray.replace rb a - r <- step1 st (Replace old a, rb1) - return $ - case r of - Partial s -> Partial $ SWRing rb1 s - Done b -> Done b + old <- RingArray.unsafeGetIndex 0 rb + r <- step1 st (Replace old a, rb) + case r of + Partial s -> do + (rb1, _) <- RingArray.replace rb a + return $ Partial $ SWRing rb1 s + Continue s -> return $ Continue $ SWRing rb s + Done b -> return $ Done b extract (SWArray _ _ st) = extract1 st extract (SWRing _ st) = extract1 st @@ -291,7 +296,7 @@ incrSumInt = Scanl step initial extract extract where - initial = return $ Partial (0 :: a) + initial = return $ Fold.Partial (0 :: a) step s (Insert a) = return $ Partial (s + a) -- step s (Delete a) = return $ Partial (s - a) @@ -324,7 +329,7 @@ incrSum = Scanl step initial extract extract initial = return - $ Partial + $ Fold.Partial $ Tuple' (0 :: a) -- running sum (0 :: a) -- accumulated rounding error @@ -447,7 +452,7 @@ windowRange n = Scanl step initial extract extract then error "ringsOf: window size must be > 0" else do arr :: MutArray.MutArray a <- liftIO $ MutArray.emptyOf n - return $ Partial $ Tuple3Fused' (MutArray.arrContents arr) 0 0 + return $ Fold.Partial $ Tuple3Fused' (MutArray.arrContents arr) 0 0 step (Tuple3Fused' mba rh i) a = do RingArray _ _ rh1 <- RingArray.replace_ (RingArray mba (n * SIZE_OF(a)) rh) a diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index d465fee0f6..b1489231b2 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -233,6 +233,7 @@ import Streamly.Internal.System.IO (defaultChunkSize) -- import qualified Data.List as List import qualified Streamly.Internal.Data.Array.Type as A import qualified Streamly.Internal.Data.Fold.Type as FL +import qualified Streamly.Internal.Data.Scanl.Type as Scanl import qualified Streamly.Internal.Data.Pipe.Type as Pipe import qualified Streamly.Internal.Data.StreamK.Type as K import qualified Streamly.Internal.Data.Producer as Producer @@ -594,10 +595,11 @@ postscanl (Scanl fstep initial extract final) (Stream sstep state) = Yield x s -> do r <- fstep fs x case r of - FL.Partial fs1 -> do + Scanl.Partial fs1 -> do !b <- extract fs1 return $ Yield b $ ScanDo s fs1 - FL.Done b -> return $ Yield b ScanDone + Scanl.Continue fs1 -> return $ Skip $ ScanDo s fs1 + Scanl.Done b -> return $ Yield b ScanDone Skip s -> return $ Skip $ ScanDo s fs Stop -> final fs >> return Stop step _ ScanDone = return Stop @@ -605,8 +607,7 @@ postscanl (Scanl fstep initial extract final) (Stream sstep state) = {-# DEPRECATED postscan "Please use postscanl instead" #-} {-# INLINE_NORMAL postscan #-} postscan :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b -postscan (FL.Fold fstep initial extract final) = - postscanl (Scanl fstep initial extract final) +postscan = postscanl . FL.fromFold {-# INLINE scanlWith #-} scanlWith :: Monad m @@ -620,15 +621,24 @@ scanlWith restart (Scanl fstep initial extract final) (Stream sstep state) = runStep st action = do res <- action case res of - FL.Partial fs -> do + Scanl.Partial fs -> do !b <- extract fs return $ Yield b $ ScanDo st fs - FL.Done b -> + Scanl.Continue fs -> return $ Skip $ ScanDo st fs + Scanl.Done b -> let next = if restart then ScanInit st else ScanDone in return $ Yield b next {-# INLINE_LATE step #-} - step _ (ScanInit st) = runStep st initial + step _ (ScanInit st) = do + res <- initial + case res of + FL.Partial fs -> do + !b <- extract fs + return $ Yield b $ ScanDo st fs + FL.Done b -> + let next = if restart then ScanInit st else ScanDone + in return $ Yield b next step gst (ScanDo st fs) = do res <- sstep (adaptState gst) st case res of @@ -641,8 +651,7 @@ scanlWith restart (Scanl fstep initial extract final) (Stream sstep state) = {-# INLINE scanWith #-} scanWith :: Monad m => Bool -> Fold m a b -> Stream m a -> Stream m b -scanWith restart (Fold fstep initial extract final) = - scanlWith restart (Scanl fstep initial extract final) +scanWith restart = scanlWith restart . FL.fromFold -- XXX It may be useful to have a version of scan where we can keep the -- accumulator independent of the value emitted. So that we do not necessarily diff --git a/core/src/Streamly/Internal/Data/Unfold.hs b/core/src/Streamly/Internal/Data/Unfold.hs index 2ebcadc562..8621bac187 100644 --- a/core/src/Streamly/Internal/Data/Unfold.hs +++ b/core/src/Streamly/Internal/Data/Unfold.hs @@ -353,10 +353,11 @@ postscanl (Scanl stepF initial extract final) (Unfold stepU injectU) = Yield x s -> do rf <- stepF fs x case rf of - FL.Done v -> return $ Yield v Nothing - FL.Partial fs1 -> do + Scanl.Done v -> return $ Yield v Nothing + Scanl.Partial fs1 -> do v <- extract fs1 return $ Yield v (Just (fs1, s)) + Scanl.Continue fs1 -> return $ Skip (Just (fs1, s)) Skip s -> return $ Skip (Just (fs, s)) Stop -> final fs >> return Stop @@ -366,7 +367,7 @@ postscanl (Scanl stepF initial extract final) (Unfold stepU injectU) = {-# DEPRECATED postscan "Please use postscanl instead" #-} {-# INLINE_NORMAL postscan #-} postscan :: Monad m => Fold m b c -> Unfold m a b -> Unfold m a c -postscan (Fold s i e f) = postscanl (Scanl s i e f) +postscan = postscanl . FL.fromFold data ScanState s f = ScanInit s | ScanDo s !f | ScanDone @@ -383,15 +384,24 @@ scanWith restart (Scanl fstep initial extract final) (Unfold stepU injectU) = runStep us action = do r <- action case r of - FL.Partial fs -> do + Scanl.Partial fs -> do !b <- extract fs return $ Yield b (ScanDo us fs) - FL.Done b -> + Scanl.Continue fs -> return $ Skip (ScanDo us fs) + Scanl.Done b -> let next = if restart then ScanInit us else ScanDone in return $ Yield b next {-# INLINE_LATE step #-} - step (ScanInit us) = runStep us initial + step (ScanInit us) = do + r <- initial + case r of + FL.Partial fs -> do + !b <- extract fs + return $ Yield b (ScanDo us fs) + FL.Done b -> + let next = if restart then ScanInit us else ScanDone + in return $ Yield b next step (ScanDo us fs) = do res <- stepU us case res of @@ -416,7 +426,7 @@ scanlMany = scanWith True {-# DEPRECATED scanMany "Please use scanlMany instead" #-} {-# INLINE_NORMAL scanMany #-} scanMany :: Monad m => Fold m b c -> Unfold m a b -> Unfold m a c -scanMany (Fold s i e f) = scanWith True (Scanl s i e f) +scanMany = scanWith True . FL.fromFold -- scan2 :: Monad m => Refold m a b c -> Unfold m a b -> Unfold m a c @@ -436,7 +446,7 @@ scanl = scanWith False {-# DEPRECATED scan "Please use scanl instead" #-} {-# INLINE_NORMAL scan #-} scan :: Monad m => Fold m b c -> Unfold m a b -> Unfold m a c -scan (Fold s i e f) = scanWith False (Scanl s i e f) +scan = scanWith False . FL.fromFold {-# DEPRECATED postscanlM' "Please use \"postscanl (Scanl.scanlM' f z)\" instead" #-} {-# INLINE_NORMAL postscanlM' #-} diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index 7f803e5558..40828f85c2 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -497,6 +497,7 @@ library , Streamly.Internal.Data.Fold.Tee , Streamly.Internal.Data.Fold.Window + , Streamly.Internal.Data.Scanl.Step , Streamly.Internal.Data.Scanl.Type , Streamly.Internal.Data.Scanl.Window , Streamly.Internal.Data.Scanl.Combinators diff --git a/src/Streamly/Internal/Data/Fold/Channel/Type.hs b/src/Streamly/Internal/Data/Fold/Channel/Type.hs index 19d1dea2ba..9c9d62b406 100644 --- a/src/Streamly/Internal/Data/Fold/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Fold/Channel/Type.hs @@ -54,6 +54,7 @@ import Streamly.Internal.Data.Channel.Worker (sendEvent) import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime) import qualified Streamly.Internal.Data.Fold as Fold +import qualified Streamly.Internal.Data.Scanl as Scanl import qualified Streamly.Internal.Data.Stream as D import Streamly.Internal.Data.Channel.Types @@ -64,11 +65,20 @@ import Streamly.Internal.Data.Channel.Types -- queue the accumulator and it will be picked by the next worker to accumulate -- the next value. +-- XXX We can separate the Fold and Scan events. + +-- | Events used by folds and scans both. data OutEvent b = FoldException ThreadId SomeException | FoldPartial b + | FoldContinue + -- ^ scan filtered output, never occurs for folds. | FoldDone ThreadId b + -- ^ the fold or scan terminated on its own. Also, sent by folds when their + -- input stream ends. | FoldEOF ThreadId + -- ^ scan got finalized after the input was over. See 'sendEOFToDriver' for + -- details. This can never occur for folds. -- | The fold driver thread queues the input of the fold in the 'inputQueue' -- The driver rings the doorbell when the queue transitions from empty to @@ -253,8 +263,8 @@ dumpChannel sv = do -- $concurrentFolds -- -- To run folds concurrently, we need to decouple the fold execution from the --- stream production. We use the SVar to do that, we have a single worker --- pushing the stream elements to the SVar and on the consumer side a fold +-- stream production. We use a Channel to do that, we have a single worker +-- pushing the stream elements to the Channel and on the consumer side a fold -- driver pulls the values and folds them. -- -- @ @@ -295,6 +305,7 @@ sendToDriver sv msg = do sendEvent (outputQueue sv) (outputDoorBell sv) msg +-- XXX should be called sendDoneToDriver sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m () sendYieldToDriver sv res = liftIO $ do tid <- myThreadId @@ -304,6 +315,20 @@ sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m () sendPartialToDriver sv res = liftIO $ do void $ sendToDriver sv (FoldPartial res) +sendContinueToDriver :: MonadIO m => Channel m a b -> m () +sendContinueToDriver sv = liftIO $ do + void $ sendToDriver sv FoldContinue + +-- | Folds never send an ouput value until the very end and they always have a +-- final value to send when their input stream terminates and the fold is +-- finalized. +-- +-- However, in case of scans we may have sent the last output already on the +-- last input or if that got filtered then even before that, when the input +-- stream stops we need to call the scan finalizer and there is nothing to +-- send, so when the finalizer is done we send a FoldEOF event to tell the +-- driver we are done now and it cannot expect any more output. +-- sendEOFToDriver :: MonadIO m => Channel m a b -> m () sendEOFToDriver sv = liftIO $ do tid <- myThreadId @@ -344,7 +369,8 @@ fromInputQueue svar = D.Stream step (FromSVarRead svar) -- event. ChildYield a -> return $ D.Yield a (FromSVarLoop sv es) ChildStopChannel -> return D.Stop - _ -> undefined + ChildStop _ _ -> + error "Bug: fromInputQueue: unexpected ChildStop event" {-# INLINE readInputQChan #-} readInputQChan :: Channel m a b -> IO ([ChildEvent a], Int) @@ -455,11 +481,19 @@ scanToChannel chan (Scanl step initial extract final) = step1 st x = do r <- step st x case r of - Fold.Partial s -> do + Scanl.Partial s -> do b <- extract s void $ sendPartialToDriver chan b return $ Fold.Partial s - Fold.Done b -> do + -- XXX Continue is not normally required to be sent. But drivers + -- like parTeeWith decide whether to zip the outputs or skip based + -- on whether any of the scans returned Continue. We can possibly + -- use an option to send it or not. parTeeWith can opt-in and + -- others can opt-out. + Scanl.Continue s -> do + void $ sendContinueToDriver chan + return $ Fold.Partial s + Scanl.Done b -> do sendYieldToDriver chan b return $ Fold.Done True @@ -559,6 +593,8 @@ checkFoldStatus sv = do FoldDone _ b -> return (Just b) FoldPartial _ -> error "checkFoldStatus: FoldPartial can occur only for scans" + FoldContinue -> + error "checkFoldStatus: FoldContinue can occur only for scans" FoldEOF _ -> error "checkFoldStatus: FoldEOF can occur only for scans" diff --git a/src/Streamly/Internal/Data/Fold/Concurrent.hs b/src/Streamly/Internal/Data/Fold/Concurrent.hs index 337c2bea2f..6bf21d0960 100644 --- a/src/Streamly/Internal/Data/Fold/Concurrent.hs +++ b/src/Streamly/Internal/Data/Fold/Concurrent.hs @@ -387,7 +387,9 @@ parDistributeScan cfg getFolds (Stream sstep state) = let ch = filter (\(_, t) -> t /= tid) chans in processOutputs ch xs (b:done) FoldPartial _ -> - error "parDistributeScan: cannot occur for folds" + error "parDistributeScan: FoldPartial cannot occur for folds" + FoldContinue -> + error "parDistributeScan: FoldContinue cannot occur for folds" FoldEOF _ -> error "parDistributeScan: FoldEOF cannot occur for folds" @@ -516,6 +518,8 @@ parDemuxScan cfg getKey getFold (Stream sstep state) = in processOutputs ch xs (o:done) FoldPartial _ -> error "parDemuxScan: cannot occur for folds" + FoldContinue -> + error "parDemuxScan: FoldContinue cannot occur for folds" FoldEOF _ -> error "parDemuxScan: FoldEOF cannot occur for folds" diff --git a/src/Streamly/Internal/Data/Scanl/Concurrent.hs b/src/Streamly/Internal/Data/Scanl/Concurrent.hs index fcf9b925d8..3b225e20c4 100644 --- a/src/Streamly/Internal/Data/Scanl/Concurrent.hs +++ b/src/Streamly/Internal/Data/Scanl/Concurrent.hs @@ -26,13 +26,12 @@ import Data.IORef (newIORef, readIORef, atomicModifyIORef) import Fusion.Plugin.Types (Fuse(..)) import Streamly.Internal.Control.Concurrent (MonadAsync) import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS) -import Streamly.Internal.Data.Fold (Step (..)) -import Streamly.Internal.Data.Scanl (Scanl(..)) +import Streamly.Internal.Data.Scanl (Scanl(..), Step (..)) import Streamly.Internal.Data.Stream (Stream(..), Step(..)) import Streamly.Internal.Data.SVar.Type (adaptState) import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..)) - import qualified Data.Map.Strict as Map +import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Stream as Stream import Streamly.Internal.Data.Fold.Channel.Type @@ -44,6 +43,8 @@ import Streamly.Internal.Data.Channel.Types -- Concurrent scans ------------------------------------------------------------------------------- +data Response b = RespDone b | RespPartial b | RespContinue + -- | Execute both the scans in a tee concurrently. -- -- Example: @@ -67,6 +68,7 @@ parTeeWith cfg f c1 c2 = Scanl step initial extract final where + -- Get response from ch1; ch2 is passed only for cleanup on exception. getResponse ch1 ch2 = do -- NOTE: We do not need a queue and doorbell mechanism for this, a single -- MVar should be enough. Also, there is only one writer and it writes @@ -86,42 +88,65 @@ parTeeWith cfg f c1 c2 = Scanl step initial extract final cleanup ch1 cleanup ch2 liftIO $ throwM ex - FoldDone _tid b -> return (Left b) - FoldPartial b -> return (Right b) + FoldDone _tid b -> return (RespDone b) + FoldPartial b -> return (RespPartial b) + FoldContinue -> return RespContinue FoldEOF _ -> error "parTeeWith: FoldEOF cannot occur here" _ -> error "parTeeWith: not expecting more than one msg in q" - processResponses ch1 ch2 r1 r2 = - return $ case r1 of - Left b1 -> do - case r2 of - Left b2 -> Done (f b1 b2) - Right b2 -> Done (f b1 b2) - Right b1 -> do - case r2 of - Left b2 -> Done (f b1 b2) - Right b2 -> Partial $ Tuple3' ch1 ch2 (f b1 b2) + -- Zip only when both channels produce output on the same input. + {-# INLINE processResponses #-} + processResponses ch1 ch2 prev r1 r2 = + return $ + case r1 of + RespPartial b1 -> + case r2 of + RespPartial b2 -> Partial $ Tuple3' ch1 ch2 (f b1 b2) + RespContinue -> Continue $ Tuple3' ch1 ch2 prev + RespDone b2 -> Done (f b1 b2) + RespContinue -> + case r2 of + RespPartial _ -> Continue $ Tuple3' ch1 ch2 prev + RespContinue -> Continue $ Tuple3' ch1 ch2 prev + RespDone _ -> Done prev + RespDone b1 -> + case r2 of + RespDone b2 -> Done (f b1 b2) + RespPartial b2 -> Done (f b1 b2) + RespContinue -> Done prev initial = do ch1 <- newScanChannel cfg c1 ch2 <- newScanChannel cfg c2 r1 <- getResponse ch1 ch2 r2 <- getResponse ch2 ch1 - processResponses ch1 ch2 r1 r2 - - step (Tuple3' ch1 ch2 _) x = do + return $ + case r1 of + RespPartial b1 -> + case r2 of + RespPartial b2 -> Fold.Partial $ Tuple3' ch1 ch2 (f b1 b2) + RespDone b2 -> Fold.Done (f b1 b2) + _ -> error "parTeeWith initial: unexpected Continue response" + + RespDone b1 -> + case r2 of + RespDone b2 -> Fold.Done (f b1 b2) + RespPartial b2 -> Fold.Done (f b1 b2) + _ -> error "parTeeWith initial: unexpected Continue response" + _ -> error "parTeeWith initial: unexpected Continue response" + + step (Tuple3' ch1 ch2 prev) x = do sendToWorker_ ch1 x sendToWorker_ ch2 x r1 <- getResponse ch1 ch2 r2 <- getResponse ch2 ch1 - processResponses ch1 ch2 r1 r2 + processResponses ch1 ch2 prev r1 r2 extract (Tuple3' _ _ x) = return x final (Tuple3' ch1 ch2 x) = do finalize ch1 finalize ch2 - -- XXX generate the final value? return x -- There are two ways to implement a concurrent scan. @@ -201,6 +226,8 @@ parDistributeScanM cfg getFolds (Stream sstep state) = in processOutputs ch xs done FoldPartial b -> processOutputs chans xs (b:done) + FoldContinue -> + processOutputs chans xs done collectOutputs qref chans = do (_, n) <- liftIO $ readIORef qref @@ -350,6 +377,8 @@ parDemuxScanM cfg getKey getFold (Stream sstep state) = in processOutputs (Map.fromList ch) xs done FoldPartial b -> processOutputs keyToChan xs (b:done) + FoldContinue -> + processOutputs keyToChan xs done collectOutputs qref keyToChan = do (_, n) <- liftIO $ readIORef qref diff --git a/test/Streamly/Test/Data/Fold/Combinators.hs b/test/Streamly/Test/Data/Fold/Combinators.hs index 8527179aa9..ba7e112d48 100644 --- a/test/Streamly/Test/Data/Fold/Combinators.hs +++ b/test/Streamly/Test/Data/Fold/Combinators.hs @@ -26,7 +26,8 @@ import Prelude hiding , maybe ) import Streamly.Test.Common (chooseInt, withNumTests) -import Streamly.Test.Data.Fold.Type (check, checkApprox, checkPostscanl) +import Streamly.Test.Data.Fold.Type + (check, checkApprox, checkPostscanl, checkNoLaw) import Test.Hspec import Test.Hspec.QuickCheck import Test.QuickCheck diff --git a/test/Streamly/Test/Data/Fold/Type.hs b/test/Streamly/Test/Data/Fold/Type.hs index 632dde24b5..695b7aea1c 100644 --- a/test/Streamly/Test/Data/Fold/Type.hs +++ b/test/Streamly/Test/Data/Fold/Type.hs @@ -8,9 +8,11 @@ -- Portability : GHC module Streamly.Test.Data.Fold.Type - (main, check, checkApprox, checkPostscanl) where + (main, check, checkApprox, checkPostscanl, checkNoLaw) where +import Control.Exception (SomeException, evaluate, try) import Data.Functor.Identity (Identity(..), runIdentity) +import Data.IORef (newIORef, readIORef, modifyIORef') import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Fold as F import qualified Streamly.Internal.Data.Scanl as Scanl @@ -34,9 +36,54 @@ import Test.QuickCheck.Monadic (monadicIO, assert, run) -- element. type Op = F.Fold -check :: (Eq b, Show b) => Op IO a b -> [a] -> [b] -> Expectation -check cons xs expected = +check :: (Eq b, Show b, Show a) => Op IO a b -> [a] -> [b] -> Expectation +check cons xs expected = do Stream.fold cons (Stream.fromList xs) `shouldReturn` Prelude.last expected + filterLaw cons xs + +-- | Same as 'check' but does NOT apply the filter law. Use this only for the +-- few tests that pass bottom (e.g. 'error') input elements to verify early +-- termination -- the law would force those elements. +checkNoLaw :: (Eq b, Show b) => Op IO a b -> [a] -> [b] -> Expectation +checkNoLaw cons xs expected = + Stream.fold cons (Stream.fromList xs) `shouldReturn` Prelude.last expected + +-- | The filter law (an independent, black-box oracle): wrapping a fold in +-- 'Fold.filter' must produce the same result as folding the pre-filtered input: +-- +-- fold (Fold.filter p s) xs === fold s (filter p xs) +-- +-- This holds for EVERY fold because both sides feed the fold the identical +-- accepted subsequence. (A 'Fold' has no 'Continue'; 'Fold.filter' keeps the +-- accumulator unchanged for rejected inputs, which is the Fold counterpart of a +-- scan's 'Continue'.) Folded into 'check'/'checkPostscanl' so it applies to +-- every shared and Fold-specific test. +filterLawPred :: Show a => a -> Bool +filterLawPred x = even (Prelude.length (Prelude.show x)) + +-- Some scans driven as folds (the postscan-only ones, e.g. 'rollingMap', +-- 'uniq') are partial: their 'extract' on the initial state errors ("Empty +-- stream"). The filter law can reduce the input to empty, so we compare the two +-- sides exception-tolerantly: both sides throwing means the law still holds +-- (they fail identically); only one side throwing is a genuine violation. +-- | Run an action, returning 'Left' if it (or its result) throws. Fixes the +-- exception type so the law below needs no inline type annotations. +tryEval :: IO c -> IO (Either SomeException c) +tryEval act = try (act >>= evaluate) + +filterLaw :: (Eq b, Show b, Show a) => Op IO a b -> [a] -> Expectation +filterLaw cons xs = do + lhs <- tryEval + (Stream.fold (F.filter filterLawPred cons) (Stream.fromList xs)) + rhs <- tryEval + (Stream.fold cons (Stream.fromList (Prelude.filter filterLawPred xs))) + case (lhs, rhs) of + (Right l, Right r) -> l `shouldBe` r + (Left _, Left _) -> return () + _ -> + expectationFailure + $ "filter law violated (one side failed): " + ++ show lhs ++ " vs " ++ show rhs -- | Epsilon-equality counterpart of 'check' for Fractional results whose -- floating-point output is only approximately equal to the reference (e.g. @@ -50,9 +97,10 @@ checkApprox cons xs expected = do -- | Postscan-only counterpart of 'check' (for combinators whose scanl initial -- is undefined). The fold result equals the last postscanl output. -checkPostscanl :: (Eq b, Show b) => Op IO a b -> [a] -> [b] -> Expectation -checkPostscanl cons xs expected = +checkPostscanl :: (Eq b, Show b, Show a) => Op IO a b -> [a] -> [b] -> Expectation +checkPostscanl cons xs expected = do Stream.fold cons (Stream.fromList xs) `shouldReturn` Prelude.last expected + filterLaw cons xs #include "Streamly/Test/Data/Scanl/CommonType.hs" @@ -142,6 +190,54 @@ fromScanl ls = Stream.fold (Fold.fromScanl (Scanl.scanl' (+) 0)) (Stream.fromList ls) `shouldReturn` Prelude.sum ls +-- | A scanner that emits its input only when it is even, returning 'Continue' +-- (no output) for odd inputs. Per the 'Scanl' contract, the driver must not +-- call @extract@ on a 'Continue' step (extract is effectful and reserved for +-- output steps) and must not feed any value to the collector. We track every +-- @extract@ call to assert it happens only on emitting steps. +-- +-- This catches a bug where @postscanl@ extracted and emitted on 'Continue', +-- which both leaked filtered elements and called @extract@ spuriously. +postscanlFilter :: Expectation +postscanlFilter = do + ref <- newIORef [] + let scanner = Scanl.Scanl step initial extract final + + initial = return (Fold.Partial (0 :: Int)) + step s a = + return $ if even a then Scanl.Partial a else Scanl.Continue s + extract s = modifyIORef' ref (s :) >> return s + final = return + + result <- + Stream.fold + (Fold.postscanl scanner Fold.toList) + (Stream.fromList [1 .. 6 :: Int]) + extractCalls <- Prelude.reverse <$> readIORef ref + + -- Only the even inputs are emitted to the collector. + result `shouldBe` [2, 4, 6] + -- 'extract' is called only on the emitting (even) steps, never on 'Continue'. + extractCalls `shouldBe` [2, 4, 6] + +-- | The Continue filter law for Fold compositions: wrapping the scanner in +-- 'Scanl.filter' (which emits Continue for rejected inputs) must give the same +-- final fold result as running the fold on the pre-filtered input: +-- +-- fold (f (Scanl.filter p scanner) collector) xs +-- === fold (f scanner collector) (filter p xs) +-- +-- Parameterised by a context 'ctx' that combines a scanner with a collector. +foldFilterLaw :: + Eq c + => (Scanl.Scanl IO Int Int -> F.Fold IO Int c) + -> [Int] + -> Property +foldFilterLaw ctx xs = monadicIO $ do + v1 <- run $ Stream.fold (ctx (Scanl.filter even Scanl.sum)) (Stream.fromList xs) + v2 <- run $ Stream.fold (ctx Scanl.sum) (Stream.fromList (Prelude.filter even xs)) + assert (v1 == v2) + toStreamK :: [Int] -> Expectation toStreamK ls = do sk <- Stream.fold Fold.toStreamK (Stream.fromList ls) @@ -566,6 +662,10 @@ main = hspec $ do it "fromPure" fromPure it "fromEffect" fromEffect prop "fromScanl" fromScanl + it "postscanl filter" postscanlFilter + prop "filter law: Fold.postscanl" $ foldFilterLaw (\h -> Fold.postscanl h Fold.toList) + prop "filter law: Fold.scanl" $ foldFilterLaw (\h -> Fold.scanl h Fold.toList) + prop "filter law: Fold.scanlMany" $ foldFilterLaw (\h -> Fold.scanlMany h Fold.toList) prop "toStreamK" toStreamK prop "toStreamKRev" toStreamKRev prop "last" last diff --git a/test/Streamly/Test/Data/Scanl/Combinators.hs b/test/Streamly/Test/Data/Scanl/Combinators.hs index fe7954b588..7201a6d543 100644 --- a/test/Streamly/Test/Data/Scanl/Combinators.hs +++ b/test/Streamly/Test/Data/Scanl/Combinators.hs @@ -10,18 +10,21 @@ module Streamly.Test.Data.Scanl.Combinators (main) where import Data.Int (Int64) +import Data.IORef (modifyIORef', newIORef, readIORef) import Data.Semigroup (Sum(..)) import qualified Streamly.Internal.Data.MutArray as MArray import qualified Streamly.Internal.Data.Pipe as Pipe import qualified Streamly.Internal.Data.Scanl as F +import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Stream as Stream -import qualified Streamly.Internal.Data.Unfold as Unfold import qualified Prelude import Prelude hiding (maximum, minimum, product, sum, mconcat, foldMap, maybe) import Streamly.Test.Common (withNumTests) -import Streamly.Test.Data.Scanl.Type (check, checkApprox, checkPostscanl) +import Streamly.Test.Data.Scanl.Common (evenScanl, filterLawScanModifier) +import Streamly.Test.Data.Scanl.Type + (check, checkApprox, checkPostscanl, checkNoLaw) import Test.Hspec import Test.Hspec.QuickCheck (prop) import Test.QuickCheck (Gen, Property, choose, forAll, listOf1) @@ -48,13 +51,123 @@ composeManyS = check (F.composeMany (F.take 2 F.sum) F.sum) ([1, 2, 3, 4, 5] :: [Int]) [0, 1, 4, 7, 14, 19] +composeUpstreamFilter :: Expectation +composeUpstreamFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.compose (evenScanl ref) F.sum) + (Stream.fromList [1 .. 6 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + -- The downstream 'sum' only sees the even inputs: 2, 2+4, 2+4+6. + out `shouldBe` [2, 6, 12] + -- extract runs on the initial step and on each even input, never on odd. + calls `shouldBe` [0, 2, 4, 6] + +composeDownstreamFilter :: Expectation +composeDownstreamFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.compose (F.scanl' (\_ x -> x) 0) (evenScanl ref)) + (Stream.fromList [1 .. 6 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [2, 4, 6] + calls `shouldBe` [2, 4, 6] + +-- 'partitionBy' routes even inputs to the left scan and odd to the right. The +-- left scan emits only multiples of 4 ('Continue' otherwise); the combined scan +-- must emit nothing and not extract the left scan on those 'Continue' steps. +partitionByFilter :: Expectation +partitionByFilter = + check + (F.partitionBy part leftScan rightScan) + ([1 .. 8] :: [Int]) + [0, 1, 3, 4, 5, 7, 8] + where + leftScan = F.filter (\x -> x `mod` 4 == 0) (F.scanl' (\_ x -> x) 0) + rightScan = F.scanl' (\_ x -> x) 0 + part n = if even n then Left n else Right n + +-- Verifies 'partitionBy': the left scan's 'extract' fires only on its emitting +-- (multiple-of-4) steps, not on the intervening 'Continue' steps. +partitionByExtractS :: Expectation +partitionByExtractS = do + ref <- newIORef [] + let leftScan = + F.Scanl + (\s a -> + return + $ if a `mod` 4 == 0 + then F.Partial a + else F.Continue s) + (return (Fold.Partial 0)) + (\s -> modifyIORef' ref (s :) >> return s) + return + rightScan = F.scanl' (\_ x -> x) 0 + part n = if even n then Left n else Right n + out <- + Stream.toList + $ Stream.postscanl + (F.partitionBy part leftScan rightScan) + (Stream.fromList [1 .. 8 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [1, 3, 4, 5, 7, 8] + -- The left scan's extract runs only on its emitting (multiple-of-4) steps. + calls `shouldBe` [4, 8] + +-- Verifies 'pipe': 'extract' is called only on emitting (even) steps, never on +-- 'Continue' steps. +pipeFilter :: Expectation +pipeFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.pipe (Pipe.map id) (evenScanl ref)) + (Stream.fromList [1 .. 6 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [2, 4, 6] + calls `shouldBe` [2, 4, 6] + +-- 'composeMany' upstream: 'extract' is called only on emitting steps, never on +-- 'Continue'. The left scan never terminates here so behaviour matches compose. +composeManyUpstreamFilter :: Expectation +composeManyUpstreamFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.composeMany (evenScanl ref) F.sum) + (Stream.fromList [1 .. 6 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [2, 6, 12] + calls `shouldBe` [0, 2, 4, 6] + +-- 'composeMany' downstream: 'extract' is called only on emitting steps. +composeManyDownstreamFilter :: Expectation +composeManyDownstreamFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.composeMany (F.scanl' (\_ x -> x) 0) (evenScanl ref)) + (Stream.fromList [1 .. 6 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [2, 4, 6] + calls `shouldBe` [2, 4, 6] + -- 'with' adapts a stateful combinator (here 'indexed') so that the supplied -- predicate also sees the state (the index). This keeps elements at even --- indices. +-- indices. Note: 'indexed' is 'postscanlMaybe', a scan composition that treats +-- a filtered ('Continue') step like 'Partial', so a filtered element re-emits +-- the previous accumulator rather than emitting nothing. withS :: Expectation withS = check (F.with F.indexed F.filter (even . fst) F.toList) "abcde" - ["", "a", "a", "ac", "ac", "ace"] + ["", "a", "ac", "ace"] pipeS :: Expectation pipeS = @@ -95,7 +208,7 @@ mapMaybeMS = check (F.mapMaybeM (\x -> return (if even x then Just x else Nothing)) F.toList) ([1, 2, 3, 4] :: [Int]) - [[], [], [2], [2], [2, 4]] + [[], [2], [2, 4]] -- An Unfold that streams the elements of an input list. unfoldList :: Monad m => Unfold.Unfold m [a] a @@ -170,23 +283,54 @@ main = hspec $ -- Before adding any tests here consider if it can be added to the -- common tests above. + + -- compose it "compose" composeS + it "compose: filter upstream" composeUpstreamFilter + it "compose: filter downstream" composeDownstreamFilter + prop "compose: filterLaw" + $ filterLawScanModifier (\h -> F.compose h F.toList) + + -- composeMany it "composeMany" composeManyS - it "with" withS + it "composeMany: filter upstream" composeManyUpstreamFilter + it "composeMany: filter downstream" composeManyDownstreamFilter + prop "composeMany: filterLaw" + $ filterLawScanModifier (\h -> F.composeMany (F.take 2 h) F.sum) + + -- pipe it "pipe" pipeS + it "pipe: filter" pipeFilter + prop "pipe: filterLaw" $ filterLawScanModifier (F.pipe (Pipe.map id)) + + it "tee" teeS + + -- partitionBy + it "partitionBy" partitionByS + it "partitionByM" partitionByMS + it "partitionBy: filter" partitionByFilter + it "partitionBy: filter extract" partitionByExtractS + prop "partitionBy: filterLaw left" + $ filterLawScanModifier (\h -> F.partitionBy Left h F.sum) + prop "partitionBy: filterLaw right" + $ filterLawScanModifier (\h -> F.partitionBy Right F.sum h) + + it "partition" partitionS + it "with" withS it "topBy" topByS it "bottomBy" bottomByS + + -- indexing it "indexingWith" indexingWithS it "indexing" indexingS it "indexingRev" indexingRevS + it "takingEndBy_" takingEndByUS it "mapMaybeM" mapMaybeMS it "unfoldEach" unfoldEachS it "unfoldMany" unfoldManyS it "defaultSalt" defaultSaltS - it "tee" teeS - it "partitionBy" partitionByS - it "partitionByM" partitionByMS - it "partition" partitionS + + -- deprecated combinators it "scanl" scanlS it "scanlMany" scanlManyS diff --git a/test/Streamly/Test/Data/Scanl/CommonCombinators.hs b/test/Streamly/Test/Data/Scanl/CommonCombinators.hs index 929a39e69e..6f7d116db8 100644 --- a/test/Streamly/Test/Data/Scanl/CommonCombinators.hs +++ b/test/Streamly/Test/Data/Scanl/CommonCombinators.hs @@ -17,7 +17,8 @@ productS = do check F.product ([] :: [Int]) [1] -- Short-circuit: the element after the first 0 is bottom, so this throws -- (and fails) if 'product' consumes past the 0. - check F.product ([2, 0, error "product consumed past 0"] :: [Int]) [1, 2, 0] + checkNoLaw + F.product ([2, 0, error "product consumed past 0"] :: [Int]) [1, 2, 0] sumS :: [Int] -> Expectation sumS ls = check F.sum ls (Prelude.scanl (+) 0 ls) @@ -46,7 +47,7 @@ theS = do check F.the ([] :: [Int]) [Nothing] -- Short-circuit: the element after the first mismatch is bottom, so this -- throws (and fails) if 'the' consumes past the mismatch. - check F.the ([3, 3, 4, error "the consumed past mismatch"] :: [Int]) + checkNoLaw F.the ([3, 3, 4, error "the consumed past mismatch"] :: [Int]) [Nothing, Just 3, Just 3, Nothing] -- Polynomial rolling hash: @@ -126,12 +127,13 @@ meanS = -- Filtering / mapping ------------------------------------------------------------------------------- +-- A filtered-out (Nothing) element emits no output. mapMaybeS :: [Int] -> Expectation mapMaybeS ls = check (F.mapMaybe (\x -> if even x then Just x else Nothing) F.toList) ls - (Prelude.scanl (\acc x -> if even x then acc ++ [x] else acc) [] ls) + (Prelude.scanl (\acc x -> acc ++ [x]) [] (Prelude.filter even ls)) drainMapMS :: [Int] -> Expectation drainMapMS ls = check (F.drainMapM return) ls (Prelude.scanl (\_ _ -> ()) () ls) @@ -167,7 +169,7 @@ indexedS = sampleFromthenS :: Expectation sampleFromthenS = check (F.sampleFromthen 0 2 F.toList) ([1 .. 6] :: [Int]) - [[], [1], [1], [1, 3], [1, 3], [1, 3, 5], [1, 3, 5]] + [[], [1], [1, 3], [1, 3, 5]] sconcatS :: Expectation sconcatS = diff --git a/test/Streamly/Test/Data/Scanl/CommonType.hs b/test/Streamly/Test/Data/Scanl/CommonType.hs index 57ad81570a..53776a2007 100644 --- a/test/Streamly/Test/Data/Scanl/CommonType.hs +++ b/test/Streamly/Test/Data/Scanl/CommonType.hs @@ -83,41 +83,44 @@ rmapMS ls = -- Filtering ------------------------------------------------------------------------------- +-- A filtered-out element emits no output, so the scan emits the initial value +-- and then one output per element that passes the predicate. filterS :: [Int] -> Expectation filterS ls = check (F.filter even F.toList) ls - (Prelude.scanl (\acc x -> if even x then acc ++ [x] else acc) [] ls) + (Prelude.scanl (\acc x -> acc ++ [x]) [] (Prelude.filter even ls)) filterMS :: [Int] -> Expectation filterMS ls = check (F.filterM (return . even) F.toList) ls - (Prelude.scanl (\acc x -> if even x then acc ++ [x] else acc) [] ls) + (Prelude.scanl (\acc x -> acc ++ [x]) [] (Prelude.filter even ls)) filteringS :: [Int] -> Expectation filteringS ls = check (F.filtering even) ls (Prelude.scanl (\_ x -> if even x then Just x else Nothing) Nothing ls) +-- A 'Nothing' (or filtered-out) element emits no output. catMaybesS :: Expectation catMaybesS = check (F.catMaybes F.toList) ([Just 1, Nothing, Just 3, Nothing, Just 5] :: [Maybe Int]) - [[], [1], [1], [1, 3], [1, 3], [1, 3, 5]] + [[], [1], [1, 3], [1, 3, 5]] catLeftsS :: Expectation catLeftsS = check (F.catLefts F.toList) ([Left 1, Right "a", Left 3, Right "b"] :: [Either Int String]) - [[], [1], [1], [1, 3], [1, 3]] + [[], [1], [1, 3]] catRightsS :: Expectation catRightsS = check (F.catRights F.toList) ([Left "a", Right 2, Left "b", Right 4] :: [Either String Int]) - [[], [], [2], [2], [2, 4]] + [[], [2], [2, 4]] catEithersS :: Expectation catEithersS = diff --git a/test/Streamly/Test/Data/Scanl/Concurrent.hs b/test/Streamly/Test/Data/Scanl/Concurrent.hs index 9746b98fc0..7a1124965f 100644 --- a/test/Streamly/Test/Data/Scanl/Concurrent.hs +++ b/test/Streamly/Test/Data/Scanl/Concurrent.hs @@ -12,10 +12,12 @@ module Streamly.Test.Data.Scanl.Concurrent (main) where import Control.Concurrent (threadDelay) import Control.Exception (ErrorCall(..), try) import Data.Function ( (&) ) -import Data.IORef (newIORef, atomicModifyIORef') +import Data.IORef (atomicModifyIORef', newIORef, readIORef) import Data.List (sort) import Streamly.Data.Scanl (Scanl) +import Streamly.Test.Data.Scanl.Common (evenScanl, filterLawScan) import Test.Hspec as H +import Test.Hspec.QuickCheck (prop) import qualified Streamly.Data.Fold as Fold import qualified Streamly.Data.Stream as Stream @@ -135,6 +137,30 @@ parDemuxScan_WorkerException concOpts = do Right _ -> expectationFailure "Expected ErrorCall exception but stream completed successfully" +parTeeWithFilter :: IO () +parTeeWithFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (Scanl.parTeeWith id (,) (evenScanl ref) (Scanl.scanl' (\_ x -> x) 0)) + (Stream.fromList [1 .. 4 :: Int]) + calls <- reverse <$> readIORef ref + out `shouldBe` [(2, 2), (4, 4)] + calls `shouldBe` [0, 2, 4] + +parTeeWithRightFilter :: IO () +parTeeWithRightFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (Scanl.parTeeWith id (,) (Scanl.scanl' (\_ x -> x) 0) (evenScanl ref)) + (Stream.fromList [1 .. 4 :: Int]) + calls <- reverse <$> readIORef ref + out `shouldBe` [(2, 2), (4, 4)] + calls `shouldBe` [0, 2, 4] + main :: IO () main = hspec $ H.parallel @@ -152,3 +178,7 @@ main = hspec $ parDemuxScan_ScanEnd (Scanl.maxBuffer 1) it "parDemuxScanM (worker exception) (maxBuffer 1)" $ parDemuxScan_WorkerException (Scanl.maxBuffer 1) + it "parTeeWith filter" parTeeWithFilter + it "parTeeWith right filter" parTeeWithRightFilter + prop "parTeeWith filter law" + $ filterLawScan (Scanl.parTeeWith id (,) Scanl.sum Scanl.sum) diff --git a/test/Streamly/Test/Data/Scanl/Container.hs b/test/Streamly/Test/Data/Scanl/Container.hs index d41c5c7b90..14074f5035 100644 --- a/test/Streamly/Test/Data/Scanl/Container.hs +++ b/test/Streamly/Test/Data/Scanl/Container.hs @@ -60,6 +60,88 @@ demuxGenericIOS = (IO (Map.Map String Int), Maybe (String, Int)))) demuxInput demuxExpected +------------------------------------------------------------------------------- +-- 'Continue' handling. +-- +-- The inner scan below filters out odd values and sums the even ones. A +-- filtered-out (odd) input makes the inner scan emit 'Continue': the scan +-- state must advance but NO output must be emitted for that input. The +-- demux/classify driver must translate this 'Continue' into 'Nothing' rather +-- than re-emitting the last valid value. +-- +-- The earlier tests used @take 2 sum@ as the inner scan, which only ever emits +-- 'Partial'/'Done' and never 'Continue', so the 'Continue' branch of the +-- driver was never exercised. With the buggy driver, the odd inputs below +-- would wrongly produce @Just (key, lastSum)@ (e.g. @Just ("A", 2)@ and +-- @Just ("B", 0)@) instead of 'Nothing'. +------------------------------------------------------------------------------- + +filterInput :: [(String, Int)] +filterInput = [("A", 2), ("A", 1), ("A", 4), ("B", 3), ("B", 6)] + +filterExpected :: [Maybe (String, Int)] +filterExpected = + [Just ("A", 2), Nothing, Just ("A", 6), Nothing, Just ("B", 6)] + +filterInner :: F.Scanl IO (String, Int) Int +filterInner = F.lmap snd (F.filter even F.sum) + +demuxFilterGetScanl :: String -> IO (Maybe (F.Scanl IO (String, Int) Int)) +demuxFilterGetScanl _ = return (Just filterInner) + +demuxFilterS :: Expectation +demuxFilterS = + checkPostscanl (F.demux fst demuxFilterGetScanl) filterInput filterExpected + +demuxFilterIOS :: Expectation +demuxFilterIOS = + checkPostscanl + (F.demuxIO fst demuxFilterGetScanl) filterInput filterExpected + +demuxGenericFilterS :: Expectation +demuxGenericFilterS = + checkPostscanl + (fmap snd + (F.demuxGeneric fst demuxFilterGetScanl + :: F.Scanl IO (String, Int) + (IO (Map.Map String Int), Maybe (String, Int)))) + filterInput filterExpected + +demuxGenericFilterIOS :: Expectation +demuxGenericFilterIOS = + checkPostscanl + (fmap snd + (F.demuxGenericIO fst demuxFilterGetScanl + :: F.Scanl IO (String, Int) + (IO (Map.Map String Int), Maybe (String, Int)))) + filterInput filterExpected + +classifyFilterS :: Expectation +classifyFilterS = + checkPostscanl (F.classify fst filterInner) filterInput filterExpected + +classifyFilterIOS :: Expectation +classifyFilterIOS = + checkPostscanl (F.classifyIO fst filterInner) filterInput filterExpected + +classifyGenericFilterS :: Expectation +classifyGenericFilterS = + checkPostscanl + (fmap snd + (F.classifyGeneric fst filterInner + :: F.Scanl IO (String, Int) + (IO (Map.Map String Int), Maybe (String, Int)))) + filterInput filterExpected + +classifyGenericFilterIOS :: Expectation +classifyGenericFilterIOS = + checkPostscanl + (fmap snd + (F.classifyGenericIO fst filterInner + :: F.Scanl IO (String, Int) + (IO (Map.Map String Int), Maybe (String, Int)))) + filterInput filterExpected + classifyInput :: [(String, Int)] classifyInput = [("ONE", 1), ("TWO", 2), ("ONE", 3), ("TWO", 4), ("ONE", 5)] @@ -106,11 +188,23 @@ main = hspec $ -- Before adding any tests here consider if it can be added to the -- common tests above. + + -- demux it "demux" demuxS + it "demux: filter" demuxFilterS it "demuxIO" demuxIOS + it "demuxIO: filter" demuxFilterIOS it "demuxGeneric" demuxGenericS + it "demuxGeneric: filter" demuxGenericFilterS it "demuxGenericIO" demuxGenericIOS + it "demuxGenericIO: filter" demuxGenericFilterIOS + + -- classify it "classify" classifyS + it "classify: filter" classifyFilterS it "classifyIO" classifyIOS + it "classifyIO: filter" classifyFilterIOS it "classifyGeneric" classifyGenericS + it "classifyGeneric: filter" classifyGenericFilterS it "classifyGenericIO" classifyGenericIOS + it "classifyGenericIO: filter" classifyGenericFilterIOS diff --git a/test/Streamly/Test/Data/Scanl/Type.hs b/test/Streamly/Test/Data/Scanl/Type.hs index 1a55ca7e4f..6f48d6ec9a 100644 --- a/test/Streamly/Test/Data/Scanl/Type.hs +++ b/test/Streamly/Test/Data/Scanl/Type.hs @@ -8,9 +8,12 @@ -- Portability : GHC module Streamly.Test.Data.Scanl.Type - (main, check, checkApprox, checkPostscanl) where + ( main, check, checkApprox, checkPostscanl, checkNoLaw + ) where import Data.Functor.Identity (Identity, runIdentity) +import Data.IORef (newIORef, readIORef) +import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.Refold.Type as Refold import qualified Streamly.Internal.Data.Scanl as F import qualified Streamly.Internal.Data.Stream as Stream @@ -19,6 +22,7 @@ import Prelude hiding (const, last, length, take, filter, scanl, foldl', concatM import qualified Prelude import Streamly.Test.Common (chooseInt) +import Streamly.Test.Data.Scanl.Common (evenScanl, filterLawScanModifier, filterLawScan) import Test.Hspec import Test.Hspec.QuickCheck (prop) import Test.QuickCheck (Property, forAll) @@ -44,9 +48,10 @@ import Test.QuickCheck (Property, forAll) -- -- In every shared test the @expected@ value is the *inclusive prescan list* -- (Prelude.scanl f z), i.e. exactly what Stream.scanl emits: the leading --- initial value followed by one output per input (filtering scans emit the --- unchanged accumulator for filtered elements; terminating scans truncate at, --- and including, the terminating step). From this list: +-- initial value followed by one output per input (filtering scans emit no +-- output for filtered elements, so their @expected@ list is built from the +-- elements that pass; terminating scans truncate at, and including, the +-- terminating step). From this list: -- * the Fold includer checks the final value (Prelude.last expected) -- * the Scanl includer checks the full Stream.scanl output (== expected) and -- the Stream.postscanl output (== drop 1 expected, the initial omitted) @@ -61,12 +66,64 @@ import Test.QuickCheck (Property, forAll) -- the leading initial value dropped. type Op = F.Scanl -check :: (Eq b, Show b) => Op IO a b -> [a] -> [b] -> Expectation +check :: (Eq b, Show b, Show a) => Op IO a b -> [a] -> [b] -> Expectation check cons xs expected = do Stream.toList (Stream.scanl cons (Stream.fromList xs)) `shouldReturn` expected Stream.toList (Stream.postscanl cons (Stream.fromList xs)) `shouldReturn` drop 1 expected + filterLaw cons xs + +-- | Same as 'check' but does NOT apply the 'Continue' filter law. Use this only +-- for the few tests that pass bottom (e.g. 'error') input elements to verify +-- early termination -- the law would force those elements. +checkNoLaw :: (Eq b, Show b) => Op IO a b -> [a] -> [b] -> Expectation +checkNoLaw cons xs expected = do + Stream.toList (Stream.scanl cons (Stream.fromList xs)) + `shouldReturn` expected + Stream.toList (Stream.postscanl cons (Stream.fromList xs)) + `shouldReturn` drop 1 expected + +-- | The 'Continue' filter law (an independent, black-box oracle that needs no +-- knowledge of how the scan is implemented): wrapping a scan in 'Scanl.filter' +-- -- which emits 'Continue' for rejected inputs -- must produce the same output +-- as running the scan on the pre-filtered input: +-- +-- scanl/postscanl (Scanl.filter p s) xs === scanl/postscanl s (filter p xs) +-- +-- This holds for EVERY scan because both sides feed the scan the identical +-- accepted subsequence; it verifies that the driver suppresses output (and does +-- not extract) on every 'Continue' step. 'filterLawPred' rejects a portion of +-- the inputs so the law actually exercises 'Continue'. Folded into 'check' and +-- 'checkPostscanl' so it applies to every shared and Scanl-specific test. +filterLawPred :: Show a => a -> Bool +filterLawPred x = even (Prelude.length (Prelude.show x)) + +filterLaw :: (Eq b, Show b, Show a) => Op IO a b -> [a] -> Expectation +filterLaw cons xs = do + -- We run the filter twice, once with the original predicate and again with + -- inverted predicate, this is to ensure that in one of them we filter the + -- first element and in one keep it, and same for the last element. These + -- two are important cases to cover. + mapM_ runScanl [filterLawPred, not . filterLawPred] + filterLawPost cons xs + where + runScanl p = do + lhs <- Stream.toList + (Stream.scanl (F.filter p cons) (Stream.fromList xs)) + rhs <- Stream.toList + (Stream.scanl cons (Stream.fromList (Prelude.filter p xs))) + lhs `shouldBe` rhs + +filterLawPost :: (Eq b, Show b, Show a) => Op IO a b -> [a] -> Expectation +filterLawPost cons xs = mapM_ run [filterLawPred, not . filterLawPred] + where + run p = do + lhs <- Stream.toList + (Stream.postscanl (F.filter p cons) (Stream.fromList xs)) + rhs <- Stream.toList + (Stream.postscanl cons (Stream.fromList (Prelude.filter p xs))) + lhs `shouldBe` rhs -- | Epsilon-equality counterpart of 'check' for Fractional results whose -- floating-point output is only approximately equal to the reference (e.g. @@ -88,10 +145,13 @@ checkApprox cons xs expected = do -- | For combinators that only support postscan (their scanl initial value is -- undefined, e.g. rollingMap), so 'check' cannot be used. @expected@ is the -- Stream.postscanl output (one value per input, no leading initial). -checkPostscanl :: (Eq b, Show b) => Op IO a b -> [a] -> [b] -> Expectation -checkPostscanl cons xs expected = +checkPostscanl :: (Eq b, Show b, Show a) => Op IO a b -> [a] -> [b] -> Expectation +checkPostscanl cons xs expected = do Stream.toList (Stream.postscanl cons (Stream.fromList xs)) `shouldReturn` expected + -- Only the postscanl half of the law: these scans have an undefined 'scanl' + -- initial value. + filterLawPost cons xs #include "Streamly/Test/Data/Scanl/CommonType.hs" @@ -130,11 +190,127 @@ postscanlMaybeCompose :: Expectation postscanlMaybeCompose = do Stream.toList (Stream.postscanl (F.postscanlMaybe (F.filtering even) F.length) (Stream.fromList [1 .. 6 :: Int])) - `shouldReturn` [0, 1, 1, 2, 2, 3] + `shouldReturn` [1, 2, 3] Stream.toList (Stream.postscanl (F.postscanlMaybe (fmap Just (F.take 0 F.sum)) F.length) (Stream.fromList [1, 2, 3 :: Int])) `shouldReturn` ([] :: [Int]) +-- postscanl must propagate Continue from the right scan (bug at line 1337 of +-- Scanl/Type.hs returns Partial instead, emitting spurious output). +-- +-- postscanl sum (filter even sum) on [1..4]: +-- running sums fed to the right: 1, 3, 6, 10 +-- 1, 3 are odd -> right returns Continue -> no output (BUG: emits 0) +-- 6, 10 are even -> right returns Partial -> output 6, then 16 +postscanlRightFilter :: Expectation +postscanlRightFilter = + Stream.toList + (Stream.postscanl + (F.postscanl (F.scanl' (+) 0) (F.filter even (F.scanl' (+) 0))) + (Stream.fromList [1 .. 4 :: Int])) + `shouldReturn` [6, 16] + +-- postscanl must propagate Continue when left returns Continue and right +-- returns Partial (bug at line 1343 of Scanl/Type.hs returns Partial instead +-- of Continue; right is spuriously advanced with a stale left value). +-- +-- postscanl (filter even sum) sum on [1..4]: +-- odd inputs -> left returns Continue (stale extract = even-sum so far) +-- right (sum) would return Partial if fed that stale value -- BUG +-- even inputs -> left returns Partial; feed new left value to right +-- Correct: only Partial when BOTH left and right return Partial +-- input 1 (odd, left=Continue) -> Continue; right NOT advanced +-- input 2 (even, left=Partial 2) -> right += 2 -> 2; output 2 +-- input 3 (odd, left=Continue) -> Continue; right NOT advanced +-- input 4 (even, left=Partial 6) -> right += 6 -> 8; output 8 +postscanlLeftFilter :: Expectation +postscanlLeftFilter = + Stream.toList + (Stream.postscanl + (F.postscanl (F.filter even (F.scanl' (+) 0)) (F.scanl' (+) 0)) + (Stream.fromList [1 .. 4 :: Int])) + `shouldReturn` [2, 8] + +-- postscanl must propagate Continue when BOTH left and right return Continue +-- (bug at line 1344 of Scanl/Type.hs returns Partial instead). +-- +-- postscanl (filter even sum) (filter odd sum) on [1..4]: +-- odd inputs -> left returns Continue; extractL gives the even-running-sum (always even) +-- even inputs -> left returns Partial; extractL gives the new even-running-sum (always even) +-- right (filter odd) always sees an even value -> always returns Continue +-- so no input should ever produce output +postscanlBothFilter :: Expectation +postscanlBothFilter = + Stream.toList + (Stream.postscanl + (F.postscanl (F.filter even (F.scanl' (+) 0)) (F.filter odd (F.scanl' (+) 0))) + (Stream.fromList [1 .. 4 :: Int])) + `shouldReturn` ([] :: [Int]) + +------------------------------------------------------------------------------- +-- Filter tests belonging to Scanl.Type +------------------------------------------------------------------------------- + +teeWithFilter :: Expectation +teeWithFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.teeWith (,) (evenScanl ref) (F.scanl' (\_ x -> x) 0)) + (Stream.fromList [1 .. 4 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [(2, 2), (4, 4)] + calls `shouldBe` [2, 4] + +teeWithRightFilter :: Expectation +teeWithRightFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.teeWith (,) (F.scanl' (\_ x -> x) 0) (evenScanl ref)) + (Stream.fromList [1 .. 4 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [(2, 2), (4, 4)] + calls `shouldBe` [2, 4] + +takeFilter :: Expectation +takeFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.take 100 (evenScanl ref)) + (Stream.fromList [1 .. 6 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [2, 4, 6] + calls `shouldBe` [2, 4, 6] + +takeFilterCount :: Expectation +takeFilterCount = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.take 2 (evenScanl ref)) + (Stream.fromList [1 .. 6 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [2, 4] + calls `shouldBe` [2] + +takeEndByFilter :: Expectation +takeEndByFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl + (F.takeEndBy (== 3) (evenScanl ref)) + (Stream.fromList [1, 2, 3, 4 :: Int]) + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [2, 2] + calls `shouldBe` [2] + ------------------------------------------------------------------------------- -- Constructors ------------------------------------------------------------------------------- @@ -164,12 +340,12 @@ scantStep s a = if a == 3 then F.Done s else F.Partial (s + a) scantS :: Expectation scantS = - check (F.scant' scantStep (F.Partial 0) id) ([1, 2, 3, 4] :: [Int]) + check (F.scant' scantStep (Fold.Partial 0) id) ([1, 2, 3, 4] :: [Int]) [0, 1, 3, 3] scantMS :: Expectation scantMS = - check (F.scantM' (\s a -> return (scantStep s a)) (return (F.Partial 0)) return) + check (F.scantM' (\s a -> return (scantStep s a)) (return (Fold.Partial 0)) return) ([1, 2, 3, 4] :: [Int]) [0, 1, 3, 3] mkScanrS :: Expectation @@ -198,8 +374,8 @@ functionMS = sumRefold :: Monad m => Refold.Refold m Int Int Int sumRefold = Refold.Refold - (\s a -> return (F.Partial (s + a))) - (\c -> return (F.Partial c)) + (\s a -> return (Fold.Partial (s + a))) + (\c -> return (Fold.Partial c)) return fromRefoldS :: Expectation @@ -238,12 +414,12 @@ mkScanl1MS = mkScantS :: Expectation mkScantS = - check (F.mkScant scantStep (F.Partial 0) id) ([1, 2, 3, 4] :: [Int]) + check (F.mkScant scantStep (Fold.Partial 0) id) ([1, 2, 3, 4] :: [Int]) [0, 1, 3, 3] mkScantMS :: Expectation mkScantMS = - check (F.mkScantM (\s a -> return (scantStep s a)) (return (F.Partial 0)) return) + check (F.mkScantM (\s a -> return (scantStep s a)) (return (Fold.Partial 0)) return) ([1, 2, 3, 4] :: [Int]) [0, 1, 3, 3] moduleName :: String @@ -256,10 +432,37 @@ main = hspec $ -- Before adding any tests here consider if it can be added to the -- common tests above. - it "scanl emits initial, postscanl omits it" scanlVsPostscanl - it "postscanl (compose)" postscanlCompose - it "postscanlMaybe (compose)" postscanlMaybeCompose - + it "scanl: emits initial value, postscanl omits it" scanlVsPostscanl + prop "sum: filterLaw" $ filterLawScan F.sum + + -- postscanl + it "postscanl: compose" postscanlCompose + it "postscanl: filter right" postscanlRightFilter + it "postscanl: filter left" postscanlLeftFilter + it "postscanl: filter both" postscanlBothFilter + prop "postscanl: filterLaw compose" + $ filterLawScanModifier (\h -> F.postscanl h F.sum) + + -- postscanlMaybe + it "postscanlMaybe: compose" postscanlMaybeCompose + prop "postscanlMaybe: filterLaw" + $ filterLawScan (F.postscanlMaybe (F.filtering even) F.length) + + -- teeWith + it "teeWith: filter left" teeWithFilter + it "teeWith: filter right" teeWithRightFilter + prop "teeWith: filterLaw" + $ filterLawScan (F.teeWith (,) F.sum F.sum) + + -- take + it "take: filter" takeFilter + it "take: filter count limit" takeFilterCount + prop "take: filterLaw" $ filterLawScanModifier (F.take 3) + + -- takeEndBy + it "takeEndBy: filter" takeEndByFilter + + -- constructors prop "scanl'" scanlS prop "scanlM'" scanlMS it "scanl1'" scanl1S @@ -275,6 +478,7 @@ main = hspec $ prop "toStreamK" toStreamKS prop "toStreamKRev" toStreamKRevS + -- deprecated constructors prop "mkScanl" mkScanlS prop "mkScanlM" mkScanlMS it "mkScanl1" mkScanl1S diff --git a/test/Streamly/Test/Data/Scanl/Window.hs b/test/Streamly/Test/Data/Scanl/Window.hs index 5c2a190548..aa49f18892 100644 --- a/test/Streamly/Test/Data/Scanl/Window.hs +++ b/test/Streamly/Test/Data/Scanl/Window.hs @@ -1,6 +1,6 @@ module Streamly.Test.Data.Scanl.Window (main) where -import Test.Hspec (hspec, describe, it, runIO, shouldReturn) +import Test.Hspec (Expectation, hspec, describe, it, runIO, shouldReturn, shouldBe) import Streamly.Internal.Data.Scanl (Incr(..)) import qualified Streamly.Internal.Data.Fold as Fold import qualified Streamly.Internal.Data.RingArray as RingArray @@ -9,6 +9,33 @@ import qualified Streamly.Internal.Data.Stream as S import Prelude hiding (sum, maximum, minimum) +-- Inner scan returns Continue for odd inputs and Partial (window sum) for even. +-- Verifies that incrScanWith propagates Continue without emitting output. +-- Even inputs: 2->{} =0, 4->{2}=2, 6->{2,4}=6, 8->{4,6}=10. +incrScanWithFilterS :: Expectation +incrScanWithFilterS = do + out <- + S.toList + $ S.postscanl + (Scanl.incrScanWith 2 inner) + (S.fromList [1, 2, 3, 4, 5, 6, 7, 8 :: Int]) + out `shouldBe` [0, 2, 6, 10] + where + inner = + Scanl.Scanl + (\s (incr, ring) -> do + let v = case incr of + Insert x -> x + Replace _ x -> x + if even v + then do + xs <- RingArray.toList ring + return (Scanl.Partial (foldr (+) 0 xs)) + else return (Scanl.Continue s)) + (return (Fold.Partial 0)) + return + return + moduleName :: String moduleName = "Window" @@ -104,6 +131,7 @@ main = hspec $ do scl (Scanl.incrScanWith 3 (Scanl.lmap fst Scanl.incrSum)) ([1, 2, 3, 4, 5] :: [Double]) `shouldReturn` [0, 1, 3, 6, 9, 12] + it "incrScanWith: filter" incrScanWithFilterS it "incrRollingMap" $ scl (Scanl.incrRollingMap (\p c -> Just (maybe 0 (c -) p))) ([Insert 1, Replace 1 3, Replace 3 6] :: [Incr Int]) diff --git a/test/Streamly/Test/Data/Stream/Transform.hs b/test/Streamly/Test/Data/Stream/Transform.hs index 07baafe7af..916749d4d8 100644 --- a/test/Streamly/Test/Data/Stream/Transform.hs +++ b/test/Streamly/Test/Data/Stream/Transform.hs @@ -20,7 +20,9 @@ import qualified Streamly.Internal.Data.Scan as Scan import qualified Streamly.Internal.Data.Scanl as Scanl import qualified Streamly.Internal.Data.Stream as Stream +import Streamly.Test.Data.Scanl.Common (evenScanl, filterLawScanModifier, filterLawScan) import Test.Hspec as H +import Test.Hspec.QuickCheck (prop) #ifdef DEVBUILD #endif @@ -175,6 +177,36 @@ testScanlMany = toList (Stream.scanlMany Scanl.sum (Stream.fromList [1, 2, 3 :: Int])) `shouldReturn` [0, 1, 3, 6] +streamPostscanlFilter :: Expectation +streamPostscanlFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.postscanl (evenScanl ref) (Stream.fromList [1 .. 6 :: Int]) + calls <- reverse <$> readIORef ref + out `shouldBe` [2, 4, 6] + calls `shouldBe` [2, 4, 6] + +streamScanlFilter :: Expectation +streamScanlFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.scanl (evenScanl ref) (Stream.fromList [1 .. 6 :: Int]) + calls <- reverse <$> readIORef ref + out `shouldBe` [0, 2, 4, 6] + calls `shouldBe` [0, 2, 4, 6] + +scanlManyFilter :: Expectation +scanlManyFilter = do + ref <- newIORef [] + out <- + Stream.toList + $ Stream.scanlMany (evenScanl ref) (Stream.fromList [1 .. 6 :: Int]) + calls <- reverse <$> readIORef ref + out `shouldBe` [0, 2, 4, 6] + calls `shouldBe` [0, 2, 4, 6] + testScanr :: Expectation testScanr = toList (Stream.scanr Scan.sum (Stream.fromList [1, 2, 3 :: Int])) @@ -587,10 +619,15 @@ main = hspec it "scanl (Scanl) empty" testScanlScanlEmpty it "postscanl" testPostscanl it "postscanl empty" testPostscanlEmpty + it "postscanl filter" streamPostscanlFilter + it "scanl filter" streamScanlFilter it "scanl (Scanl) done-at-init" testScanlDoneAtInit it "postscanl done-at-init" testPostscanlDoneAtInit it "postscanlMaybe done-at-init" testPostscanlMaybeDoneAtInit it "scanlMany" testScanlMany + it "scanlMany filter" scanlManyFilter + prop "filter law: id (driver)" $ filterLawScanModifier id + prop "filter law: sum" $ filterLawScan Scanl.sum it "scanr" testScanr it "pipe map" testPipe it "pipe filter" testPipeFilter diff --git a/test/Streamly/Test/Data/Unfold.hs b/test/Streamly/Test/Data/Unfold.hs index d3d3e17eae..0c4d171f60 100644 --- a/test/Streamly/Test/Data/Unfold.hs +++ b/test/Streamly/Test/Data/Unfold.hs @@ -22,6 +22,7 @@ import qualified Streamly.Internal.Data.Stream as S import qualified Streamly.Internal.Data.Stream as D import qualified Streamly.Internal.Data.StreamK as K +import Streamly.Test.Data.Scanl.Common (evenScanl) import Control.Exception (Exception, SomeException, try) import Control.Monad.Catch (throwM) import Control.Monad.Trans.State.Strict @@ -517,6 +518,77 @@ scanlMany = let unf = UF.scanlMany (Scanl.take 2 Scanl.sum) UF.fromList in testUnfold unf ([1,2,3,4,5] :: [Int]) [0,1,3,0,3,7,0,5] +-- 'UF.postscanl' must skip (no output, no extract) on a 'Continue' step. +unfoldPostscanlFilter :: Expectation +unfoldPostscanlFilter = do + ref <- newIORef [] + out <- + S.toList + $ S.unfold (UF.postscanl (evenScanl ref) UF.fromList) [1 .. 6 :: Int] + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [2, 4, 6] + calls `shouldBe` [2, 4, 6] + +-- 'UF.scanl' is the same but also emits the initial output. +unfoldScanlFilter :: Expectation +unfoldScanlFilter = do + ref <- newIORef [] + out <- + S.toList + $ S.unfold (UF.scanl (evenScanl ref) UF.fromList) [1 .. 6 :: Int] + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [0, 2, 4, 6] + calls `shouldBe` [0, 2, 4, 6] + +-- 'UF.scanlMany' restarts the scan on 'Done'; a filtered ('Continue') step +-- must not be confused with termination -- the scan must NOT restart on +-- 'Continue', only on 'Done'. +unfoldScanlManyFilter :: Expectation +unfoldScanlManyFilter = do + ref <- newIORef [] + out <- + S.toList + $ S.unfold (UF.scanlMany (evenScanl ref) UF.fromList) [1 .. 6 :: Int] + calls <- Prelude.reverse <$> readIORef ref + out `shouldBe` [0, 2, 4, 6] + calls `shouldBe` [0, 2, 4, 6] + +-- Filter law for 'UF.postscanl': filtering the scanner's input via +-- 'Scanl.filter' is equivalent to filtering the unfold's output before the +-- scanner sees it: +-- +-- unfold (postscanl (Scanl.filter p s) uf) seed +-- === postscanl s (filter p (unfold uf seed)) +unfoldPostscanlFilterLaw :: [Int] -> Property +unfoldPostscanlFilterLaw seed = ioProperty $ do + outL <- S.toList + $ S.unfold (UF.postscanl (Scanl.filter even Scanl.sum) UF.fromList) seed + outR <- S.toList + $ S.postscanl Scanl.sum + $ S.filter even + $ S.unfold UF.fromList seed + return $ counterexample (show outL ++ " /= " ++ show outR) (outL == outR) + +unfoldScanlFilterLaw :: [Int] -> Property +unfoldScanlFilterLaw seed = ioProperty $ do + outL <- S.toList + $ S.unfold (UF.scanl (Scanl.filter even Scanl.sum) UF.fromList) seed + outR <- S.toList + $ S.scanl Scanl.sum + $ S.filter even + $ S.unfold UF.fromList seed + return $ counterexample (show outL ++ " /= " ++ show outR) (outL == outR) + +unfoldScanlManyFilterLaw :: [Int] -> Property +unfoldScanlManyFilterLaw seed = ioProperty $ do + outL <- S.toList + $ S.unfold (UF.scanlMany (Scanl.filter even Scanl.sum) UF.fromList) seed + outR <- S.toList + $ S.scanlMany Scanl.sum + $ S.filter even + $ S.unfold UF.fromList seed + return $ counterexample (show outL ++ " /= " ++ show outR) (outL == outR) + foldMany :: Bool foldMany = let unf = UF.foldMany (Fold.take 2 Fold.toList) UF.fromList @@ -1014,6 +1086,12 @@ testTransformation = prop "scanl" scanl prop "scanl done-at-init" scanlDoneAtInit prop "scanlMany" scanlMany + it "postscanl filter" unfoldPostscanlFilter + it "scanl filter" unfoldScanlFilter + it "scanlMany filter" unfoldScanlManyFilter + prop "filter law: postscanl" unfoldPostscanlFilterLaw + prop "filter law: scanl" unfoldScanlFilterLaw + prop "filter law: scanlMany" unfoldScanlManyFilterLaw prop "foldMany" foldMany prop "either" either prop "mapM" mapM diff --git a/test/lib/Streamly/Test/Data/Scanl/Common.hs b/test/lib/Streamly/Test/Data/Scanl/Common.hs new file mode 100644 index 0000000000..0103dd84a9 --- /dev/null +++ b/test/lib/Streamly/Test/Data/Scanl/Common.hs @@ -0,0 +1,77 @@ +-- | +-- Module : Streamly.Test.Data.Scanl.Common +-- Copyright : (c) 2024 Composewell Technologies +-- License : BSD-3-Clause +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC +-- +-- Shared test helpers for Scanl Continue-contract testing. Used by +-- Scanl, Stream and Unfold test modules. + +module Streamly.Test.Data.Scanl.Common + ( evenScanl + , filterLawScanModifier + , filterLawScan + ) where + +import Data.IORef (IORef, modifyIORef') +import Test.QuickCheck (Property, counterexample, ioProperty) + +import qualified Streamly.Internal.Data.Scanl as Scanl +import qualified Streamly.Internal.Data.Stream as Stream + +-- | A scan that emits even inputs and returns 'Continue' (no output) for odd +-- ones. Its 'extract' records every call so tests can assert it never fires +-- on a 'Continue' step. +evenScanl :: IORef [Int] -> Scanl.Scanl IO Int Int +evenScanl ref = Scanl.Scanl step initial extract final + where + initial = return (Scanl.toFoldStep (Scanl.Partial 0)) + step s a = return $ if even a then Scanl.Partial a else Scanl.Continue s + extract s = modifyIORef' ref (s :) >> return s + final = return + +-- | Property: wrapping an inner scan in 'Scanl.filter' must give the same +-- output as pre-filtering the input stream before the scan sees it: +-- +-- > postscanl\/scanl (ctx (Scanl.filter even inner)) xs +-- > === postscanl\/scanl (ctx inner) (filter even xs) +-- +-- Tests that combinators that wrap an inner scan honour the Continue contract +-- (no spurious output or extract on Continue steps). +filterLawScanModifier + :: (Eq b, Show b) + => (Scanl.Scanl IO Int Int -> Scanl.Scanl IO Int b) + -> [Int] + -> Property +filterLawScanModifier ctx xs = ioProperty $ do + let inner = Scanl.sum + run scn = Stream.toList . scn . Stream.fromList + postL <- run (Stream.postscanl (ctx (Scanl.filter even inner))) xs + postR <- run (Stream.postscanl (ctx inner)) (filter even xs) + scanL <- run (Stream.scanl (ctx (Scanl.filter even inner))) xs + scanR <- run (Stream.scanl (ctx inner)) (filter even xs) + return + $ counterexample + ("postscanl: " ++ show postL ++ " /= " ++ show postR + ++ "\nscanl: " ++ show scanL ++ " /= " ++ show scanR) + (postL == postR && scanL == scanR) + +-- | Property: wrapping a scan in 'Scanl.filter' must give the same output as +-- pre-filtering the input stream: +-- +-- > postscanl\/scanl (Scanl.filter even s) xs +-- > === postscanl\/scanl s (filter even xs) +filterLawScan :: (Eq b, Show b) => Scanl.Scanl IO Int b -> [Int] -> Property +filterLawScan s xs = ioProperty $ do + let run scn = Stream.toList . scn . Stream.fromList + postL <- run (Stream.postscanl (Scanl.filter even s)) xs + postR <- run (Stream.postscanl s) (filter even xs) + scanL <- run (Stream.scanl (Scanl.filter even s)) xs + scanR <- run (Stream.scanl s) (filter even xs) + return + $ counterexample + ("postscanl: " ++ show postL ++ " /= " ++ show postR + ++ "\nscanl: " ++ show scanL ++ " /= " ++ show scanR) + (postL == postR && scanL == scanR) diff --git a/test/streamly-tests.cabal b/test/streamly-tests.cabal index bca6a4d7de..3a4f6d38cd 100644 --- a/test/streamly-tests.cabal +++ b/test/streamly-tests.cabal @@ -193,6 +193,7 @@ library hs-source-dirs: lib exposed-modules: Streamly.Test.Common + Streamly.Test.Data.Scanl.Common Streamly.Test.Data.Parser.CommonUtilities Streamly.Test.Data.Parser.CommonTestDriver Streamly.Test.Data.Parser.CommonTypeTests From 93a3dc5696241cebf021dee39409c51f62394503 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 29 Jun 2026 08:17:41 +0530 Subject: [PATCH 2/6] Update hie.yaml for Scanl tests --- hie.yaml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/hie.yaml b/hie.yaml index 20482b0c28..fb701763e2 100644 --- a/hie.yaml +++ b/hie.yaml @@ -136,6 +136,16 @@ cradle: component: "test:Data.ParserK" - path: "./test/Streamly/Test/Data/RingArray.hs" component: "test:Data.RingArray" + - path: "./test/Streamly/Test/Data/Scanl.hs" + component: "test:Data.Scanl" + - path: "./test/Streamly/Test/Data/Scanl/Combinators.hs" + component: "test:Data.Scanl" + - path: "./test/Streamly/Test/Data/Scanl/Container.hs" + component: "test:Data.Scanl" + - path: "./test/Streamly/Test/Data/Scanl/Type.hs" + component: "test:Data.Scanl" + - path: "./test/Streamly/Test/Data/Scanl/Window.hs" + component: "test:Data.Scanl" - path: "./test/Streamly/Test/Data/Scanl/Concurrent.hs" component: "test:Data.Scanl.Concurrent" - path: "./test/Streamly/Test/Data/Serialize.hs" From 9ddb8c38f9cc02433091cc790eeb35b3c94cf1e7 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sat, 27 Jun 2026 12:20:34 +0530 Subject: [PATCH 3/6] Remove unfoldEach and unfoldMany scans The implementation does not make sense and they cannot be implemented correctly with the existing type. --- .../Benchmark/Data/Scanl/Combinators.hs | 13 ------ .../Internal/Data/Scanl/Combinators.hs | 42 ------------------- test/Streamly/Test/Data/Scanl/Combinators.hs | 18 -------- 3 files changed, 73 deletions(-) diff --git a/benchmark/Streamly/Benchmark/Data/Scanl/Combinators.hs b/benchmark/Streamly/Benchmark/Data/Scanl/Combinators.hs index dd19feb419..f08bb86051 100644 --- a/benchmark/Streamly/Benchmark/Data/Scanl/Combinators.hs +++ b/benchmark/Streamly/Benchmark/Data/Scanl/Combinators.hs @@ -35,7 +35,6 @@ import qualified Streamly.Internal.Data.Fold as FL import qualified Streamly.Internal.Data.Pipe as Pipe import qualified Streamly.Internal.Data.Scanl as Scanl import qualified Streamly.Internal.Data.Stream as Stream -import qualified Streamly.Internal.Data.Unfold as Unfold import Streamly.Benchmark.Common import Test.Tasty.Bench @@ -562,17 +561,6 @@ inspect $ 'partition `hasNoType` ''FL.Step inspect $ 'partition `hasNoType` ''SPEC #endif -------------------------------------------------------------------------------- --- Nesting -------------------------------------------------------------------------------- - -{-# INLINE unfoldEach #-} -unfoldEach :: Int -> IO () -unfoldEach n = - Stream.fold FL.drain - $ Stream.postscanl (Scanl.unfoldEach Unfold.replicateM Scanl.drain) - $ Stream.fromPure (n, randomRIO (1, 1 :: Int)) - ------------------------------------------------------------------------------- -- O(n) heap: building structures ------------------------------------------------------------------------------- @@ -662,7 +650,6 @@ benchmarks value = , benchIO "partitionByM (sum, length)" partitionByM value , benchIO "partitionBy (sum, length)" partitionBy value , benchIO "partition (sum, length)" partition value - , benchIO "unfoldEach" unfoldEach value ] ++ fmap (HeapO_n,) [ benchIO "toListRev" toListRev value diff --git a/core/src/Streamly/Internal/Data/Scanl/Combinators.hs b/core/src/Streamly/Internal/Data/Scanl/Combinators.hs index fee34c1b8f..c27e9949b1 100644 --- a/core/src/Streamly/Internal/Data/Scanl/Combinators.hs +++ b/core/src/Streamly/Internal/Data/Scanl/Combinators.hs @@ -195,11 +195,9 @@ module Streamly.Internal.Data.Scanl.Combinators -- , intersperseWithQuotes -- ** Nesting - , unfoldEach -- , concatSequence -- * Deprecated - , unfoldMany , scanl , scanlMany ) @@ -224,7 +222,6 @@ import Streamly.Internal.Data.Pipe.Type (Pipe (..)) -- import Streamly.Internal.Data.Scan (Scan (..)) import Streamly.Internal.Data.Stream.Type (Stream) import Streamly.Internal.Data.Tuple.Strict (Tuple'(..)) -import Streamly.Internal.Data.Unfold.Type (Unfold(..)) import qualified Prelude import qualified Streamly.Internal.Data.MutArray.Type as MA @@ -2195,45 +2192,6 @@ toStream = fmap StreamD.fromList toList toStreamRev :: (Monad m, Monad n) => Scanl m a (Stream n a) toStreamRev = fmap StreamD.fromList toListRev --- XXX This does not fuse. It contains a recursive step function. We will need --- a Skip input constructor in the fold type to make it fuse. - --- | Unfold and flatten the input stream of a scan. --- --- @ --- Stream.scanl (unfoldEach u f) == Stream.scanl f . Stream.unfoldEach u --- @ --- --- /Pre-release/ -{-# INLINE unfoldEach #-} -unfoldEach :: Monad m => Unfold m a b -> Scanl m b c -> Scanl m a c -unfoldEach (Unfold ustep inject) (Scanl fstep initial extract final) = - Scanl consume initial extract final - - where - - {-# INLINE produce #-} - produce fs us = do - ures <- ustep us - case ures of - StreamD.Yield b us1 -> do - fres <- fstep fs b - case fres of - Partial fs1 -> produce fs1 us1 - Continue fs1 -> produce fs1 us1 - -- XXX What to do with the remaining stream? - Done c -> return $ Done c - StreamD.Skip us1 -> produce fs us1 - StreamD.Stop -> return $ Partial fs - - {-# INLINE_LATE consume #-} - consume s a = inject a >>= produce s - --- {-# DEPRECATED unfoldMany "Use unfoldEach instead." #-} -{-# INLINE unfoldMany #-} -unfoldMany :: Monad m => Unfold m a b -> Scanl m b c -> Scanl m a c -unfoldMany = unfoldEach - -- | Get the bottom most @n@ elements using the supplied comparison function. -- {-# INLINE bottomBy #-} diff --git a/test/Streamly/Test/Data/Scanl/Combinators.hs b/test/Streamly/Test/Data/Scanl/Combinators.hs index 7201a6d543..4c4916f294 100644 --- a/test/Streamly/Test/Data/Scanl/Combinators.hs +++ b/test/Streamly/Test/Data/Scanl/Combinators.hs @@ -210,22 +210,6 @@ mapMaybeMS = ([1, 2, 3, 4] :: [Int]) [[], [2], [2, 4]] --- An Unfold that streams the elements of an input list. -unfoldList :: Monad m => Unfold.Unfold m [a] a -unfoldList = - Unfold.unfoldrM - (\xs -> return (case xs of { [] -> Nothing; (y:ys) -> Just (y, ys) })) - -unfoldEachS :: Expectation -unfoldEachS = - check (F.unfoldEach unfoldList F.toList) ([[1, 2], [3], [4, 5]] :: [[Int]]) - [[], [1, 2], [1, 2, 3], [1, 2, 3, 4, 5]] - -unfoldManyS :: Expectation -unfoldManyS = - check (F.unfoldMany unfoldList F.toList) ([[1, 2], [3], [4, 5]] :: [[Int]]) - [[], [1, 2], [1, 2, 3], [1, 2, 3, 4, 5]] - -- 'defaultSalt' is the default salt used by 'rollingHash'. It is part of the -- output contract, so the test duplicates the constant rather than importing it. defaultSaltS :: Expectation @@ -327,8 +311,6 @@ main = hspec $ it "takingEndBy_" takingEndByUS it "mapMaybeM" mapMaybeMS - it "unfoldEach" unfoldEachS - it "unfoldMany" unfoldManyS it "defaultSalt" defaultSaltS -- deprecated combinators From 1652e9621ad0fb17da1539e801e29667e648e6b5 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Tue, 30 Jun 2026 16:10:15 +0530 Subject: [PATCH 4/6] Simplify getScanl in Scanl benchmarks --- .../Streamly/Benchmark/Data/Scanl/Container.hs | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/benchmark/Streamly/Benchmark/Data/Scanl/Container.hs b/benchmark/Streamly/Benchmark/Data/Scanl/Container.hs index ab3600674e..68c4094795 100644 --- a/benchmark/Streamly/Benchmark/Data/Scanl/Container.hs +++ b/benchmark/Streamly/Benchmark/Data/Scanl/Container.hs @@ -73,16 +73,6 @@ getKey buckets = (`mod` buckets) limitedSum :: Int -> Scanl IO Int Int limitedSum n = Scanl.take n Scanl.sum -{-# INLINE afterDone #-} -afterDone :: IO () -> Scanl IO a b -> Scanl IO a b -afterDone action (Scanl step i e f) = Scanl step1 i e f - where - step1 x a = do - res <- step x a - case res of - Scanl.Partial s1 -> pure $ Scanl.Partial s1 - Scanl.Done b -> action >> pure (Scanl.Done b) - {-# NOINLINE ref #-} ref :: IORef (Set.Set Int) ref = unsafePerformIO $ newIORef Set.empty @@ -93,9 +83,9 @@ getScanl k = do set <- readIORef ref if Set.member k set then pure Nothing - else pure - $ Just - $ afterDone (modifyIORef ref (Set.insert k)) (limitedSum 100) + else do + modifyIORef ref (Set.insert k) + pure $ Just (limitedSum 100) ------------------------------------------------------------------------------- -- Set operations From 11b7aba118f0216666d161c5c14455728ceac47f Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 29 Jun 2026 18:46:40 +0530 Subject: [PATCH 5/6] Remove recursion in scanlWith in Fold module --- core/src/Streamly/Internal/Data/Fold/Type.hs | 29 ++++++++++++++----- .../Internal/Data/Stream/Transform.hs | 1 + 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Fold/Type.hs b/core/src/Streamly/Internal/Data/Fold/Type.hs index 8ebc0693a3..9e7b2e3023 100644 --- a/core/src/Streamly/Internal/Data/Fold/Type.hs +++ b/core/src/Streamly/Internal/Data/Fold/Type.hs @@ -1729,8 +1729,6 @@ scanlWith isMany where - initialL_ = Scanl.fromFoldStep <$> initialL - {-# INLINE runStep #-} runStep actionL sR = do rL <- actionL @@ -1740,11 +1738,7 @@ scanlWith isMany case rR of Partial sR1 -> if isMany - -- XXX recursive call. If initialL returns Done then it - -- will not terminate. In that case we should return - -- error in the beginning itself. And we should remove - -- this recursion, assuming it won't return Done. - then runStep initialL_ sR1 + then runInitialL sR1 else Done <$> finalR sR1 Done bR -> return $ Done bR Scanl.Partial sL -> do @@ -1755,10 +1749,29 @@ scanlWith isMany Done bR -> finalL sL >> return (Done bR) Scanl.Continue sL -> return $ Partial (sL, sR) + -- Recursive when isMany is true, cannot be inlined + runInitialL sR = do + rL <- initialL + case rL of + Done bL -> do + rR <- stepR sR bL + case rR of + Partial sR1 -> + if isMany + then runInitialL sR1 + else Done <$> finalR sR1 + Done bR -> return $ Done bR + Partial sL -> do + !b <- extractL sL + rR <- stepR sR b + case rR of + Partial sR1 -> return $ Partial (sL, sR1) + Done bR -> finalL sL >> return (Done bR) + initial = do r <- initialR case r of - Partial sR -> runStep initialL_ sR + Partial sR -> runInitialL sR Done b -> return $ Done b step (sL, sR) x = runStep (stepL sL x) sR diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index b1489231b2..f8c9dbc2f4 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -642,6 +642,7 @@ scanlWith restart (Scanl fstep initial extract final) (Stream sstep state) = step gst (ScanDo st fs) = do res <- sstep (adaptState gst) st case res of + -- XXX should we skip to another state here for better fusion? Yield x s -> runStep s (fstep fs x) Skip s -> return $ Skip $ ScanDo s fs Stop -> final fs >> return Stop From 7a10ed4106d5ffd40e2c110848222c5b08992961 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Tue, 30 Jun 2026 16:11:09 +0530 Subject: [PATCH 6/6] Remove recursion in step function of composeMany Add postscanlMany --- .../Internal/Data/Scanl/Combinators.hs | 85 ---------- core/src/Streamly/Internal/Data/Scanl/Type.hs | 150 ++++++++++++++++-- 2 files changed, 135 insertions(+), 100 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Scanl/Combinators.hs b/core/src/Streamly/Internal/Data/Scanl/Combinators.hs index c27e9949b1..8843b4b09c 100644 --- a/core/src/Streamly/Internal/Data/Scanl/Combinators.hs +++ b/core/src/Streamly/Internal/Data/Scanl/Combinators.hs @@ -126,8 +126,6 @@ module Streamly.Internal.Data.Scanl.Combinators -- , slide2 -- ** Scanning Input - , compose - , composeMany -- , runScan , pipe , indexed @@ -196,10 +194,6 @@ module Streamly.Internal.Data.Scanl.Combinators -- ** Nesting -- , concatSequence - - -- * Deprecated - , scanl - , scanlMany ) where @@ -483,78 +477,6 @@ runScan :: Monad m => Scan m a b -> Fold m b c -> Scanl m a c runScan = runScanWith False -} -{-# INLINE scanWith #-} -scanWith :: Monad m => Bool -> Scanl m a b -> Scanl m b c -> Scanl m a c -scanWith isMany - (Scanl stepL initialL extractL finalL) - (Scanl stepR initialR extractR finalR) = - Scanl step initial extract final - - where - - initialL_ = fromFoldStep <$> initialL - - {-# INLINE runStep #-} - runStep actionL sR = do - rL <- actionL - case rL of - Done bL -> do - rR <- stepR sR bL - case rR of - Partial sR1 -> - if isMany - -- XXX recursive call. If initialL returns Done then it - -- will not terminate. In that case we should return - -- error in the beginning itself. And we should remove - -- this recursion, assuming it won't return Done. - then runStep initialL_ sR1 - else Done <$> finalR sR1 - Continue sR1 -> - if isMany - then runStep initialL_ sR1 - else Done <$> finalR sR1 - Done bR -> return $ Done bR - Partial sL -> do - !b <- extractL sL - rR <- stepR sR b - case rR of - Partial sR1 -> return $ Partial (sL, sR1) - Continue sR1 -> return $ Continue (sL, sR1) - Done bR -> finalL sL >> return (Done bR) - Continue sL -> return $ Continue (sL, sR) - - initial = do - r <- initialR - case r of - Fold.Partial sR -> toFoldStep <$> runStep initialL_ sR - Fold.Done b -> return $ Fold.Done b - - step (sL, sR) x = runStep (stepL sL x) sR - - extract = extractR . snd - - final (sL, sR) = finalL sL *> finalR sR - --- | Scan the input of a 'Scanl' to change it in a stateful manner using --- another 'Scanl'. The scan stops as soon as any of the scans terminates. --- --- This is basically an append operation. --- --- /Pre-release/ -{-# INLINE compose #-} -compose, scanl :: Monad m => Scanl m a b -> Scanl m b c -> Scanl m a c -compose = scanWith False - --- XXX This does not fuse beacuse of the recursive step. Need to investigate. - --- | Scan the input of a 'Scanl' to change it in a stateful manner using --- another 'Scanl'. The scan restarts with a fresh state if it terminates. --- --- /Pre-release/ -{-# INLINE composeMany #-} -composeMany, scanlMany :: Monad m => Scanl m a b -> Scanl m b c -> Scanl m a c -composeMany = scanWith True - ------------------------------------------------------------------------------ -- Filters ------------------------------------------------------------------------------ @@ -2374,10 +2296,3 @@ intersperseWithQuotes _ <- finalL sL error "intersperseWithQuotes: finished inside quote, at escape char" -} - ------------------------------------------------------------------------------- --- Deprecated ------------------------------------------------------------------------------- - -RENAME(scanl,compose) -RENAME(scanlMany,composeMany) diff --git a/core/src/Streamly/Internal/Data/Scanl/Type.hs b/core/src/Streamly/Internal/Data/Scanl/Type.hs index 54b600ab6f..75c1750688 100644 --- a/core/src/Streamly/Internal/Data/Scanl/Type.hs +++ b/core/src/Streamly/Internal/Data/Scanl/Type.hs @@ -121,7 +121,10 @@ module Streamly.Internal.Data.Scanl.Type -- ** Mapping Input , lmap , lmapM + , compose + , composeMany , postscanl + , postscanlMany -- ** Filtering , catMaybes @@ -192,6 +195,8 @@ module Streamly.Internal.Data.Scanl.Type , mkScanl1M , mkScant , mkScantM + , scanl + , scanlMany ) where @@ -217,7 +222,8 @@ import qualified Streamly.Internal.Data.Fold.Step as Fold --import qualified Streamly.Internal.Data.Stream.Step as Stream import qualified Streamly.Internal.Data.StreamK.Type as K -import Prelude hiding (Foldable(..), concatMap, filter, map, take, const) +import Prelude + hiding (Foldable(..), concatMap, filter, map, take, const, scanl) -- Entire module is exported, do not import selectively import Streamly.Internal.Data.Scanl.Step @@ -1306,13 +1312,9 @@ lmapM f (Scanl step begin done final) = Scanl step' begin done final where step' x a = f a >>= step x --- | Postscan the input of a 'Scanl' to change it in a stateful manner using --- another 'Scanl'. --- --- /Pre-release/ -{-# INLINE postscanl #-} -postscanl :: Monad m => Scanl m a b -> Scanl m b c -> Scanl m a c -postscanl +{-# INLINE scanWith #-} +scanWith :: Monad m => Bool -> Scanl m a b -> Scanl m b c -> Scanl m a c +scanWith isMany (Scanl stepL initialL extractL finalL) (Scanl stepR initialR extractR finalR) = Scanl step initial extract final @@ -1326,8 +1328,14 @@ postscanl Done bL -> do rR <- stepR sR bL case rR of - Partial sR1 -> Done <$> finalR sR1 - Continue sR1 -> Done <$> finalR sR1 + Partial sR1 -> + if isMany + then fromFoldStep <$> runInitialL sR1 + else Done <$> finalR sR1 + Continue sR1 -> + if isMany + then fromFoldStep <$> runInitialL sR1 + else Done <$> finalR sR1 Done bR -> return $ Done bR Partial sL -> do !b <- extractL sL @@ -1338,14 +1346,94 @@ postscanl Done bR -> finalL sL >> return (Done bR) Continue sL -> return $ Continue (sL, sR) + -- Recursive when isMany is true, cannot be inlined + -- XXX We need to fix the two problematic cases in this after we change the + -- initial return type to Scanl.Step. + runInitialL sR = do + rL <- initialL + case rL of + Fold.Done bL -> do + rR <- stepR sR bL + case rR of + Partial sR1 -> + if isMany + -- NOTE: this may lead to infinite loop unless at some + -- point initialL stops returning Done. + then runInitialL sR1 + else Fold.Done <$> finalR sR1 + Continue sR1 -> + if isMany + then runInitialL sR1 + -- XXX this should terminate without result + else Fold.Done <$> finalR sR1 + Done bR -> return $ Fold.Done bR + Fold.Partial sL -> do + !b <- extractL sL + rR <- stepR sR b + case rR of + Partial sR1 -> return $ Fold.Partial (sL, sR1) + Continue sR1 -> + -- XXX this should be Continue + return $ Fold.Partial (sL, sR1) + Done bR -> finalL sL >> return (Fold.Done bR) + + initial = do + r <- initialR + case r of + Fold.Partial sR -> runInitialL sR + Fold.Done b -> return $ Fold.Done b + + step (sL, sR) x = runStep (stepL sL x) sR + + extract = extractR . snd + + final (sL, sR) = finalL sL *> finalR sR + +{-# INLINE postscanlWith #-} +postscanlWith :: Monad m => Bool -> Scanl m a b -> Scanl m b c -> Scanl m a c +postscanlWith + isMany + (Scanl stepL initialL extractL finalL) + (Scanl stepR initialR extractR finalR) = + Scanl step initial extract final + + where + + {-# INLINE runStep #-} + runStep actionL sR = do + rL <- actionL + case rL of + Done bL -> do + rR <- stepR sR bL + case rR of + Partial sR1 -> + if isMany + then fromFoldStep <$> runInitialL sR1 + else Done <$> finalR sR1 + Continue sR1 -> + if isMany + then fromFoldStep <$> runInitialL sR1 + else Done <$> finalR sR1 + Done bR -> return $ Done bR + Partial sL -> do + !b <- extractL sL + rR <- stepR sR b + case rR of + Partial sR1 -> return $ Partial (sL, sR1) + Continue sR1 -> return $ Continue (sL, sR1) + Done bR -> finalL sL >> return (Done bR) + Continue sL -> return $ Continue (sL, sR) + + runInitialL sR = do + rL <- initialL + case rL of + Fold.Done _ -> Fold.Done <$> finalR sR + Fold.Partial sL -> return $ Fold.Partial (sL, sR) + initial = do rR <- initialR case rR of - Fold.Partial sR -> do - rL <- initialL - case rL of - Fold.Done _ -> Fold.Done <$> finalR sR - Fold.Partial sL -> return $ Fold.Partial (sL, sR) + Fold.Partial sR -> runInitialL sR Fold.Done b -> return $ Fold.Done b -- XXX should use Tuple' @@ -1355,6 +1443,38 @@ postscanl final (sL, sR) = finalL sL *> finalR sR +-- | Postscan the input of a 'Scanl' to change it in a stateful manner using +-- another 'Scanl'. +-- +-- /Pre-release/ +{-# INLINE postscanl #-} +postscanl :: Monad m => Scanl m a b -> Scanl m b c -> Scanl m a c +postscanl = postscanlWith False + +{-# INLINE postscanlMany #-} +postscanlMany :: Monad m => Scanl m a b -> Scanl m b c -> Scanl m a c +postscanlMany = postscanlWith True + +-- | Scan the input of a 'Scanl' to change it in a stateful manner using +-- another 'Scanl'. The scan stops as soon as any of the scans terminates. +-- +-- The composition of the scans works in postscan style i.e. the initial value +-- of the accumulator of the first scan does not drive the second scan. +-- +{-# INLINE compose #-} +compose, scanl :: Monad m => Scanl m a b -> Scanl m b c -> Scanl m a c +compose = scanWith False + +-- | Like 'compose' but the second scan restarts if it terminates. +-- +-- /Pre-release/ +{-# INLINE composeMany #-} +composeMany, scanlMany :: Monad m => Scanl m a b -> Scanl m b c -> Scanl m a c +composeMany = scanWith True + +RENAME(scanl,compose) +RENAME(scanlMany,composeMany) + ------------------------------------------------------------------------------ -- Filtering ------------------------------------------------------------------------------