Openflow parser

This commit is contained in:
Eishun Kondoh 2017-11-13 22:52:53 +09:00
parent 70b0d8919e
commit fc02a678de
338 changed files with 9081 additions and 0 deletions

17
lib/tres/application.ex Normal file
View file

@ -0,0 +1,17 @@
defmodule Tres.Application do
@moduledoc false
use Application
alias Tres.SwitchRegistry
def start(_type, _args) do
import Supervisor.Spec
children = [worker(Registry, [[keys: :unique, name: SwitchRegistry]], id: SwitchRegistry),
supervisor(Tres.MessageHandlerSup, [], id: MessageHandlerSup)]
opts = [strategy: :one_for_one, name: Tres.Supervisor]
{:ok, _connection_pid} = Tres.Utils.start_openflow_listener
Supervisor.start_link(children, opts)
end
end

View file

@ -0,0 +1,52 @@
defmodule Tres.MessageHandler do
use GenServer
defmodule State do
defstruct [
ip_addr: nil,
port: nil,
datapath_id: nil,
conn_pid: nil,
conn_ref: nil,
handler_pid: nil,
handler_ref: nil
]
end
alias Tres.MessageHandler.State
@process_flags [trap_exit: true]
def start_link({ip_addr, port}, conn_pid) do
GenServer.start_link(__MODULE__, [{ip_addr, port}, conn_pid])
end
def init([{ip_addr, port}, conn_pid]) do
init_process()
conn_ref = Process.monitor(conn_pid)
state = %State{
conn_pid: conn_pid,
conn_ref: conn_ref,
ip_addr: ip_addr,
port: port
}
{:ok, state}
end
def handle_info({:'EXIT', _pid, _reason}, state) do
{:stop, :normal, state}
end
def handle_info({:'DOWN', conn_ref, :process, _conn_pid, _reason}, %State{conn_ref: conn_ref} = state) do
{:stop, :normal, state}
end
def terminate(_reason, state) do
{:shutdown, state}
end
## private functions
defp init_process do
for {flag, value} <- @process_flags, do: Process.flag(flag, value)
end
end

View file

@ -0,0 +1,16 @@
defmodule Tres.MessageHandlerSup do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
children = [worker(Tres.MessageHandler, [], restart: :temporary)]
supervise(children, strategy: :simple_one_for_one)
end
def start_child({ip_addr, port}) do
Supervisor.start_child(__MODULE__, [{ip_addr, port}, self()])
end
end

385
lib/tres/secure_channel.ex Normal file
View file

@ -0,0 +1,385 @@
defmodule Tres.SecureChannel do
@behaviour :gen_statem
import Logger
alias Tres.SecureChannelState
alias Tres.SwitchRegistry
alias Tres.MessageHandlerSup
@process_flags [
trap_exit: true,
message_queue_data: :on_heap
]
@supported_version 4
@hello_handshake_timeout 1000
@features_handshake_timeout 1000
@ping_timeout 5000
@transaction_timeout 5000
@ping_interval 5000
@ping_fail_max_count 10
def callback_mode do
[:handle_event_function, :state_enter]
end
def start_link(ref, socket, transport, opts \\ []) do
init_args = [[ref, socket, transport, opts]]
:proc_lib.start_link(__MODULE__, :init, init_args)
end
def init([ref, socket, transport, _opts]) do
state_data =
ref
|> init_secure_channel(socket, transport)
|> init_handler
info("[#{__MODULE__}] TCP connected to Switch on #{state_data.ip_addr}:#{state_data.port} on #{inspect(self())}")
:gen_statem.enter_loop(__MODULE__, [debug: []], :INIT, state_data, [])
end
# TCP handler
def handle_event(:info, {:tcp, socket, packet}, state,
%SecureChannelState{socket: socket, transport: transport} = state_data) do
transport.setopts(socket, [active: :once])
handle_packet(packet, state_data, state, [])
end
def handle_event(:info, {:tcp_closed, socket}, _state,
%SecureChannelState{socket: socket} = state_data) do
close_connection(:tcp_closed, state_data)
end
def handle_event(:info, {:tcp_error, socket, reason}, _state,
%SecureChannelState{socket: socket} = state_data) do
close_connection({:tcp_error, reason}, state_data)
end
def handle_event(:info, {:'DOWN', _ref, :process, _main_pid, _reason} = signal, _state, state_data) do
handle_signal(signal, state_data)
end
def handle_event(:info, {:'EXIT', _pid, _reason} = signal, _state, state_data) do
handle_signal(signal, state_data)
end
def handle_event(type, message, :INIT, state_data) do
handle_INIT(type, message, state_data)
end
def handle_event(type, message, :CONNECTING, state_data) do
handle_CONNECTING(type, message, state_data)
end
def handle_event(type, message, :CONNECTED, state_data) do
handle_CONNECTED(type, message, state_data)
end
def handle_event(type, message, :WAITING, state_data) do
handle_WATING(type, message, state_data)
end
def terminate(reason, state, %SecureChannelState{datapath_id: datapath_id, aux_id: aux_id}) do
warn("[#{__MODULE__}] termiate: #{inspect(reason)} state = #{inspect(state)}")
:ok = SwitchRegistry.unregister({datapath_id, aux_id})
end
# private functions
defp init_secure_channel(ref, socket, transport) do
init_process(ref)
:ok = transport.setopts(socket, [active: :once])
SecureChannelState.new(ref: ref, socket: socket, transport: transport)
end
defp init_process(ref) do
:ok = :proc_lib.init_ack({:ok, self()})
:ok = :ranch.accept_ack(ref)
for {flag, value} <- @process_flags, do: Process.flag(flag, value)
end
defp init_handler(state_data) do
%SecureChannelState{ip_addr: ip_addr, port: port} = state_data
{:ok, pid} = MessageHandlerSup.start_child({ip_addr, port})
ref = Process.monitor(pid)
%{state_data|handler_pid: pid, handler_ref: ref}
end
# INIT state
defp handle_INIT(:enter, _old_state, state_data) do
debug("[#{__MODULE__}] Initiate HELLO handshake: #{state_data.ip_addr}:#{state_data.port}")
initiate_hello_handshake(state_data)
end
defp handle_INIT(:info, :hello_timeout, state_data) do
close_connection(:hello_handshake_timeout, state_data)
end
defp handle_INIT(:internal, {:openflow, %Openflow.Hello{} = hello}, state_data) do
handle_hello_handshake_1(hello, state_data)
end
defp handle_INIT(:internal, message, _state_data) do
debug("[#{__MODULE__}] Hello handshake in progress, dropping message: #{inspect(message)}")
:keep_state_and_data
end
# CONNECTING state
defp handle_CONNECTING(:enter, :INIT, state_data) do
debug("[#{__MODULE__}] Initiate FEATURES handshake: #{state_data.ip_addr}:#{state_data.port}")
initiate_features_handshake(state_data)
end
defp handle_CONNECTING(:enter, :WAITING, state_data) do
debug("[#{__MODULE__}] Re-entered features handshake")
initiate_features_handshake(state_data)
end
defp handle_CONNECTING(:info, :features_timeout, state_data) do
close_connection(:features_handshake_timeout, state_data)
end
defp handle_CONNECTING(:internal, {:openflow, %Openflow.Features.Reply{} = features}, state_data) do
# TODO: Send to handler
info("[#{__MODULE__}] Switch connected datapath_id: #{features.datapath_id} auxiliary_id: #{features.aux_id}")
_ = maybe_cancel_timer(state_data.timer_ref)
handle_features_handshake(features, state_data)
end
defp handle_CONNECTING(:internal, {:openflow, message}, _state_data) do
debug("[#{__MODULE__}] Features handshake in progress, dropping message: #{inspect(message.__struct__)}")
:keep_state_and_data
end
defp handle_CONNECTING(type, _message, state_data) when type == :cast or type == :call do
{:keep_state, state_data, [{:postpone, true}]}
end
# CONNECTED state
defp handle_CONNECTED(:enter, :CONNECTING, _state_data) do
start_periodic_idle_check()
:keep_state_and_data
end
defp handle_CONNECTED(:info, :idle_check, state_data) do
start_periodic_idle_check()
new_state_data = maybe_ping(state_data)
{:keep_state, new_state_data}
end
defp handle_CONNECTED(:info, :ping_timeout, state_data) do
handle_ping_timeout(state_data, :CONNECTED)
end
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Reply{xid: xid}},
%SecureChannelState{ping_xid: xid} = state_data) do
handle_ping_reply(state_data)
end
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Request{} = echo}, state_data) do
send_echo_reply(echo.xid, echo.data, state_data)
:keep_state_and_data
end
defp handle_CONNECTED(:internal, {:openflow, _message}, _state_data) do
# TODO: Send to handler
:keep_state_and_data
end
defp handle_CONNECTED(:cast, {:send_message, message}, state_data) do
message
|> send_message(state_data)
:keep_state_and_data
end
# WATING state
defp handle_WATING(:enter, :CONNECTING, state_data) do
warn("[#{__MODULE__}] Possible HANG Detected on datapath_id: #{state_data.datapath_id} !")
start_periodic_idle_check()
:keep_state_and_data
end
defp handle_WATING(:info, :idle_check, state_data) do
start_periodic_idle_check()
new_state_data = maybe_ping(state_data)
{:keep_state, new_state_data}
end
defp handle_WATING(:info, :ping_timeout, state_data) do
handle_ping_timeout(state_data, :WAITING)
end
defp handle_WATING(:internal, {:openflow, _message}, state_data) do
# TODO: Send to handler
{:next_state, :CONNECTING, state_data}
end
defp handle_WATING(type, message, state_data)
when type == :cast or type == :call do
debug("[#{__MODULE__}] Postponed: #{inspect(message)}, now WATING")
{:keep_state, state_data, [{:postpone, true}]}
end
defp handle_packet("", state_data, _state, actions) do
{:keep_state, state_data, Enum.reverse(actions)}
end
defp handle_packet(packet, %SecureChannelState{buffer: buffer} = state_data, state, actions) do
binary = <<buffer::bytes, packet::bytes>>
case Openflow.read(binary) do
{:ok, message, leftovers} ->
debug("[#{__MODULE__}] Received: #{inspect(message.__struct__)}(xid: #{message.xid}) in #{state}")
action = {:next_event, :internal, {:openflow, message}}
new_state_data = %{state_data|buffer: "", last_received: :os.timestamp}
handle_packet(leftovers, new_state_data, state, [action|actions])
{:error, :binary_too_small} ->
handle_packet("", %{state_data|buffer: binary}, state, actions)
{:error, _reason} ->
handle_packet("", state_data, state, actions)
end
end
defp initiate_hello_handshake(state_data) do
send_hello(state_data)
ref = :erlang.send_after(@hello_handshake_timeout, self(), :hello_timeout)
{:keep_state, %{state_data|timer_ref: ref}}
end
defp handle_hello_handshake_1(hello, state_data) do
maybe_cancel_timer(state_data.timer_ref)
SecureChannelState.set_transaction_id(state_data.xid, hello.xid)
if Openflow.Hello.supported_version?(hello) do
{:next_state, :CONNECTING, %{state_data|timer_ref: nil}}
else
close_connection(:failed_version_negotiation, state_data)
end
end
defp initiate_features_handshake(state_data) do
new_xid = SecureChannelState.increment_transaction_id(state_data.xid)
send_features(new_xid, state_data)
ref = :erlang.send_after(@features_handshake_timeout, self(), :features_timeout)
{:keep_state, %{state_data|timer_ref: ref}}
end
defp handle_features_handshake(%Openflow.Features.Reply{datapath_id: datapath_id, aux_id: aux_id}, state_data) do
{:ok, _} = SwitchRegistry.register({datapath_id, aux_id})
new_state_data = %{
state_data|
datapath_id: datapath_id,
aux_id: aux_id,
timer_ref: nil,
main_monitor_ref: monitor_connection(datapath_id, aux_id)
}
{:next_state, :CONNECTED, new_state_data}
end
defp monitor_connection(datapath_id, aux_id) when aux_id > 0 do
datapath_id
|> SwitchRegistry.lookup_pid
|> Process.monitor
end
defp monitor_connection(_datapath_id, _aux_id), do: nil
defp send_hello(state_data) do
@supported_version
|> Openflow.Hello.new
|> send_message(state_data)
end
defp send_features(xid, state_data) do
%{Openflow.Features.Request.new|xid: xid}
|> send_message(state_data)
end
defp send_echo_reply(xid, data, state_data) do
%{Openflow.Echo.Reply.new(data)|xid: xid}
|> send_message(state_data)
end
defp send_echo_request(xid, data, state_data) do
%{Openflow.Echo.Request.new(data)|xid: xid}
|> send_message(state_data)
end
defp start_periodic_idle_check do
:erlang.send_after(@ping_interval, self(), :idle_check)
end
defp maybe_ping(state_data) do
case should_be_ping?(state_data) do
true -> send_ping(state_data)
false -> state_data
end
end
defp should_be_ping?(%SecureChannelState{last_received: last_received, aux_id: 0}) do
:timer.now_diff(:os.timestamp(), last_received) > (1000 * @ping_interval)
end
defp should_be_ping?(_) do
false
end
defp send_ping(%SecureChannelState{xid: x_agent} = state_data) do
xid = SecureChannelState.increment_transaction_id(x_agent)
send_echo_request(xid, "", state_data)
ping_ref = :erlang.send_after(@ping_timeout, self(), :ping_timeout)
%{state_data|ping_timer_ref: ping_ref, ping_xid: xid}
end
defp handle_ping_timeout(%SecureChannelState{ping_fail_count: fail_count} = state_data, :CONNECTING)
when fail_count > @ping_fail_max_count do
{:next_state, :WAITING, state_data}
end
defp handle_ping_timeout(%SecureChannelState{ping_fail_count: fail_count} = state_data, :WAITING)
when fail_count > @ping_fail_max_count do
close_connection(:ping_failed, state_data)
end
defp handle_ping_timeout(state_data, _) do
new_state_data = maybe_ping(state_data)
{:keep_state, new_state_data}
end
defp handle_ping_reply(state_data) do
{:keep_state, %{state_data|ping_timer_ref: nil, ping_xid: nil}}
end
defp send_message(message, %SecureChannelState{socket: socket, transport: transport}) do
debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
Tres.Utils.send_message(message, socket, transport)
end
defp maybe_cancel_timer(ref) when not is_reference(ref), do: :ok
defp maybe_cancel_timer(ref) do
:erlang.cancel_timer(ref)
:ok
end
defp handle_signal({:'DOWN', mon_ref, :process, _main_pid, reason},
%SecureChannelState{main_monitor_ref: mon_ref} = state_data) do
close_connection({:main_closed, reason}, state_data)
end
defp handle_signal({:'DOWN', handler_ref, :process, _main_pid, reason},
%SecureChannelState{handler_ref: handler_ref} = state_data) do
close_connection({:handler_down, reason}, state_data)
end
defp handle_signal({:'EXIT', _pid, reason}, state_data) do
close_connection({:trap_detected, reason}, state_data)
end
defp close_connection(:failed_version_negotiation, state_data) do
warn("[#{__MODULE__}] connection terminated: Version negotiation failed")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection(:hello_handshake_timeout, state_data) do
warn("[#{__MODULE__}] connection terminated: Hello handshake timed out")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection(:features_timeout, state_data) do
warn("[#{__MODULE__}] connection terminated: Features handshake timed out")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection(:handler_error, state_data) do
warn("[#{__MODULE__}] connection terminated: Got handler error")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection(:ping_failed, state_data) do
warn("[#{__MODULE__}] connection terminated: Exceeded to max_ping_fail_count")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection({:main_closed, reason}, state_data) do
warn("[#{__MODULE__}] connection terminated: Main connection down by #{reason}")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection({:handler_down, reason}, state_data) do
warn("[#{__MODULE__}] connection terminated: Handler process down by #{reason}")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection({:trap_detected, reason}, state_data) do
warn("[#{__MODULE__}] connection terminated: Handler process down by #{reason}")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection(:tcp_closed, state_data) do
warn("[#{__MODULE__}] connection terminated: TCP Closed by peer")
{:stop, :normal, %{state_data|socket: nil}}
end
defp close_connection({:tcp_error, reason}, state_data) do
warn("[#{__MODULE__}] connection terminated: TCP Error occured: #{reason}")
{:stop, :normal, %{state_data|socket: nil}}
end
end

