-
Notifications
You must be signed in to change notification settings - Fork 34
[Data Plane Mgr] Support Pulsar Publisher #481
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. This is a good start. Please resolve the comments one by one, and work with @chenpiaoping if you have any question.
...e_manager/src/main/java/com/futurewei/alcor/dataplane/client/pulsar/DataPlaneClientImpl.java
Show resolved
Hide resolved
#####Pulsar configuration##### | ||
pulsar.url=pulsar://127.0.0.1:6650 | ||
pulsar.unicast.topic=unicast-topic1 | ||
host.ip.to.group.topic.map=group-topic1:192.168.131.131,10.10.10.11 group-topic2:192.168.131.131,11.11.11.12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an OK short-term solution. Let us plan a long-term solution. One possibility to store the mapping in node metadata manager, as this is a host-level resource.
@chenpiaoping @VanderChen what do you think of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a tracking issue #490
@VanderChen If you can take this item, that would be great. Let us sync on Slack for more details.
@Autowired | ||
private PulsarConfiguration configuration; | ||
|
||
private Map<String, String> hostIpToGroupTopic; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us plan to build these mapping from node metadata manager and store it in the local cache under development.
this.hostIpToGroupTopic = parseTopicConfig(configuration.getHostIpToGroupTopicMap()); | ||
this.groupTopicToMulticastTopic = parseTopicConfig(configuration.getGroupTopicToMulticastTopicMap()); | ||
}catch (Exception e) { | ||
throw new Exception("Parse topic config error: " + e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can define an explicit exception for this parsing error. Please find an example in https://github.com/futurewei-cloud/alcor/tree/master/services/data_plane_manager/src/main/java/com/futurewei/alcor/dataplane/exception
The idea is that we know every possible exception our own program may throw, and define it before hand. In each of the predefined exception, we can define its error code and message, and come to the debugging time, we can quickly locate where the exception is thrown and why based on the given error code and message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a predefined exception to solve this problem.
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class UnicastFunction implements Function<UnicastGoalStateByte, UnicastGoalStateByte> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like this function implementation, which is neat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, where is this UnicastFunction used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both UnicastFunction and MulticastFuncation will be bundled as jar packages. These will be deployed to the Pulsar cluster as function workers by the Pulsar CLI tool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to use Java interface to deploy function workers. This will enable automatic deployment when the dpm service starts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool looking forward to it 👍
public class MulticastFunction implements Function<MulticastGoalStateByte, MulticastGoalStateByte> { | ||
private static final Logger LOG = LoggerFactory.getLogger(MulticastFunction.class); | ||
public static final ThreadPoolExecutor executor = new ThreadPoolExecutor( | ||
20, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use configuration here.
Check the thread pool configurations in application.properties
grpc.min-threads = 100
grpc.max-threads = 200
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix this in the next commit.
} | ||
|
||
Producer<UnicastGoalStateByte> producer = pulsarClient | ||
.newProducer(JSONSchema.of(UnicastGoalStateByte.class)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to JSONSchema, what does other schema Pulsar support?
"unicastGoalStates: {}", nextTopic, unicastGoalState); | ||
} | ||
|
||
List<Goalstateprovisioner.GoalStateOperationReply.GoalStateOperationStatus> tempList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we don't have a response from MQ for now. @chenpiaoping Let us plan to design a feedback channel here: it would be ideal if we can reuse the same topic with different key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a tracking issue #489 @chenpiaoping
...e_manager/src/main/java/com/futurewei/alcor/dataplane/client/pulsar/DataPlaneClientImpl.java
Outdated
Show resolved
Hide resolved
// TODO: Find a field to decide client | ||
if (true){ | ||
return pulsarDataPlaneClient.createGoalStates(unicastGoalStates, multicastGoalState); | ||
if (networkConfig.getPortEntities().get(0).isFastPath()){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a question about this. Will there be different isFastPath values in multiple portEntity? How to deal with this situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IsFastPath is a port-level config. Although we allow different values for multiple port entities for the sake of flexibility, we don't have such a scenario for now. In terms of implementation, we could make a port-level decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, for the same NetworkConfiguration, we need to divide it into two groups to generate Goalstate based on whether it is fastpath. Am I following this logic to modify
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a NetworkConfiguration includes multiple port entities, DPM will process each entity independently and likely send the corresponding GS to its target host.
At the moment of sending the GS, we need to use the isFastPath field to determine go with the fast path or MQ scaling path.
Codecov Report
@@ Coverage Diff @@
## master #481 +/- ##
============================================
- Coverage 36.59% 36.59% -0.01%
Complexity 1162 1162
============================================
Files 453 453
Lines 10869 10869
Branches 1393 1393
============================================
- Hits 3978 3977 -1
- Misses 6349 6350 +1
Partials 542 542
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @VanderChen.
I left a few comments and create two tracking issue #489 and #490 for two pending requests. Please take a look.
@@ -517,24 +522,13 @@ private UnicastGoalState buildUnicastGoalState(NetworkConfiguration networkConfi | |||
return unicastGoalState; | |||
} | |||
|
|||
private List<Map<String, List<GoalStateOperationStatus>>> createPortConfiguration(NetworkConfiguration networkConfig) throws Exception { | |||
private List<Map<String, List<GoalStateOperationStatus>>> doCreatePortConfiguration(NetworkConfiguration networkConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like this method abstraction and its renaming!
pulsar-admin functions create \ | ||
--jar target/dataplanemanager-0.1.0-SNAPSHOT.jar \ | ||
--classname com.futurewei.alcor.dataplane.client.pulsar.function.UnicastFunction \ | ||
--tenant public --namespace default \ | ||
--name unicast-function \ | ||
--inputs persistent://public/default/unicast-topic1 \ | ||
--output persistent://public/default/group-topic1 | ||
|
||
pulsar-admin functions create \ | ||
--jar target/dataplanemanager-0.1.0-SNAPSHOT.jar \ | ||
--classname com.futurewei.alcor.dataplane.client.pulsar.MulticastFunction \ | ||
--tenant public --namespace default \ | ||
--name multicast-function \ | ||
--inputs persistent://public/default/multicast-topic1 \ | ||
--output persistent://public/default/group-topic1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@VanderChen I was wondering how the function-deploy.sh gets triggered.
...e_manager/src/main/java/com/futurewei/alcor/dataplane/client/pulsar/DataPlaneClientImpl.java
Show resolved
Hide resolved
"unicastGoalStates: {}", nextTopic, unicastGoalState); | ||
} | ||
|
||
List<Goalstateprovisioner.GoalStateOperationReply.GoalStateOperationStatus> tempList = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a tracking issue #489 @chenpiaoping
#####Pulsar configuration##### | ||
pulsar.url=pulsar://127.0.0.1:6650 | ||
pulsar.unicast.topic=unicast-topic1 | ||
host.ip.to.group.topic.map=group-topic1:192.168.131.131,10.10.10.11 group-topic2:192.168.131.131,11.11.11.12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create a tracking issue #490
@VanderChen If you can take this item, that would be great. Let us sync on Slack for more details.
This PR proposes the following change: