---
title: Playing with Sockets and Processes in Elixir
teaser: 'We make a small line-oriented TCP socket server to explore how sockets and
  some manual process management works in Elixir.

  '
tags: elixir,sockets,processes
author: Jon Yurek
published_on: 2017-10-24
---

I recently worked on a daemon application. No web stuff in this one. It served
as a sink for real-time vehicle location data. It took all that data in on a
normal TCP port. While I've done this kind of thing before, it's been a while,
and I hadn't done it with Elixir before.

The actual mechanics of the port were simple and quite standard:

1. Listen on a port.
1. Accept incoming connections.
1. Receive data.
1. Pass it along.

This is the way things have been done since we came up with TCP. But since we're
sitting on top of the BEAM VM we don't have to worry about any of these being in
the same process or stepping on each others' toes. We can:

1. Spawn a process to Listen on a port.
1. Spawn a process to accept incoming connections.
1. Receive data and hand it to a process for buffering.
1. Pass it along to another process.

Since each bit is in its own process, nothing blocks anything else, even if
we use synchronous functions.

## :gen_tcp

To access the raw TCP sockets, we'll be using [gen_tcp]. It's an Erlang module,
because the need for this functionality has existed for a long time. You can
tell you're doing this in Elixir because you'll access functions directly on an
atom. The first thing we need to do is [`listen/2`][listen] on a socket. This is
pretty straightforward:

```elixir
defmodule Receiver do
  require Logger

  def start(port) do
    spawn fn ->
      case :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true]) do
        {:ok, socket} ->
          Logger.info("Connected.")
          accept_connection(socket) # <--- We'll handle this next.
        {:error, reason} ->
          Logger.error("Could not listen: #{reason}")
      end
    end
  end
end
```

Here, `active: false` means we need to poll `:gen_tcp` for data. If we'd used
`true`, we'd need to accept the incoming data as Elixir messages in our mailbox.
That's nice to have, but we don't really need it, and it makes the workflow a
little more complicated for what we're trying to accomplish here. So we'll go
with `active: false` and poll for new data.

We don't want the call to `:gen_tcp.listen/2` to block everything. To prevent
that, we wrap the whole function body inside [`spawn/1`][spawn] so it runs in a
different Process. Having `listen/2` block is exactly what we want, but we don't
want it to preevnt anything else from running.

## Acceptance

We're in a Process listening for connections to the socket. When we have one,
we want to be able to read data from it. But we also want to be able to handle
more than one connection at a time. Never fear, we can spawn more processes!

The `accept_connection` function needs some implementation. Accepting a
connection gives us our readable data source. Let's fill that function in now.

```elixir
def accept_connection(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  spawn fn ->
    {:ok, buffer_pid} = Buffer.create() # <--- this is next
    Process.flag(:trap_exit, true)
    serve(client, buffer_pid) # <--- and then we'll cover this
  end
  loop_accept(socket)
end
```

[`:gen_tcp.accept/1`][accept] waits for a connection to come in. When one does,
we kick off a new process which creates a `Buffer` for it to use and starts
right on listening for data in the `serve` function. We'll write that function
below.

What's `Buffer`, you ask? It's [a module describing a line-oriented in-memory
buffer][buffer] that will hold input, parse out lines from that input, and send
them individually to your sink. The details of it aren't too important right
now, but the management of its process is.

## Window to the Soul

Inside this `spawn`, we create a `Buffer` for each socket connection and monitor
that process. In Erlang (and so also in Elixir) processes are super lightweight.
You're supposed to be able to create them easily and *re*create them easily.

To this end, there are two main ways to "care" about a process you create:

1. `spawn` which starts a process and doesn't care at all about its status
2. [`spawn_link`][spawn_link] which starts a process "linked" to this one. The
   link means that if one crashes, so does the other. This is useful for making
   sure state doesn't hang around and get stale and take up memory.

What neither do, though, is any kind of lifecycle tracking by default. Luckily,
we do have some tools available to us.

1. We can [`Process.monitor/1`][monitor] any ol' process we want, as long as we
   know the pid. If that process ends, we'll get a `:DOWN` message. This works
   best with `spawn`ed Processes.
1. We can trap exits with [`Process.flag(:trap_exit, true)`][trap_exit] like we
   have in the code above. This only works with `spawn_link`ed Processes. When
   the child Process dies, the parent Process will receive an `:EXIT` message.

As you can see, it's this second option that we're using when we `spawn` our
listener Process. We'll know if that `Buffer` process ever crashes.

## Collaborate and Listen

We're listening for and accepting connections our socket. We're trapping crashes
on sub-Processes, and we've started up a `Buffer` to hold onto data while we
receive it. So now we actually need to receive it. Since we started listening to
the port with `active: false` we need to ask for the data. We'll use
[`:gen_tcp.recv/2`][recv] for that.

```elixir
  def serve(socket, buffer_pid) do
    case :gen_tcp.recv(socket, 0) do
      {:ok, data} ->
        buffer_pid = maybe_recreate_buffer(buffer_pid) # <-- coming up next
        Buffer.receive(buffer_pid, data)
        serve(socket, buffer_pid)
      {:error, reason} ->
        Logger.info("Socket terminating: #{inspect reason}")
    end
  end
```

We wait forever for data to come in. When it does, we'll make sure the `Buffer`
is still running (or start a new one) and then hand the data over to it. Then
we'll keep waiting. Waiting like this is only possible because we're doing it
in a separate Process.

But the last piece of the puzzle is figuring out if the `Buffer` Process is
still running. As we said above, we'll get an `:EXIT` message if it crashed, so
we should check our mailbox.

```elixir
  defp maybe_recreate_buffer(original_pid) do
    receive do
      {:EXIT, ^original_pid, _reason} ->
        {:ok, new_buffer_pid} = Buffer.create()
        new_buffer_pid
    after
      10 ->
        original_pid
    end
  end
```

If we have an `:EXIT` waiting in our mailbox right now (or within the next 10
milliseconds), we should create a new `Buffer`. If there isn't one, just
continue on.

`maybe_recreate_buffer/1` keeps things running transparently to the `serve/2`
function. Without a `Buffer` we can't pipe data along to the functions that do
real work on the information. So, with this, though the Buffer goes away,
it's immediately re-created.

## The Sum Total

Now we have a simple socket server that works with line-oriented protocols and
can hand off fully-formed lines into that server's parser. The `Buffer`, while
simple and probably unlikely to crash, handles crashes gracefully. We learned a
bit about how exactly Processes relate to each other and how they can keep tabs
on each other.

It's important to note that this was written as an exploration of both sockets
and Process interaction. It's entirely likely that there's a solution to this
problem that is made more fault tolerant with the use of
[Supervisors][supervisors], but that was not the goal of the exercise.

[gen_tcp]: http://erlang.org/doc/man/gen_tcp.html
[listen]: http://erlang.org/doc/man/gen_tcp.html#listen-2
[accept]: http://erlang.org/doc/man/gen_tcp.html#accept-1
[buffer]: https://gist.github.com/jyurek/0c1aac357cbcc8f52007627e8658d724#file-buffer-ex
[monitor]: https://hexdocs.pm/elixir/Process.html#monitor/1
[recv]: http://erlang.org/doc/man/gen_tcp.html#recv-2
[spawn]: https://hexdocs.pm/elixir/Kernel.html#spawn/1
[spawn_link]: https://hexdocs.pm/elixir/Kernel.html#spawn_link/1
[trap_exit]: https://hexdocs.pm/elixir/Process.html#flag/2
[supervisors]: https://hexdocs.pm/elixir/Supervisor.html
