Skip to content
This repository has been archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
Add plutus-streaming package (#476)
Browse files Browse the repository at this point in the history
* Add plutus-streaming package

* Do some incantation to make hydra-ci pass.
  • Loading branch information
andreabedini authored May 30, 2022
1 parent 1d89b1e commit 3f76a97
Show file tree
Hide file tree
Showing 10 changed files with 517 additions and 0 deletions.
1 change: 1 addition & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ packages: doc
plutus-playground-server
plutus-script-utils
plutus-use-cases
plutus-streaming
quickcheck-dynamic
web-ghc

Expand Down
1 change: 1 addition & 0 deletions nix/pkgs/haskell/haskell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ let
plutus-pab-executables.package.buildable = false;
plutus-playground-server.package.buildable = false; # Would also require libpq
plutus-script-utils.package.buildable = false;
plutus-streaming.package.buildable = false;
plutus-tx-plugin.package.buildable = false;
plutus-use-cases.package.buildable = false;
plutus-example.package.buildable = false;
Expand Down
42 changes: 42 additions & 0 deletions plutus-streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Plutus Streaming

plutus-streaming is a simple library that wraps cardano-api giving a
streaming interface to the chain-sync protocol. This means you can use this
library to stream blocks from a locally running node.

The blocks are presented as received from the chain-sync protocol so you
will still need to use function from `Cardano.Api` (in cardano-node) or
`Plutus.Ledger`(in plutus-ledger) to extract information from blocks.

# Example applications

## Example 1

This application simply streams to stdout events in JSON format.

## Example 2

This application uses plutus-ledger to extract transactions (transaction
id, ins and outs) and datums. Then it prints them to stdout in JSON format.

## Example 3

This application uses folds the stream into a UTxoState as defined in
plutus-chain-index-core.

# Running the example applications

You can run the example applications with cabal. Remember you need to have
a local node running and reachable through the provided socket.

From Genesis

```
$ cabal run -- plutus-streaming-example-1 --socket-path /tmp/node.socket --mainnet
```

Passing a starting point

```
$ cabal run -- plutus-streaming-example-1 --socket-path /tmp/node.socket --slot-no 53427524 --block-hash 5e2bde4e504a9888a4f218dafc79a7619083f97d48684fcdba9dc78190df8f99
```
77 changes: 77 additions & 0 deletions plutus-streaming/examples/Common.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}
module Common where

import Cardano.Api qualified
import Data.Aeson qualified
import Data.ByteString.Lazy.Char8 qualified
import Options.Applicative (Alternative ((<|>)), Parser, auto, execParser, flag', help, helper, info, long, metavar,
option, str, strOption, (<**>))
import Orphans ()
import Streaming.Prelude qualified as S

--
-- Options parsing
--

data Options = Options
{ optionsSocketPath :: String,
optionsNetworkId :: Cardano.Api.NetworkId,
optionsChainPoint :: Cardano.Api.ChainPoint
}
deriving (Show)

optionsParser :: Parser Options
optionsParser =
Options
<$> strOption (long "socket-path" <> help "Node socket path")
<*> networkIdParser
<*> chainPointParser

networkIdParser :: Parser Cardano.Api.NetworkId
networkIdParser =
pMainnet' <|> fmap Cardano.Api.Testnet testnetMagicParser
where
pMainnet' :: Parser Cardano.Api.NetworkId
pMainnet' =
flag'
Cardano.Api.Mainnet
( long "mainnet"
<> help "Use the mainnet magic id."
)

testnetMagicParser :: Parser Cardano.Api.NetworkMagic
testnetMagicParser =
Cardano.Api.NetworkMagic
<$> option
auto
( long "testnet-magic"
<> metavar "NATURAL"
<> help "Specify a testnet magic id."
)

chainPointParser :: Parser Cardano.Api.ChainPoint
chainPointParser =
pure Cardano.Api.ChainPointAtGenesis
<|> ( Cardano.Api.ChainPoint
<$> option (Cardano.Api.SlotNo <$> auto) (long "slot-no" <> metavar "SLOT-NO")
<*> option str (long "block-hash" <> metavar "BLOCK-HASH")
)

