-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Addresses multiple intertwining issues. #166
Changes from 16 commits
8578bea
611e144
56006e1
1cb99c9
5e01f47
48ea8cf
1ddce21
a2d8163
e28614a
37187e8
4ba0fe2
62e34ea
513d66e
cc7076e
2d90faa
c8a2cbb
a4f5a71
de71711
af8714d
9107b15
159b44f
ea62d00
f4f76ab
b0114e9
0d4f0bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,13 +9,15 @@ | |
import com.github.princesslana.eriscasper.rest.RouteCatalog; | ||
import com.github.princesslana.eriscasper.rest.Routes; | ||
import com.github.princesslana.eriscasper.util.OkHttp; | ||
import com.github.princesslana.eriscasper.util.Shard; | ||
import com.ufoscout.properlty.Properlty; | ||
import com.ufoscout.properlty.reader.EnvironmentVariablesReader; | ||
import com.ufoscout.properlty.reader.SystemPropertiesReader; | ||
import com.ufoscout.properlty.reader.decorator.ToLowerCaseAndDotKeyReader; | ||
import io.reactivex.Observable; | ||
import io.reactivex.Single; | ||
import io.reactivex.schedulers.Schedulers; | ||
import java.util.Optional; | ||
import okhttp3.OkHttpClient; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -38,21 +40,22 @@ public class ErisCasper { | |
private final Payloads payloads = new Payloads(jackson); | ||
|
||
private final Routes routes; | ||
private final Optional<Shard> shard; | ||
|
||
private ErisCasper(BotToken token) { | ||
private ErisCasper(BotToken token, Optional<Shard> shard) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we avoid this optional by having using a Shard of [0,1] as a default? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think having no sharding is a bit safer, I dont think telling discord we are using shard [0,1] is useful and Optional makes sense and looks like what it does. |
||
this.token = token; | ||
|
||
this.shard = shard; | ||
routes = new Routes(token, httpClient, jackson); | ||
} | ||
|
||
private Observable<Event> getEvents() { | ||
Gateway gateway = Gateway.create(httpClient, payloads); | ||
|
||
routes.useGateway(gateway); | ||
return Single.just(RouteCatalog.getGateway()) | ||
.observeOn(Schedulers.io()) | ||
.flatMap(routes::execute) | ||
.toObservable() | ||
.flatMap(gr -> gateway.connect(gr.getUrl(), token)) | ||
.flatMap(gr -> gateway.connect(gr.getUrl(), token, shard)) | ||
.observeOn(Schedulers.computation()) | ||
.share(); | ||
} | ||
|
@@ -75,6 +78,11 @@ public static ErisCasper create() { | |
} | ||
|
||
public static ErisCasper create(String token) { | ||
return new ErisCasper(BotToken.of(token)); | ||
return new ErisCasper(BotToken.of(token), Shard.fromConfig(CONFIG)); | ||
} | ||
|
||
public static ErisCasper create(String token, int shardNumber, int shardTotal) { | ||
Shard shard = new Shard(shardNumber, shardTotal); | ||
return new ErisCasper(BotToken.of(token), Optional.of(shard)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,15 @@ | ||
package com.github.princesslana.eriscasper.gateway; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import com.github.princesslana.eriscasper.BotToken; | ||
import com.github.princesslana.eriscasper.data.event.Event; | ||
import com.github.princesslana.eriscasper.data.event.HelloEventData; | ||
import com.github.princesslana.eriscasper.data.event.ReadyEvent; | ||
import com.github.princesslana.eriscasper.gateway.commands.ImmutableResume; | ||
import com.github.princesslana.eriscasper.rx.Singles; | ||
import com.github.princesslana.eriscasper.rx.websocket.RxWebSocket; | ||
import com.github.princesslana.eriscasper.rx.websocket.RxWebSocketEvent; | ||
import com.github.princesslana.eriscasper.util.Shard; | ||
import com.google.common.base.Preconditions; | ||
import io.github.resilience4j.ratelimiter.RateLimiter; | ||
import io.github.resilience4j.ratelimiter.RateLimiterConfig; | ||
|
@@ -85,14 +88,31 @@ private void sequenceNumberSeen(Optional<SequenceNumber> seq) { | |
} | ||
|
||
@SuppressWarnings("unchecked") | ||
public Observable<Event> connect(String url, BotToken token) { | ||
public Observable<Event> connect(String url, BotToken token, Optional<Shard> shard) { | ||
Observable<Payload> ps = | ||
ws.connect(String.format("%s?v=%s&encoding=%s", url, VERSION, ENCODING)) | ||
.ofType(RxWebSocketEvent.StringMessage.class) | ||
.map(RxWebSocketEvent.StringMessage::getText) | ||
.flatMapMaybe( | ||
Singles.toMaybeAnd( | ||
payloads::read, (s, t) -> LOG.warn("Error reading payload: {}", s, t))) | ||
.takeUntil( | ||
payload -> { | ||
if (payload.op().getCode() == 9) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we not have an enum or constants around instead of having to use 9 here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll change it |
||
if (payload.d().map(JsonNode::asBoolean).orElse(false)) { | ||
resume(ws, token); | ||
} else { | ||
ws.closeDueToInvalidSession() | ||
.doOnComplete( | ||
() -> LOG.warn("Socket disconnected due to an invalid session.")) | ||
// blocking since we shouldn't be doing anything if this happens | ||
// (this is an invalid session which can't be resumed. | ||
.blockingAwait(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't need to use something blocking during an observable stream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's blocking because we don't want anything more to even try to come through, if we do it will throw broken pipes and have issues with the websocket. If we block it then we can avoid anything even attempting to come through when it's a non-resumable invalid session. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Putting the blocking here isn't stopping other websocket events from coming through or happening. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, how would we push through that command into the stream? Should we just run it outside of a completable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not really sure, this isn't something I have thought about. I'm not 100% clear on what situations lead to this happening and what action we're taking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not exactly sure what can throw a non-resumable invalid sessions in all honesty. I tried for around 3 hours to get github to throw me an invalid session without success. Invalid tokens throw an initial unauthorized message not an invalid session message so that's a bit odd. Basically right now within this codebase we're just shutting down the websocket. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so I'm not keen to merge something where we're not sure what we're meant to be doing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, maybe we should remove it then, or just throw a fatal exception when I receive an invalid session which can't be resumed. Most invalid sessions are able to be resumed and I haven't seen one where I couldn't. So maybe that's the right move? Not 100% sure. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd say remove anything related to #64 from this PR and address it at another time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me. I think it's a more complicated issue due to being able to test and understand the circumstances that lead to the outcomes we're trying to prevent. |
||
return true; | ||
} | ||
} | ||
return false; | ||
}) | ||
.doOnNext(p -> sequenceNumberSeen(p.s())) | ||
.share(); | ||
|
||
|
@@ -101,7 +121,8 @@ public Observable<Event> connect(String url, BotToken token) { | |
|
||
Completable identify = | ||
ps.filter(Payload.isOp(OpCode.HELLO)) | ||
.flatMapCompletable(p -> isResumable() ? resume(ws, token) : identify(ws, token)); | ||
.flatMapCompletable( | ||
p -> isResumable() ? resume(ws, token) : identify(ws, token, shard)); | ||
|
||
Observable<Event> events = ps.flatMapMaybe(payloads::toEvent).share(); | ||
|
||
|
@@ -127,10 +148,9 @@ private Completable send(RxWebSocket ws, Payload payload) { | |
.doOnComplete(() -> LOG.debug("Sent: {}.", payload)); | ||
} | ||
|
||
private Completable identify(RxWebSocket ws, BotToken token) { | ||
return Single.just(payloads.identify(token)) | ||
.lift(RateLimiterOperator.of(identifyLimit)) | ||
.flatMapCompletable(p -> send(ws, p)); | ||
private Completable identify(RxWebSocket ws, BotToken token, Optional<Shard> shard) { | ||
return send( | ||
Single.just(payloads.identify(token, shard)).lift(RateLimiterOperator.of(identifyLimit))); | ||
} | ||
|
||
private Completable heartbeat(RxWebSocket ws, Payload hello) { | ||
|
@@ -146,14 +166,22 @@ private Completable resume(RxWebSocket ws, BotToken token) { | |
Preconditions.checkState( | ||
lastSeenSequenceNumber.isPresent(), "Can not resume without a sequence number"); | ||
|
||
return Single.just( | ||
ImmutableResume.builder() | ||
.token(token) | ||
.sessionId(sessionId.get()) | ||
.seq(lastSeenSequenceNumber.get()) | ||
.build()) | ||
.map(payloads::resume) | ||
.flatMapCompletable(p -> send(ws, p)); | ||
return send( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm 50/50 about this send method. It does reduce a bit of duplication, but it also makes the method hard to read. With the flatMapCompletable it reads in sequence: build the resume payload, apply/map the payloads resume function, and then send it. The send at the start kind of has it out of sequence a bit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can rename it, i think that's the best thing to do in this situation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm thinking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not the naming that I'm concerned about it's the flow. By having the send in the map at the end, you end up with the code reading as: start with something, do this, do that, do this, then send it. With the method you end up with: send (something, do this, do that, do this ) |
||
Single.just( | ||
ImmutableResume.builder() | ||
.token(token) | ||
.sessionId(sessionId.get()) | ||
.seq(lastSeenSequenceNumber.get()) | ||
.build()) | ||
.map(payloads::resume)); | ||
} | ||
|
||
public Completable send(Single<Payload> payload) { | ||
return payload.flatMapCompletable(p -> send(ws, p)); | ||
} | ||
|
||
public void shutdownGracefully() { | ||
ws.close(1000, "Graceful shutdown."); | ||
} | ||
|
||
public static Gateway create(OkHttpClient client, Payloads payloads) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
package com.github.princesslana.eriscasper.gateway.commands; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.github.princesslana.eriscasper.gateway.Payload; | ||
|
||
public interface GatewayCommand { | ||
|
||
Payload toPayload(ObjectMapper jackson); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package com.github.princesslana.eriscasper.gateway.commands; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize; | ||
import com.github.princesslana.eriscasper.BotToken; | ||
import com.github.princesslana.eriscasper.gateway.ImmutablePayload; | ||
import com.github.princesslana.eriscasper.gateway.OpCode; | ||
import com.github.princesslana.eriscasper.gateway.Payload; | ||
import com.github.princesslana.eriscasper.util.Shard; | ||
import java.util.Optional; | ||
import org.immutables.value.Value; | ||
|
||
/** | ||
* @see <a href="https://discordapp.com/developers/docs/topics/gateway#identify"> | ||
* https://discordapp.com/developers/docs/topics/gateway#identify</a> | ||
*/ | ||
// TODO: This structure is not complete | ||
@JsonDeserialize(as = ImmutableIdentify.class) | ||
@Value.Immutable | ||
@Value.Enclosing | ||
public interface Identify extends GatewayCommand { | ||
BotToken getToken(); | ||
|
||
Optional<Shard> shard(); | ||
|
||
default ConnectionProperties getProperties() { | ||
return ConnectionProperties.ofDefault(); | ||
} | ||
|
||
@Override | ||
default Payload toPayload(ObjectMapper jackson) { | ||
return ImmutablePayload.builder().op(OpCode.IDENTIFY).d(jackson.valueToTree(this)).build(); | ||
} | ||
|
||
/** | ||
* @see <a | ||
* href="https://discordapp.com/developers/docs/topics/gateway#identify-identify-connection-properties"> | ||
* https://discordapp.com/developers/docs/topics/gateway#identify-identify-connection-properties</a> | ||
*/ | ||
@Value.Immutable | ||
interface ConnectionProperties { | ||
@JsonProperty("$os") | ||
String getOs(); | ||
|
||
@JsonProperty("$browser") | ||
String getBrowser(); | ||
|
||
@JsonProperty("$device") | ||
String getDevice(); | ||
|
||
static ConnectionProperties ofDefault() { | ||
return ImmutableIdentify.ConnectionProperties.builder() | ||
.os(System.getProperty("os.name")) | ||
.browser("ErisCasper.java") | ||
.device("ErisCasper.java") | ||
.build(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this belongs on Routes. Routes is kind of HTTP specific.
But for what reason are we exposing the gateway to the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gateway commands must be sent under the gateway, I dont know of a way to format them under a request since they have no actual statement. If we want to let users send commands to the gateway we need to expose it somehow. Safest way to do that is to give them access to a method which sends the command for them. I can remove Resume and Identify from GatewayCommand and it would make it a little safer in that regard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the user wants to do something like send a message, the fact that it's happening over HTTP is hidden from them. There is nothing in the API that refers to rest or http, they do it using the
Action
concept we have.So, gateway commands can be done in the same way.
Perhaps
Action
is an interface, with implementationsRestAction
,GatewayAction
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's kind of impractical if we want the users to know which one to use, we also need to know which one it is because each of them have special attributes. It would cause casting and checking and such if we just wanted to use Action.