Skip to content

Commit

Permalink
exception handle to introspect voting cluster runs
Browse files Browse the repository at this point in the history
  • Loading branch information
NadiaYvette committed Nov 18, 2024
1 parent 260bf1e commit c9e9e39
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 41 deletions.
18 changes: 15 additions & 3 deletions bench/tx-generator/src/Cardano/Benchmarking/Command.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ import Data.Text.IO as T
import Options.Applicative as Opt
import Ouroboros.Network.NodeToClient (IOManager, withIOManager)

import GHC.Stack (currentCallStack)

import System.Exit
import qualified System.IO as IO (BufferMode (..), hSetBuffering, stdout)
import qualified System.IO as IO (BufferMode (..), hPutStrLn, hSetBuffering, stderr, stdout)

#ifdef UNIX
import qualified Cardano.Logging as Tracer (traceWith)
Expand All @@ -58,7 +60,7 @@ import qualified Control.Concurrent as Weak (mkWeakThreadId)
import qualified Control.Concurrent.Async as Async (asyncThreadId, cancelWith)
import qualified Control.Concurrent.STM as STM (readTVar)
import Control.Exception (AssertionFailed (..))
import Control.Monad.Catch (MonadThrow (..))
import Control.Monad.Catch (MonadThrow (..), handleAll)
import Control.Monad.Extra (whenJustM)
import Control.Monad.IO.Class (liftIO)
import qualified Control.Monad.STM as STM (atomically)
Expand Down Expand Up @@ -90,9 +92,18 @@ data Command
| Compile FilePath
| Selftest Bool (Maybe FilePath) -- True for selftesting the voting workload; specifying an optional file for dumping txns via Show
| VersionCmd
deriving (Eq, Read, Show)

runCommand :: IO ()
runCommand = withIOManager runCommand'
runCommand = handleAll fallbackHandler $ withIOManager runCommand' where
fallbackHandler exception = do
IO.hPutStrLn IO.stderr $
List.unwords [ "runCommand: caught exception"
, show exception ]
mapM_ (IO.hPutStrLn IO.stderr) =<< currentCallStack
whenJustM getThreads \threadList -> do
Fold.forM_ threadList $ IO.hPutStrLn IO.stderr . show
throwM exception

