Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timer Services #109

Merged
merged 13 commits into from
May 26, 2023
3 changes: 3 additions & 0 deletions agonyforge-mud-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ configurations.maybeCreate("pitest")
dependencies {
pitest 'org.pitest:pitest-junit5-plugin:1.2.0'

implementation 'javax.annotation:javax.annotation-api:1.3.2'

implementation 'com.hazelcast:hazelcast:5.3.0'
implementation 'io.projectreactor.netty:reactor-netty:1.1.7'

Expand All @@ -30,6 +32,7 @@ dependencies {
implementation 'org.springframework.security:spring-security-oauth2-client'
implementation 'org.springframework.security:spring-security-oauth2-jose'
implementation 'org.springframework.session:spring-session-hazelcast'
implementation 'org.springframework.integration:spring-integration-stomp'

testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.security:spring-security-test'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.agonyforge.mud.core.config;

import com.agonyforge.mud.core.config.MqBrokerProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.support.converter.PassThruMessageConverter;
import org.springframework.messaging.simp.stomp.ReactorNettyTcpStompClient;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@Configuration
@EnableIntegration
public class StompClientConfiguration {
private final MqBrokerProperties brokerProperties;

@Autowired
public StompClientConfiguration(MqBrokerProperties brokerProperties) {
this.brokerProperties = brokerProperties;
}

@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient(brokerProperties.getHost(), brokerProperties.getPort());
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);

return stompClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.agonyforge.mud.core.service.timer;

import org.springframework.context.ApplicationEvent;

import java.util.concurrent.TimeUnit;

public class TimerEvent extends ApplicationEvent {
private TimeUnit frequency;

public TimerEvent(Object source, TimeUnit frequency) {
super(source);

this.frequency = frequency;
}

public TimeUnit getFrequency() {
return frequency;
}

@Override
public String toString() {
return "TimerEvent{" +
"frequency=" + frequency +
", source=" + source +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.agonyforge.mud.core.service.timer;

public class TimerMessage {
private Long timestamp = 0L;

public TimerMessage() {}

public TimerMessage(Long timestamp) {
this.timestamp = timestamp;
}

public Long getTimestamp() {
return timestamp;
}

@Override
public String toString() {
return "TimerMessage{" +
"timestamp=" + timestamp +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package com.agonyforge.mud.core.service.timer;

import com.agonyforge.mud.core.config.MqBrokerProperties;
import com.hazelcast.cluster.Member;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.broker.BrokerAvailabilityEvent;
import org.springframework.messaging.simp.stomp.*;
import org.springframework.scheduling.annotation.Scheduled;
import com.hazelcast.core.HazelcastInstance;
import org.springframework.stereotype.Controller;

import javax.annotation.PreDestroy;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* In a system with potentially multiple servers we need a way to coordinate things like fights across the whole
* group so that they get resolved the same way no matter which server you happen to be connected to. This class
* registers a STOMP client to several queues and listens for messages there. If this server is also the "leader"
* (see comment in the code below because that probably doesn't mean what you think it means) it will generate
* messages on the queue on fixed time intervals. One arbitrarily chosen server will get each message and publish
* a corresponding application event.
* <p>
* The game code can register an event listener that would process one round of fights, without worrying about
* the underlying complexity of server clustering. That spreads the load of processing across the whole cluster
* but also avoids multiple servers processing the same fights at the same time. This pattern should be used for
* anything that needs to be processed periodically in the game by only one server at a time.
*/
@Controller
public class TimerService implements StompSessionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TimerService.class);
static final String DESTINATION_SECOND = "/queue/per_second";
static final String DESTINATION_MINUTE = "/queue/per_minute";
static final String DESTINATION_HOUR = "/queue/per_hour";
static final String DESTINATION_DAY = "/queue/per_day";
static final String STOMP_SESSION_NAME = "core-timers";


private final ApplicationEventPublisher applicationEventPublisher;
private final HazelcastInstance hazelcastInstance;
private final SimpMessagingTemplate simpMessagingTemplate;
private final ReactorNettyTcpStompClient stompClient;
private final MqBrokerProperties brokerProperties;

private final Map<String, StompSession.Subscription> subscriptions = new HashMap<>();

private boolean isBrokerAvailable = false;
private StompSession stompSession = null;

@Autowired
public TimerService(ApplicationEventPublisher applicationEventPublisher,
HazelcastInstance hazelcastInstance,
SimpMessagingTemplate simpMessagingTemplate,
ReactorNettyTcpStompClient stompClient,
MqBrokerProperties brokerProperties) {
this.applicationEventPublisher = applicationEventPublisher;
this.hazelcastInstance = hazelcastInstance;
this.simpMessagingTemplate = simpMessagingTemplate;
this.stompClient = stompClient;
this.brokerProperties = brokerProperties;
}

@EventListener
public void onApplicationEvent(BrokerAvailabilityEvent event) {
setBrokerAvailability(event.isBrokerAvailable());

stompClient.setMessageConverter(new MappingJackson2MessageConverter());

StompHeaders stompHeaders = new StompHeaders();

stompHeaders.setAcceptVersion("1.1", "1.2");
stompHeaders.setLogin(brokerProperties.getClientUsername());
stompHeaders.setPasscode(brokerProperties.getClientPassword());
stompHeaders.setHeartbeat(new long[] {10000L, 10000L});
stompHeaders.setSession(STOMP_SESSION_NAME);

try {
stompSession = stompClient.connectAsync(stompHeaders, this).get();
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("Exception trying to connect: {}", e.getMessage(), e);
}
}

@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
LOGGER.info("Client connection to STOMP server established");

subscriptions.put(DESTINATION_SECOND, session.subscribe(DESTINATION_SECOND, this));
subscriptions.put(DESTINATION_MINUTE, session.subscribe(DESTINATION_MINUTE, this));
subscriptions.put(DESTINATION_HOUR, session.subscribe(DESTINATION_HOUR, this));
subscriptions.put(DESTINATION_DAY, session.subscribe(DESTINATION_DAY, this));
}

@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
LOGGER.error("Exception while handling frame: session={} command={} headers={} payload={}",
session.getSessionId(),
command,
headers,
payload,
exception);
}

@Override
public void handleTransportError(StompSession session, Throwable exception) {
LOGGER.error("Transport error: session={}", session.getSessionId(), exception);
}

@Override
public Type getPayloadType(StompHeaders headers) {
return TimerMessage.class;
}

@Override
public void handleFrame(StompHeaders headers, Object payload) {
LOGGER.trace("Received timer message: headers={} payload={}", headers, payload);

if (null == headers.getDestination()) {
LOGGER.error("Message destination is null!");
return;
}

TimerEvent event = null;

switch(headers.getDestination()) {
case DESTINATION_SECOND -> event = new TimerEvent(this, TimeUnit.SECONDS);
case DESTINATION_MINUTE -> event = new TimerEvent(this, TimeUnit.MINUTES);
case DESTINATION_HOUR -> event = new TimerEvent(this, TimeUnit.HOURS);
case DESTINATION_DAY -> event = new TimerEvent(this, TimeUnit.DAYS);
default -> LOGGER.error("Unknown message destination!");
}

if (event != null) {
applicationEventPublisher.publishEvent(event);
}
}

public StompSession.Subscription getSubscription(String destination) {
return subscriptions.get(destination);
}

@PreDestroy
void onShutdown() {
for (String key : subscriptions.keySet()) {
subscriptions.get(key).unsubscribe();
}

if (stompSession != null) {
stompSession.disconnect();
}
}

void setBrokerAvailability(boolean isBrokerAvailable) {
this.isBrokerAvailable = isBrokerAvailable;
}

@Scheduled(fixedRate = 1L, timeUnit = TimeUnit.SECONDS)
public void doPerSecond() {
doTimer(DESTINATION_SECOND);
}

@Scheduled(fixedRate = 1L, timeUnit = TimeUnit.MINUTES)
public void doPerMinute() {
doTimer(DESTINATION_MINUTE);
}

@Scheduled(fixedRate = 1L, timeUnit = TimeUnit.HOURS)
public void doPerHour() {
doTimer(DESTINATION_HOUR);
}

@Scheduled(fixedRate = 1L, timeUnit = TimeUnit.DAYS)
public void doPerDay() {
doTimer(DESTINATION_DAY);
}

private void doTimer(final String destination) {
// "leader" is a little bit of a misnomer here
// it's not actually the elected leader of the Hazelcast cluster
// apparently you can't even get that info via public API?!
//
// it's just the UUID that is alphabetically first in the list
// who cares? it doesn't matter!
// all we actually need is for everybody to be able to independently pick the same one

UUID me = hazelcastInstance.getCluster().getLocalMember().getUuid();
Optional<UUID> leaderOptional = hazelcastInstance.getCluster().getMembers()
.stream()
.min(Comparator.comparing(Member::getUuid))
.map(Member::getUuid);

if (isBrokerAvailable && leaderOptional.isPresent() && leaderOptional.get().equals(me)) {
LOGGER.debug("leader={} me={}", leaderOptional.orElse(null), me);

MessageHeaders messageHeaders = SimpMessageHeaderAccessor.create().getMessageHeaders();
TimerMessage timerMessage = new TimerMessage(System.currentTimeMillis());

simpMessagingTemplate.convertAndSend(destination, timerMessage, messageHeaders);
}
}
}
4 changes: 4 additions & 0 deletions agonyforge-mud-core/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
exports com.agonyforge.mud.core.cli.menu;
exports com.agonyforge.mud.core.web.model;
exports com.agonyforge.mud.core.service;
exports com.agonyforge.mud.core.service.timer;
exports com.agonyforge.mud.core.web.controller;
exports com.agonyforge.mud.core.config;
requires spring.beans;
Expand All @@ -13,6 +14,8 @@
requires org.apache.tomcat.embed.core;
requires spring.websocket;
requires spring.session.core;
requires spring.integration.core;
requires spring.integration.stomp;
requires spring.messaging;
requires reactor.netty.core;
requires org.slf4j;
Expand All @@ -24,4 +27,5 @@
requires spring.security.oauth2.client;
requires spring.core;
requires spring.webmvc;
requires java.annotation;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.agonyforge.mud.core.service.timer;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(MockitoExtension.class)
public class TimerEventTest {
@Test
void testSourceAndFrequency() {
TimerEvent uut = new TimerEvent(this, TimeUnit.MINUTES);

assertEquals(this, uut.getSource());
assertEquals(TimeUnit.MINUTES, uut.getFrequency());
}

@Test
void testToString() {
TimerEvent uut = new TimerEvent(this, TimeUnit.MINUTES);

assertTrue(uut.toString().length() > 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.agonyforge.mud.core.service.timer;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@ExtendWith(MockitoExtension.class)
public class TimerMessageTest {
@Test
void testGetTimestampDefault() {
TimerMessage uut = new TimerMessage();

assertEquals(0L, uut.getTimestamp());
}

@Test
void testGetTimestamp() {
Long systemTime = System.currentTimeMillis();
TimerMessage uut = new TimerMessage(systemTime);

assertEquals(systemTime, uut.getTimestamp());
}

@Test
void testToString() {
TimerMessage uut = new TimerMessage();

assertTrue(uut.toString().length() > 1);
}
}
Loading