---
title: Writing a Server Sent Events server in Go
teaser: 'Learn how Go can be used to keep open network connections and broadcast messages
  over the web.

  '
tags: go,new bamboo,web
author: Ismael Celis
published_on: 2014-05-13
---

_This post was originally published on the New Bamboo blog, before [New Bamboo
joined thoughtbot in London][new-bamboo-thoughtbot]._

---

![the stream](https://images.thoughtbot.com/new-bamboo/blog/writing-a-server-sent-events-server-in-go/JLvnCyHuTWenDIDVz3NX_Screenshot-2014-05-13-14.24.26.png)

I took the opportunity of our last New Bamboo Hack Day to write a [Server Sent
Events] server in Go. The idea was to allow third parties to subscribe to [a
pre-existing, internal events stream][micro-network] for an e-commerce platform.
I also needed to authenticate this service via access tokens so only authorised
users or programs can access it.

I based my implementation largely on [this example][html5-go-example], and my
own finalised production code is [here][final-code]. In this article I'll take
you through some of the code to illustrate how elegantly can Go model network
concurrency. The patterns described below can be adapted to other problems where
a server needs to keep open network connections and broadcast messages to all or
some of them. This includes Websockets, chat servers, message brokers and
others.

## The server

The first step is to define a `struct` to represent the server instance and hold
basic state.

```go
// A Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections
type Broker struct {

	// Events are pushed to this channel by the main events-gathering routine
	Notifier chan []byte

	// New client connections
	newClients chan chan []byte

	// Closed client connections
	closingClients chan chan []byte

	// Client connections registry
	clients map[chan []byte]bool
}
```

`Notifier` is a channel of byte arrays - an individual byte array representing a
single event -. An external goroutine will push events to this channel as they
come.

```go
// This happens in a goroutine somewhere else.
broker.Notifier <- eventBytes
```

What and how events are pushed to the broker depends on your use case. My server
receives JSON-encoded events from the outside via UDP and broadcasts them to any
listeners.

`newClients` is a channel that takes and registers newly open HTTP connections.
This is where things start getting interesting: newClients is *a channel of
channels*! The reason will become clear in a second.

`closingClients` is notified when a client connection closes. Again, connections
are themselves represented as channels.

`clients` is a *map* that holds the currently open connections, so incoming
events can be broadcast to them.

## Handling connections

Our Broker will implement Go's [http.Handler] interface to handle HTTP
connections. This will allow it to interop with other HTTP handlers such as
routers or middleware (more on that later).

```go
func (broker *Broker) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
  // ...
}
```

The `rw` response writer itself is only required to comply with the basic
http.ResponseWriter interface, but for a streaming server we also need to make
sure that it supports the [http.Flusher] interface (so we can flush buffered
data down the connection as it comes). Let's check for support or bail out.

```go
// Make sure that the writer supports flushing.
//
flusher, ok := rw.(http.Flusher)

if !ok {
	http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
	return
}
```

This is a good illustration of how Go interfaces allow you to *compose*
behaviour into objects, and the Go standard library itself makes full use of
this feature.

Now we'll set basic headers to support keep-alive HTTP connections (so clients
don't close them early) and the "text/event-stream" content type for browsers
that support Server Sent Events via the [EventSource] API. We'll also add a
Cross-origin Resource Sharing header so browsers on different domains can still
connect.

```go
// Set the headers related to event streaming.
rw.Header().Set("Content-Type", "text/event-stream")
rw.Header().Set("Cache-Control", "no-cache")
rw.Header().Set("Connection", "keep-alive")
rw.Header().Set("Access-Control-Allow-Origin", "*")
```

Next, each new connection creates a channel of events that it will register with
the Broker. It does this by passing its events channel to the broker's
`newClients` channel.

```go
// Each connection registers its own message channel with the Broker's connections registry
messageChan := make(chan []byte)

// Signal the broker that we have a new connection
broker.newClients <- messageChan
```

Before we carry on, let's make sure we notify the broker if our connection dies
for any reason. This is done by *deferring* a function until the current
connection handler exits.

```go
// Remove this client from the map of connected clients
// when this handler exits.
defer func() {
	broker.closingClients <- messageChan
}()
```

Next, we need to also notify when the connection is closed by the client (ie.
the client disconnects). We do this by leveraging our ResponseWriter
[http.CloseNotifier] interface, which gives us just that functionality.

```go
// Listen to connection close and un-register messageChan
notify := rw.(http.CloseNotifier).CloseNotify()

go func() {
	<-notify
	broker.closingClients <- messageChan
}()
```

The `CloseNotifier#CloseNotify()` method returns a channel that will *emit* a
boolean when the connection closes. We launch an anonymous function as a
goroutine to listen for that case without blocking the outer HTTP handler.

Finally, we wait for this connection's `messageChan` to give us events in a loop
and write the event data to the response writer.

```go
// block waiting for messages broadcast on this connection's messageChan
for {
  // Write to the ResponseWriter
  // Server Sent Events compatible
  fmt.Fprintf(rw, "data: %s\n\n", <-messageChan)

  // Flush the data immediatly instead of buffering it for later.
  flusher.Flush()
}
```

This loop will *block* on `messageChan` waiting for the next event. Because the
loop never ends this will keep our connection handler open and listening on the
socket until it's closed by the client.

## Running

So our broker spawns a new `#ServeHTTP()` connection handler for each new
connection, and each handler will notify the broker's channels when they are
created and closed. Handlers will also listen on their own `messageChan`
channels for incoming event data. But we still have to glue these channels
together and make them actually work. For that we setup a factory for a broker
instance and set it running and orchestrating all the different channels.

```go
// Broker factory
func NewServer() (broker *Broker) {
  // Instantiate a broker
  broker = &Broker{
    Notifier:       make(chan []byte, 1),
    newClients:     make(chan chan []byte),
    closingClients: make(chan chan []byte),
    clients:        make(map[chan []byte]bool),
  }

  // Set it running - listening and broadcasting events
  go broker.listen()

  return
}
```

The key here is the `Broker#listen()` method. We launch it in a goroutine so it
doesn't block the main program. Its job is to select on the different channels
and take the appropriate course of action.

```go
// Listen on different channels and act accordingly
func (broker *Broker) listen() {
  for {
    select {
      case s := <-broker.newClients:
        // A new client has connected.
        // Register their message channel
        broker.clients[s] = true
        log.Printf("Client added. %d registered clients", len(broker.clients))

      case s := <-broker.closingClients:
        // A client has dettached and we want to
        // stop sending them messages.
        delete(broker.clients, s)
        log.Printf("Removed client. %d registered clients", len(broker.clients))

      case event := <-broker.Notifier:
        // We got a new event from the outside!
        // Send event to all connected clients
        for clientMessageChan, _ := range broker.clients {
          clientMessageChan <- event
        }
    }
  }
}
```

Go channels, used in this way, allow your programs [to deal safely with
concurrency][go-concurrency] while avoiding threads and locks. Lastly, we
instantiate a broker and run it as an HTTP server in our main function. Because
it implements the `http.Handler` interface, it can be passed to
[http.ListenAndServe()] to start listening on a network address.

```go
func main() {
  broker := NewServer()

  log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))
}
```

## Receiving events

The broker's `Notifier` channel will broadcast events to all open connections,
but where does the notifier get events from? That will depend on your use case -
it could be events from an external source, a message queue, other connections,
etc. For the sake of illustration I'll start a go routine that pushes random
events to the notifier at regular intervals.

```go
func main() {

  broker := NewServer()

  go func() {
    for {
      time.Sleep(time.Second * 2)
      eventString := fmt.Sprintf("the time is %v", time.Now())
      log.Println("Receiving event")
      broker.Notifier <- []byte(eventString)
    }
  }()

  log.Fatal("HTTP server error: ", http.ListenAndServe("localhost:3000", broker))

}
```

Run it:

<kbd>go run sse.go</kbd>

Listen to the HTTP stream in your terminal:

<kbd>curl localhost:3000</kbd>

Try connecting from more than one terminal at once.

Finally, you can try in an EventSource-compliant browser:

```javascript
var client = new EventSource("http://localhost:3000")
client.onmessage = function (msg) {
  console.log(msg)
}
```

View the [full working Gist] to see it all together.

In future articles I'll show how you can use Go's `http.Handler` interface to
add authentication to a HTTP streaming server.

[EventSource]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource
[final-code]: https://github.com/bootic/bootic_data_collector/blob/master/firehose/broker.go
[full working Gist]: https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c
[go-concurrency]: http://www.golang-book.com/10
[html5-go-example]: https://github.com/kljensen/golang-html5-sse-example/blob/master/server.go
[http.CloseNotifier]: http://golang.org/pkg/net/http/#CloseNotifier
[http.Flusher]: http://golang.org/pkg/net/http/#Flusher
[http.Handler]: http://golang.org/pkg/net/http/#Handler
[http.ListenAndServe()]: http://golang.org/pkg/net/http/#ListenAndServe
[micro-network]: https://thoughtbot.com/blog/micro-network-daemons-in-go
[new-bamboo-thoughtbot]: https://thoughtbot.com/blog/new-bamboo-joins-thoughtbot-in-london
[Server Sent Events]: http://en.wikipedia.org/wiki/Server-sent_events
