diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b30d9f..c311f82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## develop (unreleased) +### New Features +* tres/secure_channel: Add a new function for blocking send + ### Bugs Fixed * tres/secure_channel: Fix to interpolate terms when logging - diff --git a/bin/enum_gen b/bin/enum_gen index 6390ace..2332792 100755 Binary files a/bin/enum_gen and b/bin/enum_gen differ diff --git a/lib/tres/secure_channel.ex b/lib/tres/secure_channel.ex index 9df19d3..eb835ac 100644 --- a/lib/tres/secure_channel.ex +++ b/lib/tres/secure_channel.ex @@ -262,6 +262,11 @@ defmodule Tres.SecureChannel do :keep_state_and_data end + defp handle_CONNECTED({:call, from}, {:send_message, message}, state_data) do + xactional_send_message({from, message}, state_data) + :keep_state_and_data + end + defp handle_CONNECTED({:call, from}, :get_xid, state_data) do xid = State.get_transaction_id(state_data.xid) {:keep_state_and_data, [{:reply, from, {:ok, xid}}]} @@ -338,7 +343,7 @@ defmodule Tres.SecureChannel do defp handle_message(_in_xact = true, message, state_data) do case XACT_KV.get(state_data.xact_kv_ref, message.xid) do - [{:xact_entry, _xid, prev_message, _orig} | _] -> + [{:xact_entry, _xid, prev_message, _orig, _from} | _] -> new_message = Openflow.append_body(prev_message, message) XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message) @@ -359,10 +364,14 @@ defmodule Tres.SecureChannel do pop_action_queue(state_data) end - defp process_xact_entry({:xact_entry, xid, message, _orig}, state_data) do + defp process_xact_entry({:xact_entry, xid, message, _orig, nil}, state_data) do unless is_nil(message), do: send(state_data.handler_pid, message) XACT_KV.delete(state_data.xact_kv_ref, xid) end + defp process_xact_entry({:xact_entry, xid, message, _orig, from}, state_data) when is_tuple(from) do + unless is_nil(message), do: :gen_statem.reply(from, message) + XACT_KV.delete(state_data.xact_kv_ref, xid) + end defp pop_action_queue(%State{action_queue: queue} = state_data) do {next_actions, new_queue} = @@ -513,6 +522,30 @@ defmodule Tres.SecureChannel do send_message(messages, state_data) end + defp xactional_send_message({from, %{xid: 0} = message}, state_data) do + xid = State.increment_transaction_id(state_data.xid) + + messages = [ + %{message | xid: xid}, + %{Openflow.Barrier.Request.new() | xid: xid} + ] + + XACT_KV.insert(state_data.xact_kv_ref, xid, message, from) + send_message(messages, state_data) + end + + defp xactional_send_message({from, %{xid: xid} = message}, state_data) do + _ = State.set_transaction_id(state_data.xid, xid) + + messages = [ + %{message | xid: xid}, + %{Openflow.Barrier.Request.new() | xid: xid} + ] + + XACT_KV.insert(state_data.xact_kv_ref, xid, message, from) + send_message(messages, state_data) + end + defp send_message(message, %State{socket: socket, transport: transport}) do if is_list(message) do for message <- message, diff --git a/lib/tres/switch_registry.ex b/lib/tres/switch_registry.ex index e901fe5..78823b7 100644 --- a/lib/tres/switch_registry.ex +++ b/lib/tres/switch_registry.ex @@ -26,6 +26,18 @@ defmodule Tres.SwitchRegistry do send_message(message, {dpid, 0}) end + def blocking_send_message(message, {_dpid, _aux_id} = datapath_id) do + [{pid, _} | _] = Registry.lookup(__MODULE__, datapath_id) + :gen_statem.call(pid, {:send_message, message}, 500) + catch + :exit, {:timeout, _} -> + {:error, :timeout} + end + + def blocking_send_message(message, dpid) when is_binary(dpid) do + blocking_send_message(message, {dpid, 0}) + end + def get_current_xid({_dpid, _aux_id} = datapath_id) do [{pid, _} | _] = Registry.lookup(__MODULE__, datapath_id) :gen_statem.call(pid, :get_xid, 1000)