Add support NXT_RESUME (#9)
This commit is contained in:
parent
4bd8ba994f
commit
a3ec836e42
15 changed files with 730 additions and 71 deletions
|
|
@ -14,6 +14,7 @@ defmodule Tres.Controller do
|
|||
|
||||
def handler_spec(dpid) do
|
||||
{cb_mod, cb_args} = Tres.Utils.get_callback_module()
|
||||
|
||||
%{
|
||||
id: {__MODULE__, dpid},
|
||||
start: {cb_mod, :start_link, [[dpid, cb_args]]},
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ defmodule Tres.MessageHandlerSup do
|
|||
|
||||
def init(_init_args) do
|
||||
children = []
|
||||
|
||||
Supervisor.init(
|
||||
children,
|
||||
strategy: :one_for_one,
|
||||
|
|
|
|||
|
|
@ -147,11 +147,16 @@ defmodule Tres.MessageHelper do
|
|||
Openflow.MeterMod.new(
|
||||
xid: options[:xid] || 0,
|
||||
command: :delete,
|
||||
meter_id: options[:meter_id] || 0,
|
||||
meter_id: options[:meter_id] || 0
|
||||
)
|
||||
|
||||
send_message(meter_mod, datapath_id, options[:blocking] || false)
|
||||
end
|
||||
|
||||
defp send_resume(datapath_id, options) do
|
||||
resume = Openflow.NxResume.new(options)
|
||||
send_message(resume, datapath_id, options[:blocking] || false)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ defmodule Tres.SecureChannel do
|
|||
{:tcp, socket, packet},
|
||||
state,
|
||||
%State{socket: socket, transport: transport} = state_data
|
||||
) do
|
||||
) do
|
||||
transport.setopts(socket, active: :once)
|
||||
handle_packet(packet, state_data, state, [])
|
||||
end
|
||||
|
|
@ -137,9 +137,7 @@ defmodule Tres.SecureChannel do
|
|||
|
||||
# INIT state
|
||||
defp handle_INIT(:enter, _old_state, state_data) do
|
||||
debug(
|
||||
"Initiate HELLO handshake: " <> "#{state_data.ip_addr}:#{state_data.port}"
|
||||
)
|
||||
debug("Initiate HELLO handshake: " <> "#{state_data.ip_addr}:#{state_data.port}")
|
||||
|
||||
initiate_hello_handshake(state_data)
|
||||
end
|
||||
|
|
@ -153,19 +151,14 @@ defmodule Tres.SecureChannel do
|
|||
end
|
||||
|
||||
defp handle_INIT(:internal, message, _state_data) do
|
||||
debug(
|
||||
"Hello handshake in progress, " <> "dropping message: #{inspect(message)}"
|
||||
)
|
||||
debug("Hello handshake in progress, " <> "dropping message: #{inspect(message)}")
|
||||
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
# CONNECTING state
|
||||
defp handle_CONNECTING(:enter, :INIT, state_data) do
|
||||
debug(
|
||||
"Initiate FEATURES handshake:" <>
|
||||
" #{state_data.ip_addr}:#{state_data.port}"
|
||||
)
|
||||
debug("Initiate FEATURES handshake:" <> " #{state_data.ip_addr}:#{state_data.port}")
|
||||
|
||||
initiate_features_handshake(state_data)
|
||||
end
|
||||
|
|
@ -195,8 +188,7 @@ defmodule Tres.SecureChannel do
|
|||
|
||||
defp handle_CONNECTING(:internal, {:openflow, message}, _state_data) do
|
||||
debug(
|
||||
"Features handshake in progress," <>
|
||||
" dropping message: #{inspect(message.__struct__)}"
|
||||
"Features handshake in progress," <> " dropping message: #{inspect(message.__struct__)}"
|
||||
)
|
||||
|
||||
:keep_state_and_data
|
||||
|
|
@ -209,6 +201,7 @@ defmodule Tres.SecureChannel do
|
|||
# CONNECTED state
|
||||
defp handle_CONNECTED(:enter, :CONNECTING, state_data) do
|
||||
_tref = schedule_xactdb_ageout()
|
||||
|
||||
case init_handler(state_data) do
|
||||
%State{} = new_state_data ->
|
||||
start_periodic_idle_check()
|
||||
|
|
@ -340,10 +333,12 @@ defmodule Tres.SecureChannel do
|
|||
action = {:next_event, :internal, {:openflow, message}}
|
||||
new_state_data = %{state_data | buffer: "", last_received: :os.timestamp()}
|
||||
handle_packet(leftovers, new_state_data, state, [action | actions])
|
||||
|
||||
{:error, :binary_too_small} ->
|
||||
handle_packet("", %{state_data | buffer: binary}, state, actions)
|
||||
{:error, :malformed_packet} ->
|
||||
:ok = debug("malformed packet received from #{state_data.datapath_id}")
|
||||
|
||||
{:error, {:malformed_packet, {_reason, st}}} ->
|
||||
:ok = debug("malformed packet received from #{state_data.datapath_id} stack_trace: #{st}")
|
||||
handle_packet("", %{state_data | buffer: ""}, state, actions)
|
||||
end
|
||||
end
|
||||
|
|
@ -375,8 +370,9 @@ defmodule Tres.SecureChannel do
|
|||
unless is_nil(message), do: send(state_data.handler_pid, message)
|
||||
XACT_KV.delete(state_data.xact_kv_ref, xid)
|
||||
end
|
||||
|
||||
defp process_xact_entry({:xact_entry, xid, message, _orig, from, _inserted_at}, state_data)
|
||||
when is_tuple(from) do
|
||||
when is_tuple(from) do
|
||||
reply = if is_nil(message), do: :noreply, else: message
|
||||
:ok = :gen_statem.reply(from, {:ok, reply})
|
||||
XACT_KV.delete(state_data.xact_kv_ref, xid)
|
||||
|
|
@ -558,8 +554,7 @@ defmodule Tres.SecureChannel do
|
|||
defp send_message(message, %State{socket: socket, transport: transport}) do
|
||||
if is_list(message) do
|
||||
for message <- message,
|
||||
do:
|
||||
debug("Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||
do: debug("Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||
else
|
||||
debug("Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||
end
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ defmodule Tres.SwitchRegistry do
|
|||
def send_message(message, dpid, _blocking = true) do
|
||||
blocking_send_message(message, dpid)
|
||||
end
|
||||
|
||||
def send_message(message, dpid, _blocking) do
|
||||
send_message(message, dpid)
|
||||
end
|
||||
|
|
|
|||
|
|
@ -14,11 +14,11 @@ defmodule Tres.Utils do
|
|||
|
||||
def start_openflow_listener do
|
||||
:ranch.start_listener(
|
||||
_ref = Tres,
|
||||
_trasport = :ranch_tcp,
|
||||
_ref = Tres,
|
||||
_trasport = :ranch_tcp,
|
||||
_transport_opts = transport_options(),
|
||||
_protocol = @connection_manager,
|
||||
_protocol_opts = []
|
||||
_protocol = @connection_manager,
|
||||
_protocol_opts = []
|
||||
)
|
||||
end
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue