tres/secure_channel: Fix to avoid crash when receive malformed packet

This commit is contained in:
Eishun Kondoh 2018-05-02 15:39:58 +09:00
parent bc3c65cfa9
commit 6bba512271
5 changed files with 66 additions and 29 deletions

Binary file not shown.

View file

@ -15,18 +15,22 @@ defmodule Openflow do
end end
def read(<<ver::8, type::8, len::16, xid::32, binary2::bytes>>) do def read(<<ver::8, type::8, len::16, xid::32, binary2::bytes>>) do
body_len = len - @ofp_header_size try do
<<body_bin::bytes-size(body_len), rest::bytes>> = binary2 body_len = len - @ofp_header_size
<<body_bin::bytes-size(body_len), rest::bytes>> = binary2
result = result = type
type |> Openflow.Enums.to_atom(:openflow_codec)
|> Openflow.Enums.to_atom(:openflow_codec) |> do_read(body_bin)
|> do_read(body_bin)
case result do case result do
{:ok, struct} -> {:ok, %{struct | version: ver, xid: xid}, rest} {:ok, struct} -> {:ok, %{struct | version: ver, xid: xid}, rest}
{:error, reason} -> {:error, reason} {:error, reason} -> {:error, reason}
end end
catch
_c, _e ->
{:error, :malformed_packet}
end
end end
def to_binary(messages) when is_list(messages) do def to_binary(messages) when is_list(messages) do

View file

@ -41,7 +41,7 @@ defmodule Tres.SecureChannel do
" #{state_data.ip_addr}:#{state_data.port}" <> " on #{inspect(self())}" " #{state_data.ip_addr}:#{state_data.port}" <> " on #{inspect(self())}"
) )
:gen_statem.enter_loop(__MODULE__, [], :INIT, state_data, []) :gen_statem.enter_loop(__MODULE__, [debug: []], :INIT, state_data, [])
end end
# TCP handler # TCP handler
@ -50,7 +50,7 @@ defmodule Tres.SecureChannel do
{:tcp, socket, packet}, {:tcp, socket, packet},
state, state,
%State{socket: socket, transport: transport} = state_data %State{socket: socket, transport: transport} = state_data
) do ) do
transport.setopts(socket, active: :once) transport.setopts(socket, active: :once)
handle_packet(packet, state_data, state, []) handle_packet(packet, state_data, state, [])
end end
@ -208,6 +208,7 @@ defmodule Tres.SecureChannel do
# CONNECTED state # CONNECTED state
defp handle_CONNECTED(:enter, :CONNECTING, state_data) do defp handle_CONNECTED(:enter, :CONNECTING, state_data) do
_tref = schedule_xactdb_ageout()
case init_handler(state_data) do case init_handler(state_data) do
%State{} = new_state_data -> %State{} = new_state_data ->
start_periodic_idle_check() start_periodic_idle_check()
@ -228,6 +229,12 @@ defmodule Tres.SecureChannel do
handle_ping_timeout(state_data, :CONNECTED) handle_ping_timeout(state_data, :CONNECTED)
end end
defp handle_CONNECTED(:info, :xactdb_ageout, state_data) do
_num_deleted = XACT_KV.aged_out(state_data.xact_kv_ref)
_tref = schedule_xactdb_ageout()
:keep_state_and_data
end
defp handle_CONNECTED( defp handle_CONNECTED(
:internal, :internal,
{:openflow, %Openflow.Echo.Reply{xid: xid}}, {:openflow, %Openflow.Echo.Reply{xid: xid}},
@ -303,6 +310,12 @@ defmodule Tres.SecureChannel do
handle_ping_timeout(state_data, :WAITING) handle_ping_timeout(state_data, :WAITING)
end end
defp handle_WATING(:info, :xactdb_ageout, state_data) do
_num_deleted = XACT_KV.aged_out(state_data.xact_kv_ref)
_tref = schedule_xactdb_ageout()
:keep_state_and_data
end
defp handle_WATING(:internal, {:openflow, message}, state_data) do defp handle_WATING(:internal, {:openflow, message}, state_data) do
%State{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data %State{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data
send(handler_pid, %{message | datapath_id: dpid, aux_id: aux_id}) send(handler_pid, %{message | datapath_id: dpid, aux_id: aux_id})
@ -324,26 +337,20 @@ defmodule Tres.SecureChannel do
case Openflow.read(binary) do case Openflow.read(binary) do
{:ok, message, leftovers} -> {:ok, message, leftovers} ->
debug(
"[#{__MODULE__}] Received: #{inspect(message.__struct__)}" <>
"(xid: #{message.xid}) in #{state}"
)
action = {:next_event, :internal, {:openflow, message}} action = {:next_event, :internal, {:openflow, message}}
new_state_data = %{state_data | buffer: "", last_received: :os.timestamp()} new_state_data = %{state_data | buffer: "", last_received: :os.timestamp()}
handle_packet(leftovers, new_state_data, state, [action | actions]) handle_packet(leftovers, new_state_data, state, [action | actions])
{:error, :binary_too_small} -> {:error, :binary_too_small} ->
handle_packet("", %{state_data | buffer: binary}, state, actions) handle_packet("", %{state_data | buffer: binary}, state, actions)
{:error, :malformed_packet} ->
{:error, _reason} -> :ok = warn("malformed packet received from #{state_data.datapath_id}")
handle_packet("", state_data, state, actions) handle_packet("", %{state_data | buffer: ""}, state, actions)
end end
end end
defp handle_message(_in_xact = true, message, state_data) do defp handle_message(_in_xact = true, message, state_data) do
case XACT_KV.get(state_data.xact_kv_ref, message.xid) do case XACT_KV.get(state_data.xact_kv_ref, message.xid) do
[{:xact_entry, _xid, prev_message, _orig, _from} | _] -> [{:xact_entry, _xid, prev_message, _orig, _from, _inserted_at} | _] ->
new_message = Openflow.append_body(prev_message, message) new_message = Openflow.append_body(prev_message, message)
XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message) XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message)
@ -364,13 +371,14 @@ defmodule Tres.SecureChannel do
pop_action_queue(state_data) pop_action_queue(state_data)
end end
defp process_xact_entry({:xact_entry, xid, message, _orig, nil}, state_data) do defp process_xact_entry({:xact_entry, xid, message, _orig, nil, _inserted_at}, state_data) do
unless is_nil(message), do: send(state_data.handler_pid, message) unless is_nil(message), do: send(state_data.handler_pid, message)
XACT_KV.delete(state_data.xact_kv_ref, xid) XACT_KV.delete(state_data.xact_kv_ref, xid)
end end
defp process_xact_entry({:xact_entry, xid, message, _orig, from}, state_data) when is_tuple(from) do defp process_xact_entry({:xact_entry, xid, message, _orig, from, _inserted_at}, state_data)
when is_tuple(from) do
reply = if is_nil(message), do: :noreply, else: message reply = if is_nil(message), do: :noreply, else: message
_ = :gen_statem.reply(from, {:ok, reply}) :ok = :gen_statem.reply(from, {:ok, reply})
XACT_KV.delete(state_data.xact_kv_ref, xid) XACT_KV.delete(state_data.xact_kv_ref, xid)
end end
@ -566,6 +574,11 @@ defmodule Tres.SecureChannel do
:ok :ok
end end
@spec schedule_xactdb_ageout() :: reference()
defp schedule_xactdb_ageout do
Process.send_after(self(), :xactdb_ageout, 1_000)
end
defp handle_signal( defp handle_signal(
{:DOWN, mon_ref, :process, _main_pid, reason}, {:DOWN, mon_ref, :process, _main_pid, reason},
%State{main_monitor_ref: mon_ref} = state_data %State{main_monitor_ref: mon_ref} = state_data

View file

@ -35,7 +35,7 @@ defmodule Tres.SwitchRegistry do
def blocking_send_message(message, {_dpid, _aux_id} = datapath_id) do def blocking_send_message(message, {_dpid, _aux_id} = datapath_id) do
[{pid, _} | _] = Registry.lookup(__MODULE__, datapath_id) [{pid, _} | _] = Registry.lookup(__MODULE__, datapath_id)
:gen_statem.call(pid, {:send_message, message}, 500) :gen_statem.call(pid, {:send_message, message}, 5000)
catch catch
:exit, {:timeout, _} -> :exit, {:timeout, _} ->
{:error, :timeout} {:error, :timeout}

View file

@ -6,13 +6,20 @@
-export([insert/3, insert/4, -export([insert/3, insert/4,
update/3, get/2, update/3, get/2,
delete/2, is_exists/2, delete/2, is_exists/2,
is_empty/1]). is_empty/1, aged_out/1]).
-define(AGE_TIME_MS, 1000).
-define(TABLE, xact_kv). -define(TABLE, xact_kv).
-define(ENTRY, xact_entry). -define(ENTRY, xact_entry).
-define(TABLE_OPTS, [set, protected, {keypos, #?ENTRY.xid}]). -define(TABLE_OPTS, [set, protected, {keypos, #?ENTRY.xid}]).
-record(?ENTRY, {xid = 0, pending = nil, orig = nil, from = nil}). -record(?ENTRY, {
xid = 0,
pending = nil,
orig = nil,
from = nil,
inserted_at = 0
}).
-spec create() -> reference(). -spec create() -> reference().
create() -> create() ->
@ -24,7 +31,11 @@ drop(Tid) ->
-spec insert(reference(), integer(), map()) -> true. -spec insert(reference(), integer(), map()) -> true.
insert(Tid, Xid, Orig) -> insert(Tid, Xid, Orig) ->
ets:insert(Tid, #?ENTRY{xid = Xid, orig = Orig}). ets:insert(Tid, #?ENTRY{
xid = Xid,
orig = Orig,
inserted_at = os:system_time(milli_seconds)
}).
-spec insert(reference(), integer(), map(), term()) -> true. -spec insert(reference(), integer(), map(), term()) -> true.
insert(Tid, Xid, Orig, From) -> insert(Tid, Xid, Orig, From) ->
@ -58,8 +69,17 @@ is_empty(Tid) ->
_ -> false _ -> false
end. end.
-spec aged_out(reference()) -> integer().
aged_out(Tid) ->
MatchSpec = ms_for_aged_entries(),
ets:select_delete(Tid, MatchSpec).
%% Private functions %% Private functions
ms_for_aged_entries() ->
Now = os:system_time(milli_seconds),
ets:fun2ms(fun(#?ENTRY{inserted_at = T1}) -> Now - T1 > ?AGE_TIME_MS end).
ms_for_exists(Xid) -> ms_for_exists(Xid) ->
ets:fun2ms(fun(#?ENTRY{xid = TXid}) when TXid == Xid -> true end). ets:fun2ms(fun(#?ENTRY{xid = TXid}) when TXid == Xid -> true end).