throwStr :: forall m t . MonadThrow m => String -> m t
throwStr = throwM . AssertionFailed
Expand All @@ -110,6 +121,7 @@ eitherErr errMsg = maybeErr errMsg . eitherToMaybe
runCommand' :: IOManager -> IO ()
runCommand' iocp = do
let putMsg msg = T.putStrLn $ "runCommand': " <> msg
IO.hSetBuffering IO.stderr IO.NoBuffering
IO.hSetBuffering IO.stdout IO.NoBuffering
putMsg "entered"
envConsts <- installSignalHandler iocp
Expand Down
26 changes: 19 additions & 7 deletions bench/tx-generator/src/Cardano/Benchmarking/GeneratorTx.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,18 @@ import Cardano.TxGenerator.Types (NumberOfTxs, TPSRate, TxGenError (..

import Prelude (String)

import qualified Control.Concurrent.Async as Async (async)
import qualified Control.Exception as Except (handle)
import qualified Control.Concurrent.STM.TMVar as STM (newEmptyTMVar)
import qualified Control.Monad.STM as STM (atomically)
import qualified Data.List as List (unwords)
import qualified Data.List.NonEmpty as NE
import Data.Text (pack)
import qualified Data.Time.Clock as Clock
import Data.Tuple.Extra (secondM)
import GHC.Conc as Conc (labelThread)
import qualified GHC.Conc as Conc (labelThread)
import qualified GHC.Stack as Stack (currentCallStack)
import qualified System.IO as IO (hPutStrLn, stderr)

-- For some reason, stylish-haskell wants to delete this.
#if MIN_VERSION_base(4,18,0)
Expand Down Expand Up @@ -100,7 +104,8 @@ handleTxSubmissionClientError
[ "Thread"
, show tid
, labelStr
, "Exception while talking to peer "
, "Exception caught by handleTxSubmissionClientError"
, "while talking to peer "
, remoteName
, "(" ++ show (addrAddress remoteAddr) ++ "):"
, show err ]
Expand Down Expand Up @@ -162,19 +167,26 @@ walletBenchmark
(txStreamSource txStreamRef tpsThrottle)
(submitSubmissionThreadStats reportRef)
remoteAddrString = show $ addrAddress remoteAddrInfo
asyncThread <- async do handle errorHandler $ connectClient remoteAddrInfo client
mkLabel tid = "txSubmissionClient " ++ show tid ++
" servicing " ++ remoteName ++ " (" ++
remoteAddrString ++ ")"
asyncThread <- Async.async do
tid <- myThreadId
IO.hPutStrLn IO.stderr $
"spawned thread " <> show tid <> " " <> mkLabel tid <> " from:"
mapM_ (IO.hPutStrLn IO.stderr . show) =<< Stack.currentCallStack
Except.handle errorHandler $ connectClient remoteAddrInfo client
let tid = asyncThreadId asyncThread
Conc.labelThread tid $ "txSubmissionClient " ++ show tid ++
" servicing " ++ remoteName ++ " (" ++ remoteAddrString ++ ")"
Conc.labelThread tid $ mkLabel tid
pure asyncThread

abcFeeder <- async $ do
abcFeeder <- Async.async $ do
startSending tpsThrottle
traceWith traceSubmit $ TraceBenchTxSubDebug "tpsLimitedFeeder : transmitting done"
STM.atomically $ sendStop tpsThrottle
traceWith traceSubmit $ TraceBenchTxSubDebug "tpsLimitedFeeder : shutdown done"
let tid = asyncThreadId abcFeeder
labelThread tid $ "tpsThrottleThread " ++ show tid
Conc.labelThread tid $ "tpsThrottleThread " ++ show tid

let abcShutdown = do
cancel abcFeeder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import qualified Data.Foldable as Foldable
import Data.Ratio
import Data.Time (defaultTimeLocale, formatTime)
import Data.Time.Clock.System (getSystemTime, systemToUTCTime)
import qualified GHC.Conc as Conc (myThreadId)
import Lens.Micro ((^.))


Expand Down Expand Up @@ -173,7 +174,7 @@ debugDumpProposalsPeriodically NixServiceOptions{..} =
BSL.writeFile (fileNameProposals tStamp) (prettyPrintOrdered props)

void $ forkIO $ forever $ do
!_ <- threadBody `catch` \SomeException{} -> pure ()
!_ <- threadBody `catch` \SomeException{} -> handleQueryError
threadDelay 60_000_000 -- 1 minute

-- an ExceptT for the masses
Expand All @@ -188,3 +189,8 @@ debugDumpProposalsPeriodically NixServiceOptions{..} =

timeStampFormat :: String
timeStampFormat = "%H-%M-%S"

handleQueryError :: IO ()
handleQueryError = do
myTID <- Conc.myThreadId
putStrLn $ "(" <> show myTID <> ") debugDumpProposalsPeriodically:"
75 changes: 45 additions & 30 deletions bench/tx-generator/src/Cardano/Benchmarking/TpsThrottle.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
{-# LANGUAGE CPP #-}

module Cardano.Benchmarking.TpsThrottle
where
( Step (..)
, TpsThrottle (..)
, consumeTxsBlocking
, consumeTxsNonBlocking
, newTpsThrottle) where

import Cardano.Benchmarking.Types
import Cardano.Benchmarking.Types (Req)
import Cardano.TxGenerator.Types (TPSRate)

import Prelude

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM as STM
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM (STM, TMVar)
import qualified Control.Concurrent.STM as STM (atomically, newEmptyTMVarIO, orElse, putTMVar, retry, takeTMVar, tryTakeTMVar)
import qualified Data.Time.Clock as Clock (diffUTCTime, getCurrentTime)

#ifdef TPS_THROTTLE_TEST
import qualified Control.Concurrent as Conc (forkIO)
import qualified GHC.Conc as Conc (labelThread, myThreadId)
import Control.Monad
import qualified Data.Time.Clock as Clock
import GHC.Conc (labelThread, myThreadId)
#endif

data Step = Next | Stop
deriving (Eq, Show)
Expand All @@ -30,24 +41,24 @@ data TpsThrottle = TpsThrottle {

newTpsThrottle :: Int -> Int -> TPSRate -> IO TpsThrottle
newTpsThrottle buffersize count tpsRate = do
var <- newEmptyTMVarIO
var <- STM.newEmptyTMVarIO
return $ TpsThrottle {
startSending = sendNTicks tpsRate buffersize count var
, sendStop = putTMVar var Nothing
, receiveBlocking = takeTMVar var >>= receiveAction var
, sendStop = STM.putTMVar var Nothing
, receiveBlocking = STM.takeTMVar var >>= receiveAction var
, receiveNonBlocking =
(Just <$> (takeTMVar var >>= receiveAction var )) `orElse` return Nothing
(Just <$> (STM.takeTMVar var >>= receiveAction var )) `STM.orElse` return Nothing
}

receiveAction :: TMVar (Maybe Int) -> Maybe Int -> STM Step
receiveAction var state = case state of
Nothing -> do
putTMVar var Nothing
STM.putTMVar var Nothing
return Stop
Just 1 -> return Next -- leave var empty, i.e. block submission until sendNTicks unblocks
Just n -> do
-- decrease counter and let other threads transmit
putTMVar var $ Just $ pred n
STM.putTMVar var $ Just $ pred n
return Next

sendNTicks :: TPSRate -> Int -> Int -> TMVar (Maybe Int) -> IO ()
Expand All @@ -65,14 +76,16 @@ sendNTicks rate buffersize count var = do
threadDelay . ceiling $ (realToFrac delay * 1000000.0 :: Double)
worker (pred n) now delay
-- increaseWatermark can retry/block if there are already buffersize ticks in the "queue"
increaseWatermark = atomically $ do
s <- tryTakeTMVar var
increaseWatermark = STM.atomically $ do
s <- STM.tryTakeTMVar var
case s of
Nothing -> putTMVar var $ Just 1
Just Nothing -> putTMVar var Nothing -- error "startTicks unreachable state : Just Nothing"
Nothing -> STM.putTMVar var $ Just 1
Just Nothing ->
-- error "startTicks unreachable state : Just Nothing"
STM.putTMVar var Nothing
Just (Just n) -> if n == buffersize
then retry -- block if buffer is full
else putTMVar var $ Just $ succ n
then STM.retry -- block if buffer is full
else STM.putTMVar var $ Just $ succ n

consumeTxsBlocking :: TpsThrottle -> Req -> IO (Step, Int)
consumeTxsBlocking tpsThrottle req = go req 0
Expand All @@ -93,33 +106,34 @@ consumeTxsNonBlocking tpsThrottle req
Just Stop -> pure (Stop, 0)
Just Next -> pure (Next, 1)

test :: IO ()
test = do
#ifdef TPS_THROTTLE_TEST
testThrottle :: IO ()
testThrottle = do
t <- newTpsThrottle 10 50 2
_threadId <- startThrottle t
threadDelay 5000000
forM_ [1 .. 5] $ \i -> forkIO $ consumer t i
forM_ [6 .. 7] $ \i -> forkIO $ consumer2 t i
forM_ [1 .. 5] $ \i -> Conc.forkIO $ consumer t i
forM_ [6 .. 7] $ \i -> Conc.forkIO $ consumer2 t i
putStrLn "done"
where
startThrottle t = forkIO $ do
startThrottle t = Conc.forkIO $ do
startSending t
putStrLn "startThrottle done"
atomically $ sendStop t
STM.atomically $ sendStop t

consumer :: TpsThrottle -> Int -> IO ()
consumer t n = do
tid <- myThreadId
labelThread tid $ "TpsThrottle consumer " ++ show n ++ " ThreadId = " ++ show tid
s <- atomically $ receiveBlocking t
tid <- Conc.myThreadId
Conc.labelThread tid $ "TpsThrottle consumer " ++ show n ++ " ThreadId = " ++ show tid
s <- STM.atomically $ receiveBlocking t
print (n, s)
if s == Next then consumer t n else putStrLn $ "Done " ++ show n

consumer2 :: TpsThrottle -> Int -> IO ()
consumer2 t n = do
tid <- myThreadId
labelThread tid $ "TpsThrottle consumer2 " ++ show n ++ " ThreadId = " ++ show tid
r <- atomically $ receiveNonBlocking t
tid <- Conc.myThreadId
Conc.labelThread tid $ "TpsThrottle consumer2 " ++ show n ++ " ThreadId = " ++ show tid
r <- STM.atomically $ receiveNonBlocking t
case r of
Just s -> do
print (n, s)
Expand All @@ -128,3 +142,4 @@ test = do
putStrLn $ "wait " ++ show n
threadDelay 100000
consumer2 t n
#endif

0 comments on commit c9e9e39

Please sign in to comment.