diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/model/config/ReplicationPluginConfiguration.java b/src/main/java/com/griddynamics/cd/nrp/internal/model/config/ReplicationPluginConfiguration.java index 1855e61..87f65ba 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/model/config/ReplicationPluginConfiguration.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/model/config/ReplicationPluginConfiguration.java @@ -52,4 +52,12 @@ public class ReplicationPluginConfiguration { public void addServer(NexusServer server) { servers.add(server); } + + public Integer getRequestsQueueSize() { + return requestsQueueSize; + } + + public Integer getRequestsSendingThreadsCount() { + return requestsSendingThreadsCount; + } } diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java index 3e6aeb2..5f6087a 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java @@ -18,5 +18,6 @@ import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo; public interface ArtifactUpdateApiClient { - void sendRequest(ArtifactMetaInfo metaInfo); + void offerRequest(ArtifactMetaInfo artifactMetaInfo); + } diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java index 9792f29..68c3774 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java @@ -45,28 +45,54 @@ public class ArtifactUpdateApiClientImpl extends ComponentSupport implements Art /** * Provides access to the plugin configurations */ - private final ConfigurationsManager configurationsManager; + private ConfigurationsManager configurationsManager; /** * ExecutorService shares between clients. All treads are created in the same executor */ - private final ExecutorService asyncRequestsExecutorService; + private final ExecutorService jerseyHttpClientExecutor; + private final BlockingQueue artifactMetaInfoBlockingQueue; @Inject public ArtifactUpdateApiClientImpl(ConfigurationsManager configurationsManager) { this.configurationsManager = configurationsManager; - BlockingQueue queue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize()); - this.asyncRequestsExecutorService = new ThreadPoolExecutor( + this.artifactMetaInfoBlockingQueue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize()); + this.jerseyHttpClientExecutor = new ThreadPoolExecutor( configurationsManager.getConfiguration().getRequestsSendingThreadsCount(), configurationsManager.getConfiguration().getRequestsSendingThreadsCount(), - 30, TimeUnit.SECONDS, queue); + 30, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + final ArtifactMetaInfo artifactMetaInfo = artifactMetaInfoBlockingQueue.take(); + sendRequest(artifactMetaInfo); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + }); + thread.start(); + } + + @Override + public void offerRequest(ArtifactMetaInfo artifactMetaInfo) { + try { + artifactMetaInfoBlockingQueue.offer(artifactMetaInfo, 30, TimeUnit.SECONDS); + } catch (Exception e) { + log.error(e.getMessage(), e); + } } /** * Sends replication requests to all nexus servers configured in XML file + * * @param metaInfo Artifact information */ - @Override public void sendRequest(ArtifactMetaInfo metaInfo) { for (NexusServer server : configurationsManager.getConfiguration().getServers()) { AsyncWebResource.Builder service = getService(server.getUrl(), server.getUser(), server.getPassword()); @@ -104,8 +130,9 @@ public GenericType getGenericType() { /** * Returns jersey HTTP resource to access to the remote replication servers + * * @param nexusUrl URL of the remote server - * @param login Username on the remote server + * @param login Username on the remote server * @param password User's password * @return Jersey HTTP client */ @@ -119,14 +146,15 @@ private AsyncWebResource.Builder getService(String nexusUrl, String login, Strin /** * Creates jersey HTTP client - * @param login Username on the remote server + * + * @param login Username on the remote server * @param password User's password * @return HTTP client */ private Client getClient(String login, String password) { ClientConfig config = new DefaultClientConfig(); Client client = Client.create(config); - client.setExecutorService(asyncRequestsExecutorService); + client.setExecutorService(jerseyHttpClientExecutor); if (login != null && !login.isEmpty() && password != null) { log.debug("Creating HTTP client with authorized HTTPBasicAuthFilter."); client.addFilter(new HTTPBasicAuthFilter(login, password)); diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java index 076532f..8bad9be 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java @@ -87,7 +87,7 @@ public void onArtifactUploading(RepositoryItemEventStore event) { updateArtifactStatus(metaInfo, artifactStatus); if (artifactStatus.isReadyForReplication()) { log.debug("File with hashes received for: " + metaInfo.toString() + " Sending request"); - artifactUpdateApiClient.sendRequest(metaInfo); + artifactUpdateApiClient.offerRequest(metaInfo); clearStatus(metaInfo); } }