This post was originally published on the New Bamboo blog, before New Bamboo joined thoughtbot in London.
I took the opportunity of our last New Bamboo Hack Day to write a Server Sent Events server in Go. The idea was to allow third parties to subscribe to a pre-existing, internal events stream for an e-commerce platform. I also needed to authenticate this service via access tokens so only authorised users or programs can access it.
I based my implementation largely on this example, and my own finalised production code is here. In this article I’ll take you through some of the code to illustrate how elegantly can Go model network concurrency. The patterns described below can be adapted to other problems where a server needs to keep open network connections and broadcast messages to all or some of them. This includes Websockets, chat servers, message brokers and others.
The server
The first step is to define a struct
to represent the server instance and hold
basic state.
// A Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier chan []byte
// New client connections
newClients chan chan []byte
// Closed client connections
closingClients chan chan []byte
// Client connections registry
clients map[chan []byte]bool
}
Notifier
is a channel of byte arrays - an individual byte array representing a
single event -. An external goroutine will push events to this channel as they
come.
// This happens in a goroutine somewhere else.
broker.Notifier <- eventBytes
What and how events are pushed to the broker depends on your use case. My server receives JSON-encoded events from the outside via UDP and broadcasts them to any listeners.
newClients
is a channel that takes and registers newly open HTTP connections.
This is where things start getting interesting: newClients is a channel of
channels! The reason will become clear in a second.
closingClients
is notified when a client connection closes. Again, connections
are themselves represented as channels.
clients
is a map that holds the currently open connections, so incoming
events can be broadcast to them.
Handling connections
Our Broker will implement Go’s http.Handler interface to handle HTTP connections. This will allow it to interop with other HTTP handlers such as routers or middleware (more on that later).
func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
// ...
}
The rw
response writer itself is only required to comply with the basic
http.ResponseWriter interface, but for a streaming server we also need to make
sure that it supports the http.Flusher interface (so we can flush buffered
data down the connection as it comes). Let’s check for support or bail out.
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
This is a good illustration of how Go interfaces allow you to compose behaviour into objects, and the Go standard library itself makes full use of this feature.
Now we’ll set basic headers to support keep-alive HTTP connections (so clients don’t close them early) and the “text/event-stream” content type for browsers that support Server Sent Events via the EventSource API. We’ll also add a Cross-origin Resource Sharing header so browsers on different domains can still connect.
// Set the headers related to event streaming.
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
Next, each new connection creates a channel of events that it will register with
the Broker. It does this by passing its events channel to the broker’s
newClients
channel.
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)
// Signal the broker that we have a new connection
broker.newClients <- messageChan
Before we carry on, let’s make sure we notify the broker if our connection dies for any reason. This is done by deferring a function until the current connection handler exits.
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
broker.closingClients <- messageChan
}()
Next, we need to also notify when the connection is closed by the client (ie. the client disconnects). We do this by leveraging our ResponseWriter http.CloseNotifier interface, which gives us just that functionality.
// Listen to connection close and un-register messageChan
notify := rw.(http.CloseNotifier).CloseNotify()
go func() {
<-notify
broker.closingClients <- messageChan
}()
The CloseNotifier#CloseNotify()
method returns a channel that will emit a
boolean when the connection closes. We launch an anonymous function as a
goroutine to listen for that case without blocking the outer HTTP handler.
Finally, we wait for this connection’s messageChan
to give us events in a loop
and write the event data to the response writer.
// block waiting for messages broadcast on this connection's messageChan
for {
// Write to the ResponseWriter
// Server Sent Events compatible
fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)
// Flush the data immediatly instead of buffering it for later.
flusher.Flush()
}
This loop will block on messageChan
waiting for the next event. Because the
loop never ends this will keep our connection handler open and listening on the
socket until it’s closed by the client.
Running
So our broker spawns a new #ServeHTTP()
connection handler for each new
connection, and each handler will notify the broker’s channels when they are
created and closed. Handlers will also listen on their own messageChan
channels for incoming event data. But we still have to glue these channels
together and make them actually work. For that we setup a factory for a broker
instance and set it running and orchestrating all the different channels.
// Broker factory
func NewServer() (broker *Broker) {
// Instantiate a broker
broker = &Broker{
Notifier: make(chan []byte, 1),
newClients: make(chan chan []byte),
closingClients: make(chan chan []byte),
clients: make(map[chan []byte]bool),
}
// Set it running - listening and broadcasting events
go broker.listen()
return
}
The key here is the Broker#listen()
method. We launch it in a goroutine so it
doesn’t block the main program. Its job is to select on the different channels
and take the appropriate course of action.
// Listen on different channels and act accordingly
func (broker *Broker) listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = true
log.Printf("Client added. %d registered clients", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
log.Printf("Removed client. %d registered clients", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan, _ := range broker.clients {
clientMessageChan <- event
}
}
}
}
Go channels, used in this way, allow your programs to deal safely with
concurrency while avoiding threads and locks. Lastly, we
instantiate a broker and run it as an HTTP server in our main function. Because
it implements the http.Handler
interface, it can be passed to
http.ListenAndServe() to start listening on a network address.
func main() {
broker := NewServer()
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))
}
Receiving events
The broker’s Notifier
channel will broadcast events to all open connections,
but where does the notifier get events from? That will depend on your use case -
it could be events from an external source, a message queue, other connections,
etc. For the sake of illustration I’ll start a go routine that pushes random
events to the notifier at regular intervals.
func main() {
broker := NewServer()
go func() {
for {
time.Sleep(time.Second * 2)
eventString := fmt.Sprintf("the time is %v", time.Now())
log.Println("Receiving event")
broker.Notifier <- []byte(eventString)
}
}()
log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))
}
Run it:
go run sse.go
Listen to the HTTP stream in your terminal:
curl localhost:3000
Try connecting from more than one terminal at once.
Finally, you can try in an EventSource-compliant browser:
var client = new EventSource("http://localhost:3000")
client.onmessage = function (msg) {
console.log(msg)
}
View the full working Gist to see it all together.
In future articles I’ll show how you can use Go’s http.Handler
interface to
add authentication to a HTTP streaming server.