Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conduit interface #23

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions benchmarks/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import Codec.Xlsx.Parser.Stream
import Codec.Xlsx.Writer.Stream
import Control.DeepSeq
import Control.Lens
import Control.Monad (void)
import Control.Monad (unless, void)
import Control.Monad.IO.Class (liftIO)
import Criterion.Main
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LB
import Data.Conduit ((.|))
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import Data.Foldable (for_)
import Data.IORef
import Data.Maybe

main :: IO ()
Expand All @@ -33,16 +37,30 @@ main = do
, bench "with stream (counting)" $ nfIO $ runXlsxM filename $ countRowsInSheet idx
, bench "with stream (reading)" $ nfIO $ runXlsxM filename $ readSheet idx (pure . rwhnf)
]
, bgroup
"read partial data (100 rows)"
[ bench "with readSheet" $ nfIO $ runXlsxM filename $ do
rowsRef <- liftIO $ newIORef []
readSheet idx $ \ sheetItem -> do
existing <- readIORef rowsRef
unless (length existing > 100) $
writeIORef rowsRef $ sheetItem:existing
liftIO $ readIORef rowsRef
, bench "with stream (conduit)" $ nfIO $ runXlsxM filename $ do
mConduit <- getSheetConduit idx
for_ mConduit $ \conduit ->
liftIO $ C.runConduitRes $ conduit .| C.take 100 .| C.sinkList
]
, bgroup
"writeFile"
[ bench "with xlsx" $ nf (fromXlsx 0) parsed
, bench "with stream (no sst)" $
nfIO $ C.runConduit $
void (writeXlsxWithSharedStrings defaultSettings mempty $ C.yieldMany $ view si_row <$> items)
C..| C.fold
.| C.fold
, bench "with stream (sst)" $
nfIO $ C.runConduit $
void (writeXlsx defaultSettings $ C.yieldMany $ view si_row <$> items)
C..| C.fold
.| C.fold
]
]
136 changes: 105 additions & 31 deletions src/Codec/Xlsx/Parser/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ module Codec.Xlsx.Parser.Stream
, wiSheets
, getWorkbookInfo
, CellRow
-- * using a sheet
, readSheet
, getSheetConduit
, countRowsInSheet
, collectItems
-- ** Index
Expand Down Expand Up @@ -74,6 +76,7 @@ import Codec.Xlsx.Types.Internal.Relationships (Relationship (..),
Relationships (..))
import Conduit (PrimMonad, (.|))
import qualified Conduit as C
import qualified Data.Conduit.Combinators as CC
import qualified Data.Vector as V
#ifdef USE_MICROLENS
import Lens.Micro
Expand All @@ -85,6 +88,7 @@ import Lens.Micro.TH
import Control.Lens
#endif
import Codec.Xlsx.Parser.Internal
import Control.Exception(throwIO)
import Control.Monad.Catch
import Control.Monad.Except
import Control.Monad.Reader
Expand All @@ -107,6 +111,7 @@ import qualified Data.Text.Read as Read
import Data.Traversable (for)
import Data.XML.Types
import GHC.Generics
import GHC.Stack
import Control.DeepSeq
import Codec.Xlsx.Parser.Internal.Memoize

