Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix on lock management and few performance improvements #310

Merged
merged 1 commit into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,11 @@ protected void doExecute(final Task task, final DeleteDatasourceRequest request,
}
try {
deleteDatasource(request.getName());
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(
released -> { log.info("Released lock for datasource[{}]", request.getName()); },
exception -> { log.error("Failed to release the lock", exception); }
)
);
}
}, exception -> { listener.onFailure(exception); }));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,16 @@ protected void doExecute(final Task task, final PutDatasourceRequest request, fi
try {
internalDoExecute(request, lock, listener);
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
);
}
}, exception -> { listener.onFailure(exception); }));
}

/**
* This method takes lock as a parameter and is responsible for releasing lock
* unless exception is thrown
*/
@VisibleForTesting
protected void internalDoExecute(
final PutDatasourceRequest request,
Expand All @@ -100,30 +100,42 @@ protected void internalDoExecute(
datasourceFacade.createIndexIfNotExists(createIndexStep);
createIndexStep.whenComplete(v -> {
Datasource datasource = Datasource.Builder.build(request);
datasourceFacade.putDatasource(
datasource,
getIndexResponseListener(datasource, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), listener)
);
}, exception -> listener.onFailure(exception));
datasourceFacade.putDatasource(datasource, getIndexResponseListener(datasource, lock, listener));
}, exception -> {
lockService.releaseLock(lock);
listener.onFailure(exception);
});
}

/**
* This method takes lock as a parameter and is responsible for releasing lock
* unless exception is thrown
*/
@VisibleForTesting
protected ActionListener<IndexResponse> getIndexResponseListener(
final Datasource datasource,
final Runnable renewLock,
final LockModel lock,
final ActionListener<AcknowledgedResponse> listener
) {
return new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
// This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread
// pool.
threadPool.generic().submit(() -> { createDatasource(datasource, renewLock); });
threadPool.generic().submit(() -> {
AtomicReference<LockModel> lockReference = new AtomicReference<>(lock);
try {
createDatasource(datasource, lockService.getRenewLockRunnable(lockReference));
} finally {
lockService.releaseLock(lockReference.get());
}
});
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(final Exception e) {
lockService.releaseLock(lock);
if (e instanceof VersionConflictEngineException) {
listener.onFailure(new ResourceAlreadyExistsException("datasource [{}] already exists", datasource.getName()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException;
import org.opensearch.geospatial.exceptions.ConcurrentModificationException;
import org.opensearch.geospatial.exceptions.IncompatibleDatasourceException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand Down Expand Up @@ -81,27 +80,21 @@ protected void doExecute(final Task task, final UpdateDatasourceRequest request,
);
return;
}

try {
Datasource datasource = datasourceFacade.getDatasource(request.getName());
if (datasource == null) {
listener.onFailure(new ResourceNotFoundException("no such datasource exist"));
return;
throw new ResourceNotFoundException("no such datasource exist");
}
validate(request, datasource);
updateIfChanged(request, datasource);
lockService.releaseLock(lock);
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
lockService.releaseLock(lock);
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(
released -> { log.info("Released lock for datasource[{}]", request.getName()); },
exception -> { log.error("Failed to release the lock", exception); }
)
);
}
}, exception -> { listener.onFailure(exception); }));
}, exception -> listener.onFailure(exception)));
}

private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.Requests;
Expand All @@ -67,6 +66,8 @@ public class GeoIpDataFacade {
private static final String IP_RANGE_FIELD_NAME = "_cidr";
private static final String DATA_FIELD_NAME = "_data";
private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1);
private static final Tuple<String, Integer> INDEX_SETTING_NUM_OF_REPLICAS = new Tuple<>("index.number_of_replicas", 0);
private static final Tuple<String, Integer> INDEX_SETTING_REFRESH_INTERVAL = new Tuple<>("index.refresh_interval", -1);
private static final Tuple<String, String> INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all");
private static final Tuple<String, Boolean> INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true);
private static final Tuple<String, Boolean> INDEX_SETTING_READ_ONLY_ALLOW_DELETE = new Tuple<>(
Expand All @@ -84,7 +85,12 @@ public GeoIpDataFacade(final ClusterService clusterService, final Client client)
}

/**
* Create an index of single shard with auto expand replicas to all nodes
* Create an index for GeoIP data
*
* Index setting start with single shard, zero replica, no refresh interval, and hidden.
* Once the GeoIP data is indexed, do refresh and force merge.
* Then, change the index setting to expand replica to all nodes, and read only allow delete.
* See {@link #freezeIndex}
*
* @param indexName index name
*/
Expand All @@ -94,7 +100,8 @@ public void createIndexIfNotExists(final String indexName) {
}
final Map<String, Object> indexSettings = new HashMap<>();
indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2());
indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_REFRESH_INTERVAL.v1(), INDEX_SETTING_REFRESH_INTERVAL.v2());
indexSettings.put(INDEX_SETTING_NUM_OF_REPLICAS.v1(), INDEX_SETTING_NUM_OF_REPLICAS.v2());
indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2());
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping());
StashedThreadContext.run(
Expand All @@ -103,6 +110,24 @@ public void createIndexIfNotExists(final String indexName) {
);
}

private void freezeIndex(final String indexName) {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
Map<String, Object> settings = new HashMap<>();
settings.put(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2());
settings.put(INDEX_SETTING_NUM_OF_REPLICAS.v1(), null);
settings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2());
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(settings)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));
});
}

/**
* Generate XContentBuilder representing datasource database index mapping
*
Expand Down Expand Up @@ -349,7 +374,7 @@ public void putGeoIpData(
@NonNull final Runnable renewLock
) {
TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT);
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
final BulkRequest bulkRequest = new BulkRequest();
while (iterator.hasNext()) {
CSVRecord record = iterator.next();
String document = createDocument(fields, record.values());
Expand All @@ -368,16 +393,7 @@ public void putGeoIpData(
}
renewLock.run();
}
StashedThreadContext.run(client, () -> {
client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout);
client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout);
client.admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Map.of(INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v1(), INDEX_SETTING_READ_ONLY_ALLOW_DELETE.v2()))
.execute()
.actionGet(timeout);
});
freezeIndex(indexName);
}

public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.client.Client;
Expand All @@ -22,6 +24,7 @@
/**
* A wrapper of job scheduler's lock service for datasource
*/
@Log4j2
public class Ip2GeoLockService {
public static final long LOCK_DURATION_IN_SECONDS = 300l;
public static final long RENEW_AFTER_IN_SECONDS = 120l;
Expand Down Expand Up @@ -57,10 +60,12 @@ public void acquireLock(final String datasourceName, final Long lockDurationSeco
* Wrapper method of LockService#release
*
* @param lockModel the lock model
* @param listener the listener
*/
public void releaseLock(final LockModel lockModel, final ActionListener<Boolean> listener) {
lockService.release(lockModel, listener);
public void releaseLock(final LockModel lockModel) {
lockService.release(
lockModel,
ActionListener.wrap(released -> {}, exception -> log.error("Failed to release the lock", exception))
);
}

/**
Expand Down Expand Up @@ -110,7 +115,6 @@ public Runnable getRenewLockRunnable(final AtomicReference<LockModel> lockModel)
if (Instant.now().isBefore(preLock.getLockTime().plusSeconds(RENEW_AFTER_IN_SECONDS))) {
return;
}

lockModel.set(renewLock(lockModel.get()));
if (lockModel.get() == null) {
new OpenSearchException("failed to renew a lock [{}]", preLock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,7 @@ protected Runnable updateDatasourceRunner(final ScheduledJobParameter jobParamet
try {
updateDatasource(jobParameter, ip2GeoLockService.getRenewLockRunnable(new AtomicReference<>(lock)));
} finally {
ip2GeoLockService.releaseLock(
lock,
ActionListener.wrap(released -> {}, exception -> { log.error("Failed to release lock [{}]", lock, exception); })
);
ip2GeoLockService.releaseLock(lock);
}
}, exception -> { log.error("Failed to acquire lock for job [{}]", jobParameter.getName(), exception); }));
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private void validateDoExecute(final LockModel lockModel, final Exception except
verify(listener).onFailure(any(OpenSearchException.class));
} else {
verify(listener).onResponse(new AcknowledgedResponse(true));
verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class));
verify(ip2GeoLockService).releaseLock(eq(lockModel));
}
} else {
// Run
Expand Down
Loading