From 7f1ffb74daa28c7eff84d3b476dd6a8a65a83e84 Mon Sep 17 00:00:00 2001 From: Akshay Mankar Date: Thu, 12 May 2022 09:26:17 +0200 Subject: [PATCH] Brig: Prepare for TURN Discovery using SRV records (#2376) * Brig: Start turn discovery when the App start Instead of starting it when the `Env` is created. This aligns the service discovery of TURN with that of SFT. In next commits, SRV based discovery for TURN will be implemented. * Brig: Refactor SFT discovery to extract generic SRV discovery code * Brig: Refactor SFT Discovery tests * Simplify sftDiscoveryLoop tests and make them srvDiscoveryLoop tests Introduce a Delay Effect to deal with threadDelay and mock it to get rid of wait loop in tests * CHANGELOG --- .../5-internal/refactor-turn-discovery | 1 + services/brig/brig.cabal | 4 + services/brig/package.yaml | 2 + services/brig/src/Brig/App.hs | 44 +--- services/brig/src/Brig/Calling.hs | 117 ++++++++-- services/brig/src/Brig/Calling/API.hs | 70 ++++-- services/brig/src/Brig/Effects/Delay.hs | 15 ++ services/brig/src/Brig/Run.hs | 3 +- services/brig/test/integration/API/Calling.hs | 9 +- services/brig/test/integration/Util.hs | 3 +- services/brig/test/unit/Test/Brig/Calling.hs | 205 ++++++++++-------- .../brig/test/unit/Test/Brig/Effects/Delay.hs | 28 +++ 12 files changed, 323 insertions(+), 178 deletions(-) create mode 100644 changelog.d/5-internal/refactor-turn-discovery create mode 100644 services/brig/src/Brig/Effects/Delay.hs create mode 100644 services/brig/test/unit/Test/Brig/Effects/Delay.hs diff --git a/changelog.d/5-internal/refactor-turn-discovery b/changelog.d/5-internal/refactor-turn-discovery new file mode 100644 index 00000000000..16dcc016051 --- /dev/null +++ b/changelog.d/5-internal/refactor-turn-discovery @@ -0,0 +1 @@ +Start TURN discovery only when the app starts and not when the Env is created \ No newline at end of file diff --git a/services/brig/brig.cabal b/services/brig/brig.cabal index 2f88f383dde..5785072b6fb 100644 --- a/services/brig/brig.cabal +++ b/services/brig/brig.cabal @@ -58,6 +58,7 @@ library Brig.Data.User Brig.Data.UserKey Brig.Data.UserPendingActivation + Brig.Effects.Delay Brig.Effects.SFT Brig.Email Brig.Federation.Client @@ -680,6 +681,7 @@ test-suite brig-tests other-modules: Test.Brig.Calling Test.Brig.Calling.Internal + Test.Brig.Effects.Delay Test.Brig.MLS Test.Brig.Roundtrip Test.Brig.User.Search.Index.Types @@ -735,8 +737,10 @@ test-suite brig-tests , brig-types , bytestring , containers + , data-timeout , dns , dns-util + , exceptions , http-types , imports , lens diff --git a/services/brig/package.yaml b/services/brig/package.yaml index 0a61858bbc7..b187df72ad2 100644 --- a/services/brig/package.yaml +++ b/services/brig/package.yaml @@ -152,8 +152,10 @@ tests: - brig-types - bytestring - containers + - data-timeout - dns - dns-util + - exceptions - http-types - imports - lens diff --git a/services/brig/src/Brig/App.hs b/services/brig/src/Brig/App.hs index 8a6d200c77a..9f2a2204671 100644 --- a/services/brig/src/Brig/App.hs +++ b/services/brig/src/Brig/App.hs @@ -54,12 +54,12 @@ module Brig.App metrics, applog, turnEnv, - turnEnvV2, sftEnv, internalEvents, emailSender, randomPrekeyLocalLock, keyPackageLocalLock, + fsWatcher, -- * App Monad AppT, @@ -100,7 +100,7 @@ import Brig.Sem.PasswordResetStore (PasswordResetStore) import Brig.Sem.PasswordResetStore.CodeStore import Brig.Team.Template import Brig.Template (Localised, TemplateBranding, forLocale, genTemplateBranding) -import Brig.Types (Locale (..), TurnURI) +import Brig.Types (Locale (..)) import Brig.User.Search.Index (IndexEnv (..), MonadIndexIO (..), runIndexIO) import Brig.User.Template import Brig.ZAuth (MonadZAuth (..), runZAuth) @@ -121,7 +121,6 @@ import Data.Domain import qualified Data.GeoIP2 as GeoIp import Data.IP import qualified Data.List.NonEmpty as NE -import Data.List1 (List1, list1) import Data.Metrics (Metrics) import qualified Data.Metrics.Middleware as Metrics import Data.Misc @@ -187,8 +186,7 @@ data Env = Env _twilioCreds :: Twilio.Credentials, _geoDb :: Maybe (IORef GeoIp.GeoDB), _fsWatcher :: FS.WatchManager, - _turnEnv :: IORef Calling.Env, - _turnEnvV2 :: IORef Calling.Env, + _turnEnv :: Calling.TurnEnv, _sftEnv :: Maybe Calling.SFTEnv, _currentTime :: IO UTCTime, _zauthEnv :: ZAuth.Env, @@ -223,7 +221,9 @@ newEnv o = do FS.startManagerConf $ FS.defaultConfig {FS.confDebounce = FS.Debounce 0.5, FS.confPollInterval = 10000000} g <- geoSetup lgr w $ Opt.geoDb o - (turn, turnV2) <- turnSetup lgr w sha512 (Opt.turn o) + let turnOpts = Opt.turn o + turnSecret <- Text.encodeUtf8 . Text.strip <$> Text.readFile (Opt.secret turnOpts) + turn <- Calling.mkTurnEnv (Opt.servers turnOpts) (Opt.serversV2 turnOpts) (Opt.tokenTTL turnOpts) (Opt.configTTL turnOpts) turnSecret sha512 let sett = Opt.optSettings o nxm <- initCredentials (Opt.setNexmo sett) twl <- initCredentials (Opt.setTwilio sett) @@ -268,7 +268,6 @@ newEnv o = do _twilioCreds = twl, _geoDb = g, _turnEnv = turn, - _turnEnvV2 = turnV2, _sftEnv = mSFTEnv, _fsWatcher = w, _currentTime = clock, @@ -310,20 +309,6 @@ geoSetup lgr w (Just db) = do startWatching w path (replaceGeoDb lgr geodb) return $ Just geodb -turnSetup :: Logger -> FS.WatchManager -> Digest -> Opt.TurnOpts -> IO (IORef Calling.Env, IORef Calling.Env) -turnSetup lgr w dig o = do - secret <- Text.encodeUtf8 . Text.strip <$> Text.readFile (Opt.secret o) - cfg <- setupTurn secret (Opt.servers o) - cfgV2 <- setupTurn secret (Opt.serversV2 o) - return (cfg, cfgV2) - where - setupTurn secret cfg = do - path <- canonicalizePath cfg - servers <- fromMaybe (error "Empty TURN list, check turn file!") <$> readTurnList path - te <- newIORef =<< Calling.newEnv dig servers (Opt.tokenTTL o) (Opt.configTTL o) secret - startWatching w path (replaceTurnServers lgr te) - return te - startWatching :: FS.WatchManager -> FilePath -> FS.Action -> IO () startWatching w p = void . FS.watchDir w (Path.dropFileName p) predicate where @@ -339,17 +324,6 @@ replaceGeoDb g ref e = do GeoIp.openGeoDB (FS.eventPath e) >>= atomicWriteIORef ref Log.info g (msg $ val "New GeoIP database loaded.") -replaceTurnServers :: Logger -> IORef Calling.Env -> FS.Event -> IO () -replaceTurnServers g ref e = do - let logErr x = Log.err g (msg $ val "Error loading turn servers: " +++ show x) - handleAny logErr $ - readTurnList (FS.eventPath e) >>= \case - Just servers -> - readIORef ref >>= \old -> do - atomicWriteIORef ref (old & Calling.turnServers .~ servers) - Log.info g (msg $ val "New turn servers loaded.") - Nothing -> Log.warn g (msg $ val "Empty or malformed turn servers list, ignoring!") - initZAuth :: Opts -> IO ZAuth.Env initZAuth o = do let zOpts = Opt.zauth o @@ -643,12 +617,6 @@ locationOf ip = return $ location (Latitude $ GeoIp.locationLatitude loc) (Longitude $ GeoIp.locationLongitude loc) Nothing -> return Nothing -readTurnList :: FilePath -> IO (Maybe (List1 TurnURI)) -readTurnList = Text.readFile >=> return . fn . mapMaybe (fromByteString . Text.encodeUtf8) . Text.lines - where - fn [] = Nothing - fn (x : xs) = Just (list1 x xs) - -------------------------------------------------------------------------------- -- Federation diff --git a/services/brig/src/Brig/Calling.hs b/services/brig/src/Brig/Calling.hs index cce94e7314c..3652e1f989a 100644 --- a/services/brig/src/Brig/Calling.hs +++ b/services/brig/src/Brig/Calling.hs @@ -26,15 +26,20 @@ module Brig.Calling mkSFTServers, SFTEnv (..), Discovery (..), - Env (..), + TurnEnv, mkSFTEnv, - newEnv, + mkTurnEnv, + srvDiscoveryLoop, sftDiscoveryLoop, - discoverSFTServers, + discoverSRVRecords, discoveryToMaybe, randomize, startSFTServiceDiscovery, - turnServers, + startTurnDiscovery, + turnServersV1, + turnServersV1File, + turnServersV2, + turnServersV2File, turnTokenTTL, turnConfigTTL, turnSecret, @@ -43,21 +48,28 @@ module Brig.Calling ) where +import Brig.Effects.Delay import Brig.Options (SFTOptions (..), defSftDiscoveryIntervalSeconds, defSftListLength, defSftServiceName) import qualified Brig.Options as Opts import Brig.Types (TurnURI) +import Control.Exception.Enclosed (handleAny) import Control.Lens import Control.Monad.Random.Class (MonadRandom) +import Data.ByteString.Conversion (fromByteString) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as NonEmpty -import Data.List1 import Data.Range +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text +import qualified Data.Text.IO as Text import Data.Time.Clock (DiffTime, diffTimeToPicoseconds) import Imports import qualified Network.DNS as DNS import OpenSSL.EVP.Digest (Digest) import Polysemy import Polysemy.TinyLog +import qualified System.FSNotify as FS +import qualified System.FilePath as Path import qualified System.Logger as Log import System.Random.MWC (GenIO, createSystemRandom) import System.Random.Shuffle @@ -129,29 +141,37 @@ discoveryToMaybe = \case NotDiscoveredYet -> Nothing Discovered x -> Just x -discoverSFTServers :: Members [DNSLookup, TinyLog] r => DNS.Domain -> Sem r (Maybe (NonEmpty SrvEntry)) -discoverSFTServers domain = +discoverSRVRecords :: Members [DNSLookup, TinyLog] r => DNS.Domain -> Sem r (Maybe (NonEmpty SrvEntry)) +discoverSRVRecords domain = lookupSRV domain >>= \case SrvAvailable es -> pure $ Just es SrvNotAvailable -> do - warn (Log.msg ("No SFT servers available" :: ByteString)) + warn $ + Log.msg (Log.val "SRV Records not available") + . Log.field "domain" domain pure Nothing SrvResponseError e -> do - err (Log.msg ("DNS Lookup failed for SFT Discovery" :: ByteString) . Log.field "Error" (show e)) + err $ + Log.msg (Log.val "SRV Lookup failed") + . Log.field "Error" (show e) + . Log.field "domain" domain pure Nothing +srvDiscoveryLoop :: Members [DNSLookup, TinyLog, Delay] r => DNS.Domain -> Int -> (NonEmpty SrvEntry -> Sem r ()) -> Sem r () +srvDiscoveryLoop domain discoveryInterval saveAction = forever $ do + servers <- discoverSRVRecords domain + case servers of + Nothing -> pure () + Just es -> saveAction es + delay discoveryInterval + mkSFTDomain :: SFTOptions -> DNS.Domain mkSFTDomain SFTOptions {..} = DNS.normalize $ maybe defSftServiceName ("_" <>) sftSRVServiceName <> "._tcp." <> sftBaseDomain --- FUTUREWORK: Remove Embed IO from here and put threadDelay into another --- effect. This will also make tests for this faster and deterministic -sftDiscoveryLoop :: Members [DNSLookup, TinyLog, Embed IO] r => SFTEnv -> Sem r () -sftDiscoveryLoop SFTEnv {..} = forever $ do - servers <- discoverSFTServers sftDomain - case servers of - Nothing -> pure () - Just es -> atomicWriteIORef sftServers (Discovered (SFTServers es)) - threadDelay sftDiscoveryInterval +sftDiscoveryLoop :: Members [DNSLookup, TinyLog, Delay, Embed IO] r => SFTEnv -> Sem r () +sftDiscoveryLoop SFTEnv {..} = + srvDiscoveryLoop sftDomain sftDiscoveryInterval $ + atomicWriteIORef sftServers . Discovered . SFTServers mkSFTEnv :: SFTOptions -> IO SFTEnv mkSFTEnv opts = @@ -163,7 +183,7 @@ mkSFTEnv opts = startSFTServiceDiscovery :: Log.Logger -> SFTEnv -> IO () startSFTServiceDiscovery logger = - runM . loggerToTinyLog logger . runDNSLookupDefault . sftDiscoveryLoop + runM . loggerToTinyLog logger . runDNSLookupDefault . runDelay . sftDiscoveryLoop -- | >>> diffTimeToMicroseconds 1 -- 1000000 @@ -172,8 +192,11 @@ diffTimeToMicroseconds = fromIntegral . (`quot` 1000000) . diffTimeToPicoseconds -- TURN specific -data Env = Env - { _turnServers :: List1 TurnURI, +data TurnEnv = TurnEnv + { _turnServersV1 :: IORef (Discovery (NonEmpty TurnURI)), + _turnServersV2 :: IORef (Discovery (NonEmpty TurnURI)), + _turnServersV1File :: FilePath, + _turnServersV2File :: FilePath, _turnTokenTTL :: Word32, _turnConfigTTL :: Word32, _turnSecret :: ByteString, @@ -181,7 +204,53 @@ data Env = Env _turnPrng :: GenIO } -makeLenses ''Env +makeLenses ''TurnEnv + +mkTurnEnv :: FilePath -> FilePath -> Word32 -> Word32 -> ByteString -> Digest -> IO TurnEnv +mkTurnEnv v1File v2File _turnTokenTTL _turnConfigTTL _turnSecret _turnSHA512 = do + _turnServersV1 <- newIORef NotDiscoveredYet + _turnServersV2 <- newIORef NotDiscoveredYet + _turnPrng <- createSystemRandom + _turnServersV1File <- canonicalizePath v1File + _turnServersV2File <- canonicalizePath v2File + pure $ TurnEnv {..} + +-- | Returns an action which can be executed to stop this +startTurnDiscovery :: Log.Logger -> FS.WatchManager -> TurnEnv -> IO () +startTurnDiscovery l w e = do + atomicWriteIORef (e ^. turnServersV1) + . maybe NotDiscoveredYet Discovered + =<< readTurnList (e ^. turnServersV1File) + atomicWriteIORef (e ^. turnServersV2) + . maybe NotDiscoveredYet Discovered + =<< readTurnList (e ^. turnServersV2File) + Log.warn l $ + Log.msg (Log.val "Waiting for TURN files") + . Log.field "file1" (e ^. turnServersV1File) + . Log.field "file2" (e ^. turnServersV2File) + startWatching w (e ^. turnServersV1File) (replaceTurnServers l (e ^. turnServersV1)) + startWatching w (e ^. turnServersV2File) (replaceTurnServers l (e ^. turnServersV2)) + +replaceTurnServers :: Log.Logger -> IORef (Discovery (NonEmpty TurnURI)) -> FS.Event -> IO () +replaceTurnServers g ref e = do + let logErr x = Log.err g (Log.msg $ Log.val "Error loading turn servers: " Log.+++ show x) + handleAny logErr $ + readTurnList (FS.eventPath e) >>= \case + Just servers -> do + atomicWriteIORef ref (Discovered servers) + Log.info g (Log.msg $ Log.val "New turn servers loaded.") + Nothing -> Log.warn g (Log.msg $ Log.val "Empty or malformed turn servers list, ignoring!") + +startWatching :: FS.WatchManager -> FilePath -> FS.Action -> IO () +startWatching w p = void . FS.watchDir w (Path.dropFileName p) predicate + where + predicate (FS.Added f _ _) = Path.equalFilePath f p + predicate (FS.Modified f _ _) = Path.equalFilePath f p + predicate FS.Removed {} = False + predicate FS.Unknown {} = False -newEnv :: Digest -> List1 TurnURI -> Word32 -> Word32 -> ByteString -> IO Env -newEnv sha512 srvs tTTL cTTL secret = Env srvs tTTL cTTL secret sha512 <$> createSystemRandom +readTurnList :: FilePath -> IO (Maybe (NonEmpty TurnURI)) +readTurnList = Text.readFile >=> return . fn . mapMaybe (fromByteString . Text.encodeUtf8) . Text.lines + where + fn [] = Nothing + fn (x : xs) = Just (x :| xs) diff --git a/services/brig/src/Brig/Calling/API.hs b/services/brig/src/Brig/Calling/API.hs index 9ec039d218e..2c25b254dc4 100644 --- a/services/brig/src/Brig/Calling/API.hs +++ b/services/brig/src/Brig/Calling/API.hs @@ -23,9 +23,11 @@ module Brig.Calling.API -- * Exposed for testing purposes newConfig, CallsConfigVersion (..), + NoTurnServers, ) where +import Brig.API.Error import Brig.API.Handler import Brig.App import Brig.Calling @@ -34,14 +36,13 @@ import Brig.Calling.Internal import Brig.Effects.SFT import Brig.Options (ListAllSFTServers (..)) import qualified Brig.Options as Opt -import Control.Error (hush) +import Control.Error (hush, throwE) import Control.Lens import Data.ByteString.Conversion import Data.ByteString.Lens import Data.Id import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as NonEmpty -import qualified Data.List1 as List1 import Data.Misc (HttpsUrl) import Data.Range import qualified Data.Swagger.Build.Api as Doc @@ -56,6 +57,8 @@ import Network.Wai.Utilities hiding (code, message) import Network.Wai.Utilities.Swagger (document) import OpenSSL.EVP.Digest (Digest, hmacBS) import Polysemy +import qualified Polysemy.Error as Polysemy +import qualified System.Logger.Class as Log import qualified System.Random.MWC as MWC import Wire.API.Call.Config (SFTServer) import qualified Wire.API.Call.Config as Public @@ -101,17 +104,32 @@ getCallsConfigV2H (_ ::: uid ::: connid ::: limit) = -- | ('UserId', 'ConnId' are required as args here to make sure this is an authenticated end-point.) getCallsConfigV2 :: UserId -> ConnId -> Maybe (Range 1 10 Int) -> (Handler r) Public.RTCConfiguration getCallsConfigV2 _ _ limit = do - env <- liftIO . readIORef =<< view turnEnvV2 + env <- view turnEnv staticUrl <- view $ settings . Opt.sftStaticUrl sftListAllServers <- fromMaybe Opt.HideAllSFTServers <$> view (settings . Opt.sftListAllServers) sftEnv' <- view sftEnv logger <- view applog manager <- view httpManager - liftIO - . runM @IO - . loggerToTinyLog logger - . interpretSFT manager - $ newConfig env staticUrl sftEnv' limit sftListAllServers CallsConfigV2 + eitherConfig <- + liftIO + . runM @IO + . loggerToTinyLog logger + . interpretSFT manager + . Polysemy.runError + $ newConfig env turnServersV2 staticUrl sftEnv' limit sftListAllServers CallsConfigV2 + handleNoTurnServers eitherConfig + +-- | Throws '500 Internal Server Error' when no turn servers are found. This is +-- done to keep backwards compatiblity, the previous code initialized an 'IORef' +-- with an 'error' so reading the 'IORef' threw a 500. +-- +-- FUTUREWORK: Making this a '404 Not Found' would be more idiomatic, but this +-- should be done after consulting with client teams. +handleNoTurnServers :: Either NoTurnServers a -> (Handler r) a +handleNoTurnServers (Right x) = pure x +handleNoTurnServers (Left NoTurnServers) = do + Log.err $ Log.msg (Log.val "Call config requested before TURN URIs could be discovered.") + throwE $ StdError internalServerError getCallsConfigH :: JSON ::: UserId ::: ConnId -> (Handler r) Response getCallsConfigH (_ ::: uid ::: connid) = @@ -119,15 +137,18 @@ getCallsConfigH (_ ::: uid ::: connid) = getCallsConfig :: UserId -> ConnId -> (Handler r) Public.RTCConfiguration getCallsConfig _ _ = do - env <- liftIO . readIORef =<< view turnEnv + env <- view turnEnv logger <- view applog manager <- view httpManager - fmap dropTransport - . liftIO - . runM @IO - . loggerToTinyLog logger - . interpretSFT manager - $ newConfig env Nothing Nothing Nothing HideAllSFTServers CallsConfigDeprecated + eitherConfig <- + (dropTransport <$$>) + . liftIO + . runM @IO + . loggerToTinyLog logger + . interpretSFT manager + . Polysemy.runError + $ newConfig env turnServersV1 Nothing Nothing Nothing HideAllSFTServers CallsConfigDeprecated + handleNoTurnServers eitherConfig where -- In order to avoid being backwards incompatible, remove the `transport` query param from the URIs dropTransport :: Public.RTCConfiguration -> Public.RTCConfiguration @@ -140,24 +161,35 @@ data CallsConfigVersion = CallsConfigDeprecated | CallsConfigV2 +data NoTurnServers = NoTurnServers + deriving (Show) + +instance Exception NoTurnServers + -- | FUTUREWORK: It is not reflected in the function type the part of the -- business logic that says that the SFT static URL parameter cannot be set at -- the same time as the SFT environment parameter. See how to allow either none -- to be set or only one of them (perhaps Data.These combined with error -- handling). newConfig :: - Members [Embed IO, SFT] r => - Calling.Env -> + Members [Embed IO, SFT, Polysemy.Error NoTurnServers] r => + Calling.TurnEnv -> + Getter Calling.TurnEnv (IORef (Discovery (NonEmpty Public.TurnURI))) -> Maybe HttpsUrl -> Maybe SFTEnv -> Maybe (Range 1 10 Int) -> ListAllSFTServers -> CallsConfigVersion -> Sem r Public.RTCConfiguration -newConfig env sftStaticUrl mSftEnv limit listAllServers version = do +newConfig env discoveredServersL sftStaticUrl mSftEnv limit listAllServers version = do let (sha, secret, tTTL, cTTL, prng) = (env ^. turnSHA512, env ^. turnSecret, env ^. turnTokenTTL, env ^. turnConfigTTL, env ^. turnPrng) -- randomize list of servers (before limiting the list, to ensure not always the same servers are chosen if limit is set) - randomizedUris <- liftIO $ randomize (List1.toNonEmpty $ env ^. turnServers) + randomizedUris <- + liftIO + . randomize + =<< Polysemy.note NoTurnServers + . discoveryToMaybe + =<< readIORef (env ^. discoveredServersL) let limitedUris = case limit of Nothing -> randomizedUris Just lim -> limitedList randomizedUris lim diff --git a/services/brig/src/Brig/Effects/Delay.hs b/services/brig/src/Brig/Effects/Delay.hs new file mode 100644 index 00000000000..8a3b9dc6e91 --- /dev/null +++ b/services/brig/src/Brig/Effects/Delay.hs @@ -0,0 +1,15 @@ +{-# LANGUAGE TemplateHaskell #-} + +module Brig.Effects.Delay where + +import Imports +import Polysemy + +data Delay m a where + Delay :: Int -> Delay m () + +makeSem ''Delay + +runDelay :: Member (Embed IO) r => Sem (Delay ': r) a -> Sem r a +runDelay = interpret $ \case + Delay i -> threadDelay i diff --git a/services/brig/src/Brig/Run.hs b/services/brig/src/Brig/Run.hs index 74d29b4a10f..dff84122e0f 100644 --- a/services/brig/src/Brig/Run.hs +++ b/services/brig/src/Brig/Run.hs @@ -86,13 +86,14 @@ run o = do Async.async $ runAppT e $ wrapHttpClient $ - Queue.listen (e ^. internalEvents) $ Internal.onEvent + Queue.listen (e ^. internalEvents) Internal.onEvent let throttleMillis = fromMaybe defSqsThrottleMillis $ setSqsThrottleMillis (optSettings o) emailListener <- for (e ^. awsEnv . sesQueue) $ \q -> Async.async $ AWS.execute (e ^. awsEnv) $ AWS.listen throttleMillis q (runAppT e . SesNotification.onEvent) sftDiscovery <- forM (e ^. sftEnv) $ Async.async . Calling.startSFTServiceDiscovery (e ^. applog) + Calling.startTurnDiscovery (e ^. applog) (e ^. fsWatcher) (e ^. turnEnv) pendingActivationCleanupAsync <- Async.async (runAppT e pendingActivationCleanup) runSettingsWithShutdown s app 5 `finally` do diff --git a/services/brig/test/integration/API/Calling.hs b/services/brig/test/integration/API/Calling.hs index 85bf7137d7e..1d8aff420ae 100644 --- a/services/brig/test/integration/API/Calling.hs +++ b/services/brig/test/integration/API/Calling.hs @@ -58,8 +58,8 @@ tests m b opts turn turnV2 = do testGroup "sft" [ test m "SFT servers /calls/config/v2 - 200" $ testSFT b opts, - test m "SFT servers /calls/config/v2 - 200 - SFT does not respond as expected" $ testSFTUnavailble b opts "https://example.com", - test m "SFT servers /calls/config/v2 - 200 - SFT DNS does not resolve" $ testSFTUnavailble b opts "https://sft.example.com" + test m "SFT servers /calls/config/v2 - 200 - SFT does not respond as expected" $ testSFTUnavailable b opts "https://example.com", + test m "SFT servers /calls/config/v2 - 200 - SFT DNS does not resolve" $ testSFTUnavailable b opts "https://sft.example.com" ] ] @@ -110,8 +110,8 @@ testSFT b opts = do (Set.fromList [sftServer server1, sftServer server2]) (Set.fromList $ maybe [] NonEmpty.toList $ cfg1 ^. rtcConfSftServers) -testSFTUnavailble :: Brig -> Opts.Opts -> String -> Http () -testSFTUnavailble b opts domain = do +testSFTUnavailable :: Brig -> Opts.Opts -> String -> Http () +testSFTUnavailable b opts domain = do uid <- userId <$> randomUser b withSettingsOverrides (opts {Opts.optSettings = (Opts.optSettings opts) {Opts.setSftStaticUrl = fromByteString (cs domain), Opts.setSftListAllServers = Just Opts.ListAllSFTServers}}) $ do cfg <- getTurnConfigurationV2 uid b @@ -122,6 +122,7 @@ testSFTUnavailble b opts domain = do (cfg ^. rtcConfSftServersAll) modifyAndAssert :: + HasCallStack => Brig -> UserId -> (UserId -> Brig -> Http RTCConfiguration) -> diff --git a/services/brig/test/integration/Util.hs b/services/brig/test/integration/Util.hs index 48d824e8117..d70266ae49d 100644 --- a/services/brig/test/integration/Util.hs +++ b/services/brig/test/integration/Util.hs @@ -42,7 +42,7 @@ import Bilge import Bilge.Assert import qualified Brig.AWS as AWS import Brig.AWS.Types -import Brig.App (applog, sftEnv) +import Brig.App (applog, fsWatcher, sftEnv, turnEnv) import Brig.Calling as Calling import qualified Brig.Code as Code import qualified Brig.Options as Opt @@ -1006,6 +1006,7 @@ withSettingsOverrides opts action = liftIO $ do sftDiscovery <- forM (env ^. sftEnv) $ \sftEnv' -> Async.async $ Calling.startSFTServiceDiscovery (env ^. applog) sftEnv' + Calling.startTurnDiscovery (env ^. applog) (env ^. fsWatcher) (env ^. turnEnv) res <- WaiTest.runSession action brigApp mapM_ Async.cancel sftDiscovery pure res diff --git a/services/brig/test/unit/Test/Brig/Calling.hs b/services/brig/test/unit/Test/Brig/Calling.hs index aaaba7e38db..5867ed387af 100644 --- a/services/brig/test/unit/Test/Brig/Calling.hs +++ b/services/brig/test/unit/Test/Brig/Calling.hs @@ -18,15 +18,16 @@ -- You should have received a copy of the GNU Affero General Public License along -- with this program. If not, see . -module Test.Brig.Calling where +module Test.Brig.Calling (tests) where import Brig.Calling -import Brig.Calling.API (CallsConfigVersion (..), newConfig) +import Brig.Calling.API (CallsConfigVersion (..), NoTurnServers, newConfig) import Brig.Calling.Internal (sftServerFromSrvTarget) import Brig.Effects.SFT import Brig.Options +import qualified Control.Concurrent.Timeout as System import Control.Lens ((^.)) -import Control.Retry +import Control.Monad.Catch (throwM) import Data.Bifunctor import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as NonEmpty @@ -34,12 +35,14 @@ import qualified Data.Map as Map import Data.Misc import Data.Range import qualified Data.Set as Set -import Data.String.Conversions +import Data.Timeout import Imports import Network.DNS import Polysemy +import Polysemy.Error import Polysemy.TinyLog import qualified System.Logger as Log +import Test.Brig.Effects.Delay import Test.Tasty import Test.Tasty.HUnit import Test.Tasty.QuickCheck (Arbitrary (..), generate) @@ -96,15 +99,16 @@ tests = (mkSFTDomain (SFTOptions "example.com" Nothing Nothing Nothing)) ], testGroup "sftDiscoveryLoop" $ - [ testCase "when service can be discovered" $ void testDiscoveryLoopWhenSuccessful, - testCase "when service can be discovered and the URLs change" testDiscoveryLoopWhenURLsChange, - testCase "when service cannot be discovered" testDiscoveryLoopWhenUnsuccessful, - testCase "when service cannot be discovered after a successful discovery" testDiscoveryLoopWhenUnsuccessfulAfterSuccess + [ testCase "when service can be discovered" $ void testSFTDiscoveryLoopWhenSuccessful ], - testGroup "discoverSFTServers" $ - [ testCase "when service is available" testSFTDiscoverWhenAvailable, - testCase "when service is not available" testSFTDiscoverWhenNotAvailable, - testCase "when dns lookup fails" testSFTDiscoverWhenDNSFails + testGroup "srvDiscoveryLoop" $ + [ testCase "when service can be discovered" $ testSRVDiscoveryLoopWhenSuccessful, + testCase "when service cannot be discovered" $ testSRVDiscoveryLoopWhenUnsuccessful + ], + testGroup "discoverSRVRecords" $ + [ testCase "when service is available" testSRVDiscoverWhenAvailable, + testCase "when service is not available" testSRVDiscoverWhenNotAvailable, + testCase "when dns lookup fails" testSRVDiscoverWhenDNSFails ], testGroup "Get Random SFT Servers" $ [ testCase "more servers in SRV than limit" testSFTManyServers, @@ -121,78 +125,89 @@ tests = ] ] -testDiscoveryLoopWhenSuccessful :: IO SFTEnv -testDiscoveryLoopWhenSuccessful = do +testSFTDiscoveryLoopWhenSuccessful :: IO SFTEnv +testSFTDiscoveryLoopWhenSuccessful = do let entry1 = SrvEntry 0 0 (SrvTarget "sft1.foo.example.com." 443) entry2 = SrvEntry 0 0 (SrvTarget "sft2.foo.example.com." 443) entry3 = SrvEntry 0 0 (SrvTarget "sft3.foo.example.com." 443) returnedEntries = entry1 :| [entry2, entry3] fakeDNSEnv <- newFakeDNSEnv (\_ -> SrvAvailable returnedEntries) - sftEnv <- mkSFTEnv $ SFTOptions "foo.example.com" Nothing (Just 0.001) Nothing + let intervalInSeconds = 0.001 + intervalInMicroseconds = 1000 + sftEnv <- mkSFTEnv $ SFTOptions "foo.example.com" Nothing (Just intervalInSeconds) Nothing + + tick <- newEmptyMVar + delayCallsTVar <- newTVarIO [] + discoveryLoop <- + Async.async . runM + . ignoreLogs + . runDelayWithTick tick delayCallsTVar + . runFakeDNSLookup fakeDNSEnv + $ sftDiscoveryLoop sftEnv - discoveryLoop <- Async.async $ runM . ignoreLogs . runFakeDNSLookup fakeDNSEnv $ sftDiscoveryLoop sftEnv - void $ retryEvery10MicrosWhileN 2000 (== 0) (length <$> readIORef (fakeLookupSrvCalls fakeDNSEnv)) - -- We don't want to stop the loop before it has written to the sftServers IORef - void $ retryEvery10MicrosWhileN 2000 (== NotDiscoveredYet) (readIORef (sftServers sftEnv)) - Async.cancel discoveryLoop + Async.race_ (Async.wait discoveryLoop) (System.timeout (30 # Second) $ takeMVar tick) actualServers <- readIORef (sftServers sftEnv) + delayCalls <- readTVarIO delayCallsTVar assertEqual "servers should be the ones read from DNS" (Discovered (mkSFTServers returnedEntries)) actualServers + assertEqual "delay should be called with the configured interval" intervalInMicroseconds (head delayCalls) pure sftEnv -testDiscoveryLoopWhenUnsuccessful :: IO () -testDiscoveryLoopWhenUnsuccessful = do - fakeDNSEnv <- newFakeDNSEnv (const SrvNotAvailable) - sftEnv <- mkSFTEnv $ SFTOptions "foo.example.com" Nothing (Just 0.001) Nothing +testSRVDiscoveryLoopWhenSuccessful :: IO () +testSRVDiscoveryLoopWhenSuccessful = do + let entry1 = SrvEntry 0 0 (SrvTarget "sft1.foo.example.com." 443) + entry2 = SrvEntry 0 0 (SrvTarget "sft2.foo.example.com." 443) + entry3 = SrvEntry 0 0 (SrvTarget "sft3.foo.example.com." 443) + returnedEntries = entry1 :| [entry2, entry3] + fakeDNSEnv <- newFakeDNSEnv (\_ -> SrvAvailable returnedEntries) + let intervalInMicroseconds = 1000 + + tick <- newEmptyMVar + delayCallsTVar <- newTVarIO [] + savedSrvRecordsMVar <- newEmptyMVar + discoveryLoop <- + Async.async + . runM + . ignoreLogs + . runDelayWithTick tick delayCallsTVar + . runFakeDNSLookup fakeDNSEnv + $ srvDiscoveryLoop "foo.example.com" intervalInMicroseconds (putMVar savedSrvRecordsMVar) - discoveryLoop <- Async.async $ runM . ignoreLogs . runFakeDNSLookup fakeDNSEnv $ sftDiscoveryLoop sftEnv - -- We wait for at least two lookups to be sure that the lookup loop looped at - -- least once - void $ retryEvery10MicrosWhileN 2000 (<= 1) (length <$> readIORef (fakeLookupSrvCalls fakeDNSEnv)) - Async.cancel discoveryLoop + Async.race_ (Async.wait discoveryLoop) (System.timeout (30 # Second) $ takeMVar tick) - actualServers <- readIORef (sftServers sftEnv) - assertEqual "servers should be the ones read from DNS" NotDiscoveredYet actualServers + savedSrvRecords <- tryReadMVar savedSrvRecordsMVar + delayCalls <- readTVarIO delayCallsTVar + assertEqual "servers should be the ones read from DNS" (Just returnedEntries) savedSrvRecords + assertEqual "delay should be called with the configured interval" intervalInMicroseconds (head delayCalls) -testDiscoveryLoopWhenUnsuccessfulAfterSuccess :: IO () -testDiscoveryLoopWhenUnsuccessfulAfterSuccess = do - sftEnv <- testDiscoveryLoopWhenSuccessful - previousEntries <- readIORef (sftServers sftEnv) +testSRVDiscoveryLoopWhenUnsuccessful :: IO () +testSRVDiscoveryLoopWhenUnsuccessful = do + fakeDNSEnv <- newFakeDNSEnv (const SrvNotAvailable) - -- In the following lines we re-use the 'sftEnv' from a successful lookup to - -- replicate what will happen when a dns lookup fails after success - failingFakeDNSEnv <- newFakeDNSEnv (const SrvNotAvailable) - discoveryLoop <- Async.async $ runM . ignoreLogs . runFakeDNSLookup failingFakeDNSEnv $ sftDiscoveryLoop sftEnv + -- Irrelevant for this test, but types make us choose a number + let intervalInMicroseconds = 1000 + tick <- newEmptyMVar + delayCallsTVar <- newTVarIO [] + discoveryLoop <- + Async.async + . runM + . ignoreLogs + . runDelayWithTick tick delayCallsTVar + . runFakeDNSLookup fakeDNSEnv + . srvDiscoveryLoop "foo.example.com" intervalInMicroseconds + $ ( \_ -> + liftIO $ assertFailure "shouldn't try to save SRV records when they are unavailable" + ) -- We wait for at least two lookups to be sure that the lookup loop looped at - -- least once - void $ retryEvery10MicrosWhileN 2000 (<= 1) (length <$> readIORef (fakeLookupSrvCalls failingFakeDNSEnv)) - Async.cancel discoveryLoop - - actualServers <- readIORef (sftServers sftEnv) - assertEqual "servers shouldn't get overwriten" previousEntries actualServers - -testDiscoveryLoopWhenURLsChange :: IO () -testDiscoveryLoopWhenURLsChange = do - sftEnv <- testDiscoveryLoopWhenSuccessful + -- least once this tests that the loop keeps going even when discovery fails + void . System.timeout (30 # Second) $ takeMVar tick + Async.race_ (Async.wait discoveryLoop) (System.timeout (30 # Second) $ takeMVar tick) - -- In the following lines we re-use the 'sftEnv' from a successful lookup to - -- replicate what will happen when a dns lookup returns new URLs - let entry1 = SrvEntry 0 0 (SrvTarget "sft4.foo.example.com." 443) - entry2 = SrvEntry 0 0 (SrvTarget "sft5.foo.example.com." 443) - newEntries = entry1 :| [entry2] + numberOfDealys <- length <$> readTVarIO delayCallsTVar + assertBool "discovery loop should run again even if discovery fails" (numberOfDealys >= 2) - fakeDNSEnv <- newFakeDNSEnv (const $ SrvAvailable newEntries) - discoveryLoop <- Async.async $ runM . ignoreLogs . runFakeDNSLookup fakeDNSEnv $ sftDiscoveryLoop sftEnv - void $ retryEvery10MicrosWhileN 2000 (== 0) (length <$> readIORef (fakeLookupSrvCalls fakeDNSEnv)) - -- We don't want to stop the loop before it has written to the sftServers IORef - void $ retryEvery10MicrosWhileN 2000 (== Discovered (mkSFTServers newEntries)) (readIORef (sftServers sftEnv)) - Async.cancel discoveryLoop - - actualServers <- readIORef (sftServers sftEnv) - assertEqual "servers should get overwritten" (Discovered (mkSFTServers newEntries)) actualServers - -testSFTDiscoverWhenAvailable :: IO () -testSFTDiscoverWhenAvailable = do +testSRVDiscoverWhenAvailable :: IO () +testSRVDiscoverWhenAvailable = do logRecorder <- newLogRecorder let entry1 = SrvEntry 0 0 (SrvTarget "sft7.foo.example.com." 443) entry2 = SrvEntry 0 0 (SrvTarget "sft8.foo.example.com." 8843) @@ -201,33 +216,33 @@ testSFTDiscoverWhenAvailable = do assertEqual "discovered servers should be returned" (Just returnedEntries) =<< ( runM . recordLogs logRecorder . runFakeDNSLookup fakeDNSEnv $ - discoverSFTServers "_sft._tcp.foo.example.com" + discoverSRVRecords "_sft._tcp.foo.example.com" ) assertEqual "nothing should be logged" [] =<< readIORef (recordedLogs logRecorder) -testSFTDiscoverWhenNotAvailable :: IO () -testSFTDiscoverWhenNotAvailable = do +testSRVDiscoverWhenNotAvailable :: IO () +testSRVDiscoverWhenNotAvailable = do logRecorder <- newLogRecorder fakeDNSEnv <- newFakeDNSEnv (const SrvNotAvailable) assertEqual "discovered servers should be returned" Nothing =<< ( runM . recordLogs logRecorder . runFakeDNSLookup fakeDNSEnv $ - discoverSFTServers "_sft._tcp.foo.example.com" + discoverSRVRecords "_sft._tcp.foo.example.com" ) - assertEqual "should warn about it in the logs" [(Warn, "No SFT servers available\n")] + assertEqual "should warn about it in the logs" [(Warn, "SRV Records not available, domain=_sft._tcp.foo.example.com\n")] =<< readIORef (recordedLogs logRecorder) -testSFTDiscoverWhenDNSFails :: IO () -testSFTDiscoverWhenDNSFails = do +testSRVDiscoverWhenDNSFails :: IO () +testSRVDiscoverWhenDNSFails = do logRecorder <- newLogRecorder fakeDNSEnv <- newFakeDNSEnv (const $ SrvResponseError IllegalDomain) assertEqual "no servers should be returned" Nothing =<< ( runM . recordLogs logRecorder . runFakeDNSLookup fakeDNSEnv $ - discoverSFTServers "_sft._tcp.foo.example.com" + discoverSRVRecords "_sft._tcp.foo.example.com" ) - assertEqual "should warn about it in the logs" [(Error, "DNS Lookup failed for SFT Discovery, Error=IllegalDomain\n")] + assertEqual "should warn about it in the logs" [(Error, "SRV Lookup failed, Error=IllegalDomain, domain=_sft._tcp.foo.example.com\n")] =<< readIORef (recordedLogs logRecorder) testSFTManyServers :: IO () @@ -256,22 +271,19 @@ testSFTFewerServers = do allServers <- getRandomElements (unsafeRange 10) . unSFTServers $ sftServers assertEqual "should return all of them" (Set.fromList $ NonEmpty.toList allServers) (Set.fromList $ NonEmpty.toList entries) -retryEvery10MicrosWhileN :: (MonadIO m) => Int -> (a -> Bool) -> m a -> m a -retryEvery10MicrosWhileN n f m = - retrying - (constantDelay 10 <> limitRetries n) - (const (return . f)) - (const m) - -- | Creates a calling environment and an https URL to be used in unit-testing -- the logic of call configuration endpoints -sftStaticEnv :: IO (Env, HttpsUrl) +sftStaticEnv :: IO (TurnEnv, HttpsUrl) sftStaticEnv = do turnUri <- generate arbitrary let tokenTtl = 10 -- seconds configTtl = 10 -- seconds secret = "secret word" - env <- newEnv undefined (pure turnUri) tokenTtl configTtl secret + env <- mkTurnEnv "test/resources/turn/servers.txt" "test/resources/turn/servers-v2.txt" tokenTtl configTtl secret undefined + let serversV1IORef = env ^. turnServersV1 + serversV2IORef = env ^. turnServersV2 + atomicWriteIORef serversV1IORef (Discovered turnUri) + atomicWriteIORef serversV2IORef (Discovered turnUri) let Right staticUrl = mkHttpsUrl =<< first @@ -287,7 +299,8 @@ testSFTStaticDeprecatedEndpoint = do runM @IO . ignoreLogs . interpretSFTInMemory mempty - $ newConfig env Nothing Nothing Nothing HideAllSFTServers CallsConfigDeprecated + . throwErrorInIO @_ @NoTurnServers + $ newConfig env turnServersV1 Nothing Nothing Nothing HideAllSFTServers CallsConfigDeprecated assertEqual "when SFT static URL is disabled, sft_servers should be empty." Set.empty @@ -311,7 +324,8 @@ testSFTStaticV2NoStaticUrl = do runM @IO . ignoreLogs . interpretSFTInMemory mempty - $ newConfig env Nothing (Just sftEnv) (Just . unsafeRange $ 2) ListAllSFTServers CallsConfigV2 + . throwErrorInIO @_ @NoTurnServers + $ newConfig env turnServersV2 Nothing (Just sftEnv) (Just . unsafeRange $ 2) ListAllSFTServers CallsConfigV2 assertEqual "when SFT static URL is disabled, sft_servers_all should be from SFT environment" (Just . fmap (sftServerFromSrvTarget . srvTarget) . toList $ servers) @@ -324,9 +338,9 @@ testSFTStaticV2StaticUrlError = do cfg <- runM @IO . ignoreLogs - . interpretSFTInMemory mempty -- an empty lookup map, meaning there was - -- an error - $ newConfig env (Just staticUrl) Nothing (Just . unsafeRange $ 2) ListAllSFTServers CallsConfigV2 + . interpretSFTInMemory mempty -- an empty lookup map, meaning there was an error + . throwErrorInIO @_ @NoTurnServers + $ newConfig env turnServersV2 (Just staticUrl) Nothing (Just . unsafeRange $ 2) ListAllSFTServers CallsConfigV2 assertEqual "when SFT static URL is enabled (and setSftListAllServers is enabled), but returns error, sft_servers_all should be omitted" Nothing @@ -343,7 +357,8 @@ testSFTStaticV2StaticUrlList = do runM @IO . ignoreLogs . interpretSFTInMemory (Map.singleton staticUrl (SFTGetResponse . Right $ servers)) - $ newConfig env (Just staticUrl) Nothing (Just . unsafeRange $ 3) ListAllSFTServers CallsConfigV2 + . throwErrorInIO @_ @NoTurnServers + $ newConfig env turnServersV2 (Just staticUrl) Nothing (Just . unsafeRange $ 3) ListAllSFTServers CallsConfigV2 assertEqual "when SFT static URL and setSftListAllServers are enabled, sft_servers_all should be from /sft_servers_all.json" (Just servers) @@ -359,8 +374,16 @@ testSFTStaticV2ListAllServersDisabled = do runM @IO . ignoreLogs . interpretSFTInMemory (Map.singleton staticUrl (SFTGetResponse . Right $ servers)) - $ newConfig env (Just staticUrl) Nothing (Just . unsafeRange $ 3) HideAllSFTServers CallsConfigV2 + . throwErrorInIO @_ @NoTurnServers + $ newConfig env turnServersV2 (Just staticUrl) Nothing (Just . unsafeRange $ 3) HideAllSFTServers CallsConfigV2 assertEqual "when SFT static URL is enabled and setSftListAllServers is \"disabled\" then sft_servers_all is missing" Nothing (cfg ^. rtcConfSftServersAll) + +throwErrorInIO :: (Member (Embed IO) r, Exception e) => Sem (Error e ': r) a -> Sem r a +throwErrorInIO action = do + eitherResult <- runError action + case eitherResult of + Right x -> pure x + Left e -> liftIO $ throwM e diff --git a/services/brig/test/unit/Test/Brig/Effects/Delay.hs b/services/brig/test/unit/Test/Brig/Effects/Delay.hs new file mode 100644 index 00000000000..a8fe84cc19a --- /dev/null +++ b/services/brig/test/unit/Test/Brig/Effects/Delay.hs @@ -0,0 +1,28 @@ +module Test.Brig.Effects.Delay where + +import Brig.Effects.Delay +import Imports +import Polysemy + +-- | Ignores the delay time and only progresses when the 'MVar' is empty using +-- 'putMVar'. This way a test using this interpreter can know when the delay +-- event gets called by just waiting using 'takeMVar'. The test can also start +-- this interpreter with a full 'MVar' and use 'takeMVar' to control when the +-- 'Delay' action returns. +-- +-- In addition, this interpreter also tracks calls to the 'Delay' action in a +-- 'TVar'. +-- +-- Example: +-- > tick <- newEmptyMVar +-- > delayCallsTVar <- newTVarIO [] +-- > async . runDelayWithTick tick delayCallsTVar $ do +-- > doStuff +-- > delay 100 +-- > takeMVar tick -- This blocks until doStuff is done +-- > assertStuffDone +runDelayWithTick :: Member (Embed IO) r => MVar () -> TVar [Int] -> Sem (Delay ': r) a -> Sem r a +runDelayWithTick tick calls = interpret $ \case + Delay i -> do + putMVar tick () + atomically $ modifyTVar calls (<> [i])