-
Notifications
You must be signed in to change notification settings - Fork 168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http.Flusher compatiability #141
Comments
Ended up switching to Gorilla Sessions which works. |
Could you please post your code? |
Sure, here is the code to test it: package main
import (
"fmt"
"github.com/alexedwards/scs/v2"
"github.com/gorilla/mux"
"github.com/r3labs/sse/v2"
"github.com/rs/cors"
"net/http"
"time"
)
func main() {
server := Server{
Router: mux.NewRouter(),
ServerSentEvents: sse.New(),
SessionManager: scs.New(),
}
server.Start()
}
type Server struct {
Router *mux.Router
ServerSentEvents *sse.Server
SessionManager *scs.SessionManager
}
func (server *Server) Start() {
server.Router.Handle("/outlook/loading", server.handleOutlookLoading())
corsHandler := cors.New(cors.Options{
AllowedOrigins: []string{"http://localhost:3000", "http://127.0.0.1:3000"},
AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodDelete},
AllowCredentials: true,
}).Handler(server.Router)
fmt.Println("Starting the server...")
// TODO - The handleOutlookLoading will not work with the session manager.
fmt.Println(http.ListenAndServe(":1337", server.SessionManager.LoadAndSave(corsHandler)))
// The following works:
//fmt.Println(http.ListenAndServe(":1337", corsHandler))
}
func (server *Server) handleOutlookLoading() http.HandlerFunc {
return func(responseWriter http.ResponseWriter, request *http.Request) {
server.ServerSentEvents.CreateStream("test")
go func() {
for {
server.ServerSentEvents.Publish("test", &sse.Event{Data: []byte("hello")})
time.Sleep(time.Second)
}
}()
server.ServerSentEvents.ServeHTTP(responseWriter, request)
}
} On the front-end I am using React and have this: const source = new EventSource("http://localhost:1337/outlook/loading?stream=" + "test")
source.onmessage = e => alert(e.data) |
Mm... I've tried to keep it simpler, removing Gorilla Mux and the CORS handler: package main
import (
"fmt"
"github.com/alexedwards/scs/v2"
"github.com/r3labs/sse/v2"
"net/http"
"time"
)
func main() {
server := Server{
Router: http.NewServeMux(),
ServerSentEvents: sse.New(),
SessionManager: scs.New(),
}
server.Start()
}
type Server struct {
Router *http.ServeMux
ServerSentEvents *sse.Server
SessionManager *scs.SessionManager
}
func (server *Server) Start() {
server.Router.Handle("/outlook/loading", server.handleOutlookLoading())
fmt.Println("Starting the server...")
// TODO - The handleOutlookLoading will not work with the session manager.
fmt.Println(http.ListenAndServe(":1337", server.SessionManager.LoadAndSave(server.Router)))
// The following works:
//fmt.Println(http.ListenAndServe(":1337", server.Router))
}
func (server *Server) handleOutlookLoading() http.HandlerFunc {
return func(responseWriter http.ResponseWriter, request *http.Request) {
server.ServerSentEvents.CreateStream("test")
go func() {
for {
server.ServerSentEvents.Publish("test", &sse.Event{Data: []byte("hello")})
time.Sleep(time.Second)
}
}()
server.ServerSentEvents.ServeHTTP(responseWriter, request)
}
} But nothing really changed, so I've tried a basic implementation of the Flush interface in func (bw *bufferedResponseWriter) Flush() {
if flusher, ok := bw.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
} The server seems responding now, but I can't see data coming. |
I couldn't see any data either, thanks for trying! |
I think the problem is that it's buffering the response. This means there is nothing to flush |
Because it waits for the underlying response to finish to decide if the session is modified, this can never work with never-ending responses like SSE. Never-ending responses like SSE usually also do not modify the session in the first place (How would they?) So I think the solution is to just I actually think this is function as designed. |
I've just pushed commit 62e546c which changes the way that LoadAndSave() works so that the response is no longer buffered. Combined with the new Go 1.20 http.ResponseController, this means that you should now be able to use the LoadAndSave() middleware in conjuction with http.Flusher and everything should Just Work 🙂 I'll leave this issue open until the commit has made it in to a new major release. |
Amazing! Thanks so much. |
Seems that even after 62e546c, |
Is there any update on this? It seems that it is still not compatible with fmt.Printf("Type of ResponseWriter: %T\n", w)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
} this prints out
and returns an error as r.Use(app.sessionManager.LoadAndSave) |
@theadell My workaround was to add this middleware: type middleware struct {
store *scs.SessionManager
}
func Middleware(store *scs.SessionManager) func(next http.Handler) http.Handler {
middleware := &middleware{
store: store,
}
return middleware.Handler
}
// flusherResponseWriter wraps http.ResponseController and http.ResponseWriter so that we pass an
// http.Flusher compatible ResponseWriter to the next handler. This is done because scs's middleware
// wraps ResponseWriter and we need to re-expose http.Flusher for the sse library.
type flusherResponseWriter struct {
*http.ResponseController
http.ResponseWriter
}
func (w flusherResponseWriter) Flush() {
err := w.ResponseController.Flush()
if err != nil {
panic(err)
}
}
func (w flusherResponseWriter) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}
var _ http.Flusher = flusherResponseWriter{}
func (m *middleware) Handler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
rc := http.NewResponseController(w)
flusher := &flusherResponseWriter{
ResponseController: rc,
ResponseWriter: w,
}
next.ServeHTTP(flusher, r.WithContext(ctx))
})
} |
@ctian1 Thanks for your solution but unless I misunderstood your approach it is not working for me, as you suggested I created a middleware that passes an http.Flusher compatible ResponseWriter to the next handler type flusherResponseWriter struct {
*http.ResponseController
http.ResponseWriter
}
func (w *flusherResponseWriter) Flush() {
if err := w.ResponseController.Flush(); err != nil {
panic(err)
}
}
func (w *flusherResponseWriter) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}
var _ http.Flusher = &flusherResponseWriter{}
func WrapWithFlusher(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rc := http.NewResponseController(w)
flusher := &flusherResponseWriter{
ResponseController: rc,
ResponseWriter: w,
}
next.ServeHTTP(flusher, r)
})
} Then I used this middleware directly after SCS r := chi.NewRouter()
r.Use(middleware.RealIP)
r.Use(middleware.Recoverer)
r.Use(app.sessionManager.LoadAndSave)
r.Use(WrapWithFlusher) Now the type of the ResponseWriter is
But this panics with
Here is a simplified version of my SSE handler func handleStatusUpdates(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
fmt.Printf("Type of ResponseWriter: %T\n", w)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
for {
// logic ....
// ...
flusher.Flush() // panics
time.Sleep(StatusUpdateInterval)
}
} |
@ctian1 @theadell Flushing responses now seems to work great with SCS when you use the Go 1.20 package main
import (
"fmt"
"io"
"log"
"net/http"
"time"
"github.com/alexedwards/scs/v2"
)
var sessionManager = scs.New()
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/flush", flushHandler)
mux.HandleFunc("/get", getHandler)
http.ListenAndServe(":4000", sessionManager.LoadAndSave(mux))
}
func flushHandler(w http.ResponseWriter, r *http.Request) {
sessionManager.Put(r.Context(), "message", "Hello from a flushing handler!")
rc := http.NewResponseController(w)
for i := 0; i < 5; i++ {
fmt.Fprintf(w, "Write %d\n", i)
err := rc.Flush()
if err != nil {
log.Println(err)
return
}
time.Sleep(time.Second)
}
}
func getHandler(w http.ResponseWriter, r *http.Request) {
msg := sessionManager.GetString(r.Context(), "message")
io.WriteString(w, msg)
} Now that At this point, I think that the right move here is for the I think it was right to open this issue against |
Thanks for your follow-up. I've followed your suggestion and tried using the http.ResponseController, but unfortunately, it doesn't seem to work in my case. I'm still encountering the "feature not supported" error during the flush. I tried the exact example you provided and the error still persists.
SCS version require github.com/alexedwards/scs/v2 v2.5.1 For the sake of clarity, here is a complete Go Program that demonstrates the problem package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/alexedwards/scs/v2"
)
var sessionManager *scs.SessionManager
func main() {
sessionManager = scs.New()
sessionManager.Lifetime = 24 * time.Hour
mux := http.NewServeMux()
mux.HandleFunc("/sse", sseHandler)
mux.HandleFunc("/", indexHandler)
log.Println("Server started on localhost:4000")
http.ListenAndServe("localhost:4000", sessionManager.LoadAndSave(mux))
}
func sseHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Create a new ResponseController for flushing
rc := http.NewResponseController(w)
for {
currentTime := time.Now().Format(time.RFC1123)
data := fmt.Sprintf("data: %s\n\n", currentTime)
_, writeErr := w.Write([]byte(data))
if writeErr != nil {
log.Printf("Error writing to client: %v", writeErr)
return
}
// Use the ResponseController to flush the data
flushErr := rc.Flush()
if flushErr != nil {
log.Printf("Error flushing data to client: %v", flushErr)
return
}
time.Sleep(1 * time.Second)
}
}
func indexHandler(w http.ResponseWriter, r *http.Request) {
html := `
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>SSE with Go</title>
</head>
<body>
<h1>Server Sent Events with Go</h1>
<div id="sse-data"></div>
<script>
const eventSource = new EventSource("/sse");
eventSource.onmessage = function(event) {
document.getElementById("sse-data").innerHTML = event.data;
};
</script>
</body>
</html>
`
w.Write([]byte(html))
} This results in the error
|
@theadell Does it work if you use the tip version of SCS?
|
You're right! Switching to the tip version sorted everything out. I mistakenly assumed 62e546c was part of v2.5.1. Thanks for pointing me in the right direction and for your patience. Much appreciated! |
I've released a new major version v2.6.0 which includes 62e546c now 👍 I believe that this issue can now be closed, but if there continue to be any problems please comment and I'll reopen it. |
Hi, I don't mean to ruffle any feathers, but while this works for the manual implementation of SSE handling implemented in the example code in this post, it still doesn't work with the proper SSE library (https://github.com/r3labs/sse). For one, while the r3labs library obviously expects I think the easiest solution might be to make |
@verygoodsoftwarenotvirus I think my comment in #141 (comment) applies here. I think the onus is really on the Is it possible for you to not use the LoadAndSave middleware on your flushing handlers, and load/write the session data yourself instead? See https://gist.github.com/alexedwards/0570e5a59677e278e13acb8ea53a3b30 |
Hello,
I want to implement Server Sent Events and am using this library: https://github.com/r3labs/sse
The library expects the
http.ResponseWriter
to be able to cast to ahttp.Flusher
for flushing.The
http.Flusher
documentation states:Your library is wrapping
http.ResponseWriter
and is breaking the library.Would it be possible for you to make the bufferedResponseWriter implement the
http.Flusher
?References: r3labs/sse#130
The text was updated successfully, but these errors were encountered: