Refactored SecureChannel
This commit is contained in:
parent
f2768496c8
commit
d454d7124f
2 changed files with 97 additions and 74 deletions
|
|
@ -60,6 +60,8 @@ defmodule LearningSwitch.Ofctl do
|
||||||
# private functions
|
# private functions
|
||||||
|
|
||||||
defp init_datapath(datapath_id) do
|
defp init_datapath(datapath_id) do
|
||||||
|
SetConfig.new(miss_send_len: :no_buffer)
|
||||||
|
|> send_message(datapath_id)
|
||||||
init_flow_tables(datapath_id)
|
init_flow_tables(datapath_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ defmodule Tres.SecureChannel do
|
||||||
|
|
||||||
alias :tres_xact_kv, as: XACT_KV
|
alias :tres_xact_kv, as: XACT_KV
|
||||||
alias :queue, as: Queue
|
alias :queue, as: Queue
|
||||||
alias Tres.SecureChannelState
|
alias Tres.SecureChannelState, as: State
|
||||||
alias Tres.SwitchRegistry
|
alias Tres.SwitchRegistry
|
||||||
alias Tres.MessageHandlerSup
|
alias Tres.MessageHandlerSup
|
||||||
|
|
||||||
|
|
@ -43,16 +43,16 @@ defmodule Tres.SecureChannel do
|
||||||
|
|
||||||
# TCP handler
|
# TCP handler
|
||||||
def handle_event(:info, {:tcp, socket, packet}, state,
|
def handle_event(:info, {:tcp, socket, packet}, state,
|
||||||
%SecureChannelState{socket: socket, transport: transport} = state_data) do
|
%State{socket: socket, transport: transport} = state_data) do
|
||||||
transport.setopts(socket, [active: :once])
|
transport.setopts(socket, [active: :once])
|
||||||
handle_packet(packet, state_data, state, [])
|
handle_packet(packet, state_data, state, [])
|
||||||
end
|
end
|
||||||
def handle_event(:info, {:tcp_closed, socket}, _state,
|
def handle_event(:info, {:tcp_closed, socket}, _state,
|
||||||
%SecureChannelState{socket: socket} = state_data) do
|
%State{socket: socket} = state_data) do
|
||||||
close_connection(:tcp_closed, state_data)
|
close_connection(:tcp_closed, state_data)
|
||||||
end
|
end
|
||||||
def handle_event(:info, {:tcp_error, socket, reason}, _state,
|
def handle_event(:info, {:tcp_error, socket, reason}, _state,
|
||||||
%SecureChannelState{socket: socket} = state_data) do
|
%State{socket: socket} = state_data) do
|
||||||
close_connection({:tcp_error, reason}, state_data)
|
close_connection({:tcp_error, reason}, state_data)
|
||||||
end
|
end
|
||||||
def handle_event(:info, {:'DOWN', _ref, :process, _main_pid, _reason} = signal, _state, state_data) do
|
def handle_event(:info, {:'DOWN', _ref, :process, _main_pid, _reason} = signal, _state, state_data) do
|
||||||
|
|
@ -74,7 +74,8 @@ defmodule Tres.SecureChannel do
|
||||||
handle_WATING(type, message, state_data)
|
handle_WATING(type, message, state_data)
|
||||||
end
|
end
|
||||||
|
|
||||||
def terminate(reason, state, %SecureChannelState{datapath_id: datapath_id, aux_id: aux_id, xact_kv_ref: kv_ref}) do
|
def terminate(reason, state,
|
||||||
|
%State{datapath_id: datapath_id, aux_id: aux_id, xact_kv_ref: kv_ref}) do
|
||||||
warn("[#{__MODULE__}] termiate: #{inspect(reason)} state = #{inspect(state)}")
|
warn("[#{__MODULE__}] termiate: #{inspect(reason)} state = #{inspect(state)}")
|
||||||
true = XACT_KV.drop(kv_ref)
|
true = XACT_KV.drop(kv_ref)
|
||||||
:ok = SwitchRegistry.unregister({datapath_id, aux_id})
|
:ok = SwitchRegistry.unregister({datapath_id, aux_id})
|
||||||
|
|
@ -86,7 +87,7 @@ defmodule Tres.SecureChannel do
|
||||||
init_process(ref)
|
init_process(ref)
|
||||||
:ok = transport.setopts(socket, [active: :once])
|
:ok = transport.setopts(socket, [active: :once])
|
||||||
kv_ref = XACT_KV.create
|
kv_ref = XACT_KV.create
|
||||||
SecureChannelState.new(ref: ref, socket: socket, transport: transport, xact_kv_ref: kv_ref)
|
State.new(ref: ref, socket: socket, transport: transport, xact_kv_ref: kv_ref)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp init_process(ref) do
|
defp init_process(ref) do
|
||||||
|
|
@ -96,7 +97,7 @@ defmodule Tres.SecureChannel do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp init_handler(state_data) do
|
defp init_handler(state_data) do
|
||||||
%SecureChannelState{datapath_id: dpid, aux_id: aux_id} = state_data
|
%State{datapath_id: dpid, aux_id: aux_id} = state_data
|
||||||
{:ok, pid} = MessageHandlerSup.start_child({dpid, aux_id})
|
{:ok, pid} = MessageHandlerSup.start_child({dpid, aux_id})
|
||||||
ref = Process.monitor(pid)
|
ref = Process.monitor(pid)
|
||||||
%{state_data|handler_pid: pid, handler_ref: ref}
|
%{state_data|handler_pid: pid, handler_ref: ref}
|
||||||
|
|
@ -104,7 +105,8 @@ defmodule Tres.SecureChannel do
|
||||||
|
|
||||||
# INIT state
|
# INIT state
|
||||||
defp handle_INIT(:enter, _old_state, state_data) do
|
defp handle_INIT(:enter, _old_state, state_data) do
|
||||||
debug("[#{__MODULE__}] Initiate HELLO handshake: #{state_data.ip_addr}:#{state_data.port}")
|
debug("[#{__MODULE__}] Initiate HELLO handshake: "<>
|
||||||
|
">#{state_data.ip_addr}:#{state_data.port}")
|
||||||
initiate_hello_handshake(state_data)
|
initiate_hello_handshake(state_data)
|
||||||
end
|
end
|
||||||
defp handle_INIT(:info, :hello_timeout, state_data) do
|
defp handle_INIT(:info, :hello_timeout, state_data) do
|
||||||
|
|
@ -114,13 +116,15 @@ defmodule Tres.SecureChannel do
|
||||||
handle_hello_handshake_1(hello, state_data)
|
handle_hello_handshake_1(hello, state_data)
|
||||||
end
|
end
|
||||||
defp handle_INIT(:internal, message, _state_data) do
|
defp handle_INIT(:internal, message, _state_data) do
|
||||||
debug("[#{__MODULE__}] Hello handshake in progress, dropping message: #{inspect(message)}")
|
debug("[#{__MODULE__}] Hello handshake in progress, "
|
||||||
|
<>"dropping message: #{inspect(message)}")
|
||||||
:keep_state_and_data
|
:keep_state_and_data
|
||||||
end
|
end
|
||||||
|
|
||||||
# CONNECTING state
|
# CONNECTING state
|
||||||
defp handle_CONNECTING(:enter, :INIT, state_data) do
|
defp handle_CONNECTING(:enter, :INIT, state_data) do
|
||||||
debug("[#{__MODULE__}] Initiate FEATURES handshake: #{state_data.ip_addr}:#{state_data.port}")
|
debug("[#{__MODULE__}] Initiate FEATURES handshake:"
|
||||||
|
<>" #{state_data.ip_addr}:#{state_data.port}")
|
||||||
initiate_features_handshake(state_data)
|
initiate_features_handshake(state_data)
|
||||||
end
|
end
|
||||||
defp handle_CONNECTING(:enter, :WAITING, state_data) do
|
defp handle_CONNECTING(:enter, :WAITING, state_data) do
|
||||||
|
|
@ -131,12 +135,15 @@ defmodule Tres.SecureChannel do
|
||||||
close_connection(:features_handshake_timeout, state_data)
|
close_connection(:features_handshake_timeout, state_data)
|
||||||
end
|
end
|
||||||
defp handle_CONNECTING(:internal, {:openflow, %Openflow.Features.Reply{} = features}, state_data) do
|
defp handle_CONNECTING(:internal, {:openflow, %Openflow.Features.Reply{} = features}, state_data) do
|
||||||
debug("[#{__MODULE__}] Switch connected datapath_id: #{features.datapath_id} auxiliary_id: #{features.aux_id}")
|
debug("[#{__MODULE__}] Switch connected "<>
|
||||||
|
"datapath_id: #{features.datapath_id}"<>
|
||||||
|
" auxiliary_id: #{features.aux_id}")
|
||||||
_ = maybe_cancel_timer(state_data.timer_ref)
|
_ = maybe_cancel_timer(state_data.timer_ref)
|
||||||
handle_features_handshake(features, state_data)
|
handle_features_handshake(features, state_data)
|
||||||
end
|
end
|
||||||
defp handle_CONNECTING(:internal, {:openflow, message}, _state_data) do
|
defp handle_CONNECTING(:internal, {:openflow, message}, _state_data) do
|
||||||
debug("[#{__MODULE__}] Features handshake in progress, dropping message: #{inspect(message.__struct__)}")
|
debug("[#{__MODULE__}] Features handshake in progress,"<>
|
||||||
|
" dropping message: #{inspect(message.__struct__)}")
|
||||||
:keep_state_and_data
|
:keep_state_and_data
|
||||||
end
|
end
|
||||||
defp handle_CONNECTING(type, _message, state_data) when type == :cast or type == :call do
|
defp handle_CONNECTING(type, _message, state_data) when type == :cast or type == :call do
|
||||||
|
|
@ -158,31 +165,19 @@ defmodule Tres.SecureChannel do
|
||||||
handle_ping_timeout(state_data, :CONNECTED)
|
handle_ping_timeout(state_data, :CONNECTED)
|
||||||
end
|
end
|
||||||
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Reply{xid: xid}},
|
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Reply{xid: xid}},
|
||||||
%SecureChannelState{ping_xid: xid} = state_data) do
|
%State{ping_xid: xid} = state_data) do
|
||||||
handle_ping_reply(state_data)
|
handle_ping_reply(state_data)
|
||||||
end
|
end
|
||||||
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Request{} = echo}, state_data) do
|
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Request{} = echo}, state_data) do
|
||||||
send_echo_reply(echo.xid, echo.data, state_data)
|
send_echo_reply(echo.xid, echo.data, state_data)
|
||||||
:keep_state_and_data
|
:keep_state_and_data
|
||||||
end
|
end
|
||||||
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Barrier.Reply{xid: xid}}, state_data) do
|
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Barrier.Reply{} = barrier}, state_data) do
|
||||||
for {:xact_entry, _xid, message, _orig} <- XACT_KV.get(state_data.xact_kv_ref, xid) do
|
{new_state_data, next_actions} = process_xact(barrier, state_data)
|
||||||
unless is_nil(message) do
|
{:keep_state, new_state_data, next_actions}
|
||||||
send(state_data.handler_pid, message)
|
|
||||||
XACT_KV.delete(state_data.xact_kv_ref, message.xid)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
{next_actions, action_queue} =
|
|
||||||
case Queue.out(state_data.action_queue) do
|
|
||||||
{:empty, action_queue} ->
|
|
||||||
{[], action_queue}
|
|
||||||
{{:value, next_action}, action_queue} ->
|
|
||||||
{[{:next_event, :internal, next_action}], action_queue}
|
|
||||||
end
|
|
||||||
{:keep_state, %{state_data|action_queue: action_queue}, next_actions}
|
|
||||||
end
|
end
|
||||||
defp handle_CONNECTED(:internal, {:openflow, message}, state_data) do
|
defp handle_CONNECTED(:internal, {:openflow, message}, state_data) do
|
||||||
%SecureChannelState{datapath_id: dpid, aux_id: aux_id} = state_data
|
%State{datapath_id: dpid, aux_id: aux_id} = state_data
|
||||||
new_message = %{message|datapath_id: dpid, aux_id: aux_id}
|
new_message = %{message|datapath_id: dpid, aux_id: aux_id}
|
||||||
state_data.xact_kv_ref
|
state_data.xact_kv_ref
|
||||||
|> XACT_KV.is_exists(message.xid)
|
|> XACT_KV.is_exists(message.xid)
|
||||||
|
|
@ -190,46 +185,20 @@ defmodule Tres.SecureChannel do
|
||||||
:keep_state_and_data
|
:keep_state_and_data
|
||||||
end
|
end
|
||||||
defp handle_CONNECTED(:internal, {:send_message, message}, state_data) do
|
defp handle_CONNECTED(:internal, {:send_message, message}, state_data) do
|
||||||
xid = SecureChannelState.increment_transaction_id(state_data.xid)
|
xactional_send_message(message, state_data)
|
||||||
messages = [
|
|
||||||
%{message|xid: xid},
|
|
||||||
%{Openflow.Barrier.Request.new|xid: xid}
|
|
||||||
]
|
|
||||||
XACT_KV.insert(state_data.xact_kv_ref, xid, message)
|
|
||||||
send_message(messages, state_data)
|
|
||||||
:keep_state_and_data
|
:keep_state_and_data
|
||||||
end
|
end
|
||||||
defp handle_CONNECTED(:cast, {:send_message, message} = action, state_data) do
|
defp handle_CONNECTED(:cast, {:send_message, message} = action, state_data) do
|
||||||
if Queue.is_empty(state_data.action_queue) do
|
if Queue.is_empty(state_data.action_queue),
|
||||||
xid = SecureChannelState.increment_transaction_id(state_data.xid)
|
do: xactional_send_message(message, state_data)
|
||||||
messages = [
|
|
||||||
%{message|xid: xid},
|
|
||||||
%{Openflow.Barrier.Request.new|xid: xid}
|
|
||||||
]
|
|
||||||
XACT_KV.insert(state_data.xact_kv_ref, xid, message)
|
|
||||||
send_message(messages, state_data)
|
|
||||||
end
|
|
||||||
action_queue = Queue.in(action, state_data.action_queue)
|
action_queue = Queue.in(action, state_data.action_queue)
|
||||||
{:keep_state, %{state_data|action_queue: action_queue}}
|
{:keep_state, %{state_data|action_queue: action_queue}}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp handle_message(_in_xact = true, message, state_data) do
|
|
||||||
case XACT_KV.get(state_data.xact_kv_ref, message.xid) do
|
|
||||||
[{:xact_entry, _xid, prev_message, _orig}|_] ->
|
|
||||||
new_message = Openflow.append_body(prev_message, message)
|
|
||||||
XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message)
|
|
||||||
_ ->
|
|
||||||
XACT_KV.delete(state_data.xact_kv_ref, message.xid)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
defp handle_message(_in_xact = false, message, state_data) do
|
|
||||||
send(state_data.handler_pid, message)
|
|
||||||
end
|
|
||||||
|
|
||||||
# WATING state
|
# WATING state
|
||||||
defp handle_WATING(:enter, :CONNECTING, state_data) do
|
defp handle_WATING(:enter, :CONNECTING, state_data) do
|
||||||
warn("[#{__MODULE__}] Possible HANG Detected on datapath_id: #{state_data.datapath_id} !")
|
warn("[#{__MODULE__}] Possible HANG Detected on datapath_id: #{state_data.datapath_id} !")
|
||||||
%SecureChannelState{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data
|
%State{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data
|
||||||
send(handler_pid, {:switch_hang, {dpid, aux_id}})
|
send(handler_pid, {:switch_hang, {dpid, aux_id}})
|
||||||
start_periodic_idle_check()
|
start_periodic_idle_check()
|
||||||
:keep_state_and_data
|
:keep_state_and_data
|
||||||
|
|
@ -243,7 +212,7 @@ defmodule Tres.SecureChannel do
|
||||||
handle_ping_timeout(state_data, :WAITING)
|
handle_ping_timeout(state_data, :WAITING)
|
||||||
end
|
end
|
||||||
defp handle_WATING(:internal, {:openflow, message}, state_data) do
|
defp handle_WATING(:internal, {:openflow, message}, state_data) do
|
||||||
%SecureChannelState{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data
|
%State{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data
|
||||||
send(handler_pid, %{message|datapath_id: dpid, aux_id: aux_id})
|
send(handler_pid, %{message|datapath_id: dpid, aux_id: aux_id})
|
||||||
{:next_state, :CONNECTING, state_data}
|
{:next_state, :CONNECTING, state_data}
|
||||||
end
|
end
|
||||||
|
|
@ -256,11 +225,12 @@ defmodule Tres.SecureChannel do
|
||||||
defp handle_packet("", state_data, _state, actions) do
|
defp handle_packet("", state_data, _state, actions) do
|
||||||
{:keep_state, state_data, Enum.reverse(actions)}
|
{:keep_state, state_data, Enum.reverse(actions)}
|
||||||
end
|
end
|
||||||
defp handle_packet(packet, %SecureChannelState{buffer: buffer} = state_data, state, actions) do
|
defp handle_packet(packet, %State{buffer: buffer} = state_data, state, actions) do
|
||||||
binary = <<buffer::bytes, packet::bytes>>
|
binary = <<buffer::bytes, packet::bytes>>
|
||||||
case Openflow.read(binary) do
|
case Openflow.read(binary) do
|
||||||
{:ok, message, leftovers} ->
|
{:ok, message, leftovers} ->
|
||||||
debug("[#{__MODULE__}] Received: #{inspect(message.__struct__)}(xid: #{message.xid}) in #{state}")
|
debug("[#{__MODULE__}] Received: #{inspect(message.__struct__)}"<>
|
||||||
|
"(xid: #{message.xid}) in #{state}")
|
||||||
action = {:next_event, :internal, {:openflow, message}}
|
action = {:next_event, :internal, {:openflow, message}}
|
||||||
new_state_data = %{state_data|buffer: "", last_received: :os.timestamp}
|
new_state_data = %{state_data|buffer: "", last_received: :os.timestamp}
|
||||||
handle_packet(leftovers, new_state_data, state, [action|actions])
|
handle_packet(leftovers, new_state_data, state, [action|actions])
|
||||||
|
|
@ -271,6 +241,41 @@ defmodule Tres.SecureChannel do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp handle_message(_in_xact = true, message, state_data) do
|
||||||
|
case XACT_KV.get(state_data.xact_kv_ref, message.xid) do
|
||||||
|
[{:xact_entry, _xid, prev_message, _orig}|_] ->
|
||||||
|
new_message = Openflow.append_body(prev_message, message)
|
||||||
|
XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message)
|
||||||
|
_ ->
|
||||||
|
XACT_KV.delete(state_data.xact_kv_ref, message.xid)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
defp handle_message(_in_xact = false, message, state_data),
|
||||||
|
do: send(state_data.handler_pid, message)
|
||||||
|
|
||||||
|
defp process_xact(%Openflow.Barrier.Reply{xid: xid}, state_data) do
|
||||||
|
:ok = state_data.xact_kv_ref
|
||||||
|
|> XACT_KV.get(xid)
|
||||||
|
|> Enum.each(&process_xact_entry(&1, state_data))
|
||||||
|
pop_action_queue(state_data)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp process_xact_entry({:xact_entry, xid, message, _orig}, state_data) do
|
||||||
|
unless is_nil(message), do: send(state_data.handler_pid, message)
|
||||||
|
XACT_KV.delete(state_data.xact_kv_ref, xid)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp pop_action_queue(%State{action_queue: queue} = state_data) do
|
||||||
|
{next_actions, new_queue} =
|
||||||
|
case Queue.out(queue) do
|
||||||
|
{:empty, action_queue} ->
|
||||||
|
{[], action_queue}
|
||||||
|
{{:value, next_action}, action_queue} ->
|
||||||
|
{[{:next_event, :internal, next_action}], action_queue}
|
||||||
|
end
|
||||||
|
{%{state_data|action_queue: new_queue}, next_actions}
|
||||||
|
end
|
||||||
|
|
||||||
defp initiate_hello_handshake(state_data) do
|
defp initiate_hello_handshake(state_data) do
|
||||||
send_hello(state_data)
|
send_hello(state_data)
|
||||||
ref = :erlang.send_after(@hello_handshake_timeout, self(), :hello_timeout)
|
ref = :erlang.send_after(@hello_handshake_timeout, self(), :hello_timeout)
|
||||||
|
|
@ -279,7 +284,7 @@ defmodule Tres.SecureChannel do
|
||||||
|
|
||||||
defp handle_hello_handshake_1(hello, state_data) do
|
defp handle_hello_handshake_1(hello, state_data) do
|
||||||
maybe_cancel_timer(state_data.timer_ref)
|
maybe_cancel_timer(state_data.timer_ref)
|
||||||
SecureChannelState.set_transaction_id(state_data.xid, hello.xid)
|
State.set_transaction_id(state_data.xid, hello.xid)
|
||||||
if Openflow.Hello.supported_version?(hello) do
|
if Openflow.Hello.supported_version?(hello) do
|
||||||
{:next_state, :CONNECTING, %{state_data|timer_ref: nil}}
|
{:next_state, :CONNECTING, %{state_data|timer_ref: nil}}
|
||||||
else
|
else
|
||||||
|
|
@ -288,13 +293,14 @@ defmodule Tres.SecureChannel do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp initiate_features_handshake(state_data) do
|
defp initiate_features_handshake(state_data) do
|
||||||
new_xid = SecureChannelState.increment_transaction_id(state_data.xid)
|
new_xid = State.increment_transaction_id(state_data.xid)
|
||||||
send_features(new_xid, state_data)
|
send_features(new_xid, state_data)
|
||||||
ref = :erlang.send_after(@features_handshake_timeout, self(), :features_timeout)
|
ref = :erlang.send_after(@features_handshake_timeout, self(), :features_timeout)
|
||||||
{:keep_state, %{state_data|timer_ref: ref}}
|
{:keep_state, %{state_data|timer_ref: ref}}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp handle_features_handshake(%Openflow.Features.Reply{datapath_id: datapath_id, aux_id: aux_id}, state_data) do
|
defp handle_features_handshake(%Openflow.Features.Reply{datapath_id: datapath_id,
|
||||||
|
aux_id: aux_id}, state_data) do
|
||||||
{:ok, _} = SwitchRegistry.register({datapath_id, aux_id})
|
{:ok, _} = SwitchRegistry.register({datapath_id, aux_id})
|
||||||
new_state_data = %{
|
new_state_data = %{
|
||||||
state_data|
|
state_data|
|
||||||
|
|
@ -343,25 +349,25 @@ defmodule Tres.SecureChannel do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp should_be_ping?(%SecureChannelState{last_received: last_received, aux_id: 0}) do
|
defp should_be_ping?(%State{last_received: last_received, aux_id: 0}) do
|
||||||
:timer.now_diff(:os.timestamp(), last_received) > (1000 * @ping_interval)
|
:timer.now_diff(:os.timestamp(), last_received) > (1000 * @ping_interval)
|
||||||
end
|
end
|
||||||
defp should_be_ping?(_) do
|
defp should_be_ping?(_) do
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
|
|
||||||
defp send_ping(%SecureChannelState{xid: x_agent} = state_data) do
|
defp send_ping(%State{xid: x_agent} = state_data) do
|
||||||
xid = SecureChannelState.increment_transaction_id(x_agent)
|
xid = State.increment_transaction_id(x_agent)
|
||||||
send_echo_request(xid, "", state_data)
|
send_echo_request(xid, "", state_data)
|
||||||
ping_ref = :erlang.send_after(@ping_timeout, self(), :ping_timeout)
|
ping_ref = :erlang.send_after(@ping_timeout, self(), :ping_timeout)
|
||||||
%{state_data|ping_timer_ref: ping_ref, ping_xid: xid}
|
%{state_data|ping_timer_ref: ping_ref, ping_xid: xid}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp handle_ping_timeout(%SecureChannelState{ping_fail_count: fail_count} = state_data, :CONNECTING)
|
defp handle_ping_timeout(%State{ping_fail_count: fail_count} = state_data, :CONNECTING)
|
||||||
when fail_count > @ping_fail_max_count do
|
when fail_count > @ping_fail_max_count do
|
||||||
{:next_state, :WAITING, state_data}
|
{:next_state, :WAITING, state_data}
|
||||||
end
|
end
|
||||||
defp handle_ping_timeout(%SecureChannelState{ping_fail_count: fail_count} = state_data, :WAITING)
|
defp handle_ping_timeout(%State{ping_fail_count: fail_count} = state_data, :WAITING)
|
||||||
when fail_count > @ping_fail_max_count do
|
when fail_count > @ping_fail_max_count do
|
||||||
close_connection(:ping_failed, state_data)
|
close_connection(:ping_failed, state_data)
|
||||||
end
|
end
|
||||||
|
|
@ -374,8 +380,23 @@ defmodule Tres.SecureChannel do
|
||||||
{:keep_state, %{state_data|ping_timer_ref: nil, ping_xid: nil}}
|
{:keep_state, %{state_data|ping_timer_ref: nil, ping_xid: nil}}
|
||||||
end
|
end
|
||||||
|
|
||||||
defp send_message(message, %SecureChannelState{socket: socket, transport: transport}) do
|
defp xactional_send_message(message, state_data) do
|
||||||
#debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
xid = State.increment_transaction_id(state_data.xid)
|
||||||
|
messages = [
|
||||||
|
%{message|xid: xid},
|
||||||
|
%{Openflow.Barrier.Request.new|xid: xid}
|
||||||
|
]
|
||||||
|
XACT_KV.insert(state_data.xact_kv_ref, xid, message)
|
||||||
|
send_message(messages, state_data)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp send_message(message, %State{socket: socket, transport: transport}) do
|
||||||
|
if is_list(message) do
|
||||||
|
for message <- message,
|
||||||
|
do: debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||||
|
else
|
||||||
|
debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||||
|
end
|
||||||
Tres.Utils.send_message(message, socket, transport)
|
Tres.Utils.send_message(message, socket, transport)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -386,11 +407,11 @@ defmodule Tres.SecureChannel do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp handle_signal({:'DOWN', mon_ref, :process, _main_pid, reason},
|
defp handle_signal({:'DOWN', mon_ref, :process, _main_pid, reason},
|
||||||
%SecureChannelState{main_monitor_ref: mon_ref} = state_data) do
|
%State{main_monitor_ref: mon_ref} = state_data) do
|
||||||
close_connection({:main_closed, reason}, state_data)
|
close_connection({:main_closed, reason}, state_data)
|
||||||
end
|
end
|
||||||
defp handle_signal({:'DOWN', handler_ref, :process, _main_pid, reason},
|
defp handle_signal({:'DOWN', handler_ref, :process, _main_pid, reason},
|
||||||
%SecureChannelState{handler_ref: handler_ref} = state_data) do
|
%State{handler_ref: handler_ref} = state_data) do
|
||||||
close_connection({:handler_down, reason}, state_data)
|
close_connection({:handler_down, reason}, state_data)
|
||||||
end
|
end
|
||||||
defp handle_signal({:'EXIT', _pid, reason}, state_data) do
|
defp handle_signal({:'EXIT', _pid, reason}, state_data) do
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue