diff --git a/bin/enum_gen b/bin/enum_gen index a9661e7..3a26013 100755 Binary files a/bin/enum_gen and b/bin/enum_gen differ diff --git a/lib/openflow.ex b/lib/openflow.ex index ac30ab8..49b98db 100644 --- a/lib/openflow.ex +++ b/lib/openflow.ex @@ -15,18 +15,22 @@ defmodule Openflow do end def read(<>) do - body_len = len - @ofp_header_size - <> = binary2 + try do + body_len = len - @ofp_header_size + <> = binary2 - result = - type - |> Openflow.Enums.to_atom(:openflow_codec) - |> do_read(body_bin) + result = type + |> Openflow.Enums.to_atom(:openflow_codec) + |> do_read(body_bin) - case result do - {:ok, struct} -> {:ok, %{struct | version: ver, xid: xid}, rest} - {:error, reason} -> {:error, reason} - end + case result do + {:ok, struct} -> {:ok, %{struct | version: ver, xid: xid}, rest} + {:error, reason} -> {:error, reason} + end + catch + _c, _e -> + {:error, :malformed_packet} + end end def to_binary(messages) when is_list(messages) do diff --git a/lib/tres/secure_channel.ex b/lib/tres/secure_channel.ex index f6671ba..91f11be 100644 --- a/lib/tres/secure_channel.ex +++ b/lib/tres/secure_channel.ex @@ -41,7 +41,7 @@ defmodule Tres.SecureChannel do " #{state_data.ip_addr}:#{state_data.port}" <> " on #{inspect(self())}" ) - :gen_statem.enter_loop(__MODULE__, [], :INIT, state_data, []) + :gen_statem.enter_loop(__MODULE__, [debug: []], :INIT, state_data, []) end # TCP handler @@ -50,7 +50,7 @@ defmodule Tres.SecureChannel do {:tcp, socket, packet}, state, %State{socket: socket, transport: transport} = state_data - ) do + ) do transport.setopts(socket, active: :once) handle_packet(packet, state_data, state, []) end @@ -208,6 +208,7 @@ defmodule Tres.SecureChannel do # CONNECTED state defp handle_CONNECTED(:enter, :CONNECTING, state_data) do + _tref = schedule_xactdb_ageout() case init_handler(state_data) do %State{} = new_state_data -> start_periodic_idle_check() @@ -228,6 +229,12 @@ defmodule Tres.SecureChannel do handle_ping_timeout(state_data, :CONNECTED) end + defp handle_CONNECTED(:info, :xactdb_ageout, state_data) do + _num_deleted = XACT_KV.aged_out(state_data.xact_kv_ref) + _tref = schedule_xactdb_ageout() + :keep_state_and_data + end + defp handle_CONNECTED( :internal, {:openflow, %Openflow.Echo.Reply{xid: xid}}, @@ -303,6 +310,12 @@ defmodule Tres.SecureChannel do handle_ping_timeout(state_data, :WAITING) end + defp handle_WATING(:info, :xactdb_ageout, state_data) do + _num_deleted = XACT_KV.aged_out(state_data.xact_kv_ref) + _tref = schedule_xactdb_ageout() + :keep_state_and_data + end + defp handle_WATING(:internal, {:openflow, message}, state_data) do %State{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data send(handler_pid, %{message | datapath_id: dpid, aux_id: aux_id}) @@ -324,26 +337,20 @@ defmodule Tres.SecureChannel do case Openflow.read(binary) do {:ok, message, leftovers} -> - debug( - "[#{__MODULE__}] Received: #{inspect(message.__struct__)}" <> - "(xid: #{message.xid}) in #{state}" - ) - action = {:next_event, :internal, {:openflow, message}} new_state_data = %{state_data | buffer: "", last_received: :os.timestamp()} handle_packet(leftovers, new_state_data, state, [action | actions]) - {:error, :binary_too_small} -> handle_packet("", %{state_data | buffer: binary}, state, actions) - - {:error, _reason} -> - handle_packet("", state_data, state, actions) + {:error, :malformed_packet} -> + :ok = warn("malformed packet received from #{state_data.datapath_id}") + handle_packet("", %{state_data | buffer: ""}, state, actions) end end defp handle_message(_in_xact = true, message, state_data) do case XACT_KV.get(state_data.xact_kv_ref, message.xid) do - [{:xact_entry, _xid, prev_message, _orig, _from} | _] -> + [{:xact_entry, _xid, prev_message, _orig, _from, _inserted_at} | _] -> new_message = Openflow.append_body(prev_message, message) XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message) @@ -364,13 +371,14 @@ defmodule Tres.SecureChannel do pop_action_queue(state_data) end - defp process_xact_entry({:xact_entry, xid, message, _orig, nil}, state_data) do + defp process_xact_entry({:xact_entry, xid, message, _orig, nil, _inserted_at}, state_data) do unless is_nil(message), do: send(state_data.handler_pid, message) XACT_KV.delete(state_data.xact_kv_ref, xid) end - defp process_xact_entry({:xact_entry, xid, message, _orig, from}, state_data) when is_tuple(from) do + defp process_xact_entry({:xact_entry, xid, message, _orig, from, _inserted_at}, state_data) + when is_tuple(from) do reply = if is_nil(message), do: :noreply, else: message - _ = :gen_statem.reply(from, {:ok, reply}) + :ok = :gen_statem.reply(from, {:ok, reply}) XACT_KV.delete(state_data.xact_kv_ref, xid) end @@ -566,6 +574,11 @@ defmodule Tres.SecureChannel do :ok end + @spec schedule_xactdb_ageout() :: reference() + defp schedule_xactdb_ageout do + Process.send_after(self(), :xactdb_ageout, 1_000) + end + defp handle_signal( {:DOWN, mon_ref, :process, _main_pid, reason}, %State{main_monitor_ref: mon_ref} = state_data diff --git a/lib/tres/switch_registry.ex b/lib/tres/switch_registry.ex index 7849058..67be0f0 100644 --- a/lib/tres/switch_registry.ex +++ b/lib/tres/switch_registry.ex @@ -35,7 +35,7 @@ defmodule Tres.SwitchRegistry do def blocking_send_message(message, {_dpid, _aux_id} = datapath_id) do [{pid, _} | _] = Registry.lookup(__MODULE__, datapath_id) - :gen_statem.call(pid, {:send_message, message}, 500) + :gen_statem.call(pid, {:send_message, message}, 5000) catch :exit, {:timeout, _} -> {:error, :timeout} diff --git a/src/tres_xact_kv.erl b/src/tres_xact_kv.erl index 2924dfe..d4b6835 100644 --- a/src/tres_xact_kv.erl +++ b/src/tres_xact_kv.erl @@ -6,13 +6,20 @@ -export([insert/3, insert/4, update/3, get/2, delete/2, is_exists/2, - is_empty/1]). + is_empty/1, aged_out/1]). +-define(AGE_TIME_MS, 1000). -define(TABLE, xact_kv). -define(ENTRY, xact_entry). -define(TABLE_OPTS, [set, protected, {keypos, #?ENTRY.xid}]). --record(?ENTRY, {xid = 0, pending = nil, orig = nil, from = nil}). +-record(?ENTRY, { + xid = 0, + pending = nil, + orig = nil, + from = nil, + inserted_at = 0 + }). -spec create() -> reference(). create() -> @@ -24,7 +31,11 @@ drop(Tid) -> -spec insert(reference(), integer(), map()) -> true. insert(Tid, Xid, Orig) -> - ets:insert(Tid, #?ENTRY{xid = Xid, orig = Orig}). + ets:insert(Tid, #?ENTRY{ + xid = Xid, + orig = Orig, + inserted_at = os:system_time(milli_seconds) + }). -spec insert(reference(), integer(), map(), term()) -> true. insert(Tid, Xid, Orig, From) -> @@ -58,8 +69,17 @@ is_empty(Tid) -> _ -> false end. +-spec aged_out(reference()) -> integer(). +aged_out(Tid) -> + MatchSpec = ms_for_aged_entries(), + ets:select_delete(Tid, MatchSpec). + %% Private functions +ms_for_aged_entries() -> + Now = os:system_time(milli_seconds), + ets:fun2ms(fun(#?ENTRY{inserted_at = T1}) -> Now - T1 > ?AGE_TIME_MS end). + ms_for_exists(Xid) -> ets:fun2ms(fun(#?ENTRY{xid = TXid}) when TXid == Xid -> true end).