Implement Openflow Protocol and Callback system
This commit is contained in:
parent
fc02a678de
commit
e52fe31b79
48 changed files with 937 additions and 244 deletions
|
|
@ -3,6 +3,7 @@ defmodule Tres.SecureChannel do
|
|||
|
||||
import Logger
|
||||
|
||||
alias :tres_xact_kv, as: XACT_KV
|
||||
alias Tres.SecureChannelState
|
||||
alias Tres.SwitchRegistry
|
||||
alias Tres.MessageHandlerSup
|
||||
|
|
@ -32,12 +33,11 @@ defmodule Tres.SecureChannel do
|
|||
end
|
||||
|
||||
def init([ref, socket, transport, _opts]) do
|
||||
state_data =
|
||||
ref
|
||||
|> init_secure_channel(socket, transport)
|
||||
|> init_handler
|
||||
info("[#{__MODULE__}] TCP connected to Switch on #{state_data.ip_addr}:#{state_data.port} on #{inspect(self())}")
|
||||
:gen_statem.enter_loop(__MODULE__, [debug: []], :INIT, state_data, [])
|
||||
state_data = init_secure_channel(ref, socket, transport)
|
||||
debug("[#{__MODULE__}] TCP connected to Switch on"
|
||||
<> " #{state_data.ip_addr}:#{state_data.port}"
|
||||
<> " on #{inspect(self())}")
|
||||
:gen_statem.enter_loop(__MODULE__, [debug: [:debug]], :INIT, state_data, [])
|
||||
end
|
||||
|
||||
# TCP handler
|
||||
|
|
@ -73,8 +73,9 @@ defmodule Tres.SecureChannel do
|
|||
handle_WATING(type, message, state_data)
|
||||
end
|
||||
|
||||
def terminate(reason, state, %SecureChannelState{datapath_id: datapath_id, aux_id: aux_id}) do
|
||||
def terminate(reason, state, %SecureChannelState{datapath_id: datapath_id, aux_id: aux_id, xact_kv_ref: kv_ref}) do
|
||||
warn("[#{__MODULE__}] termiate: #{inspect(reason)} state = #{inspect(state)}")
|
||||
true = XACT_KV.drop(kv_ref)
|
||||
:ok = SwitchRegistry.unregister({datapath_id, aux_id})
|
||||
end
|
||||
|
||||
|
|
@ -83,7 +84,8 @@ defmodule Tres.SecureChannel do
|
|||
defp init_secure_channel(ref, socket, transport) do
|
||||
init_process(ref)
|
||||
:ok = transport.setopts(socket, [active: :once])
|
||||
SecureChannelState.new(ref: ref, socket: socket, transport: transport)
|
||||
kv_ref = XACT_KV.create
|
||||
SecureChannelState.new(ref: ref, socket: socket, transport: transport, xact_kv_ref: kv_ref)
|
||||
end
|
||||
|
||||
defp init_process(ref) do
|
||||
|
|
@ -93,8 +95,8 @@ defmodule Tres.SecureChannel do
|
|||
end
|
||||
|
||||
defp init_handler(state_data) do
|
||||
%SecureChannelState{ip_addr: ip_addr, port: port} = state_data
|
||||
{:ok, pid} = MessageHandlerSup.start_child({ip_addr, port})
|
||||
%SecureChannelState{datapath_id: dpid, aux_id: aux_id} = state_data
|
||||
{:ok, pid} = MessageHandlerSup.start_child({dpid, aux_id})
|
||||
ref = Process.monitor(pid)
|
||||
%{state_data|handler_pid: pid, handler_ref: ref}
|
||||
end
|
||||
|
|
@ -128,8 +130,7 @@ defmodule Tres.SecureChannel do
|
|||
close_connection(:features_handshake_timeout, state_data)
|
||||
end
|
||||
defp handle_CONNECTING(:internal, {:openflow, %Openflow.Features.Reply{} = features}, state_data) do
|
||||
# TODO: Send to handler
|
||||
info("[#{__MODULE__}] Switch connected datapath_id: #{features.datapath_id} auxiliary_id: #{features.aux_id}")
|
||||
debug("[#{__MODULE__}] Switch connected datapath_id: #{features.datapath_id} auxiliary_id: #{features.aux_id}")
|
||||
_ = maybe_cancel_timer(state_data.timer_ref)
|
||||
handle_features_handshake(features, state_data)
|
||||
end
|
||||
|
|
@ -142,9 +143,10 @@ defmodule Tres.SecureChannel do
|
|||
end
|
||||
|
||||
# CONNECTED state
|
||||
defp handle_CONNECTED(:enter, :CONNECTING, _state_data) do
|
||||
defp handle_CONNECTED(:enter, :CONNECTING, state_data) do
|
||||
new_state_data = init_handler(state_data)
|
||||
start_periodic_idle_check()
|
||||
:keep_state_and_data
|
||||
{:keep_state, new_state_data}
|
||||
end
|
||||
defp handle_CONNECTED(:info, :idle_check, state_data) do
|
||||
start_periodic_idle_check()
|
||||
|
|
@ -162,19 +164,48 @@ defmodule Tres.SecureChannel do
|
|||
send_echo_reply(echo.xid, echo.data, state_data)
|
||||
:keep_state_and_data
|
||||
end
|
||||
defp handle_CONNECTED(:internal, {:openflow, _message}, _state_data) do
|
||||
# TODO: Send to handler
|
||||
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Barrier.Reply{xid: xid}}, state_data) do
|
||||
for {:xact_entry, _xid, message, _orig} <- XACT_KV.get(state_data.xact_kv_ref, xid) do
|
||||
unless is_nil(message) do
|
||||
send(state_data.handler_pid, message)
|
||||
XACT_KV.delete(state_data.xact_kv_ref, message.xid)
|
||||
end
|
||||
end
|
||||
:keep_state_and_data
|
||||
end
|
||||
defp handle_CONNECTED(:internal, {:openflow, message}, state_data) do
|
||||
%SecureChannelState{datapath_id: dpid, aux_id: aux_id} = state_data
|
||||
new_message = %{message|datapath_id: dpid, aux_id: aux_id}
|
||||
state_data.xact_kv_ref
|
||||
|> XACT_KV.is_exists(message.xid)
|
||||
|> handle_message(new_message, state_data)
|
||||
:keep_state_and_data
|
||||
end
|
||||
defp handle_CONNECTED(:cast, {:send_message, message}, state_data) do
|
||||
message
|
||||
|> send_message(state_data)
|
||||
xid = SecureChannelState.increment_transaction_id(state_data.xid)
|
||||
messages = [
|
||||
%{message|xid: xid},
|
||||
%{Openflow.Barrier.Request.new|xid: xid}
|
||||
]
|
||||
XACT_KV.insert(state_data.xact_kv_ref, xid, message)
|
||||
send_message(messages, state_data)
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
defp handle_message(_in_xact = true, message, state_data) do
|
||||
[{:xact_entry, _xid, prev_message, _orig}|_] = XACT_KV.get(state_data.xact_kv_ref, message.xid)
|
||||
new_message = Openflow.append_body(prev_message, message)
|
||||
XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message)
|
||||
end
|
||||
defp handle_message(_in_xact = false, message, %SecureChannelState{handler_pid: handler_pid}) do
|
||||
send(handler_pid, message)
|
||||
end
|
||||
|
||||
# WATING state
|
||||
defp handle_WATING(:enter, :CONNECTING, state_data) do
|
||||
warn("[#{__MODULE__}] Possible HANG Detected on datapath_id: #{state_data.datapath_id} !")
|
||||
%SecureChannelState{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data
|
||||
send(handler_pid, {:switch_hang, {dpid, aux_id}})
|
||||
start_periodic_idle_check()
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
|
@ -186,8 +217,9 @@ defmodule Tres.SecureChannel do
|
|||
defp handle_WATING(:info, :ping_timeout, state_data) do
|
||||
handle_ping_timeout(state_data, :WAITING)
|
||||
end
|
||||
defp handle_WATING(:internal, {:openflow, _message}, state_data) do
|
||||
# TODO: Send to handler
|
||||
defp handle_WATING(:internal, {:openflow, message}, state_data) do
|
||||
%SecureChannelState{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data
|
||||
send(handler_pid, %{message|datapath_id: dpid, aux_id: aux_id})
|
||||
{:next_state, :CONNECTING, state_data}
|
||||
end
|
||||
defp handle_WATING(type, message, state_data)
|
||||
|
|
@ -249,12 +281,10 @@ defmodule Tres.SecureChannel do
|
|||
{:next_state, :CONNECTED, new_state_data}
|
||||
end
|
||||
|
||||
defp monitor_connection(datapath_id, aux_id) when aux_id > 0 do
|
||||
datapath_id
|
||||
|> SwitchRegistry.lookup_pid
|
||||
|> Process.monitor
|
||||
end
|
||||
defp monitor_connection(_datapath_id, _aux_id), do: nil
|
||||
defp monitor_connection(datapath_id, aux_id) when aux_id > 0,
|
||||
do: SwitchRegistry.monitor(datapath_id)
|
||||
defp monitor_connection(_datapath_id, _aux_id),
|
||||
do: nil
|
||||
|
||||
defp send_hello(state_data) do
|
||||
@supported_version
|
||||
|
|
@ -320,7 +350,7 @@ defmodule Tres.SecureChannel do
|
|||
end
|
||||
|
||||
defp send_message(message, %SecureChannelState{socket: socket, transport: transport}) do
|
||||
debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||
#debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||
Tres.Utils.send_message(message, socket, transport)
|
||||
end
|
||||
|
||||
|
|
@ -371,7 +401,7 @@ defmodule Tres.SecureChannel do
|
|||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
end
|
||||
defp close_connection({:trap_detected, reason}, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Handler process down by #{reason}")
|
||||
warn("[#{__MODULE__}] connection terminated: Trapped by #{reason}")
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
end
|
||||
defp close_connection(:tcp_closed, state_data) do
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue