diff --git a/examples/learning_switch/lib/learning_switch/ofctl.ex b/examples/learning_switch/lib/learning_switch/ofctl.ex index db1714c..19c6121 100644 --- a/examples/learning_switch/lib/learning_switch/ofctl.ex +++ b/examples/learning_switch/lib/learning_switch/ofctl.ex @@ -60,6 +60,8 @@ defmodule LearningSwitch.Ofctl do # private functions defp init_datapath(datapath_id) do + SetConfig.new(miss_send_len: :no_buffer) + |> send_message(datapath_id) init_flow_tables(datapath_id) end diff --git a/lib/tres/secure_channel.ex b/lib/tres/secure_channel.ex index 31d0852..dbd7c41 100644 --- a/lib/tres/secure_channel.ex +++ b/lib/tres/secure_channel.ex @@ -5,7 +5,7 @@ defmodule Tres.SecureChannel do alias :tres_xact_kv, as: XACT_KV alias :queue, as: Queue - alias Tres.SecureChannelState + alias Tres.SecureChannelState, as: State alias Tres.SwitchRegistry alias Tres.MessageHandlerSup @@ -43,16 +43,16 @@ defmodule Tres.SecureChannel do # TCP handler def handle_event(:info, {:tcp, socket, packet}, state, - %SecureChannelState{socket: socket, transport: transport} = state_data) do + %State{socket: socket, transport: transport} = state_data) do transport.setopts(socket, [active: :once]) handle_packet(packet, state_data, state, []) end def handle_event(:info, {:tcp_closed, socket}, _state, - %SecureChannelState{socket: socket} = state_data) do + %State{socket: socket} = state_data) do close_connection(:tcp_closed, state_data) end def handle_event(:info, {:tcp_error, socket, reason}, _state, - %SecureChannelState{socket: socket} = state_data) do + %State{socket: socket} = state_data) do close_connection({:tcp_error, reason}, state_data) end def handle_event(:info, {:'DOWN', _ref, :process, _main_pid, _reason} = signal, _state, state_data) do @@ -74,7 +74,8 @@ defmodule Tres.SecureChannel do handle_WATING(type, message, state_data) end - def terminate(reason, state, %SecureChannelState{datapath_id: datapath_id, aux_id: aux_id, xact_kv_ref: kv_ref}) do + def terminate(reason, state, + %State{datapath_id: datapath_id, aux_id: aux_id, xact_kv_ref: kv_ref}) do warn("[#{__MODULE__}] termiate: #{inspect(reason)} state = #{inspect(state)}") true = XACT_KV.drop(kv_ref) :ok = SwitchRegistry.unregister({datapath_id, aux_id}) @@ -86,7 +87,7 @@ defmodule Tres.SecureChannel do init_process(ref) :ok = transport.setopts(socket, [active: :once]) kv_ref = XACT_KV.create - SecureChannelState.new(ref: ref, socket: socket, transport: transport, xact_kv_ref: kv_ref) + State.new(ref: ref, socket: socket, transport: transport, xact_kv_ref: kv_ref) end defp init_process(ref) do @@ -96,7 +97,7 @@ defmodule Tres.SecureChannel do end defp init_handler(state_data) do - %SecureChannelState{datapath_id: dpid, aux_id: aux_id} = state_data + %State{datapath_id: dpid, aux_id: aux_id} = state_data {:ok, pid} = MessageHandlerSup.start_child({dpid, aux_id}) ref = Process.monitor(pid) %{state_data|handler_pid: pid, handler_ref: ref} @@ -104,7 +105,8 @@ defmodule Tres.SecureChannel do # INIT state defp handle_INIT(:enter, _old_state, state_data) do - debug("[#{__MODULE__}] Initiate HELLO handshake: #{state_data.ip_addr}:#{state_data.port}") + debug("[#{__MODULE__}] Initiate HELLO handshake: "<> + ">#{state_data.ip_addr}:#{state_data.port}") initiate_hello_handshake(state_data) end defp handle_INIT(:info, :hello_timeout, state_data) do @@ -114,13 +116,15 @@ defmodule Tres.SecureChannel do handle_hello_handshake_1(hello, state_data) end defp handle_INIT(:internal, message, _state_data) do - debug("[#{__MODULE__}] Hello handshake in progress, dropping message: #{inspect(message)}") + debug("[#{__MODULE__}] Hello handshake in progress, " + <>"dropping message: #{inspect(message)}") :keep_state_and_data end # CONNECTING state defp handle_CONNECTING(:enter, :INIT, state_data) do - debug("[#{__MODULE__}] Initiate FEATURES handshake: #{state_data.ip_addr}:#{state_data.port}") + debug("[#{__MODULE__}] Initiate FEATURES handshake:" + <>" #{state_data.ip_addr}:#{state_data.port}") initiate_features_handshake(state_data) end defp handle_CONNECTING(:enter, :WAITING, state_data) do @@ -131,12 +135,15 @@ defmodule Tres.SecureChannel do close_connection(:features_handshake_timeout, state_data) end defp handle_CONNECTING(:internal, {:openflow, %Openflow.Features.Reply{} = features}, state_data) do - debug("[#{__MODULE__}] Switch connected datapath_id: #{features.datapath_id} auxiliary_id: #{features.aux_id}") + debug("[#{__MODULE__}] Switch connected "<> + "datapath_id: #{features.datapath_id}"<> + " auxiliary_id: #{features.aux_id}") _ = maybe_cancel_timer(state_data.timer_ref) handle_features_handshake(features, state_data) end defp handle_CONNECTING(:internal, {:openflow, message}, _state_data) do - debug("[#{__MODULE__}] Features handshake in progress, dropping message: #{inspect(message.__struct__)}") + debug("[#{__MODULE__}] Features handshake in progress,"<> + " dropping message: #{inspect(message.__struct__)}") :keep_state_and_data end defp handle_CONNECTING(type, _message, state_data) when type == :cast or type == :call do @@ -158,31 +165,19 @@ defmodule Tres.SecureChannel do handle_ping_timeout(state_data, :CONNECTED) end defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Reply{xid: xid}}, - %SecureChannelState{ping_xid: xid} = state_data) do + %State{ping_xid: xid} = state_data) do handle_ping_reply(state_data) end defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Request{} = echo}, state_data) do send_echo_reply(echo.xid, echo.data, state_data) :keep_state_and_data end - defp handle_CONNECTED(:internal, {:openflow, %Openflow.Barrier.Reply{xid: xid}}, state_data) do - for {:xact_entry, _xid, message, _orig} <- XACT_KV.get(state_data.xact_kv_ref, xid) do - unless is_nil(message) do - send(state_data.handler_pid, message) - XACT_KV.delete(state_data.xact_kv_ref, message.xid) - end - end - {next_actions, action_queue} = - case Queue.out(state_data.action_queue) do - {:empty, action_queue} -> - {[], action_queue} - {{:value, next_action}, action_queue} -> - {[{:next_event, :internal, next_action}], action_queue} - end - {:keep_state, %{state_data|action_queue: action_queue}, next_actions} + defp handle_CONNECTED(:internal, {:openflow, %Openflow.Barrier.Reply{} = barrier}, state_data) do + {new_state_data, next_actions} = process_xact(barrier, state_data) + {:keep_state, new_state_data, next_actions} end defp handle_CONNECTED(:internal, {:openflow, message}, state_data) do - %SecureChannelState{datapath_id: dpid, aux_id: aux_id} = state_data + %State{datapath_id: dpid, aux_id: aux_id} = state_data new_message = %{message|datapath_id: dpid, aux_id: aux_id} state_data.xact_kv_ref |> XACT_KV.is_exists(message.xid) @@ -190,46 +185,20 @@ defmodule Tres.SecureChannel do :keep_state_and_data end defp handle_CONNECTED(:internal, {:send_message, message}, state_data) do - xid = SecureChannelState.increment_transaction_id(state_data.xid) - messages = [ - %{message|xid: xid}, - %{Openflow.Barrier.Request.new|xid: xid} - ] - XACT_KV.insert(state_data.xact_kv_ref, xid, message) - send_message(messages, state_data) + xactional_send_message(message, state_data) :keep_state_and_data end defp handle_CONNECTED(:cast, {:send_message, message} = action, state_data) do - if Queue.is_empty(state_data.action_queue) do - xid = SecureChannelState.increment_transaction_id(state_data.xid) - messages = [ - %{message|xid: xid}, - %{Openflow.Barrier.Request.new|xid: xid} - ] - XACT_KV.insert(state_data.xact_kv_ref, xid, message) - send_message(messages, state_data) - end + if Queue.is_empty(state_data.action_queue), + do: xactional_send_message(message, state_data) action_queue = Queue.in(action, state_data.action_queue) {:keep_state, %{state_data|action_queue: action_queue}} end - defp handle_message(_in_xact = true, message, state_data) do - case XACT_KV.get(state_data.xact_kv_ref, message.xid) do - [{:xact_entry, _xid, prev_message, _orig}|_] -> - new_message = Openflow.append_body(prev_message, message) - XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message) - _ -> - XACT_KV.delete(state_data.xact_kv_ref, message.xid) - end - end - defp handle_message(_in_xact = false, message, state_data) do - send(state_data.handler_pid, message) - end - # WATING state defp handle_WATING(:enter, :CONNECTING, state_data) do warn("[#{__MODULE__}] Possible HANG Detected on datapath_id: #{state_data.datapath_id} !") - %SecureChannelState{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data + %State{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data send(handler_pid, {:switch_hang, {dpid, aux_id}}) start_periodic_idle_check() :keep_state_and_data @@ -243,7 +212,7 @@ defmodule Tres.SecureChannel do handle_ping_timeout(state_data, :WAITING) end defp handle_WATING(:internal, {:openflow, message}, state_data) do - %SecureChannelState{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data + %State{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data send(handler_pid, %{message|datapath_id: dpid, aux_id: aux_id}) {:next_state, :CONNECTING, state_data} end @@ -256,11 +225,12 @@ defmodule Tres.SecureChannel do defp handle_packet("", state_data, _state, actions) do {:keep_state, state_data, Enum.reverse(actions)} end - defp handle_packet(packet, %SecureChannelState{buffer: buffer} = state_data, state, actions) do + defp handle_packet(packet, %State{buffer: buffer} = state_data, state, actions) do binary = <> case Openflow.read(binary) do {:ok, message, leftovers} -> - debug("[#{__MODULE__}] Received: #{inspect(message.__struct__)}(xid: #{message.xid}) in #{state}") + debug("[#{__MODULE__}] Received: #{inspect(message.__struct__)}"<> + "(xid: #{message.xid}) in #{state}") action = {:next_event, :internal, {:openflow, message}} new_state_data = %{state_data|buffer: "", last_received: :os.timestamp} handle_packet(leftovers, new_state_data, state, [action|actions]) @@ -271,6 +241,41 @@ defmodule Tres.SecureChannel do end end + defp handle_message(_in_xact = true, message, state_data) do + case XACT_KV.get(state_data.xact_kv_ref, message.xid) do + [{:xact_entry, _xid, prev_message, _orig}|_] -> + new_message = Openflow.append_body(prev_message, message) + XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message) + _ -> + XACT_KV.delete(state_data.xact_kv_ref, message.xid) + end + end + defp handle_message(_in_xact = false, message, state_data), + do: send(state_data.handler_pid, message) + + defp process_xact(%Openflow.Barrier.Reply{xid: xid}, state_data) do + :ok = state_data.xact_kv_ref + |> XACT_KV.get(xid) + |> Enum.each(&process_xact_entry(&1, state_data)) + pop_action_queue(state_data) + end + + defp process_xact_entry({:xact_entry, xid, message, _orig}, state_data) do + unless is_nil(message), do: send(state_data.handler_pid, message) + XACT_KV.delete(state_data.xact_kv_ref, xid) + end + + defp pop_action_queue(%State{action_queue: queue} = state_data) do + {next_actions, new_queue} = + case Queue.out(queue) do + {:empty, action_queue} -> + {[], action_queue} + {{:value, next_action}, action_queue} -> + {[{:next_event, :internal, next_action}], action_queue} + end + {%{state_data|action_queue: new_queue}, next_actions} + end + defp initiate_hello_handshake(state_data) do send_hello(state_data) ref = :erlang.send_after(@hello_handshake_timeout, self(), :hello_timeout) @@ -279,7 +284,7 @@ defmodule Tres.SecureChannel do defp handle_hello_handshake_1(hello, state_data) do maybe_cancel_timer(state_data.timer_ref) - SecureChannelState.set_transaction_id(state_data.xid, hello.xid) + State.set_transaction_id(state_data.xid, hello.xid) if Openflow.Hello.supported_version?(hello) do {:next_state, :CONNECTING, %{state_data|timer_ref: nil}} else @@ -288,13 +293,14 @@ defmodule Tres.SecureChannel do end defp initiate_features_handshake(state_data) do - new_xid = SecureChannelState.increment_transaction_id(state_data.xid) + new_xid = State.increment_transaction_id(state_data.xid) send_features(new_xid, state_data) ref = :erlang.send_after(@features_handshake_timeout, self(), :features_timeout) {:keep_state, %{state_data|timer_ref: ref}} end - defp handle_features_handshake(%Openflow.Features.Reply{datapath_id: datapath_id, aux_id: aux_id}, state_data) do + defp handle_features_handshake(%Openflow.Features.Reply{datapath_id: datapath_id, + aux_id: aux_id}, state_data) do {:ok, _} = SwitchRegistry.register({datapath_id, aux_id}) new_state_data = %{ state_data| @@ -343,25 +349,25 @@ defmodule Tres.SecureChannel do end end - defp should_be_ping?(%SecureChannelState{last_received: last_received, aux_id: 0}) do + defp should_be_ping?(%State{last_received: last_received, aux_id: 0}) do :timer.now_diff(:os.timestamp(), last_received) > (1000 * @ping_interval) end defp should_be_ping?(_) do false end - defp send_ping(%SecureChannelState{xid: x_agent} = state_data) do - xid = SecureChannelState.increment_transaction_id(x_agent) + defp send_ping(%State{xid: x_agent} = state_data) do + xid = State.increment_transaction_id(x_agent) send_echo_request(xid, "", state_data) ping_ref = :erlang.send_after(@ping_timeout, self(), :ping_timeout) %{state_data|ping_timer_ref: ping_ref, ping_xid: xid} end - defp handle_ping_timeout(%SecureChannelState{ping_fail_count: fail_count} = state_data, :CONNECTING) + defp handle_ping_timeout(%State{ping_fail_count: fail_count} = state_data, :CONNECTING) when fail_count > @ping_fail_max_count do {:next_state, :WAITING, state_data} end - defp handle_ping_timeout(%SecureChannelState{ping_fail_count: fail_count} = state_data, :WAITING) + defp handle_ping_timeout(%State{ping_fail_count: fail_count} = state_data, :WAITING) when fail_count > @ping_fail_max_count do close_connection(:ping_failed, state_data) end @@ -374,8 +380,23 @@ defmodule Tres.SecureChannel do {:keep_state, %{state_data|ping_timer_ref: nil, ping_xid: nil}} end - defp send_message(message, %SecureChannelState{socket: socket, transport: transport}) do - #debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})") + defp xactional_send_message(message, state_data) do + xid = State.increment_transaction_id(state_data.xid) + messages = [ + %{message|xid: xid}, + %{Openflow.Barrier.Request.new|xid: xid} + ] + XACT_KV.insert(state_data.xact_kv_ref, xid, message) + send_message(messages, state_data) + end + + defp send_message(message, %State{socket: socket, transport: transport}) do + if is_list(message) do + for message <- message, + do: debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})") + else + debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})") + end Tres.Utils.send_message(message, socket, transport) end @@ -386,11 +407,11 @@ defmodule Tres.SecureChannel do end defp handle_signal({:'DOWN', mon_ref, :process, _main_pid, reason}, - %SecureChannelState{main_monitor_ref: mon_ref} = state_data) do + %State{main_monitor_ref: mon_ref} = state_data) do close_connection({:main_closed, reason}, state_data) end defp handle_signal({:'DOWN', handler_ref, :process, _main_pid, reason}, - %SecureChannelState{handler_ref: handler_ref} = state_data) do + %State{handler_ref: handler_ref} = state_data) do close_connection({:handler_down, reason}, state_data) end defp handle_signal({:'EXIT', _pid, reason}, state_data) do