diff --git a/pom.xml b/pom.xml index de10428..d9ad7df 100644 --- a/pom.xml +++ b/pom.xml @@ -173,4 +173,5 @@ + diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java index 3e6aeb2..c5cff10 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java @@ -18,5 +18,6 @@ import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo; public interface ArtifactUpdateApiClient { - void sendRequest(ArtifactMetaInfo metaInfo); + void enqueueRequest(ArtifactMetaInfo artifactMetaInfo); + } diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java index 9792f29..616433b 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java @@ -51,22 +51,53 @@ public class ArtifactUpdateApiClientImpl extends ComponentSupport implements Art * ExecutorService shares between clients. All treads are created in the same executor */ private final ExecutorService asyncRequestsExecutorService; + private final BlockingQueue blockingQueue; @Inject public ArtifactUpdateApiClientImpl(ConfigurationsManager configurationsManager) { this.configurationsManager = configurationsManager; - BlockingQueue queue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize()); - this.asyncRequestsExecutorService = new ThreadPoolExecutor( - configurationsManager.getConfiguration().getRequestsSendingThreadsCount(), - configurationsManager.getConfiguration().getRequestsSendingThreadsCount(), - 30, TimeUnit.SECONDS, queue); + this.blockingQueue = + new LinkedBlockingQueue<>(configurationsManager. + getConfiguration().getRequestsQueueSize()); + this.asyncRequestsExecutorService = + Executors.newFixedThreadPool( + configurationsManager.getConfiguration() + .getRequestsSendingThreadsCount()); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + final ArtifactMetaInfo artifactMetaInfo = blockingQueue.take(); + asyncRequestsExecutorService.submit(new Runnable() { + @Override + public void run() { + sendRequest(artifactMetaInfo); + } + }); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + }); + thread.start(); + } + + @Override + public void enqueueRequest(ArtifactMetaInfo artifactMetaInfo) { + try { + blockingQueue.offer(artifactMetaInfo, 30, TimeUnit.SECONDS); + } catch (Exception e) { + log.error(e.getMessage(), e); + } } /** * Sends replication requests to all nexus servers configured in XML file + * * @param metaInfo Artifact information */ - @Override public void sendRequest(ArtifactMetaInfo metaInfo) { for (NexusServer server : configurationsManager.getConfiguration().getServers()) { AsyncWebResource.Builder service = getService(server.getUrl(), server.getUser(), server.getPassword()); @@ -104,8 +135,9 @@ public GenericType getGenericType() { /** * Returns jersey HTTP resource to access to the remote replication servers + * * @param nexusUrl URL of the remote server - * @param login Username on the remote server + * @param login Username on the remote server * @param password User's password * @return Jersey HTTP client */ @@ -119,14 +151,14 @@ private AsyncWebResource.Builder getService(String nexusUrl, String login, Strin /** * Creates jersey HTTP client - * @param login Username on the remote server + * + * @param login Username on the remote server * @param password User's password * @return HTTP client */ private Client getClient(String login, String password) { ClientConfig config = new DefaultClientConfig(); Client client = Client.create(config); - client.setExecutorService(asyncRequestsExecutorService); if (login != null && !login.isEmpty() && password != null) { log.debug("Creating HTTP client with authorized HTTPBasicAuthFilter."); client.addFilter(new HTTPBasicAuthFilter(login, password)); diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java index 076532f..4edb769 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java @@ -87,7 +87,7 @@ public void onArtifactUploading(RepositoryItemEventStore event) { updateArtifactStatus(metaInfo, artifactStatus); if (artifactStatus.isReadyForReplication()) { log.debug("File with hashes received for: " + metaInfo.toString() + " Sending request"); - artifactUpdateApiClient.sendRequest(metaInfo); + artifactUpdateApiClient.enqueueRequest(metaInfo); clearStatus(metaInfo); } }