-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* jwk types update * update * update * jwk txn and execution * consensus ensure jwk txns are expected * update * jwk consensus network type defs * update cargo.toml * update * update * update * lint * jwk update quorum certification * jwk observation * the main jwk consensus state machine * jwk consensus epoch manager * update * jwk consensus wired into node * update * update * jwk consensus smoke tests * update
- Loading branch information
Showing
9 changed files
with
625 additions
and
5 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
// Copyright © Aptos Foundation | ||
|
||
use aptos_infallible::RwLock; | ||
use hyper::{ | ||
service::{make_service_fn, service_fn}, | ||
Body, Request, Response, Server, | ||
}; | ||
use request_handler::RequestHandler; | ||
use std::{convert::Infallible, mem, net::SocketAddr, sync::Arc}; | ||
use tokio::{ | ||
sync::{ | ||
oneshot, | ||
oneshot::{Receiver, Sender}, | ||
}, | ||
task::JoinHandle, | ||
}; | ||
|
||
pub(crate) mod request_handler; | ||
|
||
/// A dummy OIDC provider. | ||
pub struct DummyProvider { | ||
close_tx: Sender<()>, | ||
open_id_config_url: String, | ||
handler_holder: Arc<RwLock<Option<Arc<dyn RequestHandler>>>>, | ||
server_join_handle: JoinHandle<()>, | ||
} | ||
|
||
impl DummyProvider { | ||
pub(crate) async fn spawn() -> Self { | ||
let addr = SocketAddr::from(([127, 0, 0, 1], 0)); | ||
let handler_holder = Arc::new(RwLock::new(None)); | ||
let (port_tx, port_rx) = oneshot::channel::<u16>(); | ||
let (close_tx, close_rx) = oneshot::channel::<()>(); | ||
let server_join_handle = tokio::spawn(Self::run_server( | ||
addr, | ||
handler_holder.clone(), | ||
port_tx, | ||
close_rx, | ||
)); | ||
let actual_port = port_rx.await.unwrap(); | ||
let open_id_config_url = format!("http://127.0.0.1:{}", actual_port); | ||
Self { | ||
close_tx, | ||
open_id_config_url, | ||
handler_holder, | ||
server_join_handle, | ||
} | ||
} | ||
|
||
pub fn open_id_config_url(&self) -> String { | ||
self.open_id_config_url.clone() | ||
} | ||
|
||
pub fn update_request_handler( | ||
&self, | ||
handler: Option<Arc<dyn RequestHandler>>, | ||
) -> Option<Arc<dyn RequestHandler>> { | ||
mem::replace(&mut *self.handler_holder.write(), handler) | ||
} | ||
|
||
pub async fn shutdown(self) { | ||
let DummyProvider { | ||
close_tx, | ||
server_join_handle, | ||
.. | ||
} = self; | ||
close_tx.send(()).unwrap(); | ||
server_join_handle.await.unwrap(); | ||
} | ||
} | ||
|
||
// Private functions. | ||
impl DummyProvider { | ||
async fn run_server( | ||
addr: SocketAddr, | ||
handler_holder: Arc<RwLock<Option<Arc<dyn RequestHandler>>>>, | ||
port_tx: Sender<u16>, | ||
close_rx: Receiver<()>, | ||
) { | ||
let make_svc = make_service_fn(move |_| { | ||
let handler_holder_clone = handler_holder.clone(); | ||
async move { | ||
Ok::<_, Infallible>(service_fn(move |req| { | ||
Self::handle_request(req, handler_holder_clone.clone()) | ||
})) | ||
} | ||
}); | ||
|
||
let server = Server::bind(&addr).serve(make_svc); | ||
let actual_addr = server.local_addr(); | ||
port_tx.send(actual_addr.port()).unwrap(); | ||
|
||
// Graceful shutdown | ||
let graceful = server.with_graceful_shutdown(async { | ||
close_rx.await.unwrap(); | ||
}); | ||
|
||
graceful.await.unwrap(); | ||
} | ||
|
||
async fn handle_request( | ||
request: Request<Body>, | ||
handler_holder: Arc<RwLock<Option<Arc<dyn RequestHandler>>>>, | ||
) -> Result<Response<Body>, Infallible> { | ||
let handler = handler_holder.write(); | ||
let raw_response = handler.as_ref().unwrap().handle(request); | ||
Ok(Response::new(Body::from(raw_response))) | ||
} | ||
} |
118 changes: 118 additions & 0 deletions
118
testsuite/smoke-test/src/jwks/dummy_provider/request_handler.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
// Copyright © Aptos Foundation | ||
|
||
use aptos_infallible::Mutex; | ||
use hyper::{Body, Request}; | ||
use move_core_types::account_address::AccountAddress; | ||
use std::{collections::HashSet, str::FromStr}; | ||
|
||
/// A handler that handles JWK requests from a validator, | ||
/// assuming the validator account address is written as the COOKIE. | ||
pub trait RequestHandler: Send + Sync { | ||
fn handle(&self, request: Request<Body>) -> Vec<u8>; | ||
} | ||
|
||
pub struct StaticContentServer { | ||
content: Vec<u8>, | ||
} | ||
|
||
impl StaticContentServer { | ||
pub fn new(content: Vec<u8>) -> Self { | ||
Self { content } | ||
} | ||
|
||
pub fn new_str(content: &str) -> Self { | ||
Self::new(content.as_bytes().to_vec()) | ||
} | ||
} | ||
|
||
impl RequestHandler for StaticContentServer { | ||
fn handle(&self, _origin: Request<Body>) -> Vec<u8> { | ||
self.content.clone() | ||
} | ||
} | ||
|
||
fn origin_from_cookie(request: &Request<Body>) -> AccountAddress { | ||
let cookie = request | ||
.headers() | ||
.get(hyper::header::COOKIE) | ||
.unwrap() | ||
.to_str() | ||
.unwrap(); | ||
AccountAddress::from_str(cookie).unwrap() | ||
} | ||
|
||
/// The first `k` requesters will get content A forever, the rest will get content B forever. | ||
pub struct EquivocatingServer { | ||
content_a: Vec<u8>, | ||
content_b: Vec<u8>, | ||
k: usize, | ||
requesters_observed: Mutex<HashSet<AccountAddress>>, | ||
} | ||
|
||
impl EquivocatingServer { | ||
pub fn new(content_a: Vec<u8>, content_b: Vec<u8>, k: usize) -> Self { | ||
Self { | ||
content_a, | ||
content_b, | ||
k, | ||
requesters_observed: Mutex::new(HashSet::new()), | ||
} | ||
} | ||
} | ||
|
||
impl RequestHandler for EquivocatingServer { | ||
fn handle(&self, request: Request<Body>) -> Vec<u8> { | ||
let mut requesters_observed = self.requesters_observed.lock(); | ||
let origin = origin_from_cookie(&request); | ||
if requesters_observed.len() < self.k { | ||
requesters_observed.insert(origin); | ||
} | ||
|
||
if requesters_observed.contains(&origin) { | ||
self.content_a.clone() | ||
} else { | ||
self.content_b.clone() | ||
} | ||
} | ||
} | ||
|
||
/// This server first replies with `initial_thoughts`. | ||
/// After enough audience receives it for at least once, it switches its reply to `second_thoughts`. | ||
/// | ||
/// This behavior simulates the situation where a provider performs a 2nd key rotation right after the 1st. | ||
pub struct MindChangingServer { | ||
initial_thoughts: Vec<u8>, | ||
second_thoughts: Vec<u8>, | ||
change_mind_threshold: usize, | ||
requesters_observed: Mutex<HashSet<AccountAddress>>, | ||
} | ||
|
||
impl MindChangingServer { | ||
pub fn new( | ||
initial_thoughts: Vec<u8>, | ||
second_thoughts: Vec<u8>, | ||
change_mind_threshold: usize, | ||
) -> Self { | ||
Self { | ||
initial_thoughts, | ||
second_thoughts, | ||
change_mind_threshold, | ||
requesters_observed: Mutex::new(HashSet::new()), | ||
} | ||
} | ||
} | ||
|
||
impl RequestHandler for MindChangingServer { | ||
fn handle(&self, request: Request<Body>) -> Vec<u8> { | ||
let mut requesters_observed = self.requesters_observed.lock(); | ||
let origin = origin_from_cookie(&request); | ||
if requesters_observed.contains(&origin) | ||
|| requesters_observed.len() >= self.change_mind_threshold | ||
{ | ||
self.second_thoughts.clone() | ||
} else { | ||
requesters_observed.insert(origin); | ||
self.initial_thoughts.clone() | ||
} | ||
} | ||
} |
Oops, something went wrong.