Skip to content

Commit

Permalink
RESTful webservice to receive notifications of artifact uploads.
Browse files Browse the repository at this point in the history
griddynamics#3

ArtifactUpdateApiClient#offerRequest now does not work in blocking way.
Request is being added to internal queue.
Background thread iterates through this queue, and submit Runnables to Executor.
  • Loading branch information
gd-tmagrys committed Jun 30, 2015
1 parent 601963c commit fb6644e
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 17 deletions.
2 changes: 1 addition & 1 deletion replication-plugin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ requestsQueueSize and requestsSendingThreadsCount attributes define thread pool
requests from master to peers asynchronously.
The default values are: 500 for requestsQueueSize and 1 for requestsSendingThreadsCount
-->
<configurations myUrl="http://localhost:8081/nexus" requestsQueueSize="500" requestsSendingThreadsCount="1">
<configurations myUrl="http://localhost:8081/nexus" requestsQueueSize="500" requestsSendingThreadsCount="1" queueDumpFileName="/tmp/nexus-replication-plugin-queue-backup">
<servers>
<server>
<url>http://localhost:8083/nexus</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@
@RequiredArgsConstructor
@XmlAccessorType(XmlAccessType.FIELD)
public class NexusServer {
@Getter
@NonNull
@XmlElement(name = "url")
private String url;
@Getter
@NonNull
@XmlElement(name = "user")
private String user;
@Getter
@NonNull
@XmlElement(name = "password")
private String password;

public String getUrl() {
return url;
}

public String getUser() {
return user;
}

public String getPassword() {
return password;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,37 @@
@RequiredArgsConstructor
@XmlRootElement(name = "configurations")
public class ReplicationPluginConfiguration {
@Getter
@XmlElement(name = "server")
@XmlElementWrapper(name = "servers")
private final Set<NexusServer> servers = new HashSet<>();
@Getter
@NonNull
@XmlAttribute(name = "myUrl")
private String myUrl;
@Getter
@XmlAttribute(name = "requestsQueueSize")
private Integer requestsQueueSize = 500;
@Getter
@XmlAttribute(name = "requestsSendingThreadsCount")
private Integer requestsSendingThreadsCount = 1;
@XmlAttribute(name = "queueDumpFileName")
private String queueDumpFileName;

public void addServer(NexusServer server) {
servers.add(server);
}

public Integer getRequestsQueueSize() {
return requestsQueueSize;
}

public String getQueueDumpFileName() {
return queueDumpFileName;
}

public Integer getRequestsSendingThreadsCount() {
return requestsSendingThreadsCount;
}

public Set<NexusServer> getServers() {
return servers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2015, Grid Dynamics International, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.griddynamics.cd.nrp.internal.model.internal;

import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo;

import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.HashSet;
import java.util.Set;

/**
* DTO Class encapsulates artifact replication queue
*/
@XmlRootElement(name = "artifactMetaInfoBlockingQueueDump")
public class ArtifactMetaInfoQueueDump {
@XmlElement(name = "artifactMetaInfo")
@XmlElementWrapper(name = "artifactMetaInfos")
private final Set<ArtifactMetaInfo> artifactMetaInfos = new HashSet<>();

public void addArtifactMetaInfo(ArtifactMetaInfo artifactMetaInfo) {
artifactMetaInfos.add(artifactMetaInfo);
}
public void addAllArtifactMetaInfo(Set<ArtifactMetaInfo> artifactMetaInfo) {
artifactMetaInfos.addAll(artifactMetaInfo);
}

public Set<ArtifactMetaInfo> getArtifactMetaInfos() {
return artifactMetaInfos;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@
import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo;

public interface ArtifactUpdateApiClient {
void sendRequest(ArtifactMetaInfo metaInfo);
void offerRequest(ArtifactMetaInfo artifactMetaInfo);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.griddynamics.cd.nrp.internal.uploading.impl;

import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo;
import com.griddynamics.cd.nrp.internal.model.internal.ArtifactMetaInfoQueueDump;
import com.griddynamics.cd.nrp.internal.model.api.RestResponse;
import com.griddynamics.cd.nrp.internal.model.config.NexusServer;
import com.griddynamics.cd.nrp.internal.uploading.ArtifactUpdateApiClient;
Expand All @@ -34,6 +35,9 @@
import javax.inject.Singleton;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriBuilder;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import java.io.File;
import java.util.concurrent.*;

@Singleton
Expand All @@ -50,23 +54,83 @@ public class ArtifactUpdateApiClientImpl extends ComponentSupport implements Art
/**
* ExecutorService shares between clients. All treads are created in the same executor
*/
private final ExecutorService asyncRequestsExecutorService;
private final ExecutorService jerseyHttpClientExecutor;
private final FileBlockingQueue fileBlockingQueue;

@Inject
public ArtifactUpdateApiClientImpl(ConfigurationsManager configurationsManager) {
this.configurationsManager = configurationsManager;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize());
this.asyncRequestsExecutorService = new ThreadPoolExecutor(
this.fileBlockingQueue = initFileBlockingQueue(configurationsManager);
this.jerseyHttpClientExecutor = new ThreadPoolExecutor(
configurationsManager.getConfiguration().getRequestsSendingThreadsCount(),
configurationsManager.getConfiguration().getRequestsSendingThreadsCount(),
30, TimeUnit.SECONDS, queue);
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(
configurationsManager.getConfiguration().getRequestsQueueSize())
);
initBackgroundWorkers(configurationsManager);
}

private void initBackgroundWorkers(ConfigurationsManager configurationsManager) {
int requestsSendingThreadsCount = configurationsManager.getConfiguration()
.getRequestsSendingThreadsCount();
ExecutorService executorService = Executors.newFixedThreadPool(requestsSendingThreadsCount);
for (int i = 0; i < requestsSendingThreadsCount; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
final ArtifactMetaInfo artifactMetaInfo = fileBlockingQueue.peek();
sendRequest(artifactMetaInfo);
fileBlockingQueue.take();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
});
}
}

private FileBlockingQueue initFileBlockingQueue(ConfigurationsManager configurationsManager) {
BlockingQueue<ArtifactMetaInfo> blockingQueue =
new LinkedBlockingQueue<>(configurationsManager.
getConfiguration().getRequestsQueueSize());
String blockingQueueDumpFileName = configurationsManager.getConfiguration().getQueueDumpFileName();
FileBlockingQueue retVal = new FileBlockingQueue(blockingQueue,
blockingQueueDumpFileName);
try {
File blockingQueueInputFile = new File(blockingQueueDumpFileName);
if (blockingQueueInputFile.exists()) {
JAXBContext jaxbContext = JAXBContext.newInstance(ArtifactMetaInfoQueueDump.class);
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
ArtifactMetaInfoQueueDump unmarshal = (ArtifactMetaInfoQueueDump) unmarshaller.unmarshal(blockingQueueInputFile);
for (ArtifactMetaInfo artifactMetaInfo : unmarshal.getArtifactMetaInfos()) {
offerRequest(artifactMetaInfo);
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return retVal;
}

@Override
public void offerRequest(ArtifactMetaInfo artifactMetaInfo) {
try {
fileBlockingQueue.offer(artifactMetaInfo, 30, TimeUnit.SECONDS);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}

/**
* Sends replication requests to all nexus servers configured in XML file
*
* @param metaInfo Artifact information
*/
@Override
public void sendRequest(ArtifactMetaInfo metaInfo) {
for (NexusServer server : configurationsManager.getConfiguration().getServers()) {
AsyncWebResource.Builder service = getService(server.getUrl(), server.getUser(), server.getPassword());
Expand Down Expand Up @@ -104,13 +168,15 @@ public GenericType<RestResponse> getGenericType() {

/**
* Returns jersey HTTP resource to access to the remote replication servers
*
* @param nexusUrl URL of the remote server
* @param login Username on the remote server
* @param login Username on the remote server
* @param password User's password
* @return Jersey HTTP client
*/
private AsyncWebResource.Builder getService(String nexusUrl, String login, String password) {
Client client = getClient(login, password);
client.setExecutorService(jerseyHttpClientExecutor);
AsyncWebResource webResource = client.asyncResource(UriBuilder.fromUri(nexusUrl).build());
webResource = webResource.path("service").path("local").path("artifact").path("maven").path("update");
return webResource.accept(MediaType.APPLICATION_XML_TYPE)
Expand All @@ -119,14 +185,15 @@ private AsyncWebResource.Builder getService(String nexusUrl, String login, Strin

/**
* Creates jersey HTTP client
* @param login Username on the remote server
*
* @param login Username on the remote server
* @param password User's password
* @return HTTP client
*/
private Client getClient(String login, String password) {
ClientConfig config = new DefaultClientConfig();
Client client = Client.create(config);
client.setExecutorService(asyncRequestsExecutorService);
client.setExecutorService(jerseyHttpClientExecutor);
if (login != null && !login.isEmpty() && password != null) {
log.debug("Creating HTTP client with authorized HTTPBasicAuthFilter.");
client.addFilter(new HTTPBasicAuthFilter(login, password));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.griddynamics.cd.nrp.internal.uploading.impl;

import com.google.common.collect.Sets;
import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo;
import com.griddynamics.cd.nrp.internal.model.internal.ArtifactMetaInfoQueueDump;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class FileBlockingQueue {

private final BlockingQueue<ArtifactMetaInfo> internalBlockingQueue;
private final String blockingQueueDumpFileName;

private Logger log = LoggerFactory.getLogger(FileBlockingQueue.class);

public FileBlockingQueue(BlockingQueue<ArtifactMetaInfo> blockingQueue, String blockingQueueDumpFileName) {
this.internalBlockingQueue = blockingQueue;
this.blockingQueueDumpFileName = blockingQueueDumpFileName;
}

public boolean offer(ArtifactMetaInfo e, long timeout, TimeUnit timeUnit) throws InterruptedException {
synchronized (internalBlockingQueue) {
boolean retVal = internalBlockingQueue.offer(e, timeout, timeUnit);
saveQueueToFile();
internalBlockingQueue.notify();
return retVal;
}
}

public ArtifactMetaInfo peek() throws InterruptedException {
synchronized (internalBlockingQueue) {
while (internalBlockingQueue.isEmpty()) {
internalBlockingQueue.wait();
}
return internalBlockingQueue.peek();
}
}

public ArtifactMetaInfo take() throws InterruptedException {
synchronized (internalBlockingQueue) {
while (internalBlockingQueue.isEmpty()) {
internalBlockingQueue.wait();
}
ArtifactMetaInfo retVal = internalBlockingQueue.take();
saveQueueToFile();
return retVal;

}
}

private synchronized void saveQueueToFile() {

try {
String backupBlockingQueueDumpFileName = blockingQueueDumpFileName + ".bak";
File blockingQueueDumpFile = new File(blockingQueueDumpFileName);
if (blockingQueueDumpFile.exists() && !blockingQueueDumpFile.isDirectory()) {
FileUtils.copyFile(blockingQueueDumpFile, new File(backupBlockingQueueDumpFileName));
}
JAXBContext jaxbContext = JAXBContext.newInstance(ArtifactMetaInfoQueueDump.class);
Marshaller marshaller = jaxbContext.createMarshaller();
ArtifactMetaInfoQueueDump artifactMetaInfoBlockingQueueDump =
new ArtifactMetaInfoQueueDump();
artifactMetaInfoBlockingQueueDump.addAllArtifactMetaInfo(Sets.newHashSet(internalBlockingQueue));
marshaller.marshal(artifactMetaInfoBlockingQueueDump, blockingQueueDumpFile);
} catch (Exception e) {
log.error(e.getMessage(), e);
}


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void onArtifactUploading(RepositoryItemEventStore event) {
updateArtifactStatus(metaInfo, artifactStatus);
if (artifactStatus.isReadyForReplication()) {
log.debug("File with hashes received for: " + metaInfo.toString() + " Sending request");
artifactUpdateApiClient.sendRequest(metaInfo);
artifactUpdateApiClient.offerRequest(metaInfo);
clearStatus(metaInfo);
}
}
Expand Down

0 comments on commit fb6644e

Please sign in to comment.