From 3db3f1dc40936ce910e5fca4fc9fafcea8c3c85b Mon Sep 17 00:00:00 2001
From: Heemin Kim <heemin@amazon.com>
Date: Wed, 3 May 2023 15:58:01 -0700
Subject: [PATCH] Create datasource index explicitly

Signed-off-by: Heemin Kim <heemin@amazon.com>
---
 .../action/PutDatasourceTransportAction.java  | 24 ++++--
 .../ip2geo/common/DatasourceFacade.java       | 68 +++++++++++++++-
 .../ip2geo/common/GeoIpDataFacade.java        |  2 +-
 .../ip2geo/jobscheduler/Datasource.java       |  1 -
 .../jobscheduler/DatasourceUpdateService.java |  3 +-
 .../geospatial/plugin/GeospatialPlugin.java   |  4 +-
 .../resources/mappings/ip2geo_datasource.json | 80 +++++++++++++++++--
 src/main/resources/mappings/ip2geo_geoip.json |  9 +++
 .../geospatial/ip2geo/Ip2GeoTestCase.java     |  6 +-
 .../action/PutDatasourceRequestTests.java     |  2 +-
 .../PutDatasourceTransportActionTests.java    | 15 +++-
 .../ip2geo/common/DatasourceFacadeTests.java  | 74 +++++++++++++++--
 .../DatasourceExtensionTests.java             |  2 +-
 13 files changed, 259 insertions(+), 31 deletions(-)
 create mode 100644 src/main/resources/mappings/ip2geo_geoip.json

diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java
index 43e0f76c..f1e0fda5 100644
--- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java
+++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java
@@ -8,6 +8,7 @@
 
 package org.opensearch.geospatial.ip2geo.action;
 
+import java.io.IOException;
 import java.time.Instant;
 
 import lombok.extern.log4j.Log4j2;
@@ -15,6 +16,7 @@
 import org.opensearch.ResourceAlreadyExistsException;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.StepListener;
 import org.opensearch.action.index.IndexRequest;
 import org.opensearch.action.index.IndexResponse;
 import org.opensearch.action.support.ActionFilters;
@@ -71,18 +73,26 @@ public PutDatasourceTransportAction(
     @Override
     protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
         try {
-            Datasource datasource = Datasource.Builder.build(request);
-            IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
-                .id(datasource.getName())
-                .source(datasource.toXContent(JsonXContent.contentBuilder(), null))
-                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
-                .opType(DocWriteRequest.OpType.CREATE);
-            client.index(indexRequest, getIndexResponseListener(datasource, listener));
+            StepListener<Void> createIndexStep = new StepListener<>();
+            datasourceFacade.createIndexIfNotExists(createIndexStep);
+            createIndexStep.whenComplete(v -> putDatasource(request, listener), exception -> listener.onFailure(exception));
         } catch (Exception e) {
             listener.onFailure(e);
         }
     }
 
+    @VisibleForTesting
+    protected void putDatasource(final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener)
+        throws IOException {
+        Datasource datasource = Datasource.Builder.build(request);
+        IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
+            .id(datasource.getName())
+            .source(datasource.toXContent(JsonXContent.contentBuilder(), null))
+            .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
+            .opType(DocWriteRequest.OpType.CREATE);
+        client.index(indexRequest, getIndexResponseListener(datasource, listener));
+    }
+
     @VisibleForTesting
     protected ActionListener<IndexResponse> getIndexResponseListener(
         final Datasource datasource,
diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java
index 25d2f5af..ee8a68b0 100644
--- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java
+++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java
@@ -8,18 +8,28 @@
 
 package org.opensearch.geospatial.ip2geo.common;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
 import lombok.extern.log4j.Log4j2;
 
 import org.opensearch.OpenSearchException;
+import org.opensearch.ResourceAlreadyExistsException;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.StepListener;
+import org.opensearch.action.admin.indices.create.CreateIndexRequest;
+import org.opensearch.action.admin.indices.create.CreateIndexResponse;
 import org.opensearch.action.get.GetRequest;
 import org.opensearch.action.get.GetResponse;
 import org.opensearch.action.get.MultiGetItemResponse;
@@ -28,7 +38,9 @@
 import org.opensearch.action.index.IndexResponse;
 import org.opensearch.action.search.SearchResponse;
 import org.opensearch.client.Client;
+import org.opensearch.cluster.service.ClusterService;
 import org.opensearch.common.bytes.BytesReference;
+import org.opensearch.common.collect.Tuple;
 import org.opensearch.common.settings.ClusterSettings;
 import org.opensearch.common.xcontent.LoggingDeprecationHandler;
 import org.opensearch.common.xcontent.XContentFactory;
@@ -48,12 +60,64 @@
 @Log4j2
 public class DatasourceFacade {
     private static final Integer MAX_SIZE = 1000;
+    private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1);
+    private static final Tuple<String, String> INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all");
+    private static final Tuple<String, Boolean> INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true);
     private final Client client;
