Video

Want to see the full-length video right now for free?

Notes

Elixir's Model of Concurrency

  • Isolated, independent processes.
  • Processes share no memory and communicate via asynchronous message passing.

Working through an example

Rubyists may reach for Sidekiq or Resque to perform a task asynchronously. Elixir comes with a Task abstraction to handle those.

Let's build a simple version of such an abstraction to get an understanding of concurrency in Elixir.

Step 1

The behavior we want from our abstraction, call it Async, will be something like this,

Async.perform(fn -> 1 + 1 end)
Async.resolve()
# => 2

Building it

Basics of concurrency:

  • self() returns a process id in Elixir.
  • create new processes via spawn/1 or spawn/3
  • message passing via send/2 and receive/1
defmodule Async do
  def perform(task) do
    owner = self()

    spawn(fn ->
      result = task.()
      send(owner, {:ok, result})
    end)
  end

  def resolve do
    receive do
      {:ok, result} -> result
    end
  end
end
  • Test that the desired behavior is satisfied

We can extract a module Runner to show how to use spawn/3.

defmodule Runner do
  def run(owner, task) do
    result = task.()
    send(owner, {:ok, result})
  end
end

defmodule Async do
  def perform(task) do
    owner = self()

    # use spawn/3 instead of spawn/1
    spawn(Runner, :run, [owner, task])
  end

  def resolve do
    receive do
      {:ok, result} -> result
    end
  end
end

Step 2

How will we know if the process fails?

Error propagation in Elixir

Two ways to propagate errors across processes: monitoring and linking.

Let's monitor it

Desired behavior

Async.perform(fn -> 1 + 1 end)
|> Async.resolve()
# => 2

Async.perfom(fn -> raise "boom!" end)
|> Async.resolve()
# => I'm sorry, the process has failed!

Building it

defmodule Runner do
  def run(owner, task) do
    result = task.()
    send(owner, {:ok, result})
  end
end

defmodule Async do
  def perform(task) do
    owner = self()
    pid = spawn(Runner, :run, [owner, task])

    # We monitor the process just created
    ref = Process.monitor(pid)

    %{ref: ref}
  end

  def resolve(%{ref: ref}) do
    receive do
      {:ok, response} ->
        Process.demonitor(ref, [:flush])
        response
      {:DOWN, ^ref, :process, _from_pid, _exit_reason} ->
        "I'm sorry, the process has failed!"
    end
  end
end
  • Test out that the desired behavior is satisfied

Let's do linking

What if we want the error to propagate to our own process and take it down?

  • We can link two processes so that when one fails, the other also fails.

Desired behavior

Async.perform(fn -> 1 + 1 end)
Async.resolve()
# 2

Async.perform(fn -> raise "boom!" end)
# the process we're in fails (If using `iex`, it fails and then restarts)

Building it

defmodule Runner do
  def run(owner, task) do
    result = task.()
    send(owner, {:ok, result})
  end
end


defmodule Async do
  def perform(task) do
    owner = self()
    pid = spawn(Runner, :run, [owner, task])

    # We link the process just created to `owner`
    Process.link(pid)
  end

  def resolve do
    receive do
      {:ok, response} -> response
    end
  end
end

Trapping exits

  • The process trapping exit signals will receive an {:EXIT, pid, msg} message instead of being taken down.

Desired behavior

Async.perform(fn -> 1 + 1 end)
Async.resolve()
# 2

Async.perform(fn -> raise "boom!" end)
Async.resolve()
# => I'm sorry, the process has failed!
defmodule Runner do
  def run(owner, task) do
    result = task.()
    send(owner, {:ok, result})
  end
end


defmodule Async do
  def perform(task) do
    owner = self()

    # We set a flag in this process to trap exits
    Process.flag(:trap_exit, true)

    # We create and link to the process
    spawn_link(Runner, :run, [owner, task])
  end

  def resolve do
    receive do
      {:ok, response} ->
        response
      {:EXIT, _pid, _msg} ->
        "I'm sorry, the process failed!"
    end
  end
end

Further references