Skip to content

Commit

Permalink
Use only atomic operations in Plug.Upload to prepare for concurrent w…
Browse files Browse the repository at this point in the history
…rites (#997)
  • Loading branch information
magnetised authored Dec 18, 2020
1 parent ed05411 commit fa3d922
Showing 1 changed file with 32 additions and 25 deletions.
57 changes: 32 additions & 25 deletions lib/plug/upload.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ defmodule Plug.Upload do
content_type: binary | nil
}

@table __MODULE__
@dir_table __MODULE__.Dir
@path_table __MODULE__.Path
@max_attempts 10
@temp_env_vars ~w(PLUG_TMPDIR TMPDIR TMP TEMP)s

Expand All @@ -51,8 +52,8 @@ defmodule Plug.Upload do
| {:no_tmp, [binary]}
def random_file(prefix) do
case ensure_tmp() do
{:ok, tmp, paths} ->
open_random_file(prefix, tmp, 0, paths)
{:ok, tmp} ->
open_random_file(prefix, tmp, 0)

{:no_tmp, tmps} ->
{:no_tmp, tmps}
Expand All @@ -63,18 +64,18 @@ defmodule Plug.Upload do
pid = self()
server = plug_server()

case :ets.lookup(@table, pid) do
[{^pid, tmp, paths}] ->
{:ok, tmp, paths}
case :ets.lookup(@dir_table, pid) do
[{^pid, tmp}] ->
{:ok, tmp}

[] ->
{:ok, tmps} = GenServer.call(server, :upload)
{:ok, tmps} = GenServer.call(server, {:monitor, pid})
{mega, _, _} = :os.timestamp()
subdir = "/plug-" <> i(mega)

if tmp = Enum.find_value(tmps, &make_tmp_dir(&1 <> subdir)) do
true = :ets.insert_new(@table, {pid, tmp, []})
{:ok, tmp, []}
true = :ets.insert_new(@dir_table, {pid, tmp})
{:ok, tmp}
else
{:no_tmp, tmps}
end
Expand All @@ -88,20 +89,20 @@ defmodule Plug.Upload do
end
end

defp open_random_file(prefix, tmp, attempts, paths) when attempts < @max_attempts do
defp open_random_file(prefix, tmp, attempts) when attempts < @max_attempts do
path = path(prefix, tmp)

case :file.write_file(path, "", [:write, :raw, :exclusive, :binary]) do
:ok ->
:ets.update_element(@table, self(), {3, [path | paths]})
:ets.insert(@path_table, {self(), path})
{:ok, path}

{:error, reason} when reason in [:eexist, :eacces] ->
open_random_file(prefix, tmp, attempts + 1, paths)
open_random_file(prefix, tmp, attempts + 1)
end
end

defp open_random_file(_prefix, tmp, attempts, _paths) do
defp open_random_file(_prefix, tmp, attempts) do
{:too_many_attempts, tmp, attempts}
end

Expand Down Expand Up @@ -155,22 +156,30 @@ defmodule Plug.Upload do
Process.flag(:trap_exit, true)
tmp = Enum.find_value(@temp_env_vars, "/tmp", &System.get_env/1)
cwd = Path.join(File.cwd!(), "tmp")
:ets.new(@table, [:named_table, :public, :set])

:ets.new(@dir_table, [:named_table, :public, :set])
:ets.new(@path_table, [:named_table, :public, :duplicate_bag])

{:ok, [tmp, cwd]}
end

@impl true
def handle_call(:upload, {pid, _ref}, dirs) do
def handle_call({:monitor, pid}, _from, dirs) do
Process.monitor(pid)
{:reply, {:ok, dirs}, dirs}
end

@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
case :ets.lookup(@table, pid) do
[{pid, _tmp, paths}] ->
:ets.delete(@table, pid)
delete_paths(paths)
case :ets.lookup(@dir_table, pid) do
[{pid, _tmp}] ->
:ets.delete(@dir_table, pid)

@path_table
|> :ets.lookup(pid)
|> Enum.each(&delete_path/1)

:ets.delete(@path_table, pid)

[] ->
:ok
Expand All @@ -185,13 +194,11 @@ defmodule Plug.Upload do

@impl true
def terminate(_reason, _state) do
folder = fn {_pid, _tmp, paths}, _ -> delete_paths(paths) end
:ets.foldl(folder, :ok, @table)
:ok
folder = fn entry, :ok -> delete_path(entry) end
:ets.foldl(folder, :ok, @path_table)
end

defp delete_paths(paths) do
for path <- paths, do: :file.delete(path)
:ok
defp delete_path({_pid, path}) do
:file.delete(path)
end
end

0 comments on commit fa3d922

Please sign in to comment.