parseOptions :: IO Options
parseOptions = execParser $ info (optionsParser <**> helper) mempty

printJson :: Data.Aeson.ToJSON a => S.Stream (S.Of a) IO r -> IO r
printJson = S.mapM_ Data.ByteString.Lazy.Char8.putStrLn . S.map Data.Aeson.encode

-- https://github.com/input-output-hk/cardano-node/pull/3665
workaround ::
(Cardano.Api.IsCardanoEra era => Cardano.Api.EraInMode era Cardano.Api.CardanoMode -> a) ->
Cardano.Api.EraInMode era Cardano.Api.CardanoMode ->
a
workaround k Cardano.Api.ByronEraInCardanoMode = k Cardano.Api.ByronEraInCardanoMode
workaround k Cardano.Api.ShelleyEraInCardanoMode = k Cardano.Api.ShelleyEraInCardanoMode
workaround k Cardano.Api.AllegraEraInCardanoMode = k Cardano.Api.AllegraEraInCardanoMode
workaround k Cardano.Api.MaryEraInCardanoMode = k Cardano.Api.MaryEraInCardanoMode
workaround k Cardano.Api.AlonzoEraInCardanoMode = k Cardano.Api.AlonzoEraInCardanoMode

27 changes: 27 additions & 0 deletions plutus-streaming/examples/Example1.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module Main where

import Cardano.Api qualified
import Common (Options (Options, optionsChainPoint, optionsNetworkId, optionsSocketPath), parseOptions)
import Data.Aeson.Text qualified as Aeson
import Data.Text.Lazy qualified as TL
import Orphans ()
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), withChainSyncEventStream)
import Streaming.Prelude qualified as S

--
-- Main
--

main :: IO ()
main = do
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <- parseOptions

withChainSyncEventStream optionsSocketPath optionsNetworkId optionsChainPoint $
S.stdoutLn
. S.map
( \case
RollForward (Cardano.Api.BlockInMode (Cardano.Api.Block header _txs) _era) _ct ->
"RollForward, header: " <> TL.unpack (Aeson.encodeToLazyText header)
RollBackward cp _ct ->
"RollBackward, point: " <> TL.unpack (Aeson.encodeToLazyText cp)
)
56 changes: 56 additions & 0 deletions plutus-streaming/examples/Example2.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
module Main where

import Cardano.Api qualified
import Common (Options (Options, optionsChainPoint, optionsNetworkId, optionsSocketPath), parseOptions, printJson,
workaround)
import Data.Foldable (toList)
import Ledger qualified
import Orphans ()
import Plutus.Script.Utils.V1.Scripts qualified
import Plutus.Streaming (withChainSyncEventStream)
import Streaming.Prelude qualified as S

--
-- Accessory functions
--

transactions ::
Cardano.Api.BlockInMode Cardano.Api.CardanoMode ->
[Ledger.CardanoTx]
transactions (Cardano.Api.BlockInMode (Cardano.Api.Block _ txs) eim) =
map (\tx -> Ledger.CardanoApiTx (workaround (Ledger.SomeTx tx) eim)) txs

txInsAndOuts ::
Ledger.CardanoTx ->
(Ledger.TxId, [Ledger.TxOutRef], [Ledger.TxOutRef])
txInsAndOuts tx = (txId, inRefs, outRefs)
where
txId = Ledger.getCardanoTxId tx
inRefs = map Ledger.txInRef $ toList $ Ledger.getCardanoTxInputs tx
outRefs = map snd $ Ledger.getCardanoTxOutRefs tx

datums ::
Ledger.CardanoTx ->
[(Plutus.Script.Utils.V1.Scripts.DatumHash, Plutus.Script.Utils.V1.Scripts.Datum)]
datums tx = do
let txIns = toList $ Ledger.getCardanoTxInputs tx
(Ledger.TxIn _ (Just (Ledger.ConsumeScriptAddress _validator _redeemer datum))) <- txIns
pure (Plutus.Script.Utils.V1.Scripts.datumHash datum, datum)

--
-- Main
--

main :: IO ()
main = do
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <- parseOptions

withChainSyncEventStream optionsSocketPath optionsNetworkId optionsChainPoint $
printJson
. S.map -- Each ChainSyncEvent
( fmap -- Inside the payload of RollForward events
( fmap -- Each transaction
(\tx -> (txInsAndOuts tx, datums tx))
. transactions
)
)
53 changes: 53 additions & 0 deletions plutus-streaming/examples/Example3.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module Main where

import Cardano.Api qualified
import Common (Options (Options, optionsChainPoint, optionsNetworkId, optionsSocketPath), parseOptions)
import Plutus.ChainIndex (TxUtxoBalance)
import Plutus.ChainIndex.Compatibility qualified as CI
import Plutus.ChainIndex.TxUtxoBalance qualified as TxUtxoBalance
import Plutus.ChainIndex.UtxoState (UtxoIndex, UtxoState)
import Plutus.ChainIndex.UtxoState qualified as UtxoState
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), withChainSyncEventStream)
import Streaming (Of, Stream)
import Streaming.Prelude qualified as S

utxoState ::
Monad m =>
Stream (Of (ChainSyncEvent (Cardano.Api.BlockInMode Cardano.Api.CardanoMode))) m r ->
Stream (Of (ChainSyncEvent (Cardano.Api.BlockInMode Cardano.Api.CardanoMode), UtxoState TxUtxoBalance)) m r
utxoState =
S.scanned step initial projection
where
step index (RollForward block _) =
case CI.fromCardanoBlock block of
Left err -> error ("FromCardanoError: " <> show err)
Right txs ->
let tip = CI.tipFromCardanoBlock block
balance = TxUtxoBalance.fromBlock tip txs
in case UtxoState.insert balance index of
Left err ->
error (show err)
Right (UtxoState.InsertUtxoSuccess newIndex _insertPosition) ->
newIndex
step index (RollBackward cardanoPoint _) =
let point = CI.fromCardanoPoint cardanoPoint
in case TxUtxoBalance.rollback point index of
Left err -> error (show err)
Right (UtxoState.RollbackResult _newTip rolledBackIndex) ->
rolledBackIndex

initial :: UtxoIndex TxUtxoBalance
initial = mempty

projection = UtxoState.utxoState

--
-- Main
--

main :: IO ()
main = do
Options {optionsSocketPath, optionsNetworkId, optionsChainPoint} <- parseOptions

withChainSyncEventStream optionsSocketPath optionsNetworkId optionsChainPoint $
S.print . utxoState
38 changes: 38 additions & 0 deletions plutus-streaming/examples/Orphans.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{-# LANGUAGE FlexibleInstances #-}
{-# OPTIONS_GHC -Wno-orphans #-}

module Orphans where

import Cardano.Api (BlockHeader (BlockHeader), BlockNo, ChainPoint (ChainPoint, ChainPointAtGenesis),
HasTypeProxy (proxyToAsType), Hash, SerialiseAsRawBytes (deserialiseFromRawBytes), ToJSON)
import Data.ByteString.Base16 qualified as Base16
import Data.ByteString.Char8 qualified as C8
import Data.Proxy (Proxy (Proxy))
import Data.String (IsString (fromString))
import GHC.Generics (Generic)
import Plutus.Streaming (ChainSyncEvent)

-- https://github.com/input-output-hk/cardano-node/pull/3608
instance IsString (Hash BlockHeader) where
fromString = either error id . deserialiseFromRawBytesBase16 . C8.pack
where
deserialiseFromRawBytesBase16 str =
case Base16.decode str of
Right raw -> case deserialiseFromRawBytes ttoken raw of
Just x -> Right x
Nothing -> Left ("cannot deserialise " ++ show str)
Left msg -> Left ("invalid hex " ++ show str ++ ", " ++ msg)
where
ttoken = proxyToAsType (Proxy :: Proxy a)

deriving instance Generic ChainPoint

instance ToJSON ChainPoint

instance ToJSON BlockNo

deriving instance Generic BlockHeader

instance ToJSON BlockHeader

instance ToJSON a => ToJSON (ChainSyncEvent a)
Loading

0 comments on commit 3f76a97

Please sign in to comment.