diff --git a/bin/enum_gen b/bin/enum_gen index 2332792..a9661e7 100755 Binary files a/bin/enum_gen and b/bin/enum_gen differ diff --git a/lib/tres/controller.ex b/lib/tres/controller.ex index 92d962e..ee48dbb 100644 --- a/lib/tres/controller.ex +++ b/lib/tres/controller.ex @@ -4,6 +4,7 @@ defmodule Tres.Controller do import Tres.SwitchRegistry, only: [ send_message: 2, + blocking_send_message: 2, get_current_xid: 1 ] diff --git a/lib/tres/message_helper.ex b/lib/tres/message_helper.ex index b9b1655..dcb1a02 100644 --- a/lib/tres/message_helper.ex +++ b/lib/tres/message_helper.ex @@ -17,7 +17,7 @@ defmodule Tres.MessageHelper do instructions: options[:instructions] || [] } - send_message(flow_mod, datapath_id) + send_message(flow_mod, datapath_id, Keyword.get(options, :blocking, false)) end defp send_flow_mod_modify(datapath_id, options \\ []) do @@ -36,7 +36,7 @@ defmodule Tres.MessageHelper do instructions: options[:instructions] || [] } - send_message(flow_mod, datapath_id) + send_message(flow_mod, datapath_id, Keyword.get(options, :blocking, false)) end defp send_flow_mod_delete(datapath_id, options \\ []) do @@ -53,7 +53,7 @@ defmodule Tres.MessageHelper do match: options[:match] || Openflow.Match.new() } - send_message(flow_mod, datapath_id) + send_message(flow_mod, datapath_id, Keyword.get(options, :blocking, false)) end defp send_packet_out(datapath_id, options \\ []) do @@ -65,7 +65,7 @@ defmodule Tres.MessageHelper do data: options[:data] || "" } - send_message(packet_out, datapath_id) + send_message(packet_out, datapath_id, Keyword.get(options, :blocking, false)) end defp send_group_mod_add(datapath_id, options \\ []) do @@ -78,7 +78,7 @@ defmodule Tres.MessageHelper do buckets: options[:buckets] || [] ) - send_message(group_mod, datapath_id) + send_message(group_mod, datapath_id, Keyword.get(options, :blocking, false)) end defp send_group_mod_delete(datapath_id, options \\ []) do @@ -89,7 +89,7 @@ defmodule Tres.MessageHelper do group_id: options[:group_id] || :all ) - send_message(group_mod, datapath_id) + send_message(group_mod, datapath_id, Keyword.get(options, :blocking, false)) end defp send_group_mod_modify(datapath_id, options \\ []) do @@ -102,7 +102,7 @@ defmodule Tres.MessageHelper do buckets: options[:buckets] || [] ) - send_message(group_mod, datapath_id) + send_message(group_mod, datapath_id, Keyword.get(options, :blocking, false)) end defp send_role_request(datapath_id, options) do @@ -113,7 +113,7 @@ defmodule Tres.MessageHelper do generation_id: options[:generation_id] || 0 ) - send_message(role_request, datapath_id) + send_message(role_request, datapath_id, Keyword.get(options, :blocking, false)) end end end diff --git a/lib/tres/secure_channel.ex b/lib/tres/secure_channel.ex index eb835ac..f6671ba 100644 --- a/lib/tres/secure_channel.ex +++ b/lib/tres/secure_channel.ex @@ -369,7 +369,8 @@ defmodule Tres.SecureChannel do XACT_KV.delete(state_data.xact_kv_ref, xid) end defp process_xact_entry({:xact_entry, xid, message, _orig, from}, state_data) when is_tuple(from) do - unless is_nil(message), do: :gen_statem.reply(from, message) + reply = if is_nil(message), do: :noreply, else: message + _ = :gen_statem.reply(from, {:ok, reply}) XACT_KV.delete(state_data.xact_kv_ref, xid) end diff --git a/lib/tres/switch_registry.ex b/lib/tres/switch_registry.ex index 78823b7..7849058 100644 --- a/lib/tres/switch_registry.ex +++ b/lib/tres/switch_registry.ex @@ -18,6 +18,13 @@ defmodule Tres.SwitchRegistry do lookup_pid({datapath_id, 0}) end + def send_message(message, dpid, _blocking = true) do + blocking_send_message(message, dpid) + end + def send_message(message, dpid, _blocking) do + send_message(message, dpid) + end + def send_message(message, {_dpid, _aux_id} = datapath_id) do Registry.dispatch(__MODULE__, datapath_id, &do_send_message(&1, message)) end diff --git a/src/tres_xact_kv.erl b/src/tres_xact_kv.erl index fdbf225..2924dfe 100644 --- a/src/tres_xact_kv.erl +++ b/src/tres_xact_kv.erl @@ -3,13 +3,16 @@ -include_lib("stdlib/include/ms_transform.hrl"). -export([create/0, drop/1]). --export([insert/3, update/3, get/2, delete/2, is_exists/2, is_empty/1]). +-export([insert/3, insert/4, + update/3, get/2, + delete/2, is_exists/2, + is_empty/1]). -define(TABLE, xact_kv). -define(ENTRY, xact_entry). -define(TABLE_OPTS, [set, protected, {keypos, #?ENTRY.xid}]). --record(?ENTRY, {xid = 0, pending = nil, orig = nil}). +-record(?ENTRY, {xid = 0, pending = nil, orig = nil, from = nil}). -spec create() -> reference(). create() -> @@ -23,6 +26,10 @@ drop(Tid) -> insert(Tid, Xid, Orig) -> ets:insert(Tid, #?ENTRY{xid = Xid, orig = Orig}). +-spec insert(reference(), integer(), map(), term()) -> true. +insert(Tid, Xid, Orig, From) -> + ets:insert(Tid, #?ENTRY{xid = Xid, orig = Orig, from = From}). + -spec update(reference(), integer(), map()) -> integer(). update(Tid, Xid, #{'__struct__' := 'Elixir.Openflow.ErrorMsg'} = Error) -> ets:select_replace(Tid, ms_for_handle_error(Tid, Xid, Error));