View file

@ -0,0 +1,51 @@
defmodule Tres.SecureChannelState do
defstruct(
handler_pid: nil,
handler_ref: nil,
ref: nil,
socket: nil,
transport: nil,
buffer: "",
ip_addr: nil,
port: "",
datapath_id: "",
aux_id: "",
timer_ref: nil,
xid: nil,
main_monitor_ref: nil,
ping_xid: 0,
ping_timer_ref: nil,
ping_fail_count: 0,
last_received: 0
)
alias __MODULE__
def new(options) do
ref = Keyword.get(options, :ref)
socket = Keyword.get(options, :socket)
transport = Keyword.get(options, :transport)
{:ok, {ip_addr, port}} = :inet.peername(socket)
{:ok, xid_agent} = Agent.start_link(fn -> 0 end)
%SecureChannelState{
ref: ref,
socket: socket,
transport: transport,
ip_addr: :inet.ntoa(ip_addr),
port: port,
xid: xid_agent
}
end
def increment_transaction_id(xid_agent) do
Agent.get_and_update(xid_agent, &({&1 + 1, &1 + 1}))
end
def set_transaction_id(xid_agent, xid) do
Agent.update(xid_agent, fn(_) -> xid end)
end
def get_transaction_id(xid_agent) do
Agent.get(xid_agent, &(&1))
end
end

View file

@ -0,0 +1,32 @@
defmodule Tres.SwitchRegistry do
def register({_dpid, _aux_id} = datapath_id) do
{:ok, _} = Registry.register(__MODULE__, datapath_id, [])
end
def unregister({_dpid, _aux_id} = datapath_id) do
:ok = Registry.unregister(__MODULE__, datapath_id)
end
def lookup_pid({_dpid, _aux_id} = datapath_id) do
case Registry.lookup(__MODULE__, datapath_id) do
[{pid, _}] -> pid
[] -> nil
end
end
def lookup_pid(datapath_id) do
lookup_pid({datapath_id, 0})
end
def send_message(message, {_dpid, _aux_id} = datapath_id) do
Registry.dispatch(__MODULE__, datapath_id, &dispatch(&1, message))
end
def send_message(message, dpid) when is_binary(dpid) do
send_message(message, {dpid, 0})
end
# private function
defp dispatch(entries, message) do
for {pid, _} <- entries, do: :gen_statem.cast(pid, {:send_message, message})
end
end

26
lib/tres/utils.ex Normal file
View file

@ -0,0 +1,26 @@
defmodule Tres.Utils do
import Logger
@connection_manager Tres.SecureChannel
@default_max_connections 10
@default_num_acceptors 10
@default_openflow_port 6633
def start_openflow_listener do
max_connections = Application.get_env(:tres, :max_connections, @default_max_connections)
num_acceptors = Application.get_env(:tres, :num_acceptors, @default_num_acceptors)
port = Application.get_env(:tres, :port, @default_openflow_port)
options = [max_connections: max_connections, num_acceptors: num_acceptors, port: port]
:ranch.start_listener(Tres, :ranch_tcp, options, @connection_manager, [])
end
def send_message(message, socket, transport) do
try do
packet = Openflow.to_binary(message)
transport.send(socket, packet)
catch
_ ->
error("[#{__MODULE__}] Unencodable error: #{inspect(message)}")
end
end
end