Skip to content

Commit

Permalink
refactor tx-gen signal setup
Browse files Browse the repository at this point in the history
  • Loading branch information
NadiaYvette committed Nov 18, 2024
1 parent 8f4c6bc commit 260bf1e
Showing 1 changed file with 127 additions and 91 deletions.
218 changes: 127 additions & 91 deletions bench/tx-generator/src/Cardano/Benchmarking/Command.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,34 +52,36 @@ import System.Exit
import qualified System.IO as IO (BufferMode (..), hSetBuffering, stdout)

#ifdef UNIX
import Cardano.Logging as Tracer (traceWith)
import Control.Concurrent as Conc (killThread, myThreadId)
import Control.Concurrent as Weak (mkWeakThreadId)
import Control.Concurrent.Async as Async (cancelWith)
import Control.Concurrent.STM as STM (readTVar)
import qualified Cardano.Logging as Tracer (traceWith)
import qualified Control.Concurrent as Conc (ThreadId, killThread, myThreadId)
import qualified Control.Concurrent as Weak (mkWeakThreadId)
import qualified Control.Concurrent.Async as Async (asyncThreadId, cancelWith)
import qualified Control.Concurrent.STM as STM (readTVar)
import Control.Exception (AssertionFailed (..))
import Control.Monad.Catch (MonadThrow (..))
import Control.Monad.Extra (whenJustM)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM as STM (atomically)
import Data.Foldable as Fold (forM_)
import Data.List as List (unwords)
import Data.Time.Format as Time (defaultTimeLocale, formatTime)
import Data.Time.Clock.System as Time (getSystemTime, systemToUTCTime)
import GHC.Weak as Weak (deRefWeak)
import qualified Control.Monad.STM as STM (atomically)
import qualified Data.Foldable as Fold (forM_)
import qualified Data.List as List (intercalate, unwords)
import qualified Data.Time.Format as Time (defaultTimeLocale, formatTime)
import qualified Data.Time.Clock.System as Time (getSystemTime, systemToUTCTime)
import qualified Data.Time.Clock as Time (UTCTime)
import qualified GHC.Weak as Weak (Weak, deRefWeak)

