tres/secure_channel: Add a new feature for blocking sending

This commit is contained in:
Eishun Kondoh 2018-05-02 02:49:58 +09:00
parent 516bb182bb
commit bc3c65cfa9
6 changed files with 27 additions and 11 deletions

Binary file not shown.

View file

@ -4,6 +4,7 @@ defmodule Tres.Controller do
import Tres.SwitchRegistry, import Tres.SwitchRegistry,
only: [ only: [
send_message: 2, send_message: 2,
blocking_send_message: 2,
get_current_xid: 1 get_current_xid: 1
] ]

View file

@ -17,7 +17,7 @@ defmodule Tres.MessageHelper do
instructions: options[:instructions] || [] instructions: options[:instructions] || []
} }
send_message(flow_mod, datapath_id) send_message(flow_mod, datapath_id, Keyword.get(options, :blocking, false))
end end
defp send_flow_mod_modify(datapath_id, options \\ []) do defp send_flow_mod_modify(datapath_id, options \\ []) do
@ -36,7 +36,7 @@ defmodule Tres.MessageHelper do
instructions: options[:instructions] || [] instructions: options[:instructions] || []
} }
send_message(flow_mod, datapath_id) send_message(flow_mod, datapath_id, Keyword.get(options, :blocking, false))
end end
defp send_flow_mod_delete(datapath_id, options \\ []) do defp send_flow_mod_delete(datapath_id, options \\ []) do
@ -53,7 +53,7 @@ defmodule Tres.MessageHelper do
match: options[:match] || Openflow.Match.new() match: options[:match] || Openflow.Match.new()
} }
send_message(flow_mod, datapath_id) send_message(flow_mod, datapath_id, Keyword.get(options, :blocking, false))
end end
defp send_packet_out(datapath_id, options \\ []) do defp send_packet_out(datapath_id, options \\ []) do
@ -65,7 +65,7 @@ defmodule Tres.MessageHelper do
data: options[:data] || "" data: options[:data] || ""
} }
send_message(packet_out, datapath_id) send_message(packet_out, datapath_id, Keyword.get(options, :blocking, false))
end end
defp send_group_mod_add(datapath_id, options \\ []) do defp send_group_mod_add(datapath_id, options \\ []) do
@ -78,7 +78,7 @@ defmodule Tres.MessageHelper do
buckets: options[:buckets] || [] buckets: options[:buckets] || []
) )
send_message(group_mod, datapath_id) send_message(group_mod, datapath_id, Keyword.get(options, :blocking, false))
end end
defp send_group_mod_delete(datapath_id, options \\ []) do defp send_group_mod_delete(datapath_id, options \\ []) do
@ -89,7 +89,7 @@ defmodule Tres.MessageHelper do
group_id: options[:group_id] || :all group_id: options[:group_id] || :all
) )
send_message(group_mod, datapath_id) send_message(group_mod, datapath_id, Keyword.get(options, :blocking, false))
end end
defp send_group_mod_modify(datapath_id, options \\ []) do defp send_group_mod_modify(datapath_id, options \\ []) do
@ -102,7 +102,7 @@ defmodule Tres.MessageHelper do
buckets: options[:buckets] || [] buckets: options[:buckets] || []
) )
send_message(group_mod, datapath_id) send_message(group_mod, datapath_id, Keyword.get(options, :blocking, false))
end end
defp send_role_request(datapath_id, options) do defp send_role_request(datapath_id, options) do
@ -113,7 +113,7 @@ defmodule Tres.MessageHelper do
generation_id: options[:generation_id] || 0 generation_id: options[:generation_id] || 0
) )
send_message(role_request, datapath_id) send_message(role_request, datapath_id, Keyword.get(options, :blocking, false))
end end
end end
end end

View file

@ -369,7 +369,8 @@ defmodule Tres.SecureChannel do
XACT_KV.delete(state_data.xact_kv_ref, xid) XACT_KV.delete(state_data.xact_kv_ref, xid)
end end
defp process_xact_entry({:xact_entry, xid, message, _orig, from}, state_data) when is_tuple(from) do defp process_xact_entry({:xact_entry, xid, message, _orig, from}, state_data) when is_tuple(from) do
unless is_nil(message), do: :gen_statem.reply(from, message) reply = if is_nil(message), do: :noreply, else: message
_ = :gen_statem.reply(from, {:ok, reply})
XACT_KV.delete(state_data.xact_kv_ref, xid) XACT_KV.delete(state_data.xact_kv_ref, xid)
end end

View file

@ -18,6 +18,13 @@ defmodule Tres.SwitchRegistry do
lookup_pid({datapath_id, 0}) lookup_pid({datapath_id, 0})
end end
def send_message(message, dpid, _blocking = true) do
blocking_send_message(message, dpid)
end
def send_message(message, dpid, _blocking) do
send_message(message, dpid)
end
def send_message(message, {_dpid, _aux_id} = datapath_id) do def send_message(message, {_dpid, _aux_id} = datapath_id) do
Registry.dispatch(__MODULE__, datapath_id, &do_send_message(&1, message)) Registry.dispatch(__MODULE__, datapath_id, &do_send_message(&1, message))
end end

View file

@ -3,13 +3,16 @@
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-export([create/0, drop/1]). -export([create/0, drop/1]).
-export([insert/3, update/3, get/2, delete/2, is_exists/2, is_empty/1]). -export([insert/3, insert/4,
update/3, get/2,
delete/2, is_exists/2,
is_empty/1]).
-define(TABLE, xact_kv). -define(TABLE, xact_kv).
-define(ENTRY, xact_entry). -define(ENTRY, xact_entry).
-define(TABLE_OPTS, [set, protected, {keypos, #?ENTRY.xid}]). -define(TABLE_OPTS, [set, protected, {keypos, #?ENTRY.xid}]).
-record(?ENTRY, {xid = 0, pending = nil, orig = nil}). -record(?ENTRY, {xid = 0, pending = nil, orig = nil, from = nil}).
-spec create() -> reference(). -spec create() -> reference().
create() -> create() ->
@ -23,6 +26,10 @@ drop(Tid) ->
insert(Tid, Xid, Orig) -> insert(Tid, Xid, Orig) ->
ets:insert(Tid, #?ENTRY{xid = Xid, orig = Orig}). ets:insert(Tid, #?ENTRY{xid = Xid, orig = Orig}).
-spec insert(reference(), integer(), map(), term()) -> true.
insert(Tid, Xid, Orig, From) ->
ets:insert(Tid, #?ENTRY{xid = Xid, orig = Orig, from = From}).
-spec update(reference(), integer(), map()) -> integer(). -spec update(reference(), integer(), map()) -> integer().
update(Tid, Xid, #{'__struct__' := 'Elixir.Openflow.ErrorMsg'} = Error) -> update(Tid, Xid, #{'__struct__' := 'Elixir.Openflow.ErrorMsg'} = Error) ->
ets:select_replace(Tid, ms_for_handle_error(Tid, Xid, Error)); ets:select_replace(Tid, ms_for_handle_error(Tid, Xid, Error));