Skip to content

Commit

Permalink
Expose sequence recovery data via SessionStateListener (#315)
Browse files Browse the repository at this point in the history
* Enhanced session state listener with new recovery awareness method
* SessionStateListener methods now have default voids, as per PR discussion
  • Loading branch information
sheinbergon authored Oct 19, 2020
1 parent 1618a09 commit f577b30
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,18 @@ public void onRefresh() {
sendNotification("refresh");
}

public void onResendRequestSent(int beginSeqNo, int endSeqNo, int currentEndSeqNo) {
sendNotification("resendRequestSent");
}

public void onSequenceResetReceived(int newSeqNo, boolean gapFillFlag) {
sendNotification("sequenceResetReceived");
}

public void onResendRequestSatisfied(int beginSeqNo, int endSeqNo) {
sendNotification("resentRequestSatisfied");
}

public void onReset() {
sendNotification("reset");
}
Expand Down
24 changes: 13 additions & 11 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ public class Session implements Closeable {
public static final String SETTING_DISCONNECT_ON_ERROR = "DisconnectOnError";

/**
* Session setting to control precision in message timestamps.
* Session setting to control precision in message timestamps.
* Valid values are "SECONDS", "MILLIS", "MICROS", "NANOS". Default is "MILLIS".
* Only valid for FIX version >= 4.2.
*/
Expand Down Expand Up @@ -439,7 +439,7 @@ public class Session implements Closeable {
private final boolean validateIncomingMessage;
private final int[] logonIntervals;
private final Set<InetAddress> allowedRemoteAddresses;

public static final int DEFAULT_MAX_LATENCY = 120;
public static final int DEFAULT_RESEND_RANGE_CHUNK_SIZE = 0; // no resend range
public static final double DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER = 0.5;
Expand Down Expand Up @@ -1491,7 +1491,7 @@ private void nextSequenceReset(Message sequenceReset) throws IOException, Reject

if (validateSequenceNumbers && sequenceReset.isSetField(NewSeqNo.FIELD)) {
final int newSequence = sequenceReset.getInt(NewSeqNo.FIELD);

stateListener.onSequenceResetReceived(newSequence, isGapFill);
getLog().onEvent(
"Received SequenceReset FROM: " + getExpectedTargetNum() + " TO: "
+ newSequence);
Expand Down Expand Up @@ -1791,6 +1791,7 @@ private boolean verify(Message msg, boolean checkTooHigh, boolean checkTooLow)
getLog().onEvent(
"ResendRequest for messages FROM " + range.getBeginSeqNo() + " TO " + range.getEndSeqNo()
+ " has been satisfied.");
stateListener.onResendRequestSatisfied(range.getBeginSeqNo(), range.getEndSeqNo());
state.setResendRange(0, 0, 0);
}
}
Expand Down Expand Up @@ -2032,19 +2033,19 @@ private boolean generateLogon() throws IOException {
logon.setInt(NextExpectedMsgSeqNum.FIELD, nextExpectedMsgNum);
state.setLastExpectedLogonNextSeqNum(nextExpectedMsgNum);
}

setLogonTags(logon);
return sendRaw(logon, 0);
}

/**
* Logs out from session and closes the network connection.
*
*
* This method should not be called from user-code since it is likely
* to deadlock when called from a different thread than the Session thread
* and messages are sent/received concurrently.
* Instead the logout() method should be used where possible.
*
*
* @param reason the reason why the session is disconnected
* @param logError set to true if this disconnection is an error
* @throws IOException IO error
Expand Down Expand Up @@ -2085,7 +2086,7 @@ public void disconnect(String reason, boolean logError) throws IOException {
if (!state.isInitiator()) {
setEnabled(true);
}

state.setLogonReceived(false);
state.setLogonSent(false);
state.setLogoutSent(false);
Expand Down Expand Up @@ -2481,9 +2482,10 @@ private void sendResendRequest(String beginString, int msgSeqNum, int beginSeqNo
initializeHeader(resendRequest.getHeader());
sendRaw(resendRequest, 0);
getLog().onEvent("Sent ResendRequest FROM: " + beginSeqNo + " TO: " + (endSeqNo == 0 ? "infinity" : endSeqNo));
state.setResendRange(beginSeqNo, msgSeqNum - 1, resendRequestChunkSize == 0
? 0
: lastEndSeqNoSent);
int resendRangeEndSeqNum = msgSeqNum - 1;
int resendRangeCurrentSeqNum = resendRequestChunkSize == 0 ? 0 : lastEndSeqNoSent;
state.setResendRange(beginSeqNo, resendRangeEndSeqNum, resendRangeCurrentSeqNum);
stateListener.onResendRequestSent(beginSeqNo, resendRangeEndSeqNum, resendRangeCurrentSeqNum);
}

private boolean validatePossDup(Message msg) throws FieldNotFound, IOException {
Expand Down Expand Up @@ -2546,7 +2548,7 @@ private void generateLogon(Message otherLogon, int expectedTargetNum) throws Fie
} else {
getLog().onEvent("Responding to Logon request");
}

setLogonTags(logon);
sendRaw(logon, 0);
state.setLogonSent(true);
Expand Down
33 changes: 25 additions & 8 deletions quickfixj-core/src/main/java/quickfix/SessionStateListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,37 @@

public interface SessionStateListener {

void onConnect();
default void onConnect(){
}

void onDisconnect();
default void onDisconnect(){
}

void onLogon();
default void onLogon(){
}

void onLogout();
default void onLogout(){
}

void onReset();
default void onReset(){
}

void onRefresh();
default void onRefresh(){
}

void onMissedHeartBeat();
default void onMissedHeartBeat(){
}

void onHeartBeatTimeout();
default void onHeartBeatTimeout(){
}

default void onResendRequestSent(int beginSeqNo, int endSeqNo, int currentEndSeqNo){
}

default void onSequenceResetReceived(int newSeqNo, boolean gapFillFlag){
}

default void onResendRequestSatisfied(int beginSeqNo, int endSeqNo){
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,15 @@ public void onMissedHeartBeat() {

public void onHeartBeatTimeout() {
}

public void onResendRequestSent(int beginSeqNo, int endSeqNo, int currentEndSeqNo) {
}

public void onSequenceResetReceived(int newSeqNo, boolean gapFillFlag) {
}

public void onResendRequestSatisfied(int beginSeqNo, int endSeqNo) {
}
}

}
9 changes: 9 additions & 0 deletions quickfixj-core/src/test/java/quickfix/SessionResetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ public void onMissedHeartBeat() {

public void onHeartBeatTimeout() {
}

public void onResendRequestSent(int beginSeqNo, int endSeqNo, int currentEndSeqNo) {
}

public void onSequenceResetReceived(int newSeqNo, boolean gapFillFlag) {
}

public void onResendRequestSatisfied(int beginSeqNo, int endSeqNo) {
}
}

}
12 changes: 12 additions & 0 deletions quickfixj-core/src/test/java/quickfix/SocketInitiatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,18 @@ public void onMissedHeartBeat() {
@Override
public void onHeartBeatTimeout() {
}

@Override
public void onResendRequestSent(int beginSeqNo, int endSeqNo, int currentEndSeqNo) {
}

@Override
public void onSequenceResetReceived(int newSeqNo, boolean gapFillFlag) {
}

@Override
public void onResendRequestSatisfied(int beginSeqNo, int endSeqNo) {
}
};

LogFactory logFactory = sessionID -> logSessionStateListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,12 @@ public void onMissedHeartBeat() {
public void onHeartBeatTimeout() {
}

public void onResendRequestSent(int beginSeqNo, int endSeqNo, int currentEndSeqNo) {
}

public void onSequenceResetReceived(int newSeqNo, boolean gapFillFlag) {
}

public void onResendRequestSatisfied(int beginSeqNo, int endSeqNo) {
}
}

0 comments on commit f577b30

Please sign in to comment.