Expand Down Expand Up @@ -281,7 +286,7 @@ parseSharedStringss = do
else do
let state0 = initialSharedStrings
byteSrc <- Zip.getEntrySource sharedStrsSel
st <- liftIO $ runExpat state0 byteSrc $ \evs -> forM_ evs $ \ev -> do
st <- liftIO $ runCallbackExpat state0 byteSrc $ \evs -> forM_ evs $ \ev -> do
mTxt <- parseSharedStrings ev
for_ mTxt $ \txt ->
ss_list %= (`DL.snoc` txt)
Expand All @@ -295,7 +300,7 @@ readWorkbookInfo :: Zip.ZipArchive WorkbookInfo
readWorkbookInfo = do
sel <- Zip.mkEntrySelector "xl/workbook.xml"
src <- Zip.getEntrySource sel
sheets <- liftIO $ runExpat [] src $ \evs -> forM_ evs $ \case
sheets <- liftIO $ runCallbackExpat [] src $ \evs -> forM_ evs $ \case
StartElement ("sheet" :: ByteString) attrs -> do
nm <- lookupBy "name" attrs
sheetId <- lookupBy "sheetId" attrs
Expand All @@ -318,7 +323,7 @@ readWorkbookRelationships :: Zip.ZipArchive Relationships
readWorkbookRelationships = do
sel <- Zip.mkEntrySelector "xl/_rels/workbook.xml.rels"
src <- Zip.getEntrySource sel
liftIO $ fmap Relationships $ runExpat mempty src $ \evs -> forM_ evs $ \case
liftIO $ fmap Relationships $ runCallbackExpat mempty src $ \evs -> forM_ evs $ \case
StartElement ("Relationship" :: ByteString) attrs -> do
rId <- lookupBy "Id" attrs
rTarget <- lookupBy "Target" attrs
Expand Down Expand Up @@ -378,57 +383,122 @@ getSheetXmlSource sheetId = do
Just <$> liftZip (Zip.getEntrySource sheetSel)
_ -> pure Nothing

{-# SCC runExpat #-}
runExpat :: forall state tag text.
getSheetConduit :: (MonadIO m, PrimMonad m, MonadThrow m, C.MonadResource m)
=> SheetIndex ->
XlsxM (Maybe (ConduitT () SheetItem m ()))
getSheetConduit (MkSheetIndex sheetId) = do
msource <- getSheetXmlSource sheetId
initState <- makeInitialSheetState (MkSheetIndex sheetId)
pure $ msource <&> \source -> do
(parseChunk, _getLoc) <- liftIO $ Hexpat.hexpatNewParser Nothing Nothing False
stateRef <- liftIO $ newIORef initState
source
.| expatConduit parseChunk
.| saxRowConduit stateRef
.| CC.map (MkSheetItem sheetId)

expatConduit ::
forall tag text m .
(GenericXMLString tag, GenericXMLString text, MonadIO m, HasCallStack) =>
HParser ->
ConduitT ByteString (SAXEvent tag text) m ()
expatConduit parseChunk = do
mUpstream <- C.await
case mUpstream of
Just upstreamBs -> do
traverse_ C.yield =<< liftIO (processChunk @tag @text parseChunk False upstreamBs)
expatConduit parseChunk
Nothing -> do
traverse_ C.yield =<< liftIO (processChunk @tag @text parseChunk True BS.empty)

saxRowConduit ::
(MonadIO m) =>
IORef SheetState ->
ConduitT (SAXEvent ByteString Text) Row m ()
saxRowConduit sheetStateRef =
CC.concatMapM $ \sax -> do
curState <- liftIO $ readIORef sheetStateRef
(row, nextState) <- flip runStateT curState $ saxToRow sax
liftIO $ writeIORef sheetStateRef nextState
case row of
RowError err -> liftIO $ throwIO err -- crash
RowInProgress -> pure $ Nothing -- filter
RowCompleted completed -> pure $ Just completed -- result

{-# SCC runCallbackExpat #-}
runCallbackExpat :: forall state tag text.
(GenericXMLString tag, GenericXMLString text) =>
state ->
ConduitT () ByteString (C.ResourceT IO) () ->
([SAXEvent tag text] -> StateT state IO ()) ->
IO state
runExpat initialState byteSource handler = do
runCallbackExpat initialState byteSource handler = do
-- Set up state
ref <- newIORef initialState
-- Set up parser and callbacks
(parseChunk, _getLoc) <- Hexpat.hexpatNewParser Nothing Nothing False
let noExtra _ offset = pure ((), offset)
{-# SCC processChunk #-}
{-# INLINE processChunk #-}
processChunk isFinalChunk chunk = do
(buf, len, mError) <- parseChunk chunk isFinalChunk
saxen <- HexpatInternal.parseBuf buf len noExtra
case mError of
Just err -> error $ "expat error: " <> show err
Nothing -> do
state0 <- liftIO $ readIORef ref
state1 <-
{-# SCC "runExpat_runStateT_call" #-}
execStateT (handler $ map fst saxen) state0
let callHandlerState hexpat = do
-- you'd say you could factor this out completly
-- dealing with state shouldn't be part at all of this function
state0 <- readIORef ref
state1 <- execStateT (handler hexpat) state0
writeIORef ref state1

C.runConduitRes $
byteSource .|
C.awaitForever (liftIO . processChunk False)
processChunk True BS.empty
C.awaitForever (\x -> liftIO $
callHandlerState =<< processChunk @tag @text parseChunk False x

)
callHandlerState =<< processChunk @tag @text parseChunk True BS.empty
readIORef ref

{-# SCC processChunk #-}
{-# INLINE processChunk #-}
processChunk :: forall tag text.
(GenericXMLString tag, GenericXMLString text, HasCallStack) =>
HParser -> Bool -> ByteString -> IO [SAXEvent tag text]
processChunk parseChunk isFinalChunk chunk = do
(buf, len, mError) <- parseChunk chunk isFinalChunk
saxen <- HexpatInternal.parseBuf buf len noExtra
case mError of
Just err -> error $ "expat error: " <> show err
Nothing -> do
pure $ map fst saxen
where
noExtra _ offset = pure ((), offset)
runExpatForSheet ::
SheetState ->
ConduitT () ByteString (C.ResourceT IO) () ->
(SheetItem -> IO ()) ->
XlsxM ()
runExpatForSheet initState byteSource inner =
void $ liftIO $ runExpat initState byteSource handler
void $ liftIO $ runCallbackExpat initState byteSource handler
where
sheetName = _ps_sheet_index initState
sheetIndex = _ps_sheet_index initState
handler evs = forM_ evs $ \ev -> do
si <- saxToRow ev
liftIO $ case si of
RowError err -> throwIO err
RowInProgress -> pure ()
RowCompleted completed -> inner $ MkSheetItem sheetIndex $ completed

data SaxToRowResult = RowError SheetErrors -- ^ something went wrong
| RowInProgress -- ^ hasn't finished a row
| RowCompleted Row

saxToRow ::
(HasSheetState m) =>
SAXEvent ByteString Text -> m SaxToRowResult
saxToRow ev = do
parseRes <- runExceptT $ matchHexpatEvent ev
case parseRes of
Left err -> throwM err
Left err -> pure $ RowError err
Right (Just cellRow)
| not (IntMap.null cellRow) -> do
rowNum <- use ps_cell_row_index
liftIO $ inner $ MkSheetItem sheetName $ MkRow rowNum cellRow
_ -> pure ()

pure $ RowCompleted $ MkRow rowNum cellRow
_ -> pure RowInProgress
-- | this will collect the sheetitems in a list.
-- useful for cases were memory is of no concern but a sheetitem
-- type in a list is needed.
Expand Down Expand Up @@ -476,12 +546,16 @@ readSheet (MkSheetIndex sheetId) inner = do
case mSrc of
Nothing -> pure False
Just sourceSheetXml -> do
sheetState0 <- makeInitialSheetState (MkSheetIndex sheetId)
runExpatForSheet sheetState0 sourceSheetXml inner
pure True

makeInitialSheetState :: SheetIndex -> XlsxM SheetState
makeInitialSheetState (MkSheetIndex sheetId) = do
sharedStrs <- getOrParseSharedStringss
let sheetState0 = initialSheetState
pure $ initialSheetState
& ps_shared_strings .~ sharedStrs
& ps_sheet_index .~ sheetId
runExpatForSheet sheetState0 sourceSheetXml inner
pure True

-- | Returns number of rows in the given sheet (identified by the
-- sheet's ID, AKA the sheetId attribute, AKA 'sheetInfoSheetId'), or Nothing
Expand All @@ -493,7 +567,7 @@ countRowsInSheet (MkSheetIndex sheetId) = do
mSrc :: Maybe (ConduitT () ByteString (C.ResourceT IO) ()) <-
getSheetXmlSource sheetId
for mSrc $ \sourceSheetXml -> do
liftIO $ runExpat @Int @ByteString @ByteString 0 sourceSheetXml $ \evs ->
liftIO $ runCallbackExpat @Int @ByteString @ByteString 0 sourceSheetXml $ \evs ->
forM_ evs $ \case
StartElement "row" _ -> modify' (+1)
_ -> pure ()
Expand Down
48 changes: 48 additions & 0 deletions test/StreamTests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import Codec.Xlsx
import Codec.Xlsx.Parser.Stream
import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Conduit.Combinators as CC
import Control.Exception (bracket)
import Control.Lens hiding (indexed)
import Data.Set.Lens
import qualified Data.ByteString.Lazy as LB
Expand Down Expand Up @@ -76,6 +78,17 @@ tests =
$ readWrite bigWorkbook
-- , testCase "Write as stream, see if memory based implementation can read it" $ readWrite testXlsx
-- TODO forall SheetItem write that can be read

, testGroup "Conduit" [
testCase "Write as stream, using conduit parser (simpleWorkbook)" $ readWriteConduit simpleWorkbook
, testCase "Write as stream, using conduit parser (simpleWorkbookRow)" $ readWriteConduit simpleWorkbookRow
, testCase "Write as stream, using conduit parser (bigWorkbook)" $ readWriteConduit bigWorkbook
, testGroup "No sst" [
testCase "Write as stream, using conduit parser (simpleWorkbook)" $ readWriteConduitNoSst simpleWorkbook
, testCase "Write as stream, using conduit parser (simpleWorkbookRow)" $ readWriteConduitNoSst simpleWorkbookRow
, testCase "Write as stream, using conduit parser (bigWorkbook)" $ readWriteConduitNoSst bigWorkbook
]
]
],

testGroup "Reader/inline strings"
Expand All @@ -98,6 +111,41 @@ readWrite input = do
Left x -> do
throwIO x

readWriteConduit :: Xlsx -> IO ()
readWriteConduit input = do
BS.writeFile "testinput.xlsx" (toBs input)
bs <- runXlsxM "testinput.xlsx" $ do
mConduit <- getSheetConduit $ makeIndex 1
case mConduit of
Nothing -> error "sheet should exist"
Just conduit -> liftIO $ runConduitRes $ void (SW.writeXlsx SW.defaultSettings (conduit .| CC.map (view si_row))) .| C.foldC

case toXlsxEither $ LB.fromStrict bs of
Right result ->
input @==? result
Left x -> do
throwIO x

-- No sst behaves differently frmo the normal writexlsx because
-- the sst table isn't first constructed.
-- this results in a single pass instead of a double pass.
-- it turns out that in certain cases this test would pass
-- but the writeXlsx wouldn't, which indicates brittleness within
-- the statefull hexpat parser.
readWriteConduitNoSst :: Xlsx -> IO ()
readWriteConduitNoSst input = do
BS.writeFile "testinput.xlsx" (toBs input)
bs <- runXlsxM "testinput.xlsx" $ do
mConduit <- getSheetConduit $ makeIndex 1
case mConduit of
Nothing -> error "sheet should exist"
Just conduit -> liftIO $ runConduitRes $ void (SW.writeXlsxWithSharedStrings SW.defaultSettings mempty (conduit .| CC.map (view si_row))) .| C.foldC

case toXlsxEither $ LB.fromStrict bs of
Right result ->
input @==? result
Left x -> do
throwIO x
-- test if the input text is also the result (a property we use for convenience)
sharedStringInputSameAsOutput :: Text -> Either String String
sharedStringInputSameAsOutput someText =
Expand Down