+    private final ClusterService clusterService;
     private final ClusterSettings clusterSettings;
 
-    public DatasourceFacade(final Client client, final ClusterSettings clusterSettings) {
+    public DatasourceFacade(final Client client, final ClusterService clusterService) {
         this.client = client;
-        this.clusterSettings = clusterSettings;
+        this.clusterService = clusterService;
+        this.clusterSettings = clusterService.getClusterSettings();
+    }
+
+    /**
+     * Create a datasource index of single shard with auto expand replicas to all nodes
+     *
+     * We want the index to expand to all replica so that datasource query request can be executed locally
+     * for faster ingestion time.
+     */
+    public void createIndexIfNotExists(final StepListener<Void> stepListener) {
+        if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) {
+            stepListener.onResponse(null);
+            return;
+        }
+        final Map<String, Object> indexSettings = new HashMap<>();
+        indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2());
+        indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
+        indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
+        final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping())
+            .settings(indexSettings);
+        client.admin().indices().create(createIndexRequest, new ActionListener<>() {
+            @Override
+            public void onResponse(final CreateIndexResponse createIndexResponse) {
+                stepListener.onResponse(null);
+            }
+
+            @Override
+            public void onFailure(final Exception e) {
+                if (e instanceof ResourceAlreadyExistsException) {
+                    log.info("index[{}] already exist", DatasourceExtension.JOB_INDEX_NAME);
+                    stepListener.onResponse(null);
+                    return;
+                }
+                stepListener.onFailure(e);
+            }
+        });
+    }
+
+    private String getIndexMapping() {
+        try {
+            try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) {
+                try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
+                    return reader.lines().map(String::trim).collect(Collectors.joining());
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     /**
diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java
index c1fb3a70..9e648f05 100644
--- a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java
+++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacade.java
@@ -109,7 +109,7 @@ public void createIndexIfNotExists(final String indexName) {
      */
     private String getIndexMapping() {
         try {
-            try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) {
+            try (InputStream is = DatasourceFacade.class.getResourceAsStream("/mappings/ip2geo_geoip.json")) {
                 try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
                     return reader.lines().map(String::trim).collect(Collectors.joining());
                 }
diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java
index 7c7eae55..d56f5184 100644
--- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java
+++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java
@@ -182,7 +182,6 @@ public class Datasource implements Writeable, ScheduledJobParameter {
         PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD);
         PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD);
-
     }
 
     @VisibleForTesting
diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java
index 07ea55c7..0f27993d 100644
--- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java
+++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java
@@ -210,7 +210,8 @@ private String setupIndex(final DatasourceManifest manifest, final Datasource da
      * @return
      */
     private boolean shouldUpdate(final Datasource datasource, final DatasourceManifest manifest) {
-        if (datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) {
+        if (datasource.getDatabase().getUpdatedAt() != null
+            && datasource.getDatabase().getUpdatedAt().toEpochMilli() > manifest.getUpdatedAt()) {
             return false;
         }
 
diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java
index fc29b165..3a21189e 100644
--- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java
+++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java
@@ -94,7 +94,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
                 new Ip2GeoProcessor.Factory(
                     parameters.client,
                     parameters.ingestService,
-                    new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService().getClusterSettings()),
+                    new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService()),
                     new GeoIpDataFacade(parameters.ingestService.getClusterService(), parameters.client)
                 )
             )
@@ -128,7 +128,7 @@ public Collection<Object> createComponents(
         Supplier<RepositoriesService> repositoriesServiceSupplier
     ) {
         GeoIpDataFacade geoIpDataFacade = new GeoIpDataFacade(clusterService, client);
-        DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService.getClusterSettings());
+        DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService);
         DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
             clusterService,
             client,
diff --git a/src/main/resources/mappings/ip2geo_datasource.json b/src/main/resources/mappings/ip2geo_datasource.json
index 3179ef0d..3f3d5aa1 100644
--- a/src/main/resources/mappings/ip2geo_datasource.json
+++ b/src/main/resources/mappings/ip2geo_datasource.json
@@ -1,9 +1,77 @@
 {
-  "dynamic": false,
-  "properties": {
-    "_cidr": {
-      "type": "ip_range",
-      "doc_values": false
+  "properties" : {
+    "database" : {
+      "properties" : {
+        "fields" : {
+          "type" : "text"
+        },
+        "sha256_hash" : {
+          "type" : "text"
+        },
+        "provider" : {
+          "type" : "text"
+        },
+        "updated_at_in_epoch_millis" : {
+          "type" : "long"
+        },
+        "valid_for_in_days" : {
+          "type" : "long"
+        }
+      }
+    },
+    "enabled_time" : {
+      "type" : "long"
+    },
+    "endpoint" : {
+      "type" : "text"
+    },
+    "name" : {
+      "type" : "text"
+    },
+    "indices" : {
+      "type" : "text"
+    },
+    "last_update_time" : {
+      "type" : "long"
+    },
+    "schedule" : {
+      "properties" : {
+        "interval" : {
+          "properties" : {
+            "period" : {
+              "type" : "long"
+            },
+            "start_time" : {
+              "type" : "long"
+            },
+            "unit" : {
+              "type" : "text"
+            }
+          }
+        }
+      }
+    },
+    "state" : {
+      "type" : "text"
+    },
+    "update_enabled" : {
+      "type" : "boolean"
+    },
+    "update_stats" : {
+      "properties" : {
+        "last_failed_at_in_epoch_millis" : {
+          "type" : "long"
+        },
+        "last_processing_time_in_millis" : {
+          "type" : "long"
+        },
+        "last_skipped_at_in_epoch_millis" : {
+          "type" : "long"
+        },
+        "last_succeeded_at_in_epoch_millis" : {
+          "type" : "long"
+        }
+      }
     }
   }
-}
+}
\ No newline at end of file
diff --git a/src/main/resources/mappings/ip2geo_geoip.json b/src/main/resources/mappings/ip2geo_geoip.json
new file mode 100644
index 00000000..3179ef0d
--- /dev/null
+++ b/src/main/resources/mappings/ip2geo_geoip.json
@@ -0,0 +1,9 @@
+{
+  "dynamic": false,
+  "properties": {
+    "_cidr": {
+      "type": "ip_range",
+      "doc_values": false
+    }
+  }
+}
diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java
index 48451a9c..7163c7cd 100644
--- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java
+++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java
@@ -233,7 +233,11 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
             Request request,
             ActionListener<Response> listener
         ) {
-            listener.onResponse((Response) executeVerifier.get().apply(action, request));
+            try {
+                listener.onResponse((Response) executeVerifier.get().apply(action, request));
+            } catch (Exception e) {
+                listener.onFailure(e);
+            }
         }
 
         /**
diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java
index 01d3025c..673ad8df 100644
--- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java
+++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java
@@ -159,7 +159,7 @@ public void testStreamInOut() throws Exception {
         String domain = GeospatialTestHelper.randomLowerCaseString();
         PutDatasourceRequest request = new PutDatasourceRequest(datasourceName);
         request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain));
-        request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(30) + 1));
+        request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29) + 1));
 
         // Run
         BytesStreamOutput output = new BytesStreamOutput();
diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java
index ff542418..62613581 100644
--- a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java
+++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportActionTests.java
@@ -14,9 +14,11 @@
 import static org.mockito.Mockito.verify;
 
 import org.junit.Before;
+import org.mockito.ArgumentCaptor;
 import org.opensearch.ResourceAlreadyExistsException;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.StepListener;
 import org.opensearch.action.index.IndexRequest;
 import org.opensearch.action.support.WriteRequest;
 import org.opensearch.action.support.master.AcknowledgedResponse;
@@ -44,7 +46,7 @@ public void init() {
         );
     }
 
-    public void testDoExecute() throws Exception {
+    public void testDoExecute_whenValidInput_thenSucceed() throws Exception {
         Task task = mock(Task.class);
         PutDatasourceRequest request = new PutDatasourceRequest("test");
         request.setEndpoint(sampleManifestUrl());
@@ -59,7 +61,18 @@ public void testDoExecute() throws Exception {
             assertEquals(DocWriteRequest.OpType.CREATE, indexRequest.opType());
             return null;
         });
+
+        // Run
         action.doExecute(task, request, listener);
+
+        // Verify
+        ArgumentCaptor<StepListener> captor = ArgumentCaptor.forClass(StepListener.class);
+        verify(datasourceFacade).createIndexIfNotExists(captor.capture());
+
+        // Run
+        captor.getValue().onResponse(null);
+
+        // Verify
         verify(verifyingClient).index(any(IndexRequest.class), any(ActionListener.class));
         verify(listener).onResponse(new AcknowledgedResponse(true));
     }
diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java
index aacd940a..7f32d61c 100644
--- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java
+++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java
@@ -17,14 +17,16 @@
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
-import java.util.HashSet;
 import java.util.List;
 
 import org.apache.lucene.search.TotalHits;
 import org.junit.Before;
 import org.mockito.ArgumentCaptor;
+import org.opensearch.ResourceAlreadyExistsException;
 import org.opensearch.action.ActionListener;
 import org.opensearch.action.DocWriteRequest;
+import org.opensearch.action.StepListener;
+import org.opensearch.action.admin.indices.create.CreateIndexRequest;
 import org.opensearch.action.get.GetRequest;
 import org.opensearch.action.get.GetResponse;
 import org.opensearch.action.get.MultiGetItemResponse;
@@ -35,8 +37,6 @@
 import org.opensearch.action.search.SearchResponse;
 import org.opensearch.common.Randomness;
 import org.opensearch.common.bytes.BytesReference;
-import org.opensearch.common.settings.ClusterSettings;
-import org.opensearch.common.settings.Settings;
 import org.opensearch.common.xcontent.json.JsonXContent;
 import org.opensearch.geospatial.GeospatialTestHelper;
 import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
@@ -53,10 +53,70 @@ public class DatasourceFacadeTests extends Ip2GeoTestCase {
 
     @Before
     public void init() {
-        datasourceFacade = new DatasourceFacade(
-            verifyingClient,
-            new ClusterSettings(Settings.EMPTY, new HashSet<>(Ip2GeoSettings.settings()))
+        datasourceFacade = new DatasourceFacade(verifyingClient, clusterService);
+    }
+
+    public void testCreateIndexIfNotExists_whenIndexExist_thenCreateRequestIsNotCalled() {
+        when(metadata.hasIndex(DatasourceExtension.JOB_INDEX_NAME)).thenReturn(true);
+
+        // Verify
+        verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { throw new RuntimeException("Shouldn't get called"); });
+
+        // Run
+        StepListener<Void> stepListener = new StepListener<>();
+        datasourceFacade.createIndexIfNotExists(stepListener);
+
+        // Verify stepListener is called
+        stepListener.result();
+    }
+
+    public void testCreateIndexIfNotExists_whenIndexExist_thenCreateRequestIsCalled() {
+        when(metadata.hasIndex(DatasourceExtension.JOB_INDEX_NAME)).thenReturn(false);
+
+        // Verify
+        verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
+            assertTrue(actionRequest instanceof CreateIndexRequest);
+            CreateIndexRequest request = (CreateIndexRequest) actionRequest;
+            assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
+            assertEquals("1", request.settings().get("index.number_of_shards"));
+            assertEquals("0-all", request.settings().get("index.auto_expand_replicas"));
+            assertEquals("true", request.settings().get("index.hidden"));
+            assertNotNull(request.mappings());
+            return null;
+        });
+
+        // Run
+        StepListener<Void> stepListener = new StepListener<>();
+        datasourceFacade.createIndexIfNotExists(stepListener);
+
+        // Verify stepListener is called
+        stepListener.result();
+    }
+
+    public void testCreateIndexIfNotExists_whenIndexCreatedAlready_thenExceptionIsIgnored() {
+        when(metadata.hasIndex(DatasourceExtension.JOB_INDEX_NAME)).thenReturn(false);
+        verifyingClient.setExecuteVerifier(
+            (actionResponse, actionRequest) -> { throw new ResourceAlreadyExistsException(DatasourceExtension.JOB_INDEX_NAME); }
         );
+
+        // Run
+        StepListener<Void> stepListener = new StepListener<>();
+        datasourceFacade.createIndexIfNotExists(stepListener);
+
+        // Verify stepListener is called
+        stepListener.result();
+    }
+
+    public void testCreateIndexIfNotExists_whenExceptionIsThrown_thenExceptionIsThrown() {
+        when(metadata.hasIndex(DatasourceExtension.JOB_INDEX_NAME)).thenReturn(false);
+        verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { throw new RuntimeException(); });
+
+        // Run
+        StepListener<Void> stepListener = new StepListener<>();
+        datasourceFacade.createIndexIfNotExists(stepListener);
+
+        // Verify stepListener is called
+        expectThrows(RuntimeException.class, () -> stepListener.result());
     }
 
     public void testUpdateDatasource_whenValidInput_thenSucceed() throws Exception {
@@ -104,7 +164,7 @@ public void testGetDatasource_whenExistWithListener_thenListenerIsCalledWithData
         verify(listener).onResponse(eq(datasource));
     }
 
-    public void testGetDatasource_whenExistWithListener_thenListenerIsCalledWithNull() {
+    public void testGetDatasource_whenNotExistWithListener_thenListenerIsCalledWithNull() {
         Datasource datasource = setupClientForGetRequest(false, null);
         ActionListener<Datasource> listener = mock(ActionListener.class);
         datasourceFacade.getDatasource(datasource.getName(), listener);
diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java
index 18fc8628..75dea954 100644
--- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java
+++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtensionTests.java
@@ -20,7 +20,7 @@
 import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
 
 public class DatasourceExtensionTests extends Ip2GeoTestCase {
-    public void testBasic() throws Exception {
+    public void testBasic() {
         DatasourceExtension extension = new DatasourceExtension();
         assertEquals("scheduler_geospatial_ip2geo_datasource", extension.getJobType());
         assertEquals(JOB_INDEX_NAME, extension.getJobIndex());