import System.FilePath ((</>))
import System.Posix.Signals as Sig (Handler (CatchInfo),
import qualified System.Posix.Signals as Sig (Handler (CatchInfo),
SignalInfo (..), SignalSpecificInfo (..), installHandler,
sigINT, sigTERM)
#if MIN_VERSION_base(4,18,0)
import Data.Maybe as Maybe (fromMaybe)
import GHC.Conc.Sync as Conc (threadLabel)
import qualified Data.Maybe as Maybe (fromMaybe)
import qualified GHC.Conc.Sync as Conc (listThreads, threadLabel)
#endif
#endif

#ifdef UNIX
deriving instance Show SignalInfo
deriving instance Show SignalSpecificInfo
deriving instance Show Sig.SignalInfo
deriving instance Show Sig.SignalSpecificInfo
#endif

data Command
Expand Down Expand Up @@ -110,7 +112,7 @@ runCommand' iocp = do
let putMsg msg = T.putStrLn $ "runCommand': " <> msg
IO.hSetBuffering IO.stdout IO.NoBuffering
putMsg "entered"
envConsts <- installSignalHandler
envConsts <- installSignalHandler iocp
putMsg "installSignalHandler done"
putMsg "doing customExecParser"
cmd <- customExecParser
Expand Down Expand Up @@ -205,92 +207,126 @@ runCommand' iocp = do
Right script -> runScript emptyEnv script consts' >>= handleError . fst
err -> die $ "tx-generator:Cardano.Command.runCommand JsonHL: " ++ show err
Compile file -> do
putMsg $ "case Compile "
<> T.pack (show file)
o <- parseJSONFile fromJSON file
case compileOptions o of
Right script -> BSL.putStr $ prettyPrint script
Left err -> die $ "tx-generator:Cardano.Command.runCommand Compile: " ++ show err
Selftest doVoting outFile -> runSelftest emptyEnv envConsts doVoting outFile >>= handleError
Selftest doVoting outFile -> do
putMsg $ "case Selftest "
<> T.pack (show doVoting)
<> " " <> T.pack (show outFile)
runSelftest emptyEnv envConsts doVoting outFile >>= handleError
VersionCmd -> runVersionCommand
where
handleError :: Show a => Either a b -> IO ()
handleError = \case
Right _ -> exitSuccess
Left err -> die $ "tx-generator:Cardano.Command.runCommand handleError: " ++ show err
installSignalHandler :: IO EnvConsts
installSignalHandler = do
-- The main thread does not appear in the set of asyncs.
wkMainTID <- Weak.mkWeakThreadId =<< myThreadId
envConsts@EnvConsts { .. } <- STM.atomically $ newEnvConsts iocp Nothing
abc <- STM.atomically $ STM.readTVar envThreads
_ <- pure (abc, wkMainTID)

handleError :: Show a => Either a b -> IO ()
handleError = \case
Right _ -> exitSuccess
Left err -> die $
"tx-generator:Cardano.Command.runCommand handleError: " ++ show err

#ifdef UNIX
let signalHandler = Sig.CatchInfo signalHandler'
signalHandler' sigInfo = do
tid <- Conc.myThreadId
utcTime <- Time.systemToUTCTime <$> Time.getSystemTime
-- It's meant to match Cardano.Tracers.Handlers.Logs.Utils
-- The hope was to avoid the package dependency.
let formatTimeStamp = formatTime' "%Y-%m-%dT%H-%M-%S"
formatTime' = Time.formatTime Time.defaultTimeLocale
timeStamp = formatTimeStamp utcTime
#if MIN_VERSION_base(4,18,0)
maybeLabel <- Conc.threadLabel tid
let labelStr' :: String
labelStr' = fromMaybe "(thread label unset)" maybeLabel
getLabel :: Conc.ThreadId -> IO String
getLabel tid = do
maybeLabel <- Conc.threadLabel tid
pure $ Maybe.fromMaybe "(thread label unset)" maybeLabel

getThreads :: IO (Maybe [Conc.ThreadId])
getThreads = Just <$> Conc.listThreads
#else
labelStr' = "(base version insufficient to read thread label)"
getLabel :: Conc.ThreadId -> IO String
getLabel _ = pure "(base version insufficient to read thread label)"

getThreads :: IO (Maybe [Conc.ThreadId])
getThreads = pure Nothing
#endif
labelStr :: String
labelStr = List.unwords [ timeStamp
, labelStr'
, show tid
, "received signal"
, show sigInfo ]
errorToThrow :: IOError
errorToThrow = userError labelStr
tag = TraceBenchTxSubError . T.pack
traceWith' msg = do
mBenchTracer <- STM.atomically do readTVar benchTracers
case mBenchTracer of
Nothing -> pure ()
Just tracers -> do
let wrappedMsg = tag msg
submittedTracers = btTxSubmit_ tracers
Tracer.traceWith submittedTracers wrappedMsg

Prelude.putStrLn labelStr
traceWith' labelStr
mABC <- STM.atomically $ STM.readTVar envThreads
case mABC of
Nothing -> do
-- Catching a signal at this point makes it a higher than
-- average risk of the tracer not being initialized, so
-- this pursues some alternatives.
let errMsg = "Signal received before AsyncBenchmarkControl creation."
Prelude.putStrLn errMsg
traceWith' errMsg
Just AsyncBenchmarkControl { .. } -> do
abcFeeder `Async.cancelWith` errorToThrow
Fold.forM_ abcWorkers \work -> do
work `Async.cancelWith` errorToThrow
-- The main thread does __NOT__ appear in the above list.
-- In order to kill that off, this, or some equivalent,
-- absolutely /must/ be done separately.
mapM_ Conc.killThread =<< Weak.deRefWeak wkMainTID
Fold.forM_ [Sig.sigINT, Sig.sigTERM] $ \sig ->
Sig.installHandler sig signalHandler Nothing
#endif /* UNIX */

#ifdef UNIX
signalHandler :: Weak.Weak Conc.ThreadId -> EnvConsts -> Sig.SignalInfo -> IO ()
signalHandler wkMainTID EnvConsts {..} sigInfo = do
tid <- Conc.myThreadId
utcTime <- Time.systemToUTCTime <$> Time.getSystemTime
-- It's meant to match Cardano.Tracers.Handlers.Logs.Utils
-- The hope was to avoid the package dependency.
labelStr' <- getLabel tid
let labelStr :: String
labelStr = List.unwords [ formatTimeStamp utcTime
, labelStr'
, show tid
, "received signal"
, show sigInfo ]

Prelude.putStrLn labelStr
traceWith' labelStr
getThreads >>= \case
Nothing -> Prelude.putStrLn "no thread list available"
Just threadList ->
Prelude.putStrLn $ "thread list: "
<> List.intercalate ", " (Prelude.map show threadList)
STM.atomically (STM.readTVar envThreads) >>= \case
Nothing
-- Catching a signal at this point makes it a higher than
-- average risk of the tracer not being initialized, so
-- this pursues some alternatives.
| errMsg <- "Signal received before "
<> "AsyncBenchmarkControl creation."
-> do Prelude.putStrLn errMsg
traceWith' errMsg
Just AsyncBenchmarkControl { .. }
| errorToThrow <- userError labelStr
, errMsg <- "Signal received after "
<> "AsyncBenchmarkControl creation."
-> do Prelude.putStrLn errMsg
traceWith' errMsg
abcFeeder `Async.cancelWith` errorToThrow
Fold.forM_ abcWorkers \work -> do
Prelude.putStrLn . List.unwords $
[ show $ Async.asyncThreadId work
, "(placeholder for add'l info to be retrieved)"]
work `Async.cancelWith` errorToThrow
-- The main thread does __NOT__ appear in the above list.
-- In order to kill that off, this, or some equivalent,
-- absolutely /must/ be done separately.
mapM_ Conc.killThread =<< Weak.deRefWeak wkMainTID
where
formatTimeStamp :: Time.UTCTime -> String
formatTimeStamp = formatTime' "%Y-%m-%dT%H-%M-%S"
formatTime' :: String -> Time.UTCTime -> String
formatTime' = Time.formatTime Time.defaultTimeLocale
traceWith' :: String -> IO ()
traceWith' msg
| wrappedMsg <- TraceBenchTxSubError $ T.pack msg
= whenJustM (STM.atomically $ STM.readTVar benchTracers) \tracers ->
Tracer.traceWith (btTxSubmit_ tracers) wrappedMsg
#endif /* UNIX */

#ifdef UNIX
installSignalHandler :: IOManager -> IO EnvConsts
installSignalHandler iocp = do
-- The main thread does not appear in the set of asyncs.
wkMainTID <- Weak.mkWeakThreadId =<< Conc.myThreadId
envConsts <- STM.atomically $ newEnvConsts iocp Nothing
let handler = Sig.CatchInfo $ signalHandler wkMainTID envConsts
Fold.forM_ [Sig.sigINT, Sig.sigTERM] $ \sig ->
Sig.installHandler sig handler Nothing
pure envConsts
#else
installSignalHandler :: IOManager -> IO EnvConsts
installSignalHandler iocp = STM.atomically $ newEnvConsts iocp Nothing
#endif
pure envConsts

mangleNodeConfig :: Maybe FilePath -> NixServiceOptions -> IO NixServiceOptions
mangleNodeConfig fp opts = case (NixService.getNodeConfigFile opts, fp) of
(_ , Just newFilePath) -> pure $ NixService.setNodeConfigFile opts newFilePath
(Just _ , Nothing) -> return opts
(Nothing, Nothing) -> die "No node-configFile set"
mangleNodeConfig :: Maybe FilePath -> NixServiceOptions -> IO NixServiceOptions
mangleNodeConfig fp opts = case (NixService.getNodeConfigFile opts, fp) of
(_ , Just newFilePath) -> pure $ NixService.setNodeConfigFile opts newFilePath
(Just _ , Nothing) -> return opts
(Nothing, Nothing) -> die "No node-configFile set"

mangleTracerConfig :: Maybe FilePath -> NixServiceOptions -> NixServiceOptions
mangleTracerConfig traceSocket opts
= opts { _nix_cardanoTracerSocket = traceSocket <> _nix_cardanoTracerSocket opts}
mangleTracerConfig :: Maybe FilePath -> NixServiceOptions -> NixServiceOptions
mangleTracerConfig traceSocket opts
= opts { _nix_cardanoTracerSocket = traceSocket <> _nix_cardanoTracerSocket opts}

-- if there's a parsing error wrt. ScriptData, we want to fail early, before the splitting phase
quickTestPlutusDataOrDie :: NixServiceOptions -> IO ()
Expand Down

0 comments on commit 260bf1e

Please sign in to comment.