Skip to content

Instantly share code, notes, and snippets.

@ismasan
Last active August 23, 2024 08:47
Show Gist options
  • Save ismasan/3fb75381cd2deb6bfa9c to your computer and use it in GitHub Desktop.
Save ismasan/3fb75381cd2deb6bfa9c to your computer and use it in GitHub Desktop.
Example SSE server in Golang
// Copyright (c) 2017 Ismael Celis
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package main
import (
"fmt"
"log"
"net/http"
"time"
)
// Example SSE server in Golang.
// $ go run sse.go
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
// Listen to connection close and un-register messageChan
// notify := rw.(http.CloseNotifier).CloseNotify()
notify := req.Context().Done()
go func() {
<-notify
broker.closingClients <- messageChan
}()
for {
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
}
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan, _ := range broker.clients {
clientMessageChan <- event
}
}
}
}
func main() {
broker := NewServer()
go func() {
for {
time.Sleep(time.Second * 2)
eventString := fmt.Sprintf("the time is %v", time.Now())
log.Println("Receiving event")
broker.Notifier <- []byte(eventString)
}
}()
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))
}
@xsaamiir
Copy link

@maestre3d Amazing references, really helpful. Thank you for sharing.

@mirzaakhena
Copy link

@romanm-perun
Copy link

@ismasan thank you for useful example.
Based on your work and fork of @schmohlio , I've created a demo for Gin framework to show how to get messages by subscription on specific topic. See at https://github.com/romanm-perun/gin-sse-example

Suggestions are welcome!

@phanthaiduong22
Copy link

phanthaiduong22 commented Aug 6, 2021

Thank you for this repo. Because it is easy to understand and learn a lot of things. After reading this I know more about golang's strength
Can I ask

defer func() {
	broker.closingClients <- messageChan
}()

When is this function executed? Because I don't know ServeHTTP finished? or It always runs?

@ismasan
Copy link
Author

ismasan commented Aug 6, 2021

Thank you @phanthaiduong22

This is Go's defer statement.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment