Skip to content

Commit

Permalink
Merge pull request #27 from gd-tmagrys/queue
Browse files Browse the repository at this point in the history
Queue
  • Loading branch information
alekseysemenov committed Jun 25, 2015
2 parents 0b402df + c2ffca7 commit 601963c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 40 deletions.
6 changes: 5 additions & 1 deletion replication-plugin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ User and password are optional. If there is neither login nor password (or login
You could also define myUrl attribute for <configurations> tag that defines URL of the Nexus instance.
Remote URL of the proxy repository (at the peer instance) should start with myUrl (configured for master instance).
This configuration used to match what proxy repository have to poll artifact.
requestsQueueSize and requestsSendingThreadsCount attributes define thread pool configuration. This pool is used to send
requests from master to peers asynchronously.
The default values are: 500 for requestsQueueSize and 1 for requestsSendingThreadsCount
-->
<configurations myUrl="http://localhost:8081/nexus">
<configurations myUrl="http://localhost:8081/nexus" requestsQueueSize="500" requestsSendingThreadsCount="1">
<servers>
<server>
<url>http://localhost:8083/nexus</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.Serializable;

@Data
@EqualsAndHashCode(exclude = {"isMd5Received", "isSha1Received", "isFileReceived"})
@EqualsAndHashCode(exclude = {"isSha1Received", "isFileReceived"})
public class ArtifactStatus implements Serializable {
private final String groupId;
private final String artifactId;
Expand All @@ -30,7 +30,6 @@ public class ArtifactStatus implements Serializable {
private final String extension;
private final String repositoryId;
private final String nexusUrl;
private boolean isMd5Received;
private boolean isSha1Received;
private boolean isFileReceived;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public class ReplicationPluginConfiguration {
@NonNull
@XmlAttribute(name = "myUrl")
private String myUrl;
@Getter
@XmlAttribute(name = "requestsQueueSize")
private Integer requestsQueueSize = 500;
@Getter
@XmlAttribute(name = "requestsSendingThreadsCount")
private Integer requestsSendingThreadsCount = 1;

public void addServer(NexusServer server) {
servers.add(server);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
import javax.inject.Singleton;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriBuilder;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.*;

@Singleton
@Named(ArtifactUpdateApiClientImpl.ID)
Expand All @@ -48,16 +45,21 @@ public class ArtifactUpdateApiClientImpl extends ComponentSupport implements Art
/**
* Provides access to the plugin configurations
*/
private ConfigurationsManager configurationsManager;
private final ConfigurationsManager configurationsManager;

/**
* ExecutorService shares between clients. All treads are created in the same executor
*/
private ExecutorService asyncRequestsExecutorService = Executors.newFixedThreadPool(10);
private final ExecutorService asyncRequestsExecutorService;

@Inject
public ArtifactUpdateApiClientImpl(ConfigurationsManager configurationsManager) {
this.configurationsManager = configurationsManager;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize());
this.asyncRequestsExecutorService = new ThreadPoolExecutor(
configurationsManager.getConfiguration().getRequestsSendingThreadsCount(),
configurationsManager.getConfiguration().getRequestsSendingThreadsCount(),
30, TimeUnit.SECONDS, queue);
}

/**
Expand All @@ -68,29 +70,35 @@ public ArtifactUpdateApiClientImpl(ConfigurationsManager configurationsManager)
public void sendRequest(ArtifactMetaInfo metaInfo) {
for (NexusServer server : configurationsManager.getConfiguration().getServers()) {
AsyncWebResource.Builder service = getService(server.getUrl(), server.getUser(), server.getPassword());
service.post(new ITypeListener<RestResponse>() {
@Override
public void onComplete(Future<RestResponse> future) throws InterruptedException {
RestResponse response = null;
try {
response = future.get();
} catch (ExecutionException e) {
log.error("Can not get REST response", e);
try {
service.post(new ITypeListener<RestResponse>() {
@Override
public void onComplete(Future<RestResponse> future) throws InterruptedException {
RestResponse response = null;
try {
response = future.get();
} catch (ExecutionException e) {
log.error("Can not get REST response", e);
}
if (response != null && !response.isSuccess()) {
log.error("Can not send replication request: " + response.getMessage());
}
}
if (response != null && !response.isSuccess()) {
log.error("Can not send replication request: " + response.getMessage());
}
}

@Override
public Class<RestResponse> getType() {
return RestResponse.class;
}
@Override
public Class<RestResponse> getType() {
return RestResponse.class;
}

@Override
public GenericType<RestResponse> getGenericType() { return null; }
@Override
public GenericType<RestResponse> getGenericType() {
return null;
}

}, metaInfo);
}, metaInfo);
} catch (RejectedExecutionException e) {
log.warn("Requests queue is full. Request to " + server.getUrl() + " is rejected");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,23 @@ public void onArtifactUploading(RepositoryItemEventStore event) {
ArtifactMetaInfo metaInfo = new ArtifactMetaInfo(configurationsManager.getConfiguration().getMyUrl(), gav.getGroupId(), gav.getArtifactId(), gav.getVersion(), repo.getId());
metaInfo.setClassifier(gav.getClassifier());
metaInfo.setExtension(gav.getExtension());
ArtifactStatus artifactStatus = getArtifactStatus(metaInfo);
ArtifactStatus artifactStatus = null;
if (!gav.isSignature() && !gav.isHash()) {
artifactStatus = getArtifactStatus(metaInfo);
artifactStatus.setFileReceived(true);
log.debug("File received: " + metaInfo.toString());
} else if (gav.isHash()) {
if (gav.getHashType().equals(Gav.HashType.md5)) {
artifactStatus.setMd5Received(true);
} else if (gav.getHashType().equals(Gav.HashType.sha1)) {
artifactStatus.setSha1Received(true);
}
} else if (gav.isHash() && gav.getHashType().equals(Gav.HashType.sha1)) {
artifactStatus = getArtifactStatus(metaInfo);
artifactStatus.setSha1Received(true);
log.debug(gav.getHashType().name() + " hash file received for: " + metaInfo.toString());
}
updateArtifactStatus(metaInfo, artifactStatus);
if (artifactStatus.isReadyForReplication()) {
log.debug("File with hashes received for: " + metaInfo.toString() + " Sending request");
artifactUpdateApiClient.sendRequest(metaInfo);
clearStatus(metaInfo);
if (null != artifactStatus) {
updateArtifactStatus(metaInfo, artifactStatus);
if (artifactStatus.isReadyForReplication()) {
log.debug("File with hashes received for: " + metaInfo.toString() + " Sending request");
artifactUpdateApiClient.sendRequest(metaInfo);
clearStatus(metaInfo);
}
}
}
}
Expand Down

0 comments on commit 601963c

Please sign in to comment.