Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update java sdk #607

Merged
merged 1 commit into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.concurrent.ThreadLocalRandom;

import org.apache.eventmesh.client.tcp.common.EventMeshCommon;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
Expand All @@ -35,37 +34,37 @@ public class EventMeshTestUtils {
private static final int seqLength = 10;

public static UserAgent generateClient1() {
UserAgent user = new UserAgent();
user.setEnv("test");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
user.setProducerGroup("EventmeshTest-ProducerGroup");
user.setConsumerGroup("EventmeshTest-ConsumerGroup");
user.setPath("/data/app/umg_proxy");
user.setPort(8362);
user.setSubsystem("5023");
user.setPid(32893);
user.setVersion("2.0.11");
user.setIdc("FT");
return user;
return UserAgent.builder()
.env("test")
.host("127.0.0.1")
.password(generateRandomString(8))
.username("PU4283")
.producerGroup("EventmeshTest-ProducerGroup")
.consumerGroup("EventmeshTest-ConsumerGroup")
.path("/data/app/umg_proxy")
.port(8362)
.subsystem("5023")
.pid(32893)
.version("2.0.11")
.idc("FT")
.build();
}

public static UserAgent generateClient2() {
UserAgent user = new UserAgent();
user.setEnv("test");
user.setHost("127.0.0.1");
user.setPassword(generateRandomString(8));
user.setUsername("PU4283");
user.setConsumerGroup("EventmeshTest-ConsumerGroup");
user.setProducerGroup("EventmeshTest-ProducerGroup");
user.setPath("/data/app/umg_proxy");
user.setPort(9362);
user.setSubsystem("5017");
user.setPid(42893);
user.setVersion("2.0.11");
user.setIdc("FT");
return user;
return UserAgent.builder()
.env("test")
.host("127.0.0.1")
.password(generateRandomString(8))
.username("PU4283")
.producerGroup("EventmeshTest-ProducerGroup")
.consumerGroup("EventmeshTest-ConsumerGroup")
.path("/data/app/umg_proxy")
.port(9362)
.subsystem("5017")
.pid(42893)
.version("2.0.11")
.idc("FT")
.build();
}

public static Package syncRR() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class CloudEventProducer extends AbstractHttpClient implements EventMeshProtocol

private static final String PROTOCOL_TYPE = "cloudevents";

private static final String PROTOCOL_DESC = "http";

public CloudEventProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException {
super(eventMeshHttpClientConfig);
}
Expand Down Expand Up @@ -105,6 +107,8 @@ private RequestParam buildCommonPostParam(CloudEvent cloudEvent) {
.addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE)
.addHeader(ProtocolKey.PROTOCOL_DESC, PROTOCOL_DESC)
.addHeader(ProtocolKey.PROTOCOL_VERSION, cloudEvent.getSpecVersion().toString())
// todo: move producerGroup tp header
.addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup())
.addBody(SendMessageRequestBody.CONTENT, JsonUtils.serialize(cloudEvent));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.eventmesh.client.http.producer;

import io.cloudevents.SpecVersion;
import org.apache.eventmesh.client.http.AbstractHttpClient;
import org.apache.eventmesh.client.http.EventMeshRetObj;
import org.apache.eventmesh.client.http.conf.EventMeshHttpClientConfig;
Expand All @@ -26,6 +27,10 @@
@Slf4j
class EventMeshMessageProducer extends AbstractHttpClient implements EventMeshProtocolProducer<EventMeshMessage> {

private static final String PROTOCOL_TYPE = "eventmeshmessage";

private static final String PROTOCOL_DESC = "http";

public EventMeshMessageProducer(EventMeshHttpClientConfig eventMeshHttpClientConfig) throws EventMeshException {
super(eventMeshHttpClientConfig);
}
Expand Down Expand Up @@ -106,6 +111,10 @@ private RequestParam buildCommonPostParam(EventMeshMessage message) {
.addHeader(ProtocolKey.ClientInstanceKey.USERNAME, eventMeshHttpClientConfig.getUserName())
.addHeader(ProtocolKey.ClientInstanceKey.PASSWD, eventMeshHttpClientConfig.getPassword())
.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion())
.addHeader(ProtocolKey.PROTOCOL_TYPE, PROTOCOL_TYPE)
.addHeader(ProtocolKey.PROTOCOL_DESC, PROTOCOL_DESC)
//default ce version is 1.0
.addHeader(ProtocolKey.PROTOCOL_VERSION, SpecVersion.V1.toString())
.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA)
.addBody(SendMessageRequestBody.PRODUCERGROUP, eventMeshHttpClientConfig.getProducerGroup())
// todo: set message to content is better
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,6 @@ public class EventMeshCommon {
public static String PREFIX_SESSION_TPS_STAT_EVENTREV = "event_rev_tps_";

public static String CLOUD_EVENTS_PROTOCOL_NAME = "cloudevents";

public static String EM_MESSAGE_PROTOCOL_NAME = "eventmeshmessage";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ThreadLocalRandom;

import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Subscription;
Expand Down Expand Up @@ -78,16 +79,26 @@ public static Package asyncMessageAck(Package in) {
return msg;
}

public static Package asyncCloudEvent(CloudEvent cloudEvent) {
public static Package buildPackage(Object message, Command command) {
Package msg = new Package();
msg.setHeader(new Header(Command.ASYNC_MESSAGE_TO_SERVER, 0,
null, generateRandomString(seqLength)));
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE,
EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME);
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION,
cloudEvent.getSpecVersion().toString());
msg.setHeader(new Header(command, 0,
null, generateRandomString(seqLength)));
if (message instanceof CloudEvent) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE,
EventMeshCommon.CLOUD_EVENTS_PROTOCOL_NAME);
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION,
((CloudEvent) message).getSpecVersion().toString());
} else if (message instanceof EventMeshMessage) {
msg.getHeader().putProperty(Constants.PROTOCOL_TYPE,
EventMeshCommon.EM_MESSAGE_PROTOCOL_NAME);
msg.getHeader().putProperty(Constants.PROTOCOL_VERSION,
SpecVersion.V1.toString());
} else {
// unsupported protocol for server
return msg;
}
msg.getHeader().putProperty(Constants.PROTOCOL_DESC, "tcp");
msg.setBody(cloudEvent);
msg.setBody(message);
return msg;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,13 @@ public void initChannel(SocketChannel ch) {
}

