Want to see the full-length video right now for free?
Rubyists may reach for Sidekiq
or Resque
to perform a task asynchronously.
Elixir comes with a Task
abstraction to
handle those.
Let's build a simple version of such an abstraction to get an understanding of concurrency in Elixir.
The behavior we want from our abstraction, call it Async
, will be something
like this,
Async.perform(fn -> 1 + 1 end)
Async.resolve()
# => 2
Basics of concurrency:
self()
returns a process id in Elixir.spawn/1
or spawn/3
send/2
and receive/1
defmodule Async do
def perform(task) do
owner = self()
spawn(fn ->
result = task.()
send(owner, {:ok, result})
end)
end
def resolve do
receive do
{:ok, result} -> result
end
end
end
We can extract a module Runner
to show how to use spawn/3
.
defmodule Runner do
def run(owner, task) do
result = task.()
send(owner, {:ok, result})
end
end
defmodule Async do
def perform(task) do
owner = self()
# use spawn/3 instead of spawn/1
spawn(Runner, :run, [owner, task])
end
def resolve do
receive do
{:ok, result} -> result
end
end
end
How will we know if the process fails?
Two ways to propagate errors across processes: monitoring and linking.
Desired behavior
Async.perform(fn -> 1 + 1 end)
|> Async.resolve()
# => 2
Async.perfom(fn -> raise "boom!" end)
|> Async.resolve()
# => I'm sorry, the process has failed!
defmodule Runner do
def run(owner, task) do
result = task.()
send(owner, {:ok, result})
end
end
defmodule Async do
def perform(task) do
owner = self()
pid = spawn(Runner, :run, [owner, task])
# We monitor the process just created
ref = Process.monitor(pid)
%{ref: ref}
end
def resolve(%{ref: ref}) do
receive do
{:ok, response} ->
Process.demonitor(ref, [:flush])
response
{:DOWN, ^ref, :process, _from_pid, _exit_reason} ->
"I'm sorry, the process has failed!"
end
end
end
What if we want the error to propagate to our own process and take it down?
Desired behavior
Async.perform(fn -> 1 + 1 end)
Async.resolve()
# 2
Async.perform(fn -> raise "boom!" end)
# the process we're in fails (If using `iex`, it fails and then restarts)
defmodule Runner do
def run(owner, task) do
result = task.()
send(owner, {:ok, result})
end
end
defmodule Async do
def perform(task) do
owner = self()
pid = spawn(Runner, :run, [owner, task])
# We link the process just created to `owner`
Process.link(pid)
end
def resolve do
receive do
{:ok, response} -> response
end
end
end
{:EXIT, pid, msg}
message
instead of being taken down.Desired behavior
Async.perform(fn -> 1 + 1 end)
Async.resolve()
# 2
Async.perform(fn -> raise "boom!" end)
Async.resolve()
# => I'm sorry, the process has failed!
defmodule Runner do
def run(owner, task) do
result = task.()
send(owner, {:ok, result})
end
end
defmodule Async do
def perform(task) do
owner = self()
# We set a flag in this process to trap exits
Process.flag(:trap_exit, true)
# We create and link to the process
spawn_link(Runner, :run, [owner, task])
end
def resolve do
receive do
{:ok, response} ->
response
{:EXIT, _pid, _msg} ->
"I'm sorry, the process failed!"
end
end
end