Skip to content

Commit

Permalink
Add /streams/deployments/scale/streamName API
Browse files Browse the repository at this point in the history
 - Add scale shell command.
 - Add Stream scale DSL.
 - Basic Stream and controller tests

 Resolves spring-cloud#3554
  • Loading branch information
tzolov committed Oct 23, 2019
1 parent c75bb74 commit 7e54e7d
Show file tree
Hide file tree
Showing 13 changed files with 229 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void index() throws Exception {
linkWithRel("streams/deployments/rollback/{name}/{version}").description("Rollback the stream to the previous or a specific version of the stream"),
linkWithRel("streams/deployments/update/{name}").description("Update the stream."),
linkWithRel("streams/deployments/platform/list").description("List of supported deployment platforms"),
linkWithRel("streams/deployments/scale/{streamName}").description("Scale up or down number of instances for a selected list applications in a stream"),
linkWithRel("streams/logs").description("Retrieve application logs of the stream"),
linkWithRel("streams/logs/{streamName}").description("Retrieve application logs of the stream"),
linkWithRel("streams/logs/{streamName}/{appName}").description("Retrieve a specific application log of the stream"),
Expand Down Expand Up @@ -167,6 +168,8 @@ public void index() throws Exception {
fieldWithPath("_links.streams/deployments/update/{name}.href").description("Link to the streams/deployments/update/{name}"),
fieldWithPath("_links.streams/deployments/update/{name}.templated").type(JsonFieldType.BOOLEAN).optional().description("Link streams/deployments/update/{name} is templated"),
fieldWithPath("_links.streams/deployments/platform/list.href").description("Link to the streams/deployments/platform/list"),
fieldWithPath("_links.streams/deployments/scale/{streamName}.href").description("Link to the streams/deployments/scale/{streamName}"),
fieldWithPath("_links.streams/deployments/scale/{streamName}.templated").type(JsonFieldType.BOOLEAN).optional().description("Link streams/deployments/scale/{streamName} is templated"),

fieldWithPath("_links.streams/validation.href").description("Link to the streams/validation"),
fieldWithPath("_links.streams/validation.templated").type(JsonFieldType.BOOLEAN).optional().description("Link streams/validation is templated"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,11 @@ void updateStream(String streamName, String releaseName, PackageIdentifier packa
* @throws OperationNotSupportedException if the server does not support stream validation
*/
StreamAppStatusResource validateStreamDefinition(String streamDefinitionName) throws OperationNotSupportedException;

/**
*
* @param streamName the stream(release) name
* @param applicationCounts Map of application names to desired count
*/
void scaleStream(String streamName, Map<String, String> applicationCounts);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package org.springframework.cloud.dataflow.rest.client;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.naming.OperationNotSupportedException;

import org.springframework.cloud.dataflow.rest.ScaleAppRequest;
import org.springframework.cloud.dataflow.rest.UpdateStreamRequest;
import org.springframework.cloud.dataflow.rest.client.support.VersionUtils;
import org.springframework.cloud.dataflow.rest.resource.StreamAppStatusResource;
Expand Down Expand Up @@ -148,6 +150,19 @@ public void destroyAll() {
restTemplate.delete(definitionsLink.getHref());
}

@Override
public void scaleStream(String streamName, Map<String, String> applicationCounts) {
String url = deploymentsLink.getHref() + "/scale/" + streamName;
List<ScaleAppRequest> scaleAppRequests = new ArrayList<>(applicationCounts.size());
for (Map.Entry<String, String> appCount : applicationCounts.entrySet()) {
ScaleAppRequest scaleAppRequest = new ScaleAppRequest();
scaleAppRequest.setName(appCount.getKey());
scaleAppRequest.setCount(appCount.getValue());
scaleAppRequests.add(scaleAppRequest);
}
restTemplate.postForObject(url, scaleAppRequests, Object.class);
}

@Override
public void updateStream(String streamName, String releaseName, PackageIdentifier packageIdentifier,
Map<String, String> updateProperties, boolean force, List<String> appNames) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.springframework.cloud.skipper.domain.PackageIdentifier;
import org.springframework.cloud.skipper.domain.Release;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* Represents a Stream deployed on DataFlow server. Instances of this class are created using a fluent style builder
Expand Down Expand Up @@ -109,6 +110,21 @@ public void update(String properties) {
}
}

public void scale(StreamApplication application, int count) {
this.client.streamOperations().scaleStream(this.name,
Collections.singletonMap(getAppLabelOrName(application), "" + count));
}

public void scale(Map<StreamApplication, Integer> applicationCounts) {
Map<String, String> apps = applicationCounts.entrySet().stream()
.collect(Collectors.toMap(e -> getAppLabelOrName(e.getKey()), e -> e.getValue() + ""));
this.client.streamOperations().scaleStream(this.name, apps);
}

private String getAppLabelOrName(StreamApplication application) {
return StringUtils.hasText(application.getLabel()) ? application.getLabel() : application.getName();
}

/**
* Unforced Stream Update with properties map
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,31 @@ public void destroy() {
verify(streamOperations, times(1))
.destroy(eq("ticktock"));
}

@Test
public void scale() {
StreamDefinitionResource resource = new StreamDefinitionResource("ticktock",
"time | log", "time | log", "demo stream");
resource.setStatus("deploying");
when(streamOperations.createStream(anyString(),
anyString(), anyString(), anyBoolean())).thenReturn(resource);
StreamApplication time = new StreamApplication("time");
StreamApplication log = new StreamApplication("log");
Stream stream = Stream.builder(client).name("ticktock").description("demo stream")
.source(time).sink(log)
.create().deploy();
verify(streamOperations, times(1)).createStream(
eq("ticktock"), eq("time | log"), eq("demo stream"), eq(false));
verify(streamOperations, times(1)).deploy(eq("ticktock"),
anyMap());
stream.scale(time, 3);

verify(streamOperations, times(1))
.scaleStream(eq("ticktock"), eq(Collections.singletonMap(time.getName(), "3")));

stream.scale(Collections.singletonMap(log, 2));

verify(streamOperations, times(1))
.scaleStream(eq("ticktock"), eq(Collections.singletonMap(log.getName(), "2")));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.dataflow.rest;

import java.util.Map;

/**
* @author Christian Tzolov
*/
public class ScaleAppRequest {

private String name;
private String count;
private Map<String, String> properties;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getCount() {
return count;
}

public void setCount(String count) {
this.count = count;
}

public Map<String, String> getProperties() {
return properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}

@Override
public String toString() {
return "ScaleAppRequest{" +
"name='" + name + '\'' +
", count='" + count + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ public RootResource info() {
root.add(WebMvcLinkBuilder.linkTo(WebMvcLinkBuilder.methodOn(StreamDeploymentController.class).platformList()).withRel("streams/deployments/platform/list"));
root.add(WebMvcLinkBuilder.linkTo(WebMvcLinkBuilder.methodOn(StreamDeploymentController.class).rollback(null, null)).withRel("streams/deployments/rollback/{name}/{version}"));
root.add(WebMvcLinkBuilder.linkTo(WebMvcLinkBuilder.methodOn(StreamDeploymentController.class).update(null, null)).withRel("streams/deployments/update/{name}"));
root.add(
unescapeTemplateVariables(entityLinks.linkToItemResource(StreamDeploymentResource.class, "{name}")
.withRel("streams/deployments/deployment")));
root.add(unescapeTemplateVariables(entityLinks.linkToItemResource(StreamDeploymentResource.class, "{name}").withRel("streams/deployments/deployment")));
root.add(WebMvcLinkBuilder.linkTo(WebMvcLinkBuilder.methodOn(StreamDeploymentController.class).scale(null, null)).withRel("streams/deployments/scale/{streamName}"));
root.add(WebMvcLinkBuilder.linkTo(StreamLogsController.class).withRel("streams/logs"));
root.add(WebMvcLinkBuilder.linkTo(WebMvcLinkBuilder.methodOn(StreamLogsController.class).getLog(null)).withRel("streams/logs/{streamName}"));
root.add(WebMvcLinkBuilder.linkTo(WebMvcLinkBuilder.methodOn(StreamLogsController.class).getLog(null, null)).withRel("streams/logs/{streamName}/{appName}"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.cloud.dataflow.core.StreamDefinition;
import org.springframework.cloud.dataflow.core.StreamDeployment;
import org.springframework.cloud.dataflow.rest.ScaleAppRequest;
import org.springframework.cloud.dataflow.rest.UpdateStreamRequest;
import org.springframework.cloud.dataflow.rest.resource.DeploymentStateResource;
import org.springframework.cloud.dataflow.rest.resource.StreamDeploymentResource;
Expand Down Expand Up @@ -94,6 +96,20 @@ public StreamDeploymentController(StreamDefinitionRepository repository,
this.streamService = streamService;
}

/**
* Request deployment of an existing stream definition.
* @param streamName the name of an existing stream definition (required)
* @param scaleAppRequests list of stream apps and desired scale for each (required)
* @return response without a body
*/
@RequestMapping(value = "/scale/{streamName}", method = RequestMethod.POST)
public ResponseEntity<Void> scale(@PathVariable("streamName") String streamName,
@RequestBody List<ScaleAppRequest> scaleAppRequests) {
logger.info(String.format("Scale stream: %s, apps: %s", streamName, scaleAppRequests));
this.streamService.scaleStream(streamName, scaleAppRequests);
return new ResponseEntity<>(HttpStatus.CREATED);
}

@RequestMapping(value = "/update/{name}", method = RequestMethod.POST)
public ResponseEntity<Void> update(@PathVariable("name") String name,
@RequestBody UpdateStreamRequest updateStreamRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.springframework.cloud.dataflow.core.StreamDefinition;
import org.springframework.cloud.dataflow.core.StreamDeployment;
import org.springframework.cloud.dataflow.rest.ScaleAppRequest;
import org.springframework.cloud.dataflow.rest.UpdateStreamRequest;
import org.springframework.cloud.dataflow.server.controller.support.InvalidStreamDefinitionException;
import org.springframework.cloud.dataflow.server.repository.NoSuchStreamDefinitionException;
Expand All @@ -40,6 +41,13 @@
*/
public interface StreamService {

/**
*
* @param streamName
* @param scaleAppRequests
*/
void scaleStream(String streamName, List<ScaleAppRequest> scaleAppRequests);

/**
* Update the stream using the UpdateStreamRequest.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.springframework.cloud.dataflow.core.dsl.ParseException;
import org.springframework.cloud.dataflow.core.dsl.StreamNode;
import org.springframework.cloud.dataflow.core.dsl.StreamParser;
import org.springframework.cloud.dataflow.rest.ScaleAppRequest;
import org.springframework.cloud.dataflow.rest.SkipperStream;
import org.springframework.cloud.dataflow.rest.UpdateStreamRequest;
import org.springframework.cloud.dataflow.rest.util.DeploymentPropertiesUtils;
Expand All @@ -57,6 +58,7 @@
import org.springframework.cloud.dataflow.server.service.ValidationStatus;
import org.springframework.cloud.dataflow.server.stream.SkipperStreamDeployer;
import org.springframework.cloud.dataflow.server.stream.StreamDeploymentRequest;
import org.springframework.cloud.deployer.spi.app.AppStatus;
import org.springframework.cloud.deployer.spi.app.DeploymentState;
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
import org.springframework.cloud.skipper.domain.Deployer;
Expand All @@ -69,6 +71,7 @@
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
Expand Down Expand Up @@ -217,6 +220,43 @@ private void updateStreamDefinitionFromReleaseManifest(String streamName, String
AuditOperationType.STREAM, AuditActionType.UPDATE, streamName, this.auditServiceUtils.convertStreamDefinitionToAuditData(streamDefinition));
}

@Override
public void scaleStream(String streamName, List<ScaleAppRequest> scaleAppRequests) {

if (CollectionUtils.isEmpty(scaleAppRequests)) {
logger.warn("No apps to scale for stream:" + streamName);
return;
}

Map<String, String> appNameToDeployerId = new HashMap<>();
List<AppStatus> appStatuses = this.skipperStreamDeployer.getStreamStatuses(streamName);
if (!CollectionUtils.isEmpty(appStatuses)) {
for (AppStatus appStatus : appStatuses) {
try {
String appName = appStatus.getInstances().values().iterator().next().getAttributes().get("skipper.application.name");
String deploymentId = appStatus.getDeploymentId();
appNameToDeployerId.put(appName, deploymentId);
}
catch (Throwable throwable) {
logger.warn("Failed to retrieve runtime status for " + appStatus.getDeploymentId(), throwable);
}
}
}

for (ScaleAppRequest scaleAppRequest : scaleAppRequests) {
String appDeploymentId = appNameToDeployerId.get(scaleAppRequest.getName());
if (!StringUtils.hasText(appDeploymentId)) {
throw new IllegalStateException(
String.format("Could not find deployment id for stream: %s, and app name: %s", streamName, scaleAppRequest.getName()));
}

logger.info(String.format("Scale %s:%s to %s", streamName, appDeploymentId, scaleAppRequest.getCount()));
}

// TODO: 1. Use Skipper's Scale API to send the scale request(s)
// TODO: 2. Figure out whether skipper requires the deploymentId or we can pass the App Name?
}

@Override
public void updateStream(String streamName, UpdateStreamRequest updateStreamRequest) {
updateStream(streamName, updateStreamRequest.getReleaseName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package org.springframework.cloud.dataflow.server.controller;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand All @@ -35,6 +37,7 @@

import org.springframework.cloud.dataflow.core.StreamDefinition;
import org.springframework.cloud.dataflow.core.StreamDeployment;
import org.springframework.cloud.dataflow.rest.ScaleAppRequest;
import org.springframework.cloud.dataflow.rest.SkipperStream;
import org.springframework.cloud.dataflow.rest.UpdateStreamRequest;
import org.springframework.cloud.dataflow.rest.resource.StreamDeploymentResource;
Expand Down Expand Up @@ -93,6 +96,20 @@ public void testDeployViaStreamService() {
Assert.assertEquals(argumentCaptor1.getValue(), "test");
}

@Test
public void testScaleStream() {
ScaleAppRequest sar = new ScaleAppRequest();
sar.setCount("666");
sar.setName("time");
List<ScaleAppRequest> res = new ArrayList<>();
res.add(sar);
this.controller.scale("ticktock", res);

ArgumentCaptor<List<ScaleAppRequest>> argumentCaptor1 = ArgumentCaptor.forClass(List.class);
verify(streamService).scaleStream(ArgumentMatchers.eq("ticktock"), argumentCaptor1.capture());
Assert.assertEquals(res, argumentCaptor1.getValue());
}

@Test
public void testUpdateStream() {
Map<String, String> deploymentProperties = new HashMap<>();
Expand Down
Loading

0 comments on commit 7e54e7d

Please sign in to comment.