I recently worked on a daemon application. No web stuff in this one. It served as a sink for real-time vehicle location data. It took all that data in on a normal TCP port. While I’ve done this kind of thing before, it’s been a while, and I hadn’t done it with Elixir before.
The actual mechanics of the port were simple and quite standard:
- Listen on a port.
- Accept incoming connections.
- Receive data.
- Pass it along.
This is the way things have been done since we came up with TCP. But since we’re sitting on top of the BEAM VM we don’t have to worry about any of these being in the same process or stepping on each others’ toes. We can:
- Spawn a process to Listen on a port.
- Spawn a process to accept incoming connections.
- Receive data and hand it to a process for buffering.
- Pass it along to another process.
Since each bit is in its own process, nothing blocks anything else, even if we use synchronous functions.
:gen_tcp
To access the raw TCP sockets, we’ll be using gen_tcp. It’s an Erlang module,
because the need for this functionality has existed for a long time. You can
tell you’re doing this in Elixir because you’ll access functions directly on an
atom. The first thing we need to do is listen/2
on a socket. This is
pretty straightforward:
defmodule Receiver do
require Logger
def start(port) do
spawn fn ->
case :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true]) do
{:ok, socket} ->
Logger.info("Connected.")
accept_connection(socket) # <--- We'll handle this next.
{:error, reason} ->
Logger.error("Could not listen: #{reason}")
end
end
end
end
Here, active: false
means we need to poll :gen_tcp
for data. If we’d used
true
, we’d need to accept the incoming data as Elixir messages in our mailbox.
That’s nice to have, but we don’t really need it, and it makes the workflow a
little more complicated for what we’re trying to accomplish here. So we’ll go
with active: false
and poll for new data.
We don’t want the call to :gen_tcp.listen/2
to block everything. To prevent
that, we wrap the whole function body inside spawn/1
so it runs in a
different Process. Having listen/2
block is exactly what we want, but we don’t
want it to preevnt anything else from running.
Acceptance
We’re in a Process listening for connections to the socket. When we have one, we want to be able to read data from it. But we also want to be able to handle more than one connection at a time. Never fear, we can spawn more processes!
The accept_connection
function needs some implementation. Accepting a
connection gives us our readable data source. Let’s fill that function in now.
def accept_connection(socket) do
{:ok, client} = :gen_tcp.accept(socket)
spawn fn ->
{:ok, buffer_pid} = Buffer.create() # <--- this is next
Process.flag(:trap_exit, true)
serve(client, buffer_pid) # <--- and then we'll cover this
end
loop_accept(socket)
end
:gen_tcp.accept/1
waits for a connection to come in. When one does,
we kick off a new process which creates a Buffer
for it to use and starts
right on listening for data in the serve
function. We’ll write that function
below.
What’s Buffer
, you ask? It’s a module describing a line-oriented in-memory
buffer that will hold input, parse out lines from that input, and send
them individually to your sink. The details of it aren’t too important right
now, but the management of its process is.
Window to the Soul
Inside this spawn
, we create a Buffer
for each socket connection and monitor
that process. In Erlang (and so also in Elixir) processes are super lightweight.
You’re supposed to be able to create them easily and recreate them easily.
To this end, there are two main ways to “care” about a process you create:
spawn
which starts a process and doesn’t care at all about its statusspawn_link
which starts a process “linked” to this one. The link means that if one crashes, so does the other. This is useful for making sure state doesn’t hang around and get stale and take up memory.
What neither do, though, is any kind of lifecycle tracking by default. Luckily, we do have some tools available to us.
- We can
Process.monitor/1
any ol’ process we want, as long as we know the pid. If that process ends, we’ll get a:DOWN
message. This works best withspawn
ed Processes. - We can trap exits with
Process.flag(:trap_exit, true)
like we have in the code above. This only works withspawn_link
ed Processes. When the child Process dies, the parent Process will receive an:EXIT
message.
As you can see, it’s this second option that we’re using when we spawn
our
listener Process. We’ll know if that Buffer
process ever crashes.
Collaborate and Listen
We’re listening for and accepting connections our socket. We’re trapping crashes
on sub-Processes, and we’ve started up a Buffer
to hold onto data while we
receive it. So now we actually need to receive it. Since we started listening to
the port with active: false
we need to ask for the data. We’ll use
:gen_tcp.recv/2
for that.
def serve(socket, buffer_pid) do
case :gen_tcp.recv(socket, 0) do
{:ok, data} ->
buffer_pid = maybe_recreate_buffer(buffer_pid) # <-- coming up next
Buffer.receive(buffer_pid, data)
serve(socket, buffer_pid)
{:error, reason} ->
Logger.info("Socket terminating: #{inspect reason}")
end
end
We wait forever for data to come in. When it does, we’ll make sure the Buffer
is still running (or start a new one) and then hand the data over to it. Then
we’ll keep waiting. Waiting like this is only possible because we’re doing it
in a separate Process.
But the last piece of the puzzle is figuring out if the Buffer
Process is
still running. As we said above, we’ll get an :EXIT
message if it crashed, so
we should check our mailbox.
defp maybe_recreate_buffer(original_pid) do
receive do
{:EXIT, ^original_pid, _reason} ->
{:ok, new_buffer_pid} = Buffer.create()
new_buffer_pid
after
10 ->
original_pid
end
end
If we have an :EXIT
waiting in our mailbox right now (or within the next 10
milliseconds), we should create a new Buffer
. If there isn’t one, just
continue on.
maybe_recreate_buffer/1
keeps things running transparently to the serve/2
function. Without a Buffer
we can’t pipe data along to the functions that do
real work on the information. So, with this, though the Buffer goes away,
it’s immediately re-created.
The Sum Total
Now we have a simple socket server that works with line-oriented protocols and
can hand off fully-formed lines into that server’s parser. The Buffer
, while
simple and probably unlikely to crash, handles crashes gracefully. We learned a
bit about how exactly Processes relate to each other and how they can keep tabs
on each other.
It’s important to note that this was written as an exploration of both sockets and Process interaction. It’s entirely likely that there’s a solution to this problem that is made more fault tolerant with the use of Supervisors, but that was not the goal of the exercise.