diff --git a/config/config.exs b/config/config.exs index ebaa566..576f57f 100644 --- a/config/config.exs +++ b/config/config.exs @@ -44,11 +44,7 @@ config :dispatcher, # log whenever a layer starts processing log_layer_start_processing: CH.system_boolean("LOG_LAYER_START_PROCESSING"), # log whenever a layer matched, and if no matching layer was found - log_layer_matching: CH.system_boolean("LOG_LAYER_MATCHING"), - log_ws_all: CH.system_boolean("LOG_WS_ALL"), - log_ws_backend: CH.system_boolean("LOG_WS_BACKEND"), - log_ws_frontend: CH.system_boolean("LOG_WS_FRONTEND"), - log_ws_unhandled: CH.system_boolean("LOG_WS_UNHANDLED") + log_layer_matching: CH.system_boolean("LOG_LAYER_MATCHING") # It is also possible to import configuration files, relative to this # directory. For example, you can emulate configuration per environment diff --git a/lib/dispatcher/log.ex b/lib/dispatcher/log.ex index 1535234..b0a785b 100644 --- a/lib/dispatcher/log.ex +++ b/lib/dispatcher/log.ex @@ -2,10 +2,6 @@ defmodule Dispatcher.Log do @type log_name :: :log_layer_start_processing | :log_layer_matching - | :log_ws_all - | :log_ws_backend - | :log_ws_frontend - | :log_ws_unhandled @spec log(log_name, any()) :: any() def log(name, content) do diff --git a/lib/manipulators/remove_accept_encoding_header.ex b/lib/manipulators/remove_accept_encoding_header.ex index 4e50ea2..f55ad70 100644 --- a/lib/manipulators/remove_accept_encoding_header.ex +++ b/lib/manipulators/remove_accept_encoding_header.ex @@ -3,9 +3,9 @@ defmodule Manipulators.RemoveAcceptEncodingHeader do @impl true def headers(headers, connection) do - # headers = - # headers - # |> Enum.reject( &match?( {"accept_encoding", _}, &1 ) ) + headers = + headers + |> Enum.reject( &match?( {"accept_encoding", _}, &1 ) ) {headers, connection} end diff --git a/lib/matcher.ex b/lib/matcher.ex index e030ff3..058defb 100644 --- a/lib/matcher.ex +++ b/lib/matcher.ex @@ -8,7 +8,6 @@ defmodule Matcher do quote do require Matcher import Matcher - import Plug.Router, only: [forward: 2] import Plug.Conn, only: [send_resp: 3] import Proxy, only: [forward: 3] @@ -478,7 +477,7 @@ defmodule Matcher do defp sort_and_group_accept_headers(accept) do accept |> safe_parse_accept_header() - |> IO.inspect(label: "parsed_accept_header") + # |> IO.inspect(label: "parsed_accept_header") |> Enum.sort_by(&elem(&1, 3)) |> Enum.group_by(&elem(&1, 3)) |> Map.to_list() diff --git a/lib/mu_dispatcher.ex b/lib/mu_dispatcher.ex index 575fb38..bc14a61 100644 --- a/lib/mu_dispatcher.ex +++ b/lib/mu_dispatcher.ex @@ -11,7 +11,7 @@ defmodule MuDispatcher do children = [ # this is kinda strange, but the 'plug:' field is not used when 'dispatch:' is provided (my understanding) - {Plug.Adapters.Cowboy, + {Plug.Cowboy, scheme: :http, plug: PlugRouterDispatcher, options: [dispatch: dispatch, port: port]} ] @@ -21,10 +21,24 @@ defmodule MuDispatcher do end defp dispatch do + default = %{ + host: "localhost", + port: 80, + path: "/" + } + + f = fn req -> + {_, target} = + :cowboy_req.parse_qs(req) + |> Enum.find(fn {head, _} -> head == "target" end) + + Dispatcher.get_websocket(target) + end + [ {:_, [ - {"/ws/[...]", WebsocketHandler, %{}}, + {"/ws/[...]", WsHandler, {f, default}}, {:_, Plug.Cowboy.Handler, {PlugRouterDispatcher, []}} ]} ] diff --git a/lib/plug_router_dispatcher.ex b/lib/plug_router_dispatcher.ex index bd1eaba..5a7b25d 100644 --- a/lib/plug_router_dispatcher.ex +++ b/lib/plug_router_dispatcher.ex @@ -1,5 +1,3 @@ -alias Dispatcher.Log - defmodule PlugRouterDispatcher do use Plug.Router diff --git a/lib/websocket_handler.ex b/lib/websocket_handler.ex deleted file mode 100644 index 63abc19..0000000 --- a/lib/websocket_handler.ex +++ /dev/null @@ -1,122 +0,0 @@ -alias Dispatcher.Log - -defmodule WebsocketHandler do - @behaviour :cowboy_websocket - - def init(req, state) do - # Get path info - {_, target} = - :cowboy_req.parse_qs(req) - |> Enum.find(fn {head, _} -> head == "target" end) - - ws = - Dispatcher.get_websocket(target) - |> Log.inspect(:ws_log_all, label: "websocket connecting to target") - - new_state = - state - |> Map.put(:host, ws.host) - |> Map.put(:path, ws.path) - |> Map.put(:port, ws.port) - |> Map.put(:ready, false) - |> Map.put(:buffer, []) - - {:cowboy_websocket, req, new_state} - end - - def websocket_init(state) do - Log.inspect(state, :log_ws_all, label: "websocket all start connect with") - - connect_opts = %{ - connect_timeout: :timer.minutes(1), - retry: 10, - retry_timeout: 300 - } - - # conn :: pid() - {:ok, conn} = :gun.open(to_charlist(state.host), state.port, connect_opts) - {:ok, :http} = :gun.await_up(conn) - - # streamref :: StreamRef - streamref = :gun.ws_upgrade(conn, to_charlist(state.path)) - - new_state = - state - |> Map.put(:back_pid, conn) - |> Map.put(:back_ref, streamref) - - {:ok, new_state} - end - - def websocket_handle(message, state) do - new_state = - if state.ready do - Log.inspect(message, :log_ws_frontend, label: "websocket frontend message") - |> Log.inspect(:log_ws_all, label: "websocket all frontend message") - - :ok = :gun.ws_send(state.back_pid, state.back_ref, message) - state - else - Log.inspect(message, :log_ws_frontend, - label: "websocket frontend message postponed (connection not started)" - ) - |> Log.inspect(:log_ws_all, - label: "websocket all frontend message postponed (connection not started)" - ) - - buf = [message | state.buffer] - Map.put(state, :buffer, buf) - end - - {:ok, new_state} - end - - def websocket_info({:gun_ws, _pid, _ref, msg}, state) do - Log.inspect(msg, :log_ws_backend, label: "websocket backend message") - |> Log.inspect(:log_ws_all, label: "websocket all backend message") - - {:reply, msg, state} - end - - def websocket_info({:gun_error, _gun_pid, _stream_ref, reason}, _state) do - exit({:ws_upgrade_failed, reason}) - end - - def websocket_info({:gun_response, _gun_pid, _, _, status, headers}, _state) do - Log.inspect({"Websocket upgrade failed.", headers}, :log_ws_all, label: "websocket all") - exit({:ws_upgrade_failed, status, headers}) - end - - def websocket_info({:gun_upgrade, _, _, ["websocket"], headers}, state) do - Log.inspect("ws upgrade succesful", :log_ws_all, label: "websocket all") - Log.inspect(headers, :log_ws_all, label: "websocket all") - - state.buffer - |> Enum.reverse() - |> Enum.each(fn x -> - Log.inspect(x, :log_ws_frontend, label: "postponed sending message") - Log.inspect(x, :log_ws_all, label: "postponed sending message") - :gun.ws_send(state.back_pid, state.back_ref, x) - end) - - new_state = - state - |> Map.put(:ready, true) - |> Map.put(:buffer, []) - - {:ok, new_state} - end - - def websocket_info(info, state) do - Log.inspect(info, :log_ws_unhandled, label: "websocket unhandled info") - |> Log.inspect(:log_ws_all, label: "websocket all info") - - {:ok, state} - end - - def terminate(_reason, _req, state) do - Log.inspect("Closing", :log_ws_all, label: "websocket all") - :gun.shutdown(state.back_pid) - :ok - end -end diff --git a/mix.exs b/mix.exs index 4a60a29..bac50aa 100644 --- a/mix.exs +++ b/mix.exs @@ -27,6 +27,7 @@ defmodule Dispatcher.Mixfile do # Type `mix help deps` for more examples and options defp deps do [ + {:cowboy_ws_proxy, git: "https://github.com/ajuvercr/elixir-cowboy-ws-proxy-handler.git", tag: "v0.1"}, {:plug_mint_proxy, git: "https://github.com/madnificent/plug-mint-proxy.git", tag: "v0.0.2"}, # {:plug, "~> 1.10.4"}, diff --git a/mix.lock b/mix.lock index 167fc04..909cdcb 100644 --- a/mix.lock +++ b/mix.lock @@ -3,6 +3,7 @@ "castore": {:hex, :castore, "0.1.11", "c0665858e0e1c3e8c27178e73dffea699a5b28eb72239a3b2642d208e8594914", [:mix], [], "hexpm", "91b009ba61973b532b84f7c09ce441cba7aa15cb8b006cf06c6f4bba18220081"}, "cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"}, + "cowboy_ws_proxy": {:git, "https://github.com/ajuvercr/elixir-cowboy-ws-proxy-handler.git", "e015e27775af30d4e3d7ca5629d97191cca61555", [tag: "v0.1"]}, "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, "exsync": {:hex, :exsync, "0.2.4", "5cdc824553e0f4c4bf60018a9a6bbd5d3b51f93ef8401a0d8545f93127281d03", [:mix], [{:file_system, "~> 0.2", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f7622d8bb98abbe473aa066ae46f91afdf7a5346b8b89728404f7189d2e80896"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},