Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Load Balancer App #1696

Merged
merged 11 commits into from
Jun 11, 2021
Merged
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 6 additions & 29 deletions src/core/packet.h
Original file line number Diff line number Diff line change
@@ -549,34 +549,12 @@ QuicPacketHash(
const uint8_t* const RemoteCid
)
{
uint32_t Key, Offset;

if (QuicAddrGetFamily(RemoteAddress) == QUIC_ADDRESS_FAMILY_INET) {
Key =
CxPlatToeplitzHashCompute(
&MsQuicLib.ToeplitzHash,
((uint8_t*)RemoteAddress) + QUIC_ADDR_V4_PORT_OFFSET,
2, 0);
Key ^=
CxPlatToeplitzHashCompute(
&MsQuicLib.ToeplitzHash,
((uint8_t*)RemoteAddress) + QUIC_ADDR_V4_IP_OFFSET,
4, 2);
Offset = 2 + 4;
} else {
Key =
CxPlatToeplitzHashCompute(
&MsQuicLib.ToeplitzHash,
((uint8_t*)RemoteAddress) + QUIC_ADDR_V6_PORT_OFFSET,
2, 0);
Key ^=
CxPlatToeplitzHashCompute(
&MsQuicLib.ToeplitzHash,
((uint8_t*)RemoteAddress) + QUIC_ADDR_V6_IP_OFFSET,
16, 2);
Offset = 2 + 16;
}

uint32_t Key = 0, Offset;
CxPlatToeplitzHashComputeAddr(
&MsQuicLib.ToeplitzHash,
RemoteAddress,
&Key,
&Offset);
if (RemoteCidLength != 0) {
Key ^=
CxPlatToeplitzHashCompute(
@@ -585,7 +563,6 @@ QuicPacketHash(
min(RemoteCidLength, QUIC_MAX_CONNECTION_ID_LENGTH_V1),
Offset);
}

return Key;
}

34 changes: 17 additions & 17 deletions src/inc/msquichelper.h
Original file line number Diff line number Diff line change
@@ -73,13 +73,13 @@ QuicStatusToString(
inline
uint32_t
GetConnRtt(
_In_ const QUIC_API_TABLE* MsQuic,
_In_ const QUIC_API_TABLE* MsQuicTable,
_In_ HQUIC Handle
)
{
QUIC_STATISTICS Value;
uint32_t ValueSize = sizeof(Value);
MsQuic->GetParam(
MsQuicTable->GetParam(
Handle,
QUIC_PARAM_LEVEL_CONNECTION,
QUIC_PARAM_CONN_STATISTICS,
@@ -94,13 +94,13 @@ GetConnRtt(
inline
uint64_t
GetStreamID(
_In_ const QUIC_API_TABLE* MsQuic,
_In_ const QUIC_API_TABLE* MsQuicTable,
_In_ HQUIC Handle
)
{
uint64_t ID = (uint32_t)(-1);
uint32_t IDLen = sizeof(ID);
MsQuic->GetParam(
MsQuicTable->GetParam(
Handle,
QUIC_PARAM_LEVEL_STREAM,
QUIC_PARAM_STREAM_ID,
@@ -116,15 +116,15 @@ GetStreamID(
inline
QUIC_ADDR_STR
GetRemoteAddr(
_In_ const QUIC_API_TABLE* MsQuic,
_In_ const QUIC_API_TABLE* MsQuicTable,
_In_ HQUIC Handle
)
{
QUIC_ADDR addr;
uint32_t addrLen = sizeof(addr);
QUIC_ADDR_STR addrStr = { 0 };
QUIC_STATUS status =
MsQuic->GetParam(
MsQuicTable->GetParam(
Handle,
QUIC_PARAM_LEVEL_CONNECTION,
QUIC_PARAM_CONN_REMOTE_ADDRESS,
@@ -139,13 +139,13 @@ GetRemoteAddr(
inline
QUIC_STATUS
QuicForceRetry(
_In_ const QUIC_API_TABLE* MsQuic,
_In_ const QUIC_API_TABLE* MsQuicTable,
_In_ BOOLEAN Enabled
)
{
uint16_t value = Enabled ? 0 : 65;
return
MsQuic->SetParam(
MsQuicTable->SetParam(
NULL,
QUIC_PARAM_LEVEL_GLOBAL,
QUIC_PARAM_GLOBAL_RETRY_MEMORY_PERCENT,
@@ -156,12 +156,12 @@ QuicForceRetry(
inline
void
DumpMsQuicPerfCounters(
_In_ const QUIC_API_TABLE* MsQuic
_In_ const QUIC_API_TABLE* MsQuicTable
)
{
uint64_t Counters[QUIC_PERF_COUNTER_MAX] = {0};
uint32_t Lenth = sizeof(Counters);
MsQuic->GetParam(
MsQuicTable->GetParam(
NULL,
QUIC_PARAM_LEVEL_GLOBAL,
QUIC_PARAM_GLOBAL_PERF_COUNTERS,
@@ -445,7 +445,7 @@ HQUIC
GetServerConfigurationFromArgs(
_In_ int argc,
_In_reads_(argc) _Null_terminated_ char* argv[],
_In_ const QUIC_API_TABLE* MsQuic,
_In_ const QUIC_API_TABLE* MsQuicTable,
_In_ HQUIC Registration,
_In_reads_(AlpnBufferCount) _Pre_defensive_
const QUIC_BUFFER* const AlpnBuffers,
@@ -512,7 +512,7 @@ GetServerConfigurationFromArgs(

HQUIC Configuration = nullptr;
if (QUIC_SUCCEEDED(
MsQuic->ConfigurationOpen(
MsQuicTable->ConfigurationOpen(
Registration,
AlpnBuffers,
AlpnBufferCount,
@@ -521,10 +521,10 @@ GetServerConfigurationFromArgs(
Context,
&Configuration)) &&
QUIC_FAILED(
MsQuic->ConfigurationLoadCredential(
MsQuicTable->ConfigurationLoadCredential(
Configuration,
Config))) {
MsQuic->ConfigurationClose(Configuration);
MsQuicTable->ConfigurationClose(Configuration);
Configuration = nullptr;
}

@@ -540,17 +540,17 @@ GetServerConfigurationFromArgs(
inline
void
FreeServerConfiguration(
_In_ const QUIC_API_TABLE* MsQuic,
_In_ const QUIC_API_TABLE* MsQuicTable,
_In_ HQUIC Configuration
)
{
#ifdef QUIC_TEST_APIS
auto SelfSignedConfig = (const QUIC_CREDENTIAL_CONFIG*)MsQuic->GetContext(Configuration);
auto SelfSignedConfig = (const QUIC_CREDENTIAL_CONFIG*)MsQuicTable->GetContext(Configuration);
if (SelfSignedConfig) {
CxPlatFreeSelfSignedCert(SelfSignedConfig);
}
#endif
MsQuic->ConfigurationClose(Configuration);
MsQuicTable->ConfigurationClose(Configuration);
}

inline
49 changes: 49 additions & 0 deletions src/inc/quic_toeplitz.h
Original file line number Diff line number Diff line change
@@ -7,6 +7,10 @@

#pragma once

#if defined(__cplusplus)
extern "C" {
#endif

#define NIBBLES_PER_BYTE 2
#define BITS_PER_NIBBLE 4

@@ -58,6 +62,8 @@ CxPlatToeplitzHashInitialize(

//
// Computes a Toeplitz hash.
// TODO - Update SAL to ensure:
// HashInputLength + HashInputOffset <= CXPLAT_TOEPLITZ_INPUT_SIZE
//
uint32_t
CxPlatToeplitzHashCompute(
@@ -67,3 +73,46 @@ CxPlatToeplitzHashCompute(
_In_ uint32_t HashInputLength,
_In_ uint32_t HashInputOffset
);

//
// Computes the Toeplitz hash of a QUIC address.
//
inline
void
CxPlatToeplitzHashComputeAddr(
_In_ const CXPLAT_TOEPLITZ_HASH* Toeplitz,
_In_ const QUIC_ADDR* Addr,
_Inout_ uint32_t* Key,
_Out_ uint32_t* Offset
)
{
if (QuicAddrGetFamily(Addr) == QUIC_ADDRESS_FAMILY_INET) {
*Key ^=
CxPlatToeplitzHashCompute(
Toeplitz,
((uint8_t*)Addr) + QUIC_ADDR_V4_PORT_OFFSET,
2, 0);
*Key ^=
CxPlatToeplitzHashCompute(
Toeplitz,
((uint8_t*)Addr) + QUIC_ADDR_V4_IP_OFFSET,
4, 2);
*Offset = 2 + 4;
} else {
*Key ^=
CxPlatToeplitzHashCompute(
Toeplitz,
((uint8_t*)Addr) + QUIC_ADDR_V6_PORT_OFFSET,
2, 0);
*Key ^=
CxPlatToeplitzHashCompute(
Toeplitz,
((uint8_t*)Addr) + QUIC_ADDR_V6_IP_OFFSET,
16, 2);
*Offset = 2 + 16;
}
}

#if defined(__cplusplus)
}
#endif
11 changes: 10 additions & 1 deletion src/perf/lib/RpsClient.cpp
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ PrintHelp(
" -target:<####> The target server to connect to.\n"
" -runtime:<####> The total runtime (in ms). (def:%u)\n"
" -port:<####> The UDP port of the server. (def:%u)\n"
" -ip:<0/4/6> A hint for the resolving the hostname to an IP address. (def:0)\n"
" -conns:<####> The number of connections to use. (def:%u)\n"
" -requests:<####> The number of requests to send at a time. (def:2*conns)\n"
" -request:<####> The length of request payloads. (def:%u)\n"
@@ -78,6 +79,14 @@ RpsClient::Init(
TryGetValue(argc, argv, "request", &RequestLength);
TryGetValue(argc, argv, "response", &ResponseLength);

uint16_t Ip;
if (TryGetValue(argc, argv, "ip", &Ip)) {
switch (Ip) {
case 4: RemoteFamily = QUIC_ADDRESS_FAMILY_INET; break;
case 6: RemoteFamily = QUIC_ADDRESS_FAMILY_INET6; break;
}
}

uint32_t Affinitize;
if (TryGetValue(argc, argv, "affinitize", &Affinitize)) {
AffinitizeWorkers = Affinitize != 0;
@@ -240,7 +249,7 @@ RpsClient::Start(
MsQuic->ConnectionStart(
Connections[i],
Configuration,
QUIC_ADDRESS_FAMILY_UNSPEC,
RemoteFamily,
Target.get(),
Port);
if (QUIC_FAILED(Status)) {
1 change: 1 addition & 0 deletions src/perf/lib/RpsClient.h
Original file line number Diff line number Diff line change
@@ -165,6 +165,7 @@ class RpsClient : public PerfBase {
QUIC_CREDENTIAL_FLAG_NO_CERTIFICATE_VALIDATION)};
uint32_t WorkerCount;
uint16_t Port {PERF_DEFAULT_PORT};
QUIC_ADDRESS_FAMILY RemoteFamily {QUIC_ADDRESS_FAMILY_UNSPEC};
UniquePtr<char[]> Target;
uint32_t RunTime {RPS_DEFAULT_RUN_TIME};
uint32_t ConnectionCount {RPS_DEFAULT_CONNECTION_COUNT};
8 changes: 8 additions & 0 deletions src/platform/inline.c
Original file line number Diff line number Diff line change
@@ -341,3 +341,11 @@ CxPlatInternalEventWaitWithTimeout(
_Inout_ CXPLAT_EVENT* Event,
_In_ uint32_t TimeoutMs
);

void
CxPlatToeplitzHashComputeAddr(
_In_ const CXPLAT_TOEPLITZ_HASH* Toeplitz,
_In_ const QUIC_ADDR* Addr,
_Inout_ uint32_t* Key,
_Out_ uint32_t* Offset
);
1 change: 1 addition & 0 deletions src/tools/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ add_subdirectory(interop)
add_subdirectory(interopserver)
add_subdirectory(ip/client)
add_subdirectory(ip/server)
add_subdirectory(lb)
add_subdirectory(pcp)
add_subdirectory(post)
add_subdirectory(reach)
4 changes: 4 additions & 0 deletions src/tools/lb/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

add_quic_tool(quiclb loadbalancer.cpp)
217 changes: 217 additions & 0 deletions src/tools/lb/loadbalancer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*++
Copyright (c) Microsoft Corporation.
Licensed under the MIT License.
Abstract:
Load balances QUIC traffic from a public address to a set of private
addresses. Requires the use of NAT'ing. Don't use in production.
--*/

#include <vector>
#include <unordered_map>
#include <mutex>

#include <quic_datapath.h>
#include <quic_toeplitz.h>
#include <msquichelper.h>

bool Verbose = false;
CXPLAT_DATAPATH* Datapath;
struct LbInterface* PublicInterface;
std::vector<QUIC_ADDR> PrivateAddrs;

struct LbInterface {
bool IsPublic;
CXPLAT_SOCKET* Socket {nullptr};
QUIC_ADDR LocalAddress;

LbInterface(_In_ const QUIC_ADDR* Address, bool IsPublic) : IsPublic(IsPublic) {
if (IsPublic) {
CxPlatSocketCreateUdp(Datapath, Address, nullptr, this, 0, &Socket);
} else {
CxPlatSocketCreateUdp(Datapath, nullptr, Address, this, 0, &Socket);
}
if (!Socket) {
printf("CxPlatSocketCreateUdp failed.\n");
exit(1);
}
CxPlatSocketGetLocalAddress(Socket, &LocalAddress);
}

virtual ~LbInterface() {
CxPlatSocketDelete(Socket);
}

virtual void Receive(_In_ CXPLAT_RECV_DATA* RecvDataChain) = 0;

void Send(_In_ CXPLAT_RECV_DATA* RecvDataChain, _In_opt_ const QUIC_ADDR* PeerAddress = nullptr) {
QUIC_ADDR RemoteAddress;
if (PeerAddress == nullptr) {
CxPlatSocketGetRemoteAddress(Socket, &RemoteAddress);
PeerAddress = &RemoteAddress;
}
CXPLAT_SEND_DATA* Send = nullptr;
while (RecvDataChain) {
if (!Send) {
Send = CxPlatSendDataAlloc(Socket, CXPLAT_ECN_NON_ECT, MAX_UDP_PAYLOAD_LENGTH);
}
if (Send) {
auto Buffer = CxPlatSendDataAllocBuffer(Send, MAX_UDP_PAYLOAD_LENGTH);
if (!Buffer) {
(void)CxPlatSocketSend(Socket, &LocalAddress, PeerAddress, Send, 0);
Send = CxPlatSendDataAlloc(Socket, CXPLAT_ECN_NON_ECT, MAX_UDP_PAYLOAD_LENGTH);
if (Send) {
Buffer = CxPlatSendDataAllocBuffer(Send, MAX_UDP_PAYLOAD_LENGTH);
}
}
if (Buffer) {
Buffer->Length = RecvDataChain->BufferLength;
CxPlatCopyMemory(Buffer->Buffer, RecvDataChain->Buffer, RecvDataChain->BufferLength);
}
}
RecvDataChain = RecvDataChain->Next;
}
if (Send) {
(void)CxPlatSocketSend(Socket, &LocalAddress, PeerAddress, Send, 0);
}
}
};

//
// Represents a NAT'ed socket from the load balancer back to a single private
// server address.
//
struct LbPrivateInterface : public LbInterface {
const QUIC_ADDR PeerAddress {0};

LbPrivateInterface(_In_ const QUIC_ADDR* PrivateAddress, _In_ const QUIC_ADDR* PeerAddress)
: LbInterface(PrivateAddress, false), PeerAddress(*PeerAddress) {
if (Verbose) {
QUIC_ADDR_STR PeerStr, PrivateStr;
QuicAddrToString(PeerAddress, &PeerStr);
QuicAddrToString(PrivateAddress, &PrivateStr);
printf("New private interface, %s => %s\n", PeerStr.Address, PrivateStr.Address);
}
}

void Receive(_In_ CXPLAT_RECV_DATA* RecvDataChain) {
PublicInterface->Send(RecvDataChain, &PeerAddress);
}
};

//
// Represents the public listening socket that load balances (and NATs) UDP
// packets between public clients and back end (private) server addresses.
//
struct LbPublicInterface : public LbInterface {
struct Hasher {
CXPLAT_TOEPLITZ_HASH Toeplitz;
Hasher() {
CxPlatRandom(CXPLAT_TOEPLITZ_KEY_SIZE, &Toeplitz.HashKey);
CxPlatToeplitzHashInitialize(&Toeplitz);
}
size_t operator() (const std::pair<QUIC_ADDR, QUIC_ADDR> key) const {
uint32_t Key = 0, Offset;
CxPlatToeplitzHashComputeAddr(&Toeplitz, &key.first, &Key, &Offset);
CxPlatToeplitzHashComputeAddr(&Toeplitz, &key.second, &Key, &Offset);
return Key;
}
};

struct EqualFn {
bool operator() (const std::pair<QUIC_ADDR, QUIC_ADDR>& t1, const std::pair<QUIC_ADDR, QUIC_ADDR>& t2) const {
return QuicAddrCompare(&t1.second, &t2.second);
}
};

std::unordered_map<std::pair<QUIC_ADDR, QUIC_ADDR>, LbPrivateInterface*, Hasher, EqualFn> PrivateInterfaces;
std::mutex Lock;
uint32_t NextInterface = 0;

LbPublicInterface(_In_ const QUIC_ADDR* PublicAddress) : LbInterface(PublicAddress, true) { }

~LbPublicInterface() {
// TODO - Iterate over private interfaces and delete
}

void Receive(_In_ CXPLAT_RECV_DATA* RecvDataChain) {
auto PrivateInterface =
GetPrivateInterface(
&RecvDataChain->Tuple->LocalAddress,
&RecvDataChain->Tuple->RemoteAddress);
PrivateInterface->Send(RecvDataChain);
}

LbInterface* GetPrivateInterface(_In_ const QUIC_ADDR* Local, _In_ const QUIC_ADDR* Remote) {
std::lock_guard<std::mutex> Scope(Lock);
auto& Entry = PrivateInterfaces[std::pair{*Local, *Remote}];
if (!Entry) {
Entry = new LbPrivateInterface(&PrivateAddrs[NextInterface++ % PrivateAddrs.size()], Remote);
}
return Entry;
}
};

void LbReceive(_In_ CXPLAT_SOCKET*, _In_ void* Context, _In_ CXPLAT_RECV_DATA* RecvDataChain) {
((LbInterface*)(Context))->Receive(RecvDataChain);
CxPlatRecvDataReturn(RecvDataChain);
}

void NoOpUnreachable(_In_ CXPLAT_SOCKET*,_In_ void*, _In_ const QUIC_ADDR*) { }

int
QUIC_MAIN_EXPORT
main(int argc, char **argv)
{
const char* PublicAddress;
const char* PrivateAddresses;
if (!TryGetValue(argc, argv, "pub", &PublicAddress) ||
!TryGetValue(argc, argv, "priv", &PrivateAddresses)) {
printf("Usage: quiclb -pub:<address> -priv:<address>,<address>\n");
exit(1);
}
Verbose = GetFlag(argc, argv, "v") || GetFlag(argc, argv, "verbose");

QUIC_ADDR PublicAddr;
if (!QuicAddrFromString(PublicAddress, 0, &PublicAddr) ||
!QuicAddrGetPort(&PublicAddr)) {
printf("Failed to decode -pub address: %s.\n", PublicAddress);
exit(1);
}

while (true) {
char* End = (char*)strchr(PrivateAddresses, ',');
if (End) { *End = 0; }

QUIC_ADDR PrivateAddr;
if (!QuicAddrFromString(PrivateAddresses, 0, &PrivateAddr) ||
!QuicAddrGetPort(&PrivateAddr)) {
printf("Failed to decode -priv address: %s.\n", PrivateAddresses);
exit(1);
}
PrivateAddrs.push_back(PrivateAddr);

if (!End) { break; }
PrivateAddresses = End + 1;
}

CxPlatSystemLoad();
CxPlatInitialize();

CXPLAT_UDP_DATAPATH_CALLBACKS LbUdpCallbacks { LbReceive, NoOpUnreachable };
CxPlatDataPathInitialize(0, &LbUdpCallbacks, nullptr, &Datapath);
PublicInterface = new LbPublicInterface(&PublicAddr);

printf("Press Enter to exit.\n\n");
getchar();

delete PublicInterface;
CxPlatDataPathUninitialize(Datapath);
CxPlatUninitialize();
CxPlatSystemUnload();

return 0;
}