Skip to content

Commit

Permalink
RESTful webservice to receive notifications of artifact uploads.
Browse files Browse the repository at this point in the history
#3

ArtifactUpdateApiClient#offerRequest now does not work in blocking way.
Request is being added to internal queue.
Background thread iterates through this queue, and submit Runnables to Executor.
  • Loading branch information
gd-tmagrys committed Jun 30, 2015
1 parent 601963c commit a63b873
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo;

public interface ArtifactUpdateApiClient {
void sendRequest(ArtifactMetaInfo metaInfo);
void offerRequest(ArtifactMetaInfo artifactMetaInfo);

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,53 @@ public class ArtifactUpdateApiClientImpl extends ComponentSupport implements Art
* ExecutorService shares between clients. All treads are created in the same executor
*/
private final ExecutorService asyncRequestsExecutorService;
private final BlockingQueue<ArtifactMetaInfo> blockingQueue;

@Inject
public ArtifactUpdateApiClientImpl(ConfigurationsManager configurationsManager) {
this.configurationsManager = configurationsManager;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize());
this.asyncRequestsExecutorService = new ThreadPoolExecutor(
configurationsManager.getConfiguration().getRequestsSendingThreadsCount(),
configurationsManager.getConfiguration().getRequestsSendingThreadsCount(),
30, TimeUnit.SECONDS, queue);
this.blockingQueue =
new LinkedBlockingQueue<>(configurationsManager.
getConfiguration().getRequestsQueueSize());
this.asyncRequestsExecutorService =
Executors.newFixedThreadPool(
configurationsManager.getConfiguration()
.getRequestsSendingThreadsCount());
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
final ArtifactMetaInfo artifactMetaInfo = blockingQueue.take();
asyncRequestsExecutorService.submit(new Runnable() {
@Override
public void run() {
sendRequest(artifactMetaInfo);
}
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
});
thread.start();
}

@Override
public void offerRequest(ArtifactMetaInfo artifactMetaInfo) {
try {
blockingQueue.offer(artifactMetaInfo, 30, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

/**
* Sends replication requests to all nexus servers configured in XML file
*
* @param metaInfo Artifact information
*/
@Override
public void sendRequest(ArtifactMetaInfo metaInfo) {
for (NexusServer server : configurationsManager.getConfiguration().getServers()) {
AsyncWebResource.Builder service = getService(server.getUrl(), server.getUser(), server.getPassword());
Expand Down Expand Up @@ -104,8 +135,9 @@ public GenericType<RestResponse> getGenericType() {

/**
* Returns jersey HTTP resource to access to the remote replication servers
*
* @param nexusUrl URL of the remote server
* @param login Username on the remote server
* @param login Username on the remote server
* @param password User's password
* @return Jersey HTTP client
*/
Expand All @@ -119,14 +151,14 @@ private AsyncWebResource.Builder getService(String nexusUrl, String login, Strin

/**
* Creates jersey HTTP client
* @param login Username on the remote server
*
* @param login Username on the remote server
* @param password User's password
* @return HTTP client
*/
private Client getClient(String login, String password) {
ClientConfig config = new DefaultClientConfig();
Client client = Client.create(config);
client.setExecutorService(asyncRequestsExecutorService);
if (login != null && !login.isEmpty() && password != null) {
log.debug("Creating HTTP client with authorized HTTPBasicAuthFilter.");
client.addFilter(new HTTPBasicAuthFilter(login, password));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void onArtifactUploading(RepositoryItemEventStore event) {
updateArtifactStatus(metaInfo, artifactStatus);
if (artifactStatus.isReadyForReplication()) {
log.debug("File with hashes received for: " + metaInfo.toString() + " Sending request");
artifactUpdateApiClient.sendRequest(metaInfo);
artifactUpdateApiClient.offerRequest(metaInfo);
clearStatus(metaInfo);
}
}
Expand Down

0 comments on commit a63b873

Please sign in to comment.