Skip to content

Commit

Permalink
Remove all unused client attributes (#293)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 authored May 6, 2023
1 parent bda5eff commit e06849d
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,16 @@

import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.tasks.Task;
Expand All @@ -42,7 +36,6 @@
*/
@Log4j2
public class PutDatasourceTransportAction extends HandledTransportAction<PutDatasourceRequest, AcknowledgedResponse> {
private final Client client;
private final ThreadPool threadPool;
private final DatasourceFacade datasourceFacade;
private final DatasourceUpdateService datasourceUpdateService;
Expand All @@ -51,20 +44,19 @@ public class PutDatasourceTransportAction extends HandledTransportAction<PutData
* Default constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param client the client
* @param threadPool the thread pool
* @param datasourceFacade the datasource facade
* @param datasourceUpdateService the datasource update service
*/
@Inject
public PutDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final Client client,
final ThreadPool threadPool,
final DatasourceFacade datasourceFacade,
final DatasourceUpdateService datasourceUpdateService
) {
super(PutDatasourceAction.NAME, transportService, actionFilters, PutDatasourceRequest::new);
this.client = client;
this.threadPool = threadPool;
this.datasourceFacade = datasourceFacade;
this.datasourceUpdateService = datasourceUpdateService;
Expand All @@ -85,12 +77,7 @@ protected void doExecute(final Task task, final PutDatasourceRequest request, fi
protected void putDatasource(final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener)
throws IOException {
Datasource datasource = Datasource.Builder.build(request);
IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
.id(datasource.getName())
.source(datasource.toXContent(JsonXContent.contentBuilder(), null))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.opType(DocWriteRequest.OpType.CREATE);
client.index(indexRequest, getIndexResponseListener(datasource, listener));
datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, listener));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Objects;
import java.util.stream.Collectors;

import javax.swing.*;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
Expand All @@ -34,7 +36,6 @@
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetItemResponse;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
Expand Down Expand Up @@ -129,12 +130,30 @@ private String getIndexMapping() {
*/
public IndexResponse updateDatasource(final Datasource datasource) throws IOException {
datasource.setLastUpdateTime(Instant.now());
IndexRequestBuilder requestBuilder = client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME);
requestBuilder.setId(datasource.getName());
requestBuilder.setOpType(DocWriteRequest.OpType.INDEX);
requestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
requestBuilder.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
return client.index(requestBuilder.request()).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
return client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
}

/**
* Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
*
* @param datasource the datasource
* @param listener the listener
* @throws IOException exception
*/
public void putDatasource(final Datasource datasource, final ActionListener listener) throws IOException {
datasource.setLastUpdateTime(Instant.now());
client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setOpType(DocWriteRequest.OpType.CREATE)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.execute(listener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
Expand Down Expand Up @@ -53,7 +52,6 @@ public static DatasourceRunner getJobRunnerInstance() {
}

private ClusterService clusterService;
private Client client;
private DatasourceUpdateService datasourceUpdateService;
private Ip2GeoExecutor ip2GeoExecutor;
private DatasourceFacade datasourceFacade;
Expand All @@ -68,13 +66,11 @@ private DatasourceRunner() {
*/
public void initialize(
final ClusterService clusterService,
final Client client,
final DatasourceUpdateService datasourceUpdateService,
final Ip2GeoExecutor ip2GeoExecutor,
final DatasourceFacade datasourceFacade
) {
this.clusterService = clusterService;
this.client = client;
this.datasourceUpdateService = datasourceUpdateService;
this.ip2GeoExecutor = ip2GeoExecutor;
this.datasourceFacade = datasourceFacade;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.opensearch.OpenSearchException;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
Expand All @@ -35,19 +34,16 @@
public class DatasourceUpdateService {
private final ClusterService clusterService;
private final ClusterSettings clusterSettings;
private final Client client;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;

public DatasourceUpdateService(
final ClusterService clusterService,
final Client client,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade
) {
this.clusterService = clusterService;
this.clusterSettings = clusterService.getClusterSettings();
this.client = client;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
import org.opensearch.client.Client;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
Expand All @@ -52,7 +51,6 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
private final Set<String> properties;
private final boolean ignoreMissing;
private final boolean firstOnly;
private final Client client;
private final ClusterSettings clusterSettings;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;
Expand All @@ -72,8 +70,9 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
* @param properties the properties
* @param ignoreMissing true if documents with a missing value for the field should be ignored
* @param firstOnly true if only first result should be returned in case of array
* @param client the client
* @param clusterSettings the cluster settings
* @param datasourceFacade the datasource facade
* @param geoIpDataFacade the geoip data facade
*/
public Ip2GeoProcessor(
final String tag,
Expand All @@ -84,7 +83,6 @@ public Ip2GeoProcessor(
final Set<String> properties,
final boolean ignoreMissing,
final boolean firstOnly,
final Client client,
final ClusterSettings clusterSettings,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade
Expand All @@ -96,7 +94,6 @@ public Ip2GeoProcessor(
this.properties = properties;
this.ignoreMissing = ignoreMissing;
this.firstOnly = firstOnly;
this.client = client;
this.clusterSettings = clusterSettings;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
Expand Down Expand Up @@ -318,24 +315,18 @@ public String getType() {
* Ip2Geo processor factory
*/
public static final class Factory implements Processor.Factory {
private final Client client;
private final IngestService ingestService;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;

/**
* Default constructor
*
* @param client the client
* @param ingestService the ingest service
* @param datasourceFacade the datasource facade
* @param geoIpDataFacade the geoip data facade
*/
public Factory(
final Client client,
final IngestService ingestService,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade
) {
this.client = client;
public Factory(final IngestService ingestService, final DatasourceFacade datasourceFacade, final GeoIpDataFacade geoIpDataFacade) {
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
Expand Down Expand Up @@ -380,7 +371,6 @@ public Ip2GeoProcessor create(
propertyNames == null ? null : new HashSet<>(propertyNames),
ignoreMissing,
firstOnly,
client,
ingestService.getClusterService().getClusterSettings(),
datasourceFacade,
geoIpDataFacade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
.put(
Ip2GeoProcessor.TYPE,
new Ip2GeoProcessor.Factory(
parameters.client,
parameters.ingestService,
new DatasourceFacade(parameters.client, parameters.ingestService.getClusterService()),
new GeoIpDataFacade(parameters.ingestService.getClusterService(), parameters.client)
Expand Down Expand Up @@ -129,19 +128,13 @@ public Collection<Object> createComponents(
) {
GeoIpDataFacade geoIpDataFacade = new GeoIpDataFacade(clusterService, client);
DatasourceFacade datasourceFacade = new DatasourceFacade(client, clusterService);
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(
clusterService,
client,
datasourceFacade,
geoIpDataFacade
);
DatasourceUpdateService datasourceUpdateService = new DatasourceUpdateService(clusterService, datasourceFacade, geoIpDataFacade);
Ip2GeoExecutor ip2GeoExecutor = new Ip2GeoExecutor(threadPool);
/**
* We don't need to return datasource runner because it is used only by job scheduler and job scheduler
* does not use DI but it calls DatasourceExtension#getJobRunner to get DatasourceRunner instance.
*/
DatasourceRunner.getJobRunnerInstance()
.initialize(clusterService, client, datasourceUpdateService, ip2GeoExecutor, datasourceFacade);
DatasourceRunner.getJobRunnerInstance().initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceFacade);

return List.of(UploadStats.getInstance(), datasourceUpdateService, datasourceFacade, ip2GeoExecutor, geoIpDataFacade);
}
Expand Down
Loading

0 comments on commit e06849d

Please sign in to comment.