Skip to content

Commit

Permalink
Brig: Prepare for TURN Discovery using SRV records (#2376)
Browse files Browse the repository at this point in the history
* Brig: Start turn discovery when the App start

Instead of starting it when the `Env` is created. This aligns the service
discovery of TURN with that of SFT. In next commits, SRV based discovery for
TURN will be implemented.

* Brig: Refactor SFT discovery to extract generic SRV discovery code

* Brig: Refactor SFT Discovery tests

* Simplify sftDiscoveryLoop tests and make them srvDiscoveryLoop tests

Introduce a Delay Effect to deal with threadDelay and mock it to get rid of
wait loop in tests

* CHANGELOG
  • Loading branch information
akshaymankar authored May 12, 2022
1 parent 4a3bea4 commit 7f1ffb7
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 178 deletions.
1 change: 1 addition & 0 deletions changelog.d/5-internal/refactor-turn-discovery
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Start TURN discovery only when the app starts and not when the Env is created
4 changes: 4 additions & 0 deletions services/brig/brig.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ library
Brig.Data.User
Brig.Data.UserKey
Brig.Data.UserPendingActivation
Brig.Effects.Delay
Brig.Effects.SFT
Brig.Email
Brig.Federation.Client
Expand Down Expand Up @@ -680,6 +681,7 @@ test-suite brig-tests
other-modules:
Test.Brig.Calling
Test.Brig.Calling.Internal
Test.Brig.Effects.Delay
Test.Brig.MLS
Test.Brig.Roundtrip
Test.Brig.User.Search.Index.Types
Expand Down Expand Up @@ -735,8 +737,10 @@ test-suite brig-tests
, brig-types
, bytestring
, containers
, data-timeout
, dns
, dns-util
, exceptions
, http-types
, imports
, lens
Expand Down
2 changes: 2 additions & 0 deletions services/brig/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ tests:
- brig-types
- bytestring
- containers
- data-timeout
- dns
- dns-util
- exceptions
- http-types
- imports
- lens
Expand Down
44 changes: 6 additions & 38 deletions services/brig/src/Brig/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ module Brig.App
metrics,
applog,
turnEnv,
turnEnvV2,
sftEnv,
internalEvents,
emailSender,
randomPrekeyLocalLock,
keyPackageLocalLock,
fsWatcher,

-- * App Monad
AppT,
Expand Down Expand Up @@ -100,7 +100,7 @@ import Brig.Sem.PasswordResetStore (PasswordResetStore)
import Brig.Sem.PasswordResetStore.CodeStore
import Brig.Team.Template
import Brig.Template (Localised, TemplateBranding, forLocale, genTemplateBranding)
import Brig.Types (Locale (..), TurnURI)
import Brig.Types (Locale (..))
import Brig.User.Search.Index (IndexEnv (..), MonadIndexIO (..), runIndexIO)
import Brig.User.Template
import Brig.ZAuth (MonadZAuth (..), runZAuth)
Expand All @@ -121,7 +121,6 @@ import Data.Domain
import qualified Data.GeoIP2 as GeoIp
import Data.IP
import qualified Data.List.NonEmpty as NE
import Data.List1 (List1, list1)
import Data.Metrics (Metrics)
import qualified Data.Metrics.Middleware as Metrics
import Data.Misc
Expand Down Expand Up @@ -187,8 +186,7 @@ data Env = Env
_twilioCreds :: Twilio.Credentials,
_geoDb :: Maybe (IORef GeoIp.GeoDB),
_fsWatcher :: FS.WatchManager,
_turnEnv :: IORef Calling.Env,
_turnEnvV2 :: IORef Calling.Env,
_turnEnv :: Calling.TurnEnv,
_sftEnv :: Maybe Calling.SFTEnv,
_currentTime :: IO UTCTime,
_zauthEnv :: ZAuth.Env,
Expand Down Expand Up @@ -223,7 +221,9 @@ newEnv o = do
FS.startManagerConf $
FS.defaultConfig {FS.confDebounce = FS.Debounce 0.5, FS.confPollInterval = 10000000}
g <- geoSetup lgr w $ Opt.geoDb o
(turn, turnV2) <- turnSetup lgr w sha512 (Opt.turn o)
let turnOpts = Opt.turn o
turnSecret <- Text.encodeUtf8 . Text.strip <$> Text.readFile (Opt.secret turnOpts)
turn <- Calling.mkTurnEnv (Opt.servers turnOpts) (Opt.serversV2 turnOpts) (Opt.tokenTTL turnOpts) (Opt.configTTL turnOpts) turnSecret sha512
let sett = Opt.optSettings o
nxm <- initCredentials (Opt.setNexmo sett)
twl <- initCredentials (Opt.setTwilio sett)
Expand Down Expand Up @@ -268,7 +268,6 @@ newEnv o = do
_twilioCreds = twl,
_geoDb = g,
_turnEnv = turn,
_turnEnvV2 = turnV2,
_sftEnv = mSFTEnv,
_fsWatcher = w,
_currentTime = clock,
Expand Down Expand Up @@ -310,20 +309,6 @@ geoSetup lgr w (Just db) = do
startWatching w path (replaceGeoDb lgr geodb)
return $ Just geodb

turnSetup :: Logger -> FS.WatchManager -> Digest -> Opt.TurnOpts -> IO (IORef Calling.Env, IORef Calling.Env)
turnSetup lgr w dig o = do
secret <- Text.encodeUtf8 . Text.strip <$> Text.readFile (Opt.secret o)
cfg <- setupTurn secret (Opt.servers o)
cfgV2 <- setupTurn secret (Opt.serversV2 o)
return (cfg, cfgV2)
where
setupTurn secret cfg = do
path <- canonicalizePath cfg
servers <- fromMaybe (error "Empty TURN list, check turn file!") <$> readTurnList path
te <- newIORef =<< Calling.newEnv dig servers (Opt.tokenTTL o) (Opt.configTTL o) secret
startWatching w path (replaceTurnServers lgr te)
return te

startWatching :: FS.WatchManager -> FilePath -> FS.Action -> IO ()
startWatching w p = void . FS.watchDir w (Path.dropFileName p) predicate
where
Expand All @@ -339,17 +324,6 @@ replaceGeoDb g ref e = do
GeoIp.openGeoDB (FS.eventPath e) >>= atomicWriteIORef ref
Log.info g (msg $ val "New GeoIP database loaded.")

replaceTurnServers :: Logger -> IORef Calling.Env -> FS.Event -> IO ()
replaceTurnServers g ref e = do
let logErr x = Log.err g (msg $ val "Error loading turn servers: " +++ show x)
handleAny logErr $
readTurnList (FS.eventPath e) >>= \case
Just servers ->
readIORef ref >>= \old -> do
atomicWriteIORef ref (old & Calling.turnServers .~ servers)
Log.info g (msg $ val "New turn servers loaded.")
Nothing -> Log.warn g (msg $ val "Empty or malformed turn servers list, ignoring!")

initZAuth :: Opts -> IO ZAuth.Env
initZAuth o = do
let zOpts = Opt.zauth o
Expand Down Expand Up @@ -643,12 +617,6 @@ locationOf ip =
return $ location (Latitude $ GeoIp.locationLatitude loc) (Longitude $ GeoIp.locationLongitude loc)
Nothing -> return Nothing

readTurnList :: FilePath -> IO (Maybe (List1 TurnURI))
readTurnList = Text.readFile >=> return . fn . mapMaybe (fromByteString . Text.encodeUtf8) . Text.lines
where
fn [] = Nothing
fn (x : xs) = Just (list1 x xs)

--------------------------------------------------------------------------------
-- Federation

Expand Down
117 changes: 93 additions & 24 deletions services/brig/src/Brig/Calling.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@ module Brig.Calling
mkSFTServers,
SFTEnv (..),
Discovery (..),
Env (..),
TurnEnv,
mkSFTEnv,
newEnv,
mkTurnEnv,
srvDiscoveryLoop,
sftDiscoveryLoop,
discoverSFTServers,
discoverSRVRecords,
discoveryToMaybe,
randomize,
startSFTServiceDiscovery,
turnServers,
startTurnDiscovery,
turnServersV1,
turnServersV1File,
turnServersV2,
turnServersV2File,
turnTokenTTL,
turnConfigTTL,
turnSecret,
Expand All @@ -43,21 +48,28 @@ module Brig.Calling
)
where

import Brig.Effects.Delay
import Brig.Options (SFTOptions (..), defSftDiscoveryIntervalSeconds, defSftListLength, defSftServiceName)
import qualified Brig.Options as Opts
import Brig.Types (TurnURI)
import Control.Exception.Enclosed (handleAny)
import Control.Lens
import Control.Monad.Random.Class (MonadRandom)
import Data.ByteString.Conversion (fromByteString)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NonEmpty
import Data.List1
import Data.Range
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
import qualified Data.Text.IO as Text
import Data.Time.Clock (DiffTime, diffTimeToPicoseconds)
import Imports
import qualified Network.DNS as DNS
import OpenSSL.EVP.Digest (Digest)
import Polysemy
import Polysemy.TinyLog
import qualified System.FSNotify as FS
import qualified System.FilePath as Path
import qualified System.Logger as Log
import System.Random.MWC (GenIO, createSystemRandom)
import System.Random.Shuffle
Expand Down Expand Up @@ -129,29 +141,37 @@ discoveryToMaybe = \case
NotDiscoveredYet -> Nothing
Discovered x -> Just x

discoverSFTServers :: Members [DNSLookup, TinyLog] r => DNS.Domain -> Sem r (Maybe (NonEmpty SrvEntry))
discoverSFTServers domain =
discoverSRVRecords :: Members [DNSLookup, TinyLog] r => DNS.Domain -> Sem r (Maybe (NonEmpty SrvEntry))
discoverSRVRecords domain =
lookupSRV domain >>= \case
SrvAvailable es -> pure $ Just es
SrvNotAvailable -> do
warn (Log.msg ("No SFT servers available" :: ByteString))
warn $
Log.msg (Log.val "SRV Records not available")
. Log.field "domain" domain
pure Nothing
SrvResponseError e -> do
err (Log.msg ("DNS Lookup failed for SFT Discovery" :: ByteString) . Log.field "Error" (show e))
err $
Log.msg (Log.val "SRV Lookup failed")
. Log.field "Error" (show e)
. Log.field "domain" domain
pure Nothing

