tres/secure_channel: Add a new feature for blocking sending
This commit is contained in:
parent
2f5c59c649
commit
31b113af24
4 changed files with 50 additions and 3 deletions
|
|
@ -262,6 +262,11 @@ defmodule Tres.SecureChannel do
|
|||
:keep_state_and_data
|
||||
end
|
||||
|
||||
defp handle_CONNECTED({:call, from}, {:send_message, message}, state_data) do
|
||||
xactional_send_message({from, message}, state_data)
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
defp handle_CONNECTED({:call, from}, :get_xid, state_data) do
|
||||
xid = State.get_transaction_id(state_data.xid)
|
||||
{:keep_state_and_data, [{:reply, from, {:ok, xid}}]}
|
||||
|
|
@ -338,7 +343,7 @@ defmodule Tres.SecureChannel do
|
|||
|
||||
defp handle_message(_in_xact = true, message, state_data) do
|
||||
case XACT_KV.get(state_data.xact_kv_ref, message.xid) do
|
||||
[{:xact_entry, _xid, prev_message, _orig} | _] ->
|
||||
[{:xact_entry, _xid, prev_message, _orig, _from} | _] ->
|
||||
new_message = Openflow.append_body(prev_message, message)
|
||||
XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message)
|
||||
|
||||
|
|
@ -359,10 +364,14 @@ defmodule Tres.SecureChannel do
|
|||
pop_action_queue(state_data)
|
||||
end
|
||||
|
||||
defp process_xact_entry({:xact_entry, xid, message, _orig}, state_data) do
|
||||
defp process_xact_entry({:xact_entry, xid, message, _orig, nil}, state_data) do
|
||||
unless is_nil(message), do: send(state_data.handler_pid, message)
|
||||
XACT_KV.delete(state_data.xact_kv_ref, xid)
|
||||
end
|
||||
defp process_xact_entry({:xact_entry, xid, message, _orig, from}, state_data) when is_tuple(from) do
|
||||
unless is_nil(message), do: :gen_statem.reply(from, message)
|
||||
XACT_KV.delete(state_data.xact_kv_ref, xid)
|
||||
end
|
||||
|
||||
defp pop_action_queue(%State{action_queue: queue} = state_data) do
|
||||
{next_actions, new_queue} =
|
||||
|
|
@ -513,6 +522,30 @@ defmodule Tres.SecureChannel do
|
|||
send_message(messages, state_data)
|
||||
end
|
||||
|
||||
defp xactional_send_message({from, %{xid: 0} = message}, state_data) do
|
||||
xid = State.increment_transaction_id(state_data.xid)
|
||||
|
||||
messages = [
|
||||
%{message | xid: xid},
|
||||
%{Openflow.Barrier.Request.new() | xid: xid}
|
||||
]
|
||||
|
||||
XACT_KV.insert(state_data.xact_kv_ref, xid, message, from)
|
||||
send_message(messages, state_data)
|
||||
end
|
||||
|
||||
defp xactional_send_message({from, %{xid: xid} = message}, state_data) do
|
||||
_ = State.set_transaction_id(state_data.xid, xid)
|
||||
|
||||
messages = [
|
||||
%{message | xid: xid},
|
||||
%{Openflow.Barrier.Request.new() | xid: xid}
|
||||
]
|
||||
|
||||
XACT_KV.insert(state_data.xact_kv_ref, xid, message, from)
|
||||
send_message(messages, state_data)
|
||||
end
|
||||
|
||||
defp send_message(message, %State{socket: socket, transport: transport}) do
|
||||
if is_list(message) do
|
||||
for message <- message,
|
||||
|
|
|
|||
|
|
@ -26,6 +26,18 @@ defmodule Tres.SwitchRegistry do
|
|||
send_message(message, {dpid, 0})
|
||||
end
|
||||
|
||||
def blocking_send_message(message, {_dpid, _aux_id} = datapath_id) do
|
||||
[{pid, _} | _] = Registry.lookup(__MODULE__, datapath_id)
|
||||
:gen_statem.call(pid, {:send_message, message}, 500)
|
||||
catch
|
||||
:exit, {:timeout, _} ->
|
||||
{:error, :timeout}
|
||||
end
|
||||
|
||||
def blocking_send_message(message, dpid) when is_binary(dpid) do
|
||||
blocking_send_message(message, {dpid, 0})
|
||||
end
|
||||
|
||||
def get_current_xid({_dpid, _aux_id} = datapath_id) do
|
||||
[{pid, _} | _] = Registry.lookup(__MODULE__, datapath_id)
|
||||
:gen_statem.call(pid, :get_xid, 1000)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue