Skip to content

Commit

Permalink
RESTful webservice to receive notifications of artifact uploads.
Browse files Browse the repository at this point in the history
griddynamics#3

ArtifactUpdateApiClient#offerRequest now does not work in blocking way.
Request is being added to internal queue.
Background thread iterates through this queue, and submit Runnables to Executor.
  • Loading branch information
gd-tmagrys committed Jun 30, 2015
1 parent 601963c commit 696d6bc
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,12 @@ public class ReplicationPluginConfiguration {
public void addServer(NexusServer server) {
servers.add(server);
}

public Integer getRequestsQueueSize() {
return requestsQueueSize;
}

public Integer getRequestsSendingThreadsCount() {
return requestsSendingThreadsCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo;

public interface ArtifactUpdateApiClient {
void sendRequest(ArtifactMetaInfo metaInfo);
void offerRequest(ArtifactMetaInfo artifactMetaInfo);

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,54 @@ public class ArtifactUpdateApiClientImpl extends ComponentSupport implements Art
/**
* Provides access to the plugin configurations
*/
private final ConfigurationsManager configurationsManager;
private ConfigurationsManager configurationsManager;

/**
* ExecutorService shares between clients. All treads are created in the same executor
*/
private final ExecutorService asyncRequestsExecutorService;
private final ExecutorService jerseyHttpClientExecutor;
private final BlockingQueue<ArtifactMetaInfo> artifactMetaInfoBlockingQueue;

@Inject
public ArtifactUpdateApiClientImpl(ConfigurationsManager configurationsManager) {
this.configurationsManager = configurationsManager;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize());
this.asyncRequestsExecutorService = new ThreadPoolExecutor(
this.artifactMetaInfoBlockingQueue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize());
this.jerseyHttpClientExecutor = new ThreadPoolExecutor(
configurationsManager.getConfiguration().getRequestsSendingThreadsCount(),
configurationsManager.getConfiguration().getRequestsSendingThreadsCount(),
30, TimeUnit.SECONDS, queue);
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
final ArtifactMetaInfo artifactMetaInfo = artifactMetaInfoBlockingQueue.take();
sendRequest(artifactMetaInfo);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
});
thread.start();
}

@Override
public void offerRequest(ArtifactMetaInfo artifactMetaInfo) {
try {
artifactMetaInfoBlockingQueue.offer(artifactMetaInfo, 30, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

/**
* Sends replication requests to all nexus servers configured in XML file
*
* @param metaInfo Artifact information
*/
@Override
public void sendRequest(ArtifactMetaInfo metaInfo) {
for (NexusServer server : configurationsManager.getConfiguration().getServers()) {
AsyncWebResource.Builder service = getService(server.getUrl(), server.getUser(), server.getPassword());
Expand Down Expand Up @@ -104,8 +130,9 @@ public GenericType<RestResponse> getGenericType() {

/**
* Returns jersey HTTP resource to access to the remote replication servers
*
* @param nexusUrl URL of the remote server
* @param login Username on the remote server
* @param login Username on the remote server
* @param password User's password
* @return Jersey HTTP client
*/
Expand All @@ -119,14 +146,15 @@ private AsyncWebResource.Builder getService(String nexusUrl, String login, Strin

/**
* Creates jersey HTTP client
* @param login Username on the remote server
*
* @param login Username on the remote server
* @param password User's password
* @return HTTP client
*/
private Client getClient(String login, String password) {
ClientConfig config = new DefaultClientConfig();
Client client = Client.create(config);
client.setExecutorService(asyncRequestsExecutorService);
client.setExecutorService(jerseyHttpClientExecutor);
if (login != null && !login.isEmpty() && password != null) {
log.debug("Creating HTTP client with authorized HTTPBasicAuthFilter.");
client.addFilter(new HTTPBasicAuthFilter(login, password));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void onArtifactUploading(RepositoryItemEventStore event) {
updateArtifactStatus(metaInfo, artifactStatus);
if (artifactStatus.isReadyForReplication()) {
log.debug("File with hashes received for: " + metaInfo.toString() + " Sending request");
artifactUpdateApiClient.sendRequest(metaInfo);
artifactUpdateApiClient.offerRequest(metaInfo);
clearStatus(metaInfo);
}
}
Expand Down

0 comments on commit 696d6bc

Please sign in to comment.