srvDiscoveryLoop :: Members [DNSLookup, TinyLog, Delay] r => DNS.Domain -> Int -> (NonEmpty SrvEntry -> Sem r ()) -> Sem r ()
srvDiscoveryLoop domain discoveryInterval saveAction = forever $ do
servers <- discoverSRVRecords domain
case servers of
Nothing -> pure ()
Just es -> saveAction es
delay discoveryInterval

mkSFTDomain :: SFTOptions -> DNS.Domain
mkSFTDomain SFTOptions {..} = DNS.normalize $ maybe defSftServiceName ("_" <>) sftSRVServiceName <> "._tcp." <> sftBaseDomain

-- FUTUREWORK: Remove Embed IO from here and put threadDelay into another
-- effect. This will also make tests for this faster and deterministic
sftDiscoveryLoop :: Members [DNSLookup, TinyLog, Embed IO] r => SFTEnv -> Sem r ()
sftDiscoveryLoop SFTEnv {..} = forever $ do
servers <- discoverSFTServers sftDomain
case servers of
Nothing -> pure ()
Just es -> atomicWriteIORef sftServers (Discovered (SFTServers es))
threadDelay sftDiscoveryInterval
sftDiscoveryLoop :: Members [DNSLookup, TinyLog, Delay, Embed IO] r => SFTEnv -> Sem r ()
sftDiscoveryLoop SFTEnv {..} =
srvDiscoveryLoop sftDomain sftDiscoveryInterval $
atomicWriteIORef sftServers . Discovered . SFTServers

mkSFTEnv :: SFTOptions -> IO SFTEnv
mkSFTEnv opts =
Expand All @@ -163,7 +183,7 @@ mkSFTEnv opts =

startSFTServiceDiscovery :: Log.Logger -> SFTEnv -> IO ()
startSFTServiceDiscovery logger =
runM . loggerToTinyLog logger . runDNSLookupDefault . sftDiscoveryLoop
runM . loggerToTinyLog logger . runDNSLookupDefault . runDelay . sftDiscoveryLoop

-- | >>> diffTimeToMicroseconds 1
-- 1000000
Expand All @@ -172,16 +192,65 @@ diffTimeToMicroseconds = fromIntegral . (`quot` 1000000) . diffTimeToPicoseconds

-- TURN specific

data Env = Env
{ _turnServers :: List1 TurnURI,
data TurnEnv = TurnEnv
{ _turnServersV1 :: IORef (Discovery (NonEmpty TurnURI)),
_turnServersV2 :: IORef (Discovery (NonEmpty TurnURI)),
_turnServersV1File :: FilePath,
_turnServersV2File :: FilePath,
_turnTokenTTL :: Word32,
_turnConfigTTL :: Word32,
_turnSecret :: ByteString,
_turnSHA512 :: Digest,
_turnPrng :: GenIO
}

makeLenses ''Env
makeLenses ''TurnEnv

mkTurnEnv :: FilePath -> FilePath -> Word32 -> Word32 -> ByteString -> Digest -> IO TurnEnv
mkTurnEnv v1File v2File _turnTokenTTL _turnConfigTTL _turnSecret _turnSHA512 = do
_turnServersV1 <- newIORef NotDiscoveredYet
_turnServersV2 <- newIORef NotDiscoveredYet
_turnPrng <- createSystemRandom
_turnServersV1File <- canonicalizePath v1File
_turnServersV2File <- canonicalizePath v2File
pure $ TurnEnv {..}

-- | Returns an action which can be executed to stop this
startTurnDiscovery :: Log.Logger -> FS.WatchManager -> TurnEnv -> IO ()
startTurnDiscovery l w e = do
atomicWriteIORef (e ^. turnServersV1)
. maybe NotDiscoveredYet Discovered
=<< readTurnList (e ^. turnServersV1File)
atomicWriteIORef (e ^. turnServersV2)
. maybe NotDiscoveredYet Discovered
=<< readTurnList (e ^. turnServersV2File)
Log.warn l $
Log.msg (Log.val "Waiting for TURN files")
. Log.field "file1" (e ^. turnServersV1File)
. Log.field "file2" (e ^. turnServersV2File)
startWatching w (e ^. turnServersV1File) (replaceTurnServers l (e ^. turnServersV1))
startWatching w (e ^. turnServersV2File) (replaceTurnServers l (e ^. turnServersV2))

replaceTurnServers :: Log.Logger -> IORef (Discovery (NonEmpty TurnURI)) -> FS.Event -> IO ()
replaceTurnServers g ref e = do
let logErr x = Log.err g (Log.msg $ Log.val "Error loading turn servers: " Log.+++ show x)
handleAny logErr $
readTurnList (FS.eventPath e) >>= \case
Just servers -> do
atomicWriteIORef ref (Discovered servers)
Log.info g (Log.msg $ Log.val "New turn servers loaded.")
Nothing -> Log.warn g (Log.msg $ Log.val "Empty or malformed turn servers list, ignoring!")

startWatching :: FS.WatchManager -> FilePath -> FS.Action -> IO ()
startWatching w p = void . FS.watchDir w (Path.dropFileName p) predicate
where
predicate (FS.Added f _ _) = Path.equalFilePath f p
predicate (FS.Modified f _ _) = Path.equalFilePath f p
predicate FS.Removed {} = False
predicate FS.Unknown {} = False

newEnv :: Digest -> List1 TurnURI -> Word32 -> Word32 -> ByteString -> IO Env
newEnv sha512 srvs tTTL cTTL secret = Env srvs tTTL cTTL secret sha512 <$> createSystemRandom
readTurnList :: FilePath -> IO (Maybe (NonEmpty TurnURI))
readTurnList = Text.readFile >=> return . fn . mapMaybe (fromByteString . Text.encodeUtf8) . Text.lines
where
fn [] = Nothing
fn (x : xs) = Just (x :| xs)
Loading

0 comments on commit 7f1ffb7

Please sign in to comment.