diff --git a/docs/en/connector-v2/source/Persistiq.md b/docs/en/connector-v2/source/Persistiq.md
new file mode 100644
index 00000000000..8f462c4fe8a
--- /dev/null
+++ b/docs/en/connector-v2/source/Persistiq.md
@@ -0,0 +1,296 @@
+# Persistiq
+
+> Persistiq source connector
+
+## Description
+
+Used to read data from Persistiq.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+| --------------------------- | ------ | -------- | ------------- |
+| url | String | Yes | - |
+| password | String | Yes | - |
+| method | String | No | get |
+| schema | Config | No | - |
+| schema.fields | Config | No | - |
+| format | String | No | json |
+| params | Map | No | - |
+| body | String | No | - |
+| json_field | Config | No | - |
+| content_json | String | No | - |
+| poll_interval_ms | int | No | - |
+| retry | int | No | - |
+| retry_backoff_multiplier_ms | int | No | 100 |
+| retry_backoff_max_ms | int | No | 10000 |
+| common-options | config | No | - |
+
+### url [String]
+
+http request url
+
+### password [String]
+
+API key for login, you can get it at Persistiq website
+
+### method [String]
+
+http request method, only supports GET, POST method
+
+### params [Map]
+
+http params
+
+### body [String]
+
+http body
+
+### poll_interval_ms [int]
+
+request http api interval(millis) in stream mode
+
+### retry [int]
+
+The max retry times if request http return to `IOException`
+
+### retry_backoff_multiplier_ms [int]
+
+The retry-backoff times(millis) multiplier if request http failed
+
+### retry_backoff_max_ms [int]
+
+The maximum retry-backoff times(millis) if request http failed
+
+### format [String]
+
+the format of upstream data, now only support `json` `text`, default `json`.
+
+when you assign format is `json`, you should also assign schema option, for example:
+
+upstream data is the following:
+
+```json
+{
+ "code": 200,
+ "data": "get success",
+ "success": true
+}
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+when you assign format is `text`, connector will do nothing for upstream data, for example:
+
+upstream data is the following:
+
+```json
+{
+ "code": 200,
+ "data": "get success",
+ "success": true
+}
+```
+
+connector will generate data as the following:
+
+| content |
+|---------|
+| {"code": 200, "data": "get success", "success": true} |
+
+### schema [Config]
+
+#### fields [Config]
+
+the schema fields of upstream data
+
+### content_json [String]
+
+This parameter can get some json data.If you only need the data in the 'book' section, configure `content_field = "$.store.book.*"`.
+
+If your return data looks something like this.
+
+```json
+{
+ "store": {
+ "book": [
+ {
+ "category": "reference",
+ "author": "Nigel Rees",
+ "title": "Sayings of the Century",
+ "price": 8.95
+ },
+ {
+ "category": "fiction",
+ "author": "Evelyn Waugh",
+ "title": "Sword of Honour",
+ "price": 12.99
+ }
+ ],
+ "bicycle": {
+ "color": "red",
+ "price": 19.95
+ }
+ },
+ "expensive": 10
+}
+```
+You can configure `content_field = "$.store.book.*"` and the result returned looks like this:
+
+```json
+[
+ {
+ "category": "reference",
+ "author": "Nigel Rees",
+ "title": "Sayings of the Century",
+ "price": 8.95
+ },
+ {
+ "category": "fiction",
+ "author": "Evelyn Waugh",
+ "title": "Sword of Honour",
+ "price": 12.99
+ }
+]
+```
+Then you can get the desired result with a simpler schema,like
+
+```hocon
+Http {
+ url = "http://example.com/xyz"
+ method = "GET"
+ format = "json"
+ content_field = "$.store.book.*"
+ schema = {
+ fields {
+ category = string
+ author = string
+ title = string
+ price = string
+ }
+ }
+}
+```
+
+Here is an example:
+
+- Test data can be found at this link [mockserver-contentjson-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-contentjson-config.json)
+- See this link for task configuration [http_contentjson_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_contentjson_to_assert.conf).
+
+### json_field [Config]
+
+This parameter helps you configure the schema,so this parameter must be used with schema.
+
+If your data looks something like this:
+
+```json
+{
+ "store": {
+ "book": [
+ {
+ "category": "reference",
+ "author": "Nigel Rees",
+ "title": "Sayings of the Century",
+ "price": 8.95
+ },
+ {
+ "category": "fiction",
+ "author": "Evelyn Waugh",
+ "title": "Sword of Honour",
+ "price": 12.99
+ }
+ ],
+ "bicycle": {
+ "color": "red",
+ "price": 19.95
+ }
+ },
+ "expensive": 10
+}
+```
+
+You can get the contents of 'book' by configuring the task as follows:
+
+```hocon
+source {
+ Http {
+ url = "http://example.com/xyz"
+ method = "GET"
+ format = "json"
+ json_field = {
+ category = "$.store.book[*].category"
+ author = "$.store.book[*].author"
+ title = "$.store.book[*].title"
+ price = "$.store.book[*].price"
+ }
+ schema = {
+ fields {
+ category = string
+ author = string
+ title = string
+ price = string
+ }
+ }
+ }
+}
+```
+
+- Test data can be found at this link [mockserver-jsonpath-config.json](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-jsonpath-config.json)
+- See this link for task configuration [http_jsonpath_to_assert.conf](../../../../seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/http_jsonpath_to_assert.conf).
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details
+
+## Example
+
+```hocon
+ Persistiq{
+ url = "https://api.persistiq.com/v1/users"
+ password = "Your password"
+ content_field = "$.users.*"
+ schema = {
+ fields {
+ id = string
+ name = string
+ email = string
+ activated = boolean
+ default_mailbox_id = string
+ salesforce_id = string
+ }
+ }
+ }
+```
+
+## Changelog
+
+### next version
+
+- Add Persistiq Source Connector
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/pom.xml b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/pom.xml
new file mode 100644
index 00000000000..7798e0f7d2c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/pom.xml
@@ -0,0 +1,40 @@
+
+
+
+
+ connector-http
+ org.apache.seatunnel
+ ${revision}
+
+ 4.0.0
+
+ connector-http-persistiq
+
+
+
+ org.apache.seatunnel
+ connector-http-base
+ ${project.version}
+
+
+
+
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java
new file mode 100644
index 00000000000..a2bdf09c7e5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSource.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.persistiq.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSource;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
+import org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceParameter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@AutoService(SeaTunnelSource.class)
+public class PersistiqSource extends HttpSource {
+ private final PersistiqSourceParameter persistiqSourceParameter = new PersistiqSourceParameter();
+ @Override
+ public String getPluginName() {
+ return "Persistiq";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, PersistiqSourceConfig.URL.key(), PersistiqSourceConfig.PASSWORD.key());
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+ }
+ persistiqSourceParameter.buildWithConfig(pluginConfig);
+ buildSchemaWithConfig(pluginConfig);
+ }
+
+ @Override
+ public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception {
+ return new HttpSourceReader(this.persistiqSourceParameter, readerContext, this.deserializationSchema, jsonField, contentField);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java
new file mode 100644
index 00000000000..cb62e220988
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/PersistiqSourceFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.persistiq.source;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceFactory;
+import org.apache.seatunnel.connectors.seatunnel.persistiq.source.config.PersistiqSourceConfig;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class PersistiqSourceFactory extends HttpSourceFactory {
+ @Override
+ public String factoryIdentifier() {
+ return "Persistiq";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return getHttpBuilder()
+ .required(PersistiqSourceConfig.PASSWORD)
+ .build();
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/config/PersistiqSourceConfig.java b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/config/PersistiqSourceConfig.java
new file mode 100644
index 00000000000..7ca37f42c4d
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/config/PersistiqSourceConfig.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.persistiq.source.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpConfig;
+
+public class PersistiqSourceConfig extends HttpConfig {
+ public static final String X_API_KEY = "x-api-key";
+
+ public static final Option PASSWORD = Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Persistiq login api key");
+}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/config/PersistiqSourceParameter.java b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/config/PersistiqSourceParameter.java
new file mode 100644
index 00000000000..256347c75e3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/main/java/org/apache/seatunnel/connectors/seatunnel/persistiq/source/config/PersistiqSourceParameter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.persistiq.source.config;
+
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.HashMap;
+
+public class PersistiqSourceParameter extends HttpParameter {
+ public void buildWithConfig(Config pluginConfig) {
+ super.buildWithConfig(pluginConfig);
+ // put authorization in headers
+ this.headers = this.getHeaders() == null ? new HashMap<>() : this.getHeaders();
+ this.headers.put(PersistiqSourceConfig.X_API_KEY, pluginConfig.getString(PersistiqSourceConfig.PASSWORD.key()));
+ this.setHeaders(this.headers);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/test/java/org/apache/seatunnel/connectors/seatunnel/persistiq/PersistiqFactoryTest.java b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/test/java/org/apache/seatunnel/connectors/seatunnel/persistiq/PersistiqFactoryTest.java
new file mode 100644
index 00000000000..699a2e48dcc
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-http/connector-http-persistiq/src/test/java/org/apache/seatunnel/connectors/seatunnel/persistiq/PersistiqFactoryTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.persistiq;
+
+import org.apache.seatunnel.connectors.seatunnel.persistiq.source.PersistiqSourceFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class PersistiqFactoryTest {
+
+ @Test
+ void optionRule() {
+ Assertions.assertNotNull((new PersistiqSourceFactory()).optionRule());
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-http/pom.xml b/seatunnel-connectors-v2/connector-http/pom.xml
index 522adaa00a5..944e20c5e92 100644
--- a/seatunnel-connectors-v2/connector-http/pom.xml
+++ b/seatunnel-connectors-v2/connector-http/pom.xml
@@ -40,6 +40,7 @@
connector-http-jira
connector-http-gitlab
connector-http-notion
+ connector-http-persistiq
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index a76591b3d94..f163dfa0ca4 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -203,6 +203,12 @@
${project.version}
provided
+
+ org.apache.seatunnel
+ connector-http-persistiq
+ ${project.version}
+ provided
+
org.apache.seatunnel
connector-jdbc
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
index de857158613..3f83098ee79 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/pom.xml
@@ -80,6 +80,12 @@
${project.version}
test
+
+ org.apache.seatunnel
+ connector-http-persistiq
+ ${project.version}
+ test
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpPersistiqIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpPersistiqIT.java
new file mode 100644
index 00000000000..d264a8508c4
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/java/org/apache/seatunnel/e2e/connector/http/HttpPersistiqIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.http;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.stream.Stream;
+
+public class HttpPersistiqIT extends TestSuiteBase implements TestResource {
+
+ private static final String IMAGE = "mockserver/mockserver:5.14.0";
+
+ private GenericContainer> mockserverContainer;
+
+ @BeforeAll
+ @Override
+ public void startUp() {
+ this.mockserverContainer = new GenericContainer<>(DockerImageName.parse(IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("mockserver")
+ .withExposedPorts(1080)
+ .withCopyFileToContainer(MountableFile.forHostPath(new File(HttpIT.class.getResource(
+ "/mockserver-persistiq-config.json").getPath()).getAbsolutePath()),
+ "/tmp/mockserver-persistiq-config.json")
+ .withEnv("MOCKSERVER_INITIALIZATION_JSON_PATH", "/tmp/mockserver-persistiq-config.json")
+ .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+ .waitingFor(new HttpWaitStrategy().forPath("/").forStatusCode(404));
+ Startables.deepStart(Stream.of(mockserverContainer)).join();
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() {
+ if (mockserverContainer != null) {
+ mockserverContainer.stop();
+ }
+ }
+
+ @TestTemplate
+ public void testHttpPersistiqSourceToAssertSink(TestContainer container) throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob("/persistiq_json_to_assert.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-persistiq-config.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-persistiq-config.json
new file mode 100644
index 00000000000..5dd56b27fab
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/mockserver-persistiq-config.json
@@ -0,0 +1,59 @@
+// https://www.mock-server.com/mock_server/getting_started.html#request_matchers
+
+[
+ {
+ "httpRequest": {
+ "method" : "GET",
+ "path": "/v1/users"
+ },
+ "httpResponse": {
+ "body":
+ {
+ "status": "success",
+ "errors": [],
+ "users": [
+ {
+ "id": "u_q3e537",
+ "name": "Tiana Eichmann MD",
+ "email": "colton.jenkins@acme2.com",
+ "activated": true,
+ "default_mailbox_id": "mbox_...",
+ "salesforce_id": null
+ },
+ {
+ "id": "u_2ljD34",
+ "name": "Brendan Reichert",
+ "email": "teresa@acme2.com",
+ "activated": true,
+ "default_mailbox_id": "mbox_...",
+ "salesforce_id": null
+ },
+ {
+ "id": "u_M3kXp2",
+ "name": "Chester Lind",
+ "email": "raina@acme2.com",
+ "activated": false,
+ "default_mailbox_id": "mbox_...",
+ "salesforce_id": null
+ },
+ {
+ "id": "u_114g0a",
+ "name": "TaoZex",
+ "email": "TaoZex@acme2.com",
+ "activated": true,
+ "default_mailbox_id": "mbox_...",
+ "salesforce_id": null
+ },
+ {
+ "id": "u_h44g53",
+ "name": "Jack",
+ "email": "Jack@acme2.com",
+ "activated": false,
+ "default_mailbox_id": "mbox_...",
+ "salesforce_id": null
+ }
+ ]
+ }
+ }
+ }
+]
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/persistiq_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/persistiq_json_to_assert.conf
new file mode 100644
index 00000000000..dc2c6c06b73
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-http-e2e/src/test/resources/persistiq_json_to_assert.conf
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Persistiq {
+ url = "http://mockserver:1080/v1/users"
+ password = "Seatunnel-test"
+ method = "GET"
+ format = "json"
+ content_field = "$.users.*"
+ schema = {
+ fields {
+ id = string
+ name = string
+ email = string
+ activated = boolean
+ default_mailbox_id = string
+ salesforce_id = string
+ }
+ }
+ }
+}
+
+sink {
+ Console {}
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 5
+ }
+ ],
+
+ field_rules = [
+ {
+ field_name = id
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = email
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = activated
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file