diff --git a/server/svix-server/src/v1/endpoints/message.rs b/server/svix-server/src/v1/endpoints/message.rs index 7177f44b7..0e7e2b178 100644 --- a/server/svix-server/src/v1/endpoints/message.rs +++ b/server/svix-server/src/v1/endpoints/message.rs @@ -30,7 +30,7 @@ use crate::{ }, }, db::models::{application, message, messagecontent}, - error::{Error, HttpError, Result}, + error::{http_error_on_conflict, Error, HttpError, Result}, queue::{MessageTaskBatch, TaskQueueProducer}, v1::utils::{ filter_and_paginate_time_limited, openapi_tag, validation_error, ApplicationMsgPath, @@ -360,7 +360,7 @@ pub(crate) async fn create_message_inner( let (msg, msg_content) = db .transaction(|txn| { async move { - let msg = msg.insert(txn).await?; + let msg = msg.insert(txn).await.map_err(http_error_on_conflict)?; let msg_content = messagecontent::ActiveModel::new(msg.id.clone(), payload); let msg_content = msg_content.insert(txn).await?; Ok((msg, msg_content)) diff --git a/server/svix-server/tests/it/e2e_message.rs b/server/svix-server/tests/it/e2e_message.rs index 3fc93c18d..929157330 100644 --- a/server/svix-server/tests/it/e2e_message.rs +++ b/server/svix-server/tests/it/e2e_message.rs @@ -4,13 +4,15 @@ use chrono::{Duration, Utc}; use reqwest::StatusCode; use sea_orm::{sea_query::Expr, ColumnTrait, EntityTrait, QueryFilter}; +use serde::de::IgnoredAny; use svix_server::{ + core::types::{EventTypeName, MessageUid}, db::models::messagecontent, expired_message_cleaner, v1::{ endpoints::{ attempt::MessageAttemptOut, - message::{MessageOut, RawPayload}, + message::{MessageIn, MessageOut, RawPayload}, }, utils::ListResponse, }, @@ -472,3 +474,44 @@ async fn test_expunge_message_payload() { assert_eq!(msg.payload.0.get(), r#"{"expired":true}"#); } + +#[tokio::test] +async fn test_message_conflict() { + let (client, _jh) = start_svix_server().await; + + let app_id = create_test_app(&client, "v1MessageCRTestApp") + .await + .unwrap() + .id; + + let _endp_id = create_test_endpoint(&client, &app_id, "http://localhost:2/bad/url/") + .await + .unwrap() + .id; + + let msg_in = MessageIn { + event_type: EventTypeName("user.signup".to_owned()), + payload: RawPayload::from_string(serde_json::json!({"test": "value"}).to_string()).unwrap(), + payload_retention_period: 5, + channels: None, + uid: Some(MessageUid("test1".to_owned())), + }; + + let _: MessageOut = client + .post( + &format!("api/v1/app/{}/msg/", &app_id), + msg_in.clone(), + StatusCode::ACCEPTED, + ) + .await + .unwrap(); + + let _: IgnoredAny = client + .post( + &format!("api/v1/app/{}/msg/", &app_id), + msg_in, + StatusCode::CONFLICT, + ) + .await + .unwrap(); +}