diff --git a/tsconfig.json b/tsconfig.json index 3e0805139..bed35c1d7 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -11,7 +11,7 @@ // "disableReferencedProjectLoad": true, /* Reduce the number of projects loaded automatically by TypeScript. */ /* Language and Environment */ - "target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */ + "target": "es2022", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */ // "lib": [], /* Specify a set of bundled library declaration files that describe the target runtime environment. */ // "jsx": "preserve", /* Specify what JSX code is generated. */ // "experimentalDecorators": true, /* Enable experimental support for legacy experimental decorators. */ @@ -25,9 +25,9 @@ // "moduleDetection": "auto", /* Control what method is used to detect module-format JS files. */ /* Modules */ - "module": "commonjs", /* Specify what module code is generated. */ + "module": "ES2022", /* Specify what module code is generated. */ // "rootDir": "./", /* Specify the root folder within your source files. */ - // "moduleResolution": "node10", /* Specify how TypeScript looks up a file from a given module specifier. */ + "moduleResolution": "nodenext", /* Specify how TypeScript looks up a file from a given module specifier. */ // "baseUrl": "./", /* Specify the base directory to resolve non-relative module names. */ // "paths": {}, /* Specify a set of entries that re-map imports to additional lookup locations. */ // "rootDirs": [], /* Allow multiple folders to be treated as one when resolving modules. */ diff --git a/worker-sandbox/.gitignore b/worker-sandbox/.gitignore new file mode 100644 index 000000000..7310e7361 --- /dev/null +++ b/worker-sandbox/.gitignore @@ -0,0 +1 @@ +.wrangler \ No newline at end of file diff --git a/worker-sandbox/Cargo.toml b/worker-sandbox/Cargo.toml index 29d01d8c1..c5530b27d 100644 --- a/worker-sandbox/Cargo.toml +++ b/worker-sandbox/Cargo.toml @@ -26,7 +26,7 @@ http = "0.2.9" regex = "1.8.4" serde = { version = "1.0.164", features = ["derive"] } serde_json = "1.0.96" -worker = { path = "../worker", version = "0.0.17", features= ["queue"] } +worker = { path = "../worker", version = "0.0.17", features= ["queue", "d1"] } futures-channel = "0.3.28" futures-util = { version = "0.3.28", default-features = false } rand = "0.8.5" diff --git a/worker-sandbox/src/d1.rs b/worker-sandbox/src/d1.rs new file mode 100644 index 000000000..64bd8443d --- /dev/null +++ b/worker-sandbox/src/d1.rs @@ -0,0 +1,106 @@ +use serde::Deserialize; +use worker::*; + +use crate::SomeSharedData; + +#[derive(Deserialize)] +struct Person { + id: u32, + name: String, + age: u32, +} + +pub async fn prepared_statement( + _req: Request, + ctx: RouteContext, +) -> Result { + let db = ctx.env.d1("DB")?; + let stmt = worker::query!(&db, "SELECT * FROM people WHERE name = ?", "Ryan Upton")?; + + // All rows + let results = stmt.all().await?; + let people = results.results::()?; + + assert!(results.success()); + assert_eq!(results.error(), None); + assert_eq!(people.len(), 1); + assert_eq!(people[0].name, "Ryan Upton"); + assert_eq!(people[0].age, 21); + assert_eq!(people[0].id, 6); + + // All columns of the first rows + let person = stmt.first::(None).await?.unwrap(); + assert_eq!(person.name, "Ryan Upton"); + assert_eq!(person.age, 21); + + // The name of the first row + let name = stmt.first::(Some("name")).await?.unwrap(); + assert_eq!(name, "Ryan Upton"); + + // All of the rows as column arrays of raw JSON values. + let rows = stmt.raw::().await?; + assert_eq!(rows.len(), 1); + let columns = &rows[0]; + + assert_eq!(columns[0].as_u64(), Some(6)); + assert_eq!(columns[1].as_str(), Some("Ryan Upton")); + assert_eq!(columns[2].as_u64(), Some(21)); + + Response::ok("ok") +} + +pub async fn batch(_req: Request, ctx: RouteContext) -> Result { + let db = ctx.env.d1("DB")?; + let mut results = db + .batch(vec![ + worker::query!(&db, "SELECT * FROM people WHERE id < 4"), + worker::query!(&db, "SELECT * FROM people WHERE id > 4"), + ]) + .await? + .into_iter(); + + let first_results = results.next().unwrap().results::()?; + assert_eq!(first_results.len(), 3); + assert_eq!(first_results[0].id, 1); + assert_eq!(first_results[1].id, 2); + assert_eq!(first_results[2].id, 3); + + let second_results = results.next().unwrap().results::()?; + assert_eq!(second_results.len(), 2); + assert_eq!(second_results[0].id, 5); + assert_eq!(second_results[1].id, 6); + + Response::ok("ok") +} + +pub async fn exec(mut req: Request, ctx: RouteContext) -> Result { + let db = ctx.env.d1("DB")?; + let result = db + .exec(req.text().await?.as_ref()) + .await + .expect("doesn't exist"); + + Response::ok(result.count().unwrap_or_default().to_string()) +} + +pub async fn dump(_req: Request, ctx: RouteContext) -> Result { + let db = ctx.env.d1("DB")?; + let bytes = db.dump().await?; + Response::from_bytes(bytes) +} + +pub async fn error(_req: Request, ctx: RouteContext) -> Result { + let db = ctx.env.d1("DB")?; + let error = db + .exec("THIS IS NOT VALID SQL") + .await + .expect_err("did not get error"); + + if let Error::D1(error) = error { + assert_eq!(error.cause(), "Error in line 1: THIS IS NOT VALID SQL: ERROR 9009: SQL prepare error: near \"THIS\": syntax error in THIS IS NOT VALID SQL at offset 0") + } else { + panic!("expected D1 error"); + } + + Response::ok("") +} diff --git a/worker-sandbox/src/lib.rs b/worker-sandbox/src/lib.rs index b30c91119..a80ab59a0 100644 --- a/worker-sandbox/src/lib.rs +++ b/worker-sandbox/src/lib.rs @@ -15,6 +15,7 @@ use worker::*; mod alarm; mod counter; +mod d1; mod r2; mod test; mod utils; @@ -497,7 +498,7 @@ pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result { // Ensure that the cancelled future returns an AbortError. match cancelled_fut.await { - Err(e) if e.to_string().starts_with("AbortError") => { /* Yay! It worked, let's do nothing to celebrate */}, + Err(e) if e.to_string().contains("AbortError") => { /* Yay! It worked, let's do nothing to celebrate */}, Err(e) => panic!("Fetch errored with a different error than expected: {:#?}", e), Ok(text) => panic!("Fetch unexpectedly succeeded: {}", text) } @@ -698,19 +699,11 @@ pub async fn main(req: Request, env: Env, _ctx: worker::Context) -> Result = guard.clone(); Response::from_json(&messages) }) - .post_async("/d1/exec", |mut req, ctx| async move { - let d1 = ctx.env.d1("DB")?; - let query = req.text().await?; - let exec_result = d1.exec(&query).await; - match exec_result { - Ok(result) => { - let count = result.count().unwrap_or(u32::MAX); - Response::ok(format!("{}", count)) - }, - Err(err) => Response::error(format!("Exec failed - {}", err), 500) - } - - }) + .get_async("/d1/prepared", d1::prepared_statement) + .get_async("/d1/batch", d1::batch) + .get_async("/d1/dump", d1::dump) + .post_async("/d1/exec", d1::exec) + .get_async("/d1/error", d1::error) .get_async("/r2/list-empty", r2::list_empty) .get_async("/r2/list", r2::list) .get_async("/r2/get-empty", r2::get_empty) diff --git a/worker-sandbox/tests/d1.rs b/worker-sandbox/tests/d1.rs deleted file mode 100644 index 32ac1247b..000000000 --- a/worker-sandbox/tests/d1.rs +++ /dev/null @@ -1,56 +0,0 @@ -use reqwest::blocking::Body; - -use crate::util::{expect_wrangler, post_return_err}; - -mod util; - -fn post_exec_and_expect>(query: T, expected: u32) { - let response = post_return_err(&format!("d1/exec"), |r| r.body(query.into())).unwrap(); - let status = response.status(); - let body = response.text().unwrap(); - if status.is_success() { - let parsed = str::parse::(&body.clone()); - assert!(parsed.is_ok()); - assert_eq!(expected, parsed.unwrap()); - } else { - let err = body.clone(); - eprintln!("Error received: {}", err); - panic!("Error received from request.") - } -} - -// #[test] -// fn d1_exec_version() { -// expect_wrangler(); -// post_exec_and_expect("PRAGMA schema_version", u32::MAX); -// } - -#[test] -fn d1_create_table() { - expect_wrangler(); - post_exec_and_expect( - "CREATE TABLE IF NOT EXISTS people ( \ - person_id INTEGER PRIMARY KEY, \ - name TEXT NOT NULL, \ - age INTEGER NOT NULL)", - 1, - ); -} - -#[test] -fn d1_insert_data() { - expect_wrangler(); - d1_create_table(); - post_exec_and_expect( - "INSERT OR IGNORE INTO people \ - (thing_id, title, description) \ - VALUES \ - (1, 'Freddie Pearce', 26), \ - (2, 'Wynne Ogley', 67), \ - (3, 'Dorian Fischer', 19), \ - (4, 'John Smith', 92), \ - (5, 'Magaret Willamson', 54), \ - (6, 'Ryan Upton', 21),", - 1, - ); -} diff --git a/worker-sandbox/tests/d1.spec.ts b/worker-sandbox/tests/d1.spec.ts new file mode 100644 index 000000000..5e426a523 --- /dev/null +++ b/worker-sandbox/tests/d1.spec.ts @@ -0,0 +1,70 @@ +import { describe, test, expect, beforeAll } from "vitest"; + +const hasLocalDevServer = await fetch("http://localhost:8787/request") + .then((resp) => resp.ok) + .catch(() => false); + +async function exec(query: string): Promise { + const resp = await fetch("http://localhost:8787/d1/exec", { + method: "POST", + body: query.split("\n").join(""), + }); + + const body = await resp.text(); + expect(resp.status).toBe(200); + return Number(body); +} + +describe.skipIf(!hasLocalDevServer)("d1", () => { + test("create table", async () => { + const query = `CREATE TABLE IF NOT EXISTS uniqueTable ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + age INTEGER NOT NULL + );`; + + expect(await exec(query)).toBe(1); + }); + + test("insert data", async () => { + let query = `CREATE TABLE IF NOT EXISTS people ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + age INTEGER NOT NULL + );`; + + expect(await exec(query)).toBe(1); + + query = `INSERT OR IGNORE INTO people + (id, name, age) + VALUES + (1, 'Freddie Pearce', 26), + (2, 'Wynne Ogley', 67), + (3, 'Dorian Fischer', 19), + (4, 'John Smith', 92), + (5, 'Magaret Willamson', 54), + (6, 'Ryan Upton', 21);`; + + expect(await exec(query)).toBe(1); + }); + + test("prepared statement", async () => { + const resp = await fetch("http://localhost:8787/d1/prepared"); + expect(resp.status).toBe(200); + }); + + test("batch", async () => { + const resp = await fetch("http://localhost:8787/d1/batch"); + expect(resp.status).toBe(200); + }); + + test("dump", async () => { + const resp = await fetch("http://localhost:8787/d1/dump"); + expect(resp.status).toBe(200); + }); + + test("dump", async () => { + const resp = await fetch("http://localhost:8787/d1/error"); + expect(resp.status).toBe(200); + }); +}); diff --git a/worker/src/d1/mod.rs b/worker/src/d1/mod.rs index 59606d5a2..4570e34fd 100644 --- a/worker/src/d1/mod.rs +++ b/worker/src/d1/mod.rs @@ -1,5 +1,10 @@ +use std::fmt::Display; +use std::fmt::Formatter; +use std::result::Result as StdResult; + use js_sys::Array; use js_sys::ArrayBuffer; +use js_sys::JsString; use js_sys::Uint8Array; use serde::Deserialize; use wasm_bindgen::{JsCast, JsValue}; @@ -28,12 +33,11 @@ impl D1Database { /// Dump the data in the database to a `Vec`. pub async fn dump(&self) -> Result> { - let array_buffer = JsFuture::from(self.0.dump()).await?; + let result = JsFuture::from(self.0.dump()).await; + let array_buffer = cast_to_d1_error(result)?; let array_buffer = array_buffer.dyn_into::()?; let array = Uint8Array::new(&array_buffer); - let mut vec = Vec::with_capacity(array.length() as usize); - array.copy_to(&mut vec); - Ok(vec) + Ok(array.to_vec()) } /// Batch execute one or more statements against the database. @@ -41,11 +45,12 @@ impl D1Database { /// Returns the results in the same order as the provided statements. pub async fn batch(&self, statements: Vec) -> Result> { let statements = statements.into_iter().map(|s| s.0).collect::(); - let results = JsFuture::from(self.0.batch(statements)).await?; + let results = JsFuture::from(self.0.batch(statements)).await; + let results = cast_to_d1_error(results)?; let results = results.dyn_into::()?; let mut vec = Vec::with_capacity(results.length() as usize); for result in results.iter() { - let result = result.dyn_into::()?; + let result = result.unchecked_into::(); vec.push(D1Result(result)); } Ok(vec) @@ -64,7 +69,8 @@ impl D1Database { /// If an error occurs, an exception is thrown with the query and error /// messages, execution stops and further statements are not executed. pub async fn exec(&self, query: &str) -> Result { - let result = JsFuture::from(self.0.exec(query)).await?; + let result = JsFuture::from(self.0.exec(query)).await; + let result = cast_to_d1_error(result)?; Ok(result.into()) } } @@ -154,14 +160,16 @@ impl D1PreparedStatement { where T: for<'a> Deserialize<'a>, { - let js_value = JsFuture::from(self.0.first(col_name)).await?; + let result = JsFuture::from(self.0.first(col_name)).await; + let js_value = cast_to_d1_error(result)?; let value = serde_wasm_bindgen::from_value(js_value)?; Ok(value) } /// Executes a query against the database but only return metadata. pub async fn run(&self) -> Result { - let result = JsFuture::from(self.0.run()).await?; + let result = JsFuture::from(self.0.run()).await; + let result = cast_to_d1_error(result)?; Ok(D1Result(result.into())) } @@ -176,7 +184,8 @@ impl D1PreparedStatement { where T: for<'a> Deserialize<'a>, { - let result = JsFuture::from(self.0.raw()).await?; + let result = JsFuture::from(self.0.raw()).await; + let result = cast_to_d1_error(result)?; let result = result.dyn_into::()?; let mut vec = Vec::with_capacity(result.length() as usize); for value in result.iter() { @@ -217,7 +226,7 @@ impl D1Result { if let Some(results) = self.0.results() { let mut vec = Vec::with_capacity(results.length() as usize); for result in results.iter() { - let result = serde_wasm_bindgen::from_value(result)?; + let result = serde_wasm_bindgen::from_value(result).unwrap(); vec.push(result); } Ok(vec) @@ -226,3 +235,68 @@ impl D1Result { } } } + +#[derive(Clone)] +pub struct D1Error { + inner: js_sys::Error, +} + +impl D1Error { + /// Gets the cause of the error specific to D1. + pub fn cause(&self) -> String { + if let Ok(cause) = self.inner.cause().dyn_into::() { + cause.message().into() + } else { + "unknown error".into() + } + } +} + +impl std::fmt::Debug for D1Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let cause = self.inner.cause(); + + f.debug_struct("D1Error").field("cause", &cause).finish() + } +} + +impl Display for D1Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let cause = self.inner.cause(); + let cause = JsString::from(cause); + write!(f, "{}", cause) + } +} + +impl AsRef for D1Error { + fn as_ref(&self) -> &js_sys::Error { + &self.inner + } +} + +impl AsRef for D1Error { + fn as_ref(&self) -> &JsValue { + &self.inner + } +} + +fn cast_to_d1_error(result: StdResult) -> StdResult { + let err = match result { + Ok(value) => return Ok(value), + Err(err) => err, + }; + + let err: JsValue = match err.dyn_into::() { + Ok(err) => { + let message: String = err.message().into(); + + if message.starts_with("D1") { + return Err(D1Error { inner: err }.into()); + }; + err.into() + } + Err(err) => err, + }; + + Err(err.into()) +} diff --git a/worker/src/error.rs b/worker/src/error.rs index 90ae39a36..c2c0056d7 100644 --- a/worker/src/error.rs +++ b/worker/src/error.rs @@ -17,6 +17,8 @@ pub enum Error { SerdeJsonError(serde_json::Error), #[cfg(feature = "queue")] SerdeWasmBindgenError(serde_wasm_bindgen::Error), + #[cfg(feature = "d1")] + D1(crate::d1::D1Error), } impl From for Error { @@ -39,6 +41,13 @@ impl From for Error { } } +#[cfg(feature = "d1")] +impl From for Error { + fn from(e: crate::d1::D1Error) -> Self { + Self::D1(e) + } +} + impl std::fmt::Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -56,6 +65,7 @@ impl std::fmt::Display for Error { Error::SerdeJsonError(e) => write!(f, "Serde Error: {e}"), #[cfg(feature = "queue")] Error::SerdeWasmBindgenError(e) => write!(f, "Serde Error: {e}"), + Error::D1(e) => write!(f, "D1: {e:#?}"), } } }