@Override
public void close() throws EventMeshException {
public void close() {
try {
channel.disconnect().sync();
workers.shutdownGracefully();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("close tcp client failed.|remote address={}", channel.remoteAddress(), e);
throw new EventMeshException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,54 +1,132 @@
package org.apache.eventmesh.client.tcp.impl.cloudevent;



import org.apache.eventmesh.client.tcp.EventMeshTCPPubClient;
import org.apache.eventmesh.client.tcp.common.AsyncRRCallback;
import org.apache.eventmesh.client.tcp.common.ReceiveMsgHook;
import org.apache.eventmesh.client.tcp.common.*;
import org.apache.eventmesh.client.tcp.conf.EventMeshTcpClientConfig;
import org.apache.eventmesh.client.tcp.impl.eventmeshmessage.EventMeshMessageTCPPubClient;
import org.apache.eventmesh.common.EventMeshMessage;
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.cloudevents.CloudEvent;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
* A CloudEvent TCP publish client implementation.
*/
@Slf4j
public class CloudEventTCPPubClient implements EventMeshTCPPubClient<CloudEvent> {
public class CloudEventTCPPubClient extends TcpClient implements EventMeshTCPPubClient<CloudEvent> {

private final UserAgent userAgent;

private ReceiveMsgHook<EventMeshMessage> callback;

private final ConcurrentHashMap<String, AsyncRRCallback> callbackConcurrentHashMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> task;

public CloudEventTCPPubClient(EventMeshTcpClientConfig eventMeshTcpClientConfig) {
super(eventMeshTcpClientConfig);
this.userAgent = eventMeshTcpClientConfig.getUserAgent();
}

@Override
public void init() throws EventMeshException {

try {
open(new Handler());
hello();
} catch (Exception ex) {
throw new EventMeshException("Initialize EventMeshMessageTCPPubClient error", ex);
}
}

@Override
public void heartbeat() throws EventMeshException {

if (task != null) {
synchronized (EventMeshMessageTCPPubClient.class) {
task = scheduler.scheduleAtFixedRate(() -> {
try {
if (!isActive()) {
reconnect();
}
Package msg = MessageUtils.heartBeat();
io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
} catch (Exception ignore) {
// ignore
}
}, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS);
}
}
}

@Override
public void reconnect() throws EventMeshException {

try {
super.reconnect();
hello();
} catch (Exception ex) {
throw new EventMeshException("reconnect error", ex);
}
}

@Override
public Package rr(CloudEvent msg, long timeout) throws EventMeshException {
return null;
public Package rr(CloudEvent event, long timeout) throws EventMeshException {
try {
Package msg = MessageUtils.buildPackage(event, Command.REQUEST_TO_SERVER);
log.info("{}|rr|send|type={}|msg={}", clientNo, msg, msg);
return io(msg, timeout);
} catch (Exception ex) {
throw new EventMeshException("rr error");
}
}

@Override
public void asyncRR(CloudEvent msg, AsyncRRCallback callback, long timeout) throws EventMeshException {

public void asyncRR(CloudEvent event, AsyncRRCallback callback, long timeout) throws EventMeshException {
try {
Package msg = MessageUtils.buildPackage(event, Command.REQUEST_TO_SERVER);
super.send(msg);
this.callbackConcurrentHashMap.put((String) RequestContext._key(msg), callback);
} catch (Exception ex) {
// should trigger callback?
throw new EventMeshException("asyncRR error", ex);
}
}

@Override
public Package publish(CloudEvent cloudEvent, long timeout) throws EventMeshException {
return null;
try {
// todo: transform EventMeshMessage to Package
Package msg = MessageUtils.buildPackage(cloudEvent, Command.ASYNC_MESSAGE_TO_SERVER);
log.info("SimplePubClientImpl cloud event|{}|publish|send|type={}|protocol={}|msg={}",
clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
return io(msg, timeout);
} catch (Exception ex) {
throw new EventMeshException("publish error", ex);
}
}

@Override
public void broadcast(CloudEvent cloudEvent, long timeout) throws EventMeshException {

try {
// todo: transform EventMeshMessage to Package
Package msg = MessageUtils.buildPackage(cloudEvent, Command.BROADCAST_MESSAGE_TO_SERVER);
log.info("{}|publish|send|type={}|protocol={}|msg={}", clientNo, msg.getHeader().getCommand(),
msg.getHeader().getProperty(PropertyConst.PROPERTY_MESSAGE_PROTOCOL), msg);
super.send(msg);
} catch (Exception ex) {
throw new EventMeshException("Broadcast message error", ex);
}
}

@Override
Expand All @@ -57,7 +135,45 @@ public void registerBusiHandler(ReceiveMsgHook<CloudEvent> handler) throws Event
}

@Override
public void close() throws EventMeshException {
public void close() {

}

// todo: move to abstract class
@ChannelHandler.Sharable
private class Handler extends SimpleChannelInboundHandler<Package> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Exception {
log.info("SimplePubClientImpl|{}|receive|type={}|msg={}", clientNo, msg.getHeader(), msg);

Command cmd = msg.getHeader().getCommand();
if (cmd == Command.RESPONSE_TO_CLIENT) {
if (callback != null) {
callback.handle(msg, ctx);
}
Package pkg = MessageUtils.responseToClientAck(msg);
send(pkg);
} else if (cmd == Command.SERVER_GOODBYE_REQUEST) {
//TODO
}

RequestContext context = contexts.get(RequestContext._key(msg));
if (context != null) {
contexts.remove(context.getKey());
context.finish(msg);
}
}
}

// todo: remove hello
private void hello() throws Exception {
Package msg = MessageUtils.hello(userAgent);
this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
}

// todo: remove goodbye
private void goodbye() throws Exception {
Package msg = MessageUtils.goodbye();
this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS);
}
}
Loading