Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: expose Materialize version in mz_version parameter #17024

Merged
merged 1 commit into from
Jan 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions src/adapter/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::{mpsc, oneshot, watch};
use tracing::error;
use uuid::Uuid;

use mz_build_info::BuildInfo;
use mz_ore::collections::CollectionExt;
use mz_ore::id_gen::IdAllocator;
use mz_ore::task::{AbortOnDropHandle, JoinHandleExt};
Expand All @@ -29,7 +30,7 @@ use crate::catalog::INTROSPECTION_USER;
use crate::command::{Canceled, Command, ExecuteResponse, Response, StartupResponse};
use crate::error::AdapterError;
use crate::metrics::Metrics;
use crate::session::{EndTransactionAction, PreparedStatement, Session, TransactionId};
use crate::session::{EndTransactionAction, PreparedStatement, Session, TransactionId, User};
use crate::PeekResponseUnary;

/// An abstraction allowing us to name different connections.
Expand Down Expand Up @@ -71,14 +72,20 @@ impl Handle {
/// outstanding clients have dropped.
#[derive(Debug, Clone)]
pub struct Client {
build_info: &'static BuildInfo,
inner_cmd_tx: mpsc::UnboundedSender<Command>,
id_alloc: Arc<IdAllocator<ConnectionId>>,
metrics: Metrics,
}

impl Client {
pub(crate) fn new(cmd_tx: mpsc::UnboundedSender<Command>, metrics: Metrics) -> Client {
pub(crate) fn new(
build_info: &'static BuildInfo,
cmd_tx: mpsc::UnboundedSender<Command>,
metrics: Metrics,
) -> Client {
Client {
build_info,
inner_cmd_tx: cmd_tx,
id_alloc: Arc::new(IdAllocator::new(1, 1 << 16)),
metrics,
Expand All @@ -88,6 +95,7 @@ impl Client {
/// Allocates a client for an incoming connection.
pub fn new_conn(&self) -> Result<ConnClient, AdapterError> {
Ok(ConnClient {
build_info: self.build_info,
conn_id: self
.id_alloc
.alloc()
Expand All @@ -101,7 +109,7 @@ impl Client {
pub async fn introspection_execute_one(&self, sql: &str) -> Result<Vec<Row>, anyhow::Error> {
// Connect to the coordinator.
let conn_client = self.new_conn()?;
let session = Session::new(conn_client.conn_id(), INTROSPECTION_USER.clone());
let session = conn_client.new_session(INTROSPECTION_USER.clone());
let (mut session_client, _) = conn_client.startup(session, false).await?;

// Parse the SQL statement.
Expand Down Expand Up @@ -146,11 +154,20 @@ impl Client {
/// See also [`Client`].
#[derive(Debug)]
pub struct ConnClient {
build_info: &'static BuildInfo,
conn_id: ConnectionId,
inner: Client,
}

impl ConnClient {
/// Creates a new session associated with this connection for the given
/// user.
///
/// It is the caller's responsibility to have authenticated the user.
pub fn new_session(&self, user: User) -> Session {
Session::new(self.build_info, self.conn_id, user)
}

/// Returns the ID of the connection associated with this client.
pub fn conn_id(&self) -> ConnectionId {
self.conn_id
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/config/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use mz_repr::Row;
use mz_sql::ast::{Ident, Raw, ShowStatement, ShowVariableStatement, Statement};

use crate::catalog::SYSTEM_USER;
use crate::session::{EndTransactionAction, Session};
use crate::session::EndTransactionAction;
use crate::{AdapterError, Client, ExecuteResponse, PeekResponseUnary, SessionClient};

use super::SynchronizedParameters;
Expand All @@ -27,7 +27,7 @@ pub struct SystemParameterBackend {
impl SystemParameterBackend {
pub async fn new(client: Client) -> Result<Self, AdapterError> {
let conn_client = client.new_conn()?;
let session = Session::new(conn_client.conn_id(), SYSTEM_USER.clone());
let session = conn_client.new_session(SYSTEM_USER.clone());
let (session_client, _) = conn_client.startup(session, true).await?;
Ok(Self { session_client })
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ pub async fn serve<S: Append + 'static>(
start_instant,
_thread: thread.join_on_drop(),
};
let client = Client::new(cmd_tx.clone(), metrics_clone);
let client = Client::new(build_info, cmd_tx.clone(), metrics_clone);
Ok((handle, client))
}
Err(e) => Err(e),
Expand Down
23 changes: 16 additions & 7 deletions src/adapter/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::OwnedMutexGuard;
use uuid::Uuid;

use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
use mz_pgrepr::Format;
use mz_repr::{Datum, Diff, GlobalId, Row, ScalarType, TimestampManipulation};
use mz_sql::ast::{Raw, Statement, TransactionAccessMode};
Expand Down Expand Up @@ -95,25 +96,33 @@ pub struct Session<T = mz_repr::Timestamp> {

impl<T: TimestampManipulation> Session<T> {
/// Creates a new session for the specified connection ID.
pub fn new(conn_id: ConnectionId, user: User) -> Session<T> {
pub(crate) fn new(
build_info: &'static BuildInfo,
conn_id: ConnectionId,
user: User,
) -> Session<T> {
assert_ne!(conn_id, DUMMY_CONNECTION_ID);
Self::new_internal(conn_id, user)
Self::new_internal(build_info, conn_id, user)
}

/// Creates a new dummy session.
///
/// Dummy sessions are intended for use when executing queries on behalf of
/// the system itself, rather than on behalf of a user.
pub fn dummy() -> Session<T> {
Self::new_internal(DUMMY_CONNECTION_ID, SYSTEM_USER.clone())
Self::new_internal(&DUMMY_BUILD_INFO, DUMMY_CONNECTION_ID, SYSTEM_USER.clone())
}

fn new_internal(conn_id: ConnectionId, user: User) -> Session<T> {
fn new_internal(
build_info: &'static BuildInfo,
conn_id: ConnectionId,
user: User,
) -> Session<T> {
let (notices_tx, notices_rx) = mpsc::unbounded_channel();
let vars = if INTERNAL_USER_NAMES.contains(&user.name) {
SessionVars::for_cluster(&user.name)
SessionVars::for_cluster(build_info, &user.name)
} else {
SessionVars::default()
SessionVars::new(build_info)
};
Session {
conn_id,
Expand Down Expand Up @@ -593,7 +602,7 @@ impl<T: TimestampManipulation> Session<T> {
pub fn reset(&mut self) {
let _ = self.clear_transaction();
self.prepared_statements.clear();
self.vars = SessionVars::default();
self.vars = SessionVars::new(self.vars.build_info());
}

/// Returns the user who owns this session.
Expand Down
70 changes: 57 additions & 13 deletions src/adapter/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use once_cell::sync::Lazy;
use serde::Serialize;
use uncased::UncasedStr;

use mz_build_info::BuildInfo;
use mz_ore::cast;
use mz_sql::ast::{Ident, SetVariableValue, Value as AstValue};
use mz_sql::DEFAULT_SCHEMA;
Expand Down Expand Up @@ -125,6 +126,8 @@ const INTERVAL_STYLE: ServerVar<str> = ServerVar {
internal: false,
};

const MZ_VERSION_NAME: &UncasedStr = UncasedStr::new("mz_version");

const QGM_OPTIMIZATIONS: ServerVar<bool> = ServerVar {
name: UncasedStr::new("qgm_optimizations_experimental"),
value: &false,
Expand Down Expand Up @@ -399,6 +402,7 @@ static EMIT_TRACE_ID_NOTICE: ServerVar<bool> = ServerVar {
#[derive(Debug)]
pub struct SessionVars {
application_name: SessionVar<str>,
build_info: &'static BuildInfo,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this go on SystemVars instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

System vars can be set with ALTER SYSTEM SET which is not what we want here. It's kinda a funny split (probably system vars and server/session vars should be unified somehow), but for now this is the right spot!

client_encoding: ServerVar<str>,
client_min_messages: SessionVar<ClientSeverity>,
cluster: SessionVar<str>,
Expand All @@ -424,10 +428,12 @@ pub struct SessionVars {
emit_trace_id_notice: SessionVar<bool>,
}

impl Default for SessionVars {
fn default() -> SessionVars {
impl SessionVars {
/// Creates a new [`SessionVars`].
pub fn new(build_info: &'static BuildInfo) -> SessionVars {
SessionVars {
application_name: SessionVar::new(&APPLICATION_NAME),
build_info,
client_encoding: CLIENT_ENCODING,
client_min_messages: SessionVar::new(&CLIENT_MIN_MESSAGES),
cluster: SessionVar::new(&CLUSTER),
Expand Down Expand Up @@ -455,24 +461,20 @@ impl Default for SessionVars {
emit_trace_id_notice: SessionVar::new(&EMIT_TRACE_ID_NOTICE),
}
}
}

impl SessionVars {
/// Returns a new SessionVars with the cluster variable set to `cluster`.
pub fn for_cluster(cluster_name: &str) -> Self {
let mut cluster = SessionVar::new(&CLUSTER);
cluster.session_value = Some(cluster_name.into());
Self {
cluster,
..Default::default()
}
pub fn for_cluster(build_info: &'static BuildInfo, cluster_name: &str) -> Self {
let mut vars = SessionVars::new(build_info);
vars.cluster.session_value = Some(cluster_name.into());
vars
}

/// Returns an iterator over the configuration parameters and their current
/// values for this session.
pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
let vars: [&dyn Var; 24] = [
let vars: [&dyn Var; 25] = [
&self.application_name,
self.build_info,
&self.client_encoding,
&self.client_min_messages,
&self.cluster,
Expand Down Expand Up @@ -504,7 +506,7 @@ impl SessionVars {
/// values for this session) that are expected to be sent to the client when
/// a new connection is established or when their value changes.
pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
let vars: [&dyn Var; 8] = [
let vars: [&dyn Var; 9] = [
&self.application_name,
&self.client_encoding,
&self.date_style,
Expand All @@ -513,6 +515,13 @@ impl SessionVars {
&self.standard_conforming_strings,
&self.timezone,
&self.interval_style,
// Including `mz_version` in the notify set is a Materialize
// extension. Doing so allows applications to detect whether they
// are talking to Materialize or PostgreSQL without an additional
// network roundtrip. This is known to be safe because CockroachDB
// has an analogous extension [0].
// [0]: https://github.com/cockroachdb/cockroach/blob/369c4057a/pkg/sql/pgwire/conn.go#L1840
self.build_info,
];
vars.into_iter()
}
Expand Down Expand Up @@ -550,6 +559,8 @@ impl SessionVars {
Ok(&self.integer_datetimes)
} else if name == INTERVAL_STYLE.name {
Ok(&self.interval_style)
} else if name == MZ_VERSION_NAME {
Ok(self.build_info)
} else if name == QGM_OPTIMIZATIONS.name {
Ok(&self.qgm_optimizations)
} else if name == SEARCH_PATH.name {
Expand Down Expand Up @@ -789,6 +800,7 @@ impl SessionVars {
// call to `end_transaction` below.
let SessionVars {
application_name,
build_info: _,
client_encoding: _,
client_min_messages,
cluster,
Expand Down Expand Up @@ -836,6 +848,11 @@ impl SessionVars {
self.application_name.value()
}

/// Returns the build info.
pub fn build_info(&self) -> &'static BuildInfo {
self.build_info
}

/// Returns the value of the `client_encoding` configuration parameter.
pub fn client_encoding(&self) -> &'static str {
self.client_encoding.value
Expand Down Expand Up @@ -881,6 +898,11 @@ impl SessionVars {
self.interval_style.value
}

/// Returns the value of the `mz_version` configuration parameter.
pub fn mz_version(&self) -> String {
self.build_info.value()
}

/// Returns the value of the `qgm_optimizations` configuration parameter.
pub fn qgm_optimizations(&self) -> bool {
*self.qgm_optimizations.value()
Expand Down Expand Up @@ -1589,6 +1611,28 @@ where
}
}

impl Var for BuildInfo {
fn name(&self) -> &'static str {
"mz_version"
}

fn value(&self) -> String {
self.human_version()
}

fn description(&self) -> &'static str {
"Shows the Materialize server version (Materialize)."
}

fn type_name(&self) -> &'static str {
str::TYPE_NAME
}

fn visible(&self, _: &User) -> bool {
true
}
}

/// A value that can be stored in a session or server variable.
pub trait Value: ToOwned + Send + Sync {
/// The name of the value type.
Expand Down
4 changes: 2 additions & 2 deletions src/environmentd/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tracing::{error, warn};

use mz_adapter::catalog::{HTTP_DEFAULT_USER, SYSTEM_USER};
use mz_adapter::session::{ExternalUserMetadata, Session, User};
use mz_adapter::session::{ExternalUserMetadata, User};
use mz_adapter::{AdapterError, Client, SessionClient};
use mz_frontegg_auth::{FronteggAuthentication, FronteggError};
use mz_ore::metrics::MetricsRegistry;
Expand Down Expand Up @@ -282,7 +282,7 @@ impl AuthedClient {
create_if_not_exists,
} = user;
let adapter_client = adapter_client.new_conn()?;
let session = Session::new(adapter_client.conn_id(), user);
let session = adapter_client.new_session(user);
let (adapter_client, _) = adapter_client
.startup(session, create_if_not_exists)
.await?;
Expand Down
5 changes: 0 additions & 5 deletions src/pgwire/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ where
}
}

/// Returns the ID of this connection.
pub fn id(&self) -> u32 {
self.conn_id
}

/// Reads and decodes one frontend message from the client.
///
/// Blocks until the client sends a complete message. If the client
Expand Down
13 changes: 5 additions & 8 deletions src/pgwire/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use mz_adapter::catalog::INTERNAL_USER_NAMES;
use mz_adapter::session::User;
use mz_adapter::session::{
EndTransactionAction, ExternalUserMetadata, InProgressRows, Portal, PortalState,
RowBatchStream, Session, TransactionStatus,
RowBatchStream, TransactionStatus,
};
use mz_adapter::{ExecuteResponse, PeekResponseUnary, RowsFuture};
use mz_frontegg_auth::FronteggAuthentication;
Expand Down Expand Up @@ -224,13 +224,10 @@ where
};

// Construct session.
let mut session = Session::new(
conn.id(),
User {
name: user,
external_metadata,
},
);
let mut session = adapter_client.new_session(User {
name: user,
external_metadata,
});
for (name, value) in params {
let local = false;
let _ = session.vars_mut().set(&name, &value, local);
Expand Down
Loading