Skip to content

Commit

Permalink
Merge branch 'master' into simplify-for-transports-to-send-to-multipl…
Browse files Browse the repository at this point in the history
…e-destinations
  • Loading branch information
cfredri4 committed Nov 14, 2024
2 parents 631e607 + 8c6cfa0 commit b521dd4
Show file tree
Hide file tree
Showing 63 changed files with 3,135 additions and 692 deletions.
1 change: 1 addition & 0 deletions conf/jg-magic-map.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
<class id="97" name="org.jgroups.protocols.RTTHeader"/>
<class id="98" name="org.jgroups.protocols.ProtPerfHeader"/>
<class id="99" name="org.jgroups.protocols.NakAckHeader"/>
<class id="100" name="org.jgroups.protocols.UnicastHeader"/>

</magic-number-class-mapping>

1 change: 1 addition & 0 deletions conf/jg-protocol-ids.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
<class id="75" name="org.jgroups.protocols.JDBC_PING2"/>
<class id="76" name="org.jgroups.protocols.NAKACK3"/>
<class id="77" name="org.jgroups.protocols.NAKACK4"/>
<class id="78" name="org.jgroups.protocols.UNICAST4"/>

<!-- IDs reserved for building blocks -->
<class id="200" name="org.jgroups.blocks.RequestCorrelator"/> <!-- ID should be the same as Global.BLOCKS_START_ID -->
Expand Down
4 changes: 0 additions & 4 deletions conf/sym-encrypt.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,5 @@
<UNICAST3/>
<pbcast.STABLE/>
<FRAG2/>
<!-- AUTH below is optional -->
<AUTH auth_class="org.jgroups.auth.MD5Token"
auth_value="chris"
token_hash="MD5"/>
<pbcast.GMS join_timeout="2s" />
</config>
35 changes: 4 additions & 31 deletions doc/manual/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -269,48 +269,22 @@ to better performance.

JGroups 5.0 comes with a number of message types (see the next sections). If none of them are a fit for the application's
requirements, new message types can be defined and registered. To do this, the new message type needs to implement
`Message` (typically by subclassing `BaseMessage`) and registering it with the `MessageFactory` in the transport:
`Message` (typically by subclassing `BaseMessage`) and registering it with the `MessageFactory`:

[source,java]
----
CustomMessage msg=new CustomMessage(...);
JChannel ch;
TP transport=ch.getProtocolStack().getTransport();
MessageFactory mf=transport.getMessageFactory();
mf.register((short)12345, CustomMessage::new)
MessageFactory.register((short)12345, CustomMessage::new)
----

A (unique) ID has to be assigned with the message type, and then it has to be registered with the message factory
in the transport. This has to be done before sending an instance of the new message type.
If the ID has already been registered before, or is taken, an exception will be thrown.
Note that the default implementation of `MessageFactory` requires all IDs to be greater than 32, so that there's room
for adding built-in message types.
`MessageFactory` requires all IDs to be greater than 32, so that there's room for adding built-in message types.

NOTE: It is recommended to register all custom message types _before_ connecting the channel, so that potential errors
are detected early.

[[CustomMessageFactory]]
==== Custom `MessageFactory`
`MessageFactory` is a simple interface:

[source,java]
----
public interface MessageFactory {
<T extends Message> T create(short id);
void register(short type, Supplier<? extends Message> generator);
}
----
We saw the that the `register()` method is used to associate new message types with IDs <<MessageFactory,above>>.

There is a `DefaultMessageFactory` which is set in the transport (`TP`). If more control over the creation of custom
messages is desired, a custom implementation of `MessageFactory` can be written and registered in the transport, using
`TP.setMessageFactory(MessageFactory mf)`.

An example for why we might want to provide our own `MessageFactory` is that we have control over the creation of
messages; e.g. to create an `NioMessage` with a *direct* `ByteBuffer`, we may want to use a _pool_ of off-heap memory
rather than calling `ByteBuffer.allocateDirect()` for each message, which is slow.


[[BytesMessage]]
==== BytesMessage
This is the equivalent to the 4.x `Message`, and contains a byte array, offset and length. There are methods to get and
Expand Down Expand Up @@ -365,8 +339,7 @@ The methods of `NioMessage` are:
|==========================

NOTE: The envisioned use case for `useDirectMemory()` is when we send an `NioMessage` with a direct `ByteBuffer`, but
don't need the `ByteBuffer` to be created in off-heap memory at the receiver, when on-heap will do. +
The alternative is to provide a custom <<MessageFactory,`MessageFactory`>>.
don't need the `ByteBuffer` to be created in off-heap memory at the receiver, when on-heap will do.



Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.5.1</version>
<version>3.5.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
5 changes: 1 addition & 4 deletions src/org/jgroups/BatchMessage.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

