Skip to content

Commit

Permalink
[INIT] Initial commit (can handshake)
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillaume Milan committed Sep 9, 2019
0 parents commit dcaea4c
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
24 changes: 24 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where 3rd-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
rtmp-*.tar

21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Rtmp

**TODO: Add description**

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `rtmp` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:rtmp, "~> 0.1.0"}
]
end
```

Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/rtmp](https://hexdocs.pm/rtmp).

30 changes: 30 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config

# This configuration is loaded before any dependency and is restricted
# to this project. If another project depends on this project, this
# file won't be loaded nor affect the parent project. For this reason,
# if you want to provide default values for your application for
# 3rd-party users, it should be done in your "mix.exs" file.

# You can configure your application as:
#
# config :rtmp, key: :value
#
# and access this configuration in your application as:
#
# Application.get_env(:rtmp, :key)
#
# You can also configure a 3rd-party app:
#
# config :logger, level: :info
#

# It is also possible to import configuration files, relative to this
# directory. For example, you can emulate configuration per environment
# by uncommenting the line below and defining dev.exs, test.exs and such.
# Configuration from the imported file will override the ones defined
# here (which is why it is important to import them last).
#
# import_config "#{Mix.env()}.exs"
33 changes: 33 additions & 0 deletions lib/connection.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule RTMP.Connection do
use GenServer
@moduledoc """
Server that monitor a connection through gen_tcp
"""
require Logger

def start_link(opts), do: GenServer.start_link(__MODULE__, opts[:socket], opts)

def init(socket) do
Process.send(self(), :process, [])
{:ok, %{socket: socket}}
end

def terminate(_reason, %{socket: socket}) do
:gen_tcp.close(socket)
end

def handle_info(:process, %{socket: socket}) do
message = receive_until(socket, [])
Logger.debug("RECEIVED FULL MESSAGE #{inspect message}", ansi_color: :green)
{:stop, :normal, %{socket: socket}}
end

def receive_until(socket, previous_chunks) do
case :gen_tcp.recv(socket, 0) do
{:ok, chunk} ->
Logger.debug("RECIEVED: #{inspect chunk}")
receive_until(socket, [chunk , previous_chunks])
{:error, :closed} -> previous_chunks
end
end
end
24 changes: 24 additions & 0 deletions lib/message_manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule RTMP.MessageManager do
require Logger
def receive_message(:handshake1, client) do
receive_until(client,
fn <<_::bytes-size(1537)>> -> true
_ -> false
end)
end
def receive_message(:handshake2, client) do
receive_until(client,
fn <<_::bytes-size(1536)>> -> true
_ -> false
end)
end
def receive_message(:connected, client) do
receive_until(client, fn _ -> true end)
end

def receive_until(client, finished, previous_chunk \\ <<>>) do
Logger.debug("RECEIVE UNTIL #{byte_size(previous_chunk)}")
{:ok, chunk} = :gen_tcp.recv(client, 0)
if finished.(previous_chunk<>chunk), do: previous_chunk<>chunk, else: receive_until(client, finished, previous_chunk<>chunk)
end
end
17 changes: 17 additions & 0 deletions lib/rtmp.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Rtmp.Application do
@moduledoc """
Start all the services needed for the RTMP module
"""

use Application

def start(_type, _args) do
children = [
# Starts a worker by calling: Rtmp.Worker.start_link(arg)
# {Rtmp.Worker, arg},
]

opts = [strategy: :one_for_one, name: Rtmp.Supervisor]
Supervisor.start_link(children, opts)
end
end
38 changes: 38 additions & 0 deletions lib/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule RTMP.Server do
use GenServer
@moduledoc """
Server that monitor the gen_tcp listen port
"""
require Logger

#TODO Monitor connection if the quit by their self

def start_link(opts), do: GenServer.start_link(__MODULE__, opts[:port], opts)

def init(port) do
Process.send(self(), :listen, [])
{:ok, port} = :gen_tcp.listen(port, [:binary, {:packet, 0}, {:active, false}])
{:ok, %{port: port, connections: []}}
end

def terminate(_reason, %{port: port, connections: connections}) do
IO.puts("CLOSING #{inspect port}")
:gen_tcp.close(port)
IO.puts("CLOSED #{inspect port}")
Enum.each(connections, fn c ->
Logger.debug("EXITING #{inspect c}")
Process.exit(c, :kill)
end)
end

def handle_info(:listen, %{port: port, connections: connections}) do
{:ok, socket} = :gen_tcp.accept(port)
{:ok, new_connection} = RTMP.Connection.start_link([socket: socket])
Process.send(self(), :listen, [])
{:noreply, %{port: port, connections: [new_connection|connections]}}
end

def handle_call(:state, _from, state) do
{:reply, state, state}
end
end
125 changes: 125 additions & 0 deletions lib/server2.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
defmodule RTMP.Server2 do
use GenServer
require Logger

def start_link(port, opts), do: GenServer.start_link(__MODULE__, port, opts)

def init(port) do
{:ok, socket} = :gen_tcp.listen(port, [:binary, {:packet, 0}, {:active, false}])
GenServer.cast(self(), :create_connection)
{:ok, %{socket: socket, clients: []}}
end

def terminate(reason, %{socket: socket}) do
Logger.debug("[Server]: Closing scoket du to #{inspect reason}")
:gen_tcp.close(socket)
end

def handle_cast(:create_connection, %{socket: socket} = state) do
{:ok, pid} = RTMP.Connection2.start_link({socket, self()}, [])
:ok = :gen_tcp.controlling_process(socket, pid)
{:noreply, Map.update!(state, :clients, fn clients -> [pid|clients] end)}
end

def handle_cast({:register_client, client}, state) do
{:noreply, Map.update!(state, :clients, fn clients -> [client|clients] end)}
end
def handle_cast({:unregister_client, client}, state) do
Logger.debug("[Server] Unregistering client #{inspect client}")
{:noreply, Map.update!(state, :clients, fn clients -> Enum.filter(clients, &(&1 != client)) end)}
end
end

defmodule RTMP.Connection2 do
use GenServer
require Logger

@moduledoc """
Connection FSM:
- :handshaking_phase1 -----------> Receive C0 -------------> :handshaking_phase1
- :handshaking_phase1 -----------> Receive C1 -------------> :handshaking_phase2
- :handshaking_phase2 -----------> Receive C2 -------------> :connected
"""
def start_link({socket, server}, opts), do: GenServer.start_link(__MODULE__, {socket, server}, opts)

def init({socket, server}) do
{:ok, client} = :gen_tcp.accept(socket)
GenServer.cast(self(), :waiting_message)
GenServer.cast(server, :create_connection)
id = :crypto.strong_rand_bytes(1528)
start_timestamp = :os.system_time(:millisecond)
{:ok, %{server: server, client: client, fsm: %{state: :handshaking_phase1}, id: id, start_timestamp: start_timestamp}}
end

def terminate(reason, %{client: client, server: server}) do
Logger.debug("[Connection] Closing connection due to #{inspect reason}")
:gen_tcp.close(client)
GenServer.cast(server, {:unregister_client, self()})
end

def handle_cast(:waiting_message, %{client: client, fsm: %{state: :handshaking_phase1}} = state) do
case RTMP.MessageManager.receive_message(:handshake1, client) do
<<0x03, time::bytes-size(4), 0, 0, 0, 0, rand::bytes-size(1528)>> -> # we wait until receiving C0 and C1 (lazy enough)
Logger.debug("[Connection] Receive C0 and C1")
server_time = server_time(state.start_timestamp)
send_s1(client, server_time, state.id)
send_s2(client, time, server_time, rand)
GenServer.cast(self(), :waiting_message)
{:noreply, (state |> Map.update!(:fsm, fn _ -> %{state: :handshaking_phase2} end))}
message ->
inspect_message = "<<#{to_bytes(message, []) |> Enum.map(fn <<x>> -> x end)|> Enum.join(", ")}>>"
Logger.debug("[Connection] Bad message received #{inspect_message} \nWith a length of #{length(to_bytes(message, []))}")
{:stop, :error}
end
end
def handle_cast(:waiting_message, %{client: client, fsm: %{state: :handshaking_phase2}} = state) do
case RTMP.MessageManager.receive_message(:handshake2, client) do
<<_server_timestamp::bytes-size(4), _time::bytes-size(4), _rand::bytes-size(1528)>> ->
Logger.debug("[Connection] Receive C2")
GenServer.cast(self(), :waiting_message)
{:noreply, (state |> Map.update!(:fsm, fn _ -> %{state: :connected} end))}
message ->
inspect_message = "<<#{to_bytes(message, []) |> Enum.map(fn <<x>> -> x end)|> Enum.join(", ")}>>"
Logger.debug("[Connection] Bad message received #{inspect_message} \nWith a length of #{length(to_bytes(message, []))}")
{:stop, :error}
end
end
def handle_cast(:terminate, _state) do
{:stop, :terminated}
end
def handle_cast(message, state) do
Logger.debug("[Connection] Receive message #{inspect message} \nWith state #{inspect state}")
{:noreply, state}
end

def send_s0(client) do
Logger.debug("[Connection] Sending S0 #{inspect <<0x03>>}")
:gen_tcp.send(client, <<0x03>>)
end

def send_s1(client, server_time, id) do
message = <<0x03>> <> server_time <> id
Logger.debug("[Connection] Sending S1 #{inspect message}")
:gen_tcp.send(client, message)
end

def send_s2(client, receive_time, sent_time, client_id) do
message = receive_time <> sent_time <> client_id
Logger.debug("[Connection] Sending S2 #{inspect message}")
:gen_tcp.send(client, message)
end

def server_time(start) do
#TODO Modulo for long stream ^^
<<(:os.system_time(:millisecond) - start)::32>>
end

def to_bytes("", list) do
list |> Enum.reverse
end
def to_bytes(string, list) do
<<b::bytes-size(1)>> <> rest = string
to_bytes(rest, [b|list])
end
end
29 changes: 29 additions & 0 deletions lib/to_del.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule ToDel do
use GenServer
require Logger
def start_link do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end
def init(_) do
{:ok, :ok}
end

def wait_for_connection do
{:ok, port} = :gen_tcp.listen(1234, [])
{:ok, socket} = :gen_tcp.accept(port)
Logger.debug("#{inspect receive_until(socket, [])}")
:ok = :gen_tcp.close(socket)
:ok = :gen_tcp.close(port)
end
defp receive_until(socket, prev_chunks) do
case :gen_tcp.recv(socket, 0) do
{:ok, chunk} ->
Logger.debug("#{inspect chunk}")
receive_until(socket, [chunk, prev_chunks])
{:error, :closed} -> prev_chunks
end
end
def receive_message(socket) do
receive_until(socket, [])
end
end
29 changes: 29 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Rtmp.MixProject do
use Mix.Project

def project do
[
app: :rtmp,
version: "0.1.0",
elixir: "~> 1.7",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end

# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger],
mod: {Rtmp.Application, []}
]
end

# Run "mix help deps" to learn about dependencies.
defp deps do
[
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
]
end
end

0 comments on commit dcaea4c

Please sign in to comment.