package org.jgroups;


Expand Down Expand Up @@ -32,8 +31,6 @@ public class BatchMessage extends BaseMessage implements Iterable<Message> {
protected Address orig_src;


protected static final MessageFactory mf=new DefaultMessageFactory();

public BatchMessage() {
}

Expand Down Expand Up @@ -155,7 +152,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
msgs=new Message[index]; // a bit of additional space should we add byte arrays
for(int i=0; i < index; i++) {
short type=in.readShort();
msgs[i]=mf.create(type).setDest(dest()).setSrc(orig_src);
msgs[i]=MessageFactory.create(type).setDest(dest()).setSrc(orig_src);
msgs[i].readFrom(in);
}
}
Expand Down
26 changes: 18 additions & 8 deletions src/org/jgroups/CompositeMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.*;
import java.util.function.Supplier;

/**
Expand All @@ -31,8 +28,6 @@ public class CompositeMessage extends BaseMessage implements Iterable<Message> {
protected boolean collapse; // send as a BytesMessage when true


protected static final MessageFactory mf=new DefaultMessageFactory();

public CompositeMessage() {
}

Expand All @@ -46,6 +41,11 @@ public CompositeMessage(Address dest, Message ... messages) {
add(messages);
}

public CompositeMessage(Address dest, Collection<Message> messages) {
super(dest);
add(messages);
}


public Supplier<Message> create() {return CompositeMessage::new;}
public short getType() {return collapse? Message.BYTES_MSG : Message.COMPOSITE_MSG;}
Expand Down Expand Up @@ -86,6 +86,12 @@ public CompositeMessage add(Message ... messages) {
return this;
}

public CompositeMessage add(Collection<Message> messages) {
ensureCapacity(index + messages.size());
for(Message msg: messages)
msgs[index++]=Objects.requireNonNull(ensureSameDest(msg));
return this;
}

public <T extends Message> T get(int index) {
return (T)msgs[index];
Expand Down Expand Up @@ -136,6 +142,7 @@ public void writePayload(DataOutput out) throws IOException {
for(int i=0; i < index; i++) {
Message msg=msgs[i];
out.writeShort(msg.getType());
// msg.writeToNoAddrs(src(), out);
msg.writeTo(out);
}
}
Expand All @@ -147,8 +154,11 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
msgs=new Message[index]; // a bit of additional space should we add byte arrays
for(int i=0; i < index; i++) {
short type=in.readShort();
msgs[i]=mf.create(type);
msgs[i].readFrom(in);
Message msg=MessageFactory.create(type).setDest(getDest());
if(msg.getSrc() == null)
msg.setSrc(src());
msg.readFrom(in);
msgs[i]=msg;
}
}
}
Expand Down
46 changes: 0 additions & 46 deletions src/org/jgroups/DefaultMessageFactory.java

This file was deleted.

41 changes: 35 additions & 6 deletions src/org/jgroups/MessageFactory.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,56 @@
package org.jgroups;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Factory to create messages. Uses an array for message IDs less then 32, and a hashmap for
* types above 32
* @author Bela Ban
* @since 5.0
*/
public interface MessageFactory {


public class MessageFactory {
protected static final byte MIN_TYPE=32;
protected static final Supplier<? extends Message>[] creators=new Supplier[MIN_TYPE];
protected static Map<Short,Supplier<? extends Message>> map=new HashMap<>();
static {
creators[Message.BYTES_MSG]=BytesMessage::new;
creators[Message.NIO_MSG]=NioMessage::new;
creators[Message.EMPTY_MSG]=EmptyMessage::new;
creators[Message.OBJ_MSG]=ObjectMessage::new;
creators[Message.LONG_MSG]=LongMessage::new;
creators[Message.COMPOSITE_MSG]=CompositeMessage::new;
creators[Message.FRAG_MSG]=FragmentedMessage::new;
creators[Message.EARLYBATCH_MSG]=BatchMessage::new;
}

/**
* Creates a message based on the given ID
* @param id The ID
* @param type The ID
* @param <T> The type of the message
* @return A message
*/
<T extends Message> T create(short id);
public static <T extends Message> T create(short type) {
Supplier<? extends Message> creator=type < MIN_TYPE? creators[type] : map.get(type);
if(creator == null)
throw new IllegalArgumentException("no creator found for type " + type);
return (T)creator.get();
}

/**
* Registers a new creator of messages
* @param type The type associated with the new payload. Needs to be the same in all nodes of the same cluster, and
* needs to be available (ie., not taken by JGroups or other applications).
* @param generator The creator of the payload associated with the given type
*/
<M extends MessageFactory> M register(short type, Supplier<? extends Message> generator);
public static void register(short type, Supplier<? extends Message> generator) {
Objects.requireNonNull(generator, "the creator must be non-null");
if(type < MIN_TYPE)
throw new IllegalArgumentException(String.format("type (%d) must be >= 32", type));
if(map.containsKey(type))
throw new IllegalArgumentException(String.format("type %d is already taken", type));
map.put(type, generator);
}
}
7 changes: 5 additions & 2 deletions src/org/jgroups/ObjectMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ObjectMessage(Address dest, SizeStreamable obj) {
public ObjectMessage setArray(ByteArray buf) {throw new UnsupportedOperationException();}
public boolean isWrapped() {return isFlagSet(Flag.SERIALIZED);}

// reusing SERIALIZABLE
// reusing SERIALIZED
public ObjectMessage setWrapped(boolean b) {
if(b) setFlag(Flag.SERIALIZED);
else clearFlag(Flag.SERIALIZED);
Expand Down Expand Up @@ -145,8 +145,11 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
}

@Override protected Message copyPayload(Message copy) {
if(obj != null)
if(obj != null) {
((ObjectMessage)copy).setObject(obj);
if(isFlagSet(Flag.SERIALIZED))
copy.setFlag(Flag.SERIALIZED);
}
return copy;
}

Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/conf/ClassConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ClassConfigurator {
protected static final String ID = "id";
protected static final String NAME = "name";
protected static final String EXTERNAL = "external";
private static final int MAX_MAGIC_VALUE=100;
private static final int MAX_MAGIC_VALUE=124;
private static final int MAX_PROT_ID_VALUE=256;
private static final short MIN_CUSTOM_MAGIC_NUMBER=1024;
private static final short MIN_CUSTOM_PROTOCOL_ID=512;
Expand Down Expand Up @@ -292,7 +292,7 @@ protected static void alreadyInProtocolsMap(short prot_id, String classname) {
* try to read the magic number configuration file as a Resource form the classpath using getResourceAsStream
* if this fails this method tries to read the configuration file from mMagicNumberFile using a FileInputStream (not in classpath but somewhere else in the disk)
*
* @return an array of ClassMap objects that where parsed from the file (if found) or an empty array if file not found or had en exception
* @return a list of ClassMap objects that where parsed from the file (if found) or an empty array if file not found or had en exception
*/
protected static List<Triple<Short,String,Boolean>> readMappings(String name) throws Exception {
InputStream stream=Util.getResourceAsStream(name, ClassConfigurator.class);
Expand Down
8 changes: 3 additions & 5 deletions src/org/jgroups/protocols/COMPRESS.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class COMPRESS extends Protocol {

protected BlockingQueue<Deflater> deflater_pool;
protected BlockingQueue<Inflater> inflater_pool;
protected MessageFactory msg_factory;
protected final LongAdder num_compressions=new LongAdder(), num_decompressions=new LongAdder();


Expand Down Expand Up @@ -77,7 +76,6 @@ public void init() throws Exception {
inflater_pool=new ArrayBlockingQueue<>(pool_size);
for(int i=0; i < pool_size; i++)
inflater_pool.add(new Inflater());
msg_factory=getTransport().getMessageFactory();
}

public void destroy() {
Expand Down Expand Up @@ -192,7 +190,7 @@ protected Message uncompress(Message msg, int original_size, boolean needs_deser
inflater.inflate(uncompressed_payload);
// we need to copy: https://issues.redhat.com/browse/JGRP-867
if(needs_deserialization) {
return messageFromByteArray(uncompressed_payload, msg_factory);
return messageFromByteArray(uncompressed_payload);
}
else
return msg.copy(false, true).setArray(uncompressed_payload, 0, uncompressed_payload.length);
Expand Down Expand Up @@ -221,9 +219,9 @@ protected static ByteArray messageToByteArray(Message msg) {
}
}

protected static Message messageFromByteArray(byte[] uncompressed_payload, MessageFactory msg_factory) {
protected static Message messageFromByteArray(byte[] uncompressed_payload) {
try {
return Util.messageFromBuffer(uncompressed_payload, 0, uncompressed_payload.length, msg_factory);
return Util.messageFromBuffer(uncompressed_payload, 0, uncompressed_payload.length);
}
catch(Exception ex) {
throw new RuntimeException("failed unmarshalling message", ex);
Expand Down
Loading

0 comments on commit b521dd4

Please sign in to comment.