Formatted
This commit is contained in:
parent
5fc01a9bec
commit
7635272fbd
150 changed files with 5055 additions and 4032 deletions
|
|
@ -8,12 +8,16 @@ defmodule Tres.Application do
|
|||
def start(_type, _args) do
|
||||
import Supervisor.Spec
|
||||
|
||||
{cb_mod, _cb_args} = Tres.Utils.get_callback_module
|
||||
children = [worker(Registry, [[keys: :unique, name: SwitchRegistry]], id: SwitchRegistry),
|
||||
supervisor(Tres.MessageHandlerSup, [cb_mod], id: MessageHandlerSup),
|
||||
supervisor(OVSDB, [], id: OVSDB)]
|
||||
{cb_mod, _cb_args} = Tres.Utils.get_callback_module()
|
||||
|
||||
children = [
|
||||
worker(Registry, [[keys: :unique, name: SwitchRegistry]], id: SwitchRegistry),
|
||||
supervisor(Tres.MessageHandlerSup, [cb_mod], id: MessageHandlerSup),
|
||||
supervisor(OVSDB, [], id: OVSDB)
|
||||
]
|
||||
|
||||
opts = [strategy: :one_for_one, name: Tres.Supervisor]
|
||||
{:ok, _connection_pid} = Tres.Utils.start_openflow_listener
|
||||
{:ok, _connection_pid} = Tres.Utils.start_openflow_listener()
|
||||
Supervisor.start_link(children, opts)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -5,11 +5,9 @@ defmodule Tres.ExampleHandler do
|
|||
import Logger
|
||||
|
||||
defmodule State do
|
||||
defstruct [
|
||||
datapath_id: nil,
|
||||
aux_id: nil,
|
||||
conn_ref: nil
|
||||
]
|
||||
defstruct datapath_id: nil,
|
||||
aux_id: nil,
|
||||
conn_ref: nil
|
||||
end
|
||||
|
||||
def start_link(datapath, args) do
|
||||
|
|
@ -17,10 +15,11 @@ defmodule Tres.ExampleHandler do
|
|||
end
|
||||
|
||||
def init([{datapath_id, aux_id}, _args]) do
|
||||
info("[#{__MODULE__}] Switch Ready: "
|
||||
<> "datapath_id: #{datapath_id} "
|
||||
<> "aux_id: #{aux_id} "
|
||||
<> "on #{inspect(self())}")
|
||||
info(
|
||||
"[#{__MODULE__}] Switch Ready: " <>
|
||||
"datapath_id: #{datapath_id} " <> "aux_id: #{aux_id} " <> "on #{inspect(self())}"
|
||||
)
|
||||
|
||||
_ = send_desc_stats_request(datapath_id)
|
||||
_ = send_port_desc_stats_request(datapath_id)
|
||||
state = %State{datapath_id: datapath_id, aux_id: aux_id}
|
||||
|
|
@ -31,18 +30,24 @@ defmodule Tres.ExampleHandler do
|
|||
handle_port_desc_stats_reply(desc, datapath_id)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info(%Desc.Reply{datapath_id: datapath_id} = desc, state) do
|
||||
handle_desc_stats_reply(desc, datapath_id)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:switch_disconnected, reason}, state) do
|
||||
:ok = warn("[#{__MODULE__}] Switch Disconnected: datapath_id: #{state.datapath_id} by #{reason}")
|
||||
:ok =
|
||||
warn("[#{__MODULE__}] Switch Disconnected: datapath_id: #{state.datapath_id} by #{reason}")
|
||||
|
||||
{:stop, :normal, state}
|
||||
end
|
||||
|
||||
def handle_info({:switch_hang, _datapath_id}, state) do
|
||||
:ok = warn("[#{__MODULE__}] Switch possible hang: datapath_id: #{state.datapath_id}")
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
# `Catch all` function is required.
|
||||
def handle_info(info, state) do
|
||||
:ok = warn("[#{__MODULE__}] unhandled message #{inspect(info)}: #{state.datapath_id}")
|
||||
|
|
@ -52,35 +57,32 @@ defmodule Tres.ExampleHandler do
|
|||
# private functions
|
||||
|
||||
defp send_desc_stats_request(datapath_id) do
|
||||
Desc.Request.new
|
||||
Desc.Request.new()
|
||||
|> send_message(datapath_id)
|
||||
end
|
||||
|
||||
defp send_port_desc_stats_request(datapath_id) do
|
||||
PortDesc.Request.new
|
||||
PortDesc.Request.new()
|
||||
|> send_message(datapath_id)
|
||||
end
|
||||
|
||||
defp handle_desc_stats_reply(desc, datapath_id) do
|
||||
info(
|
||||
"[#{__MODULE__}] Switch Desc: "
|
||||
<> "mfr = #{desc.mfr_desc} "
|
||||
<> "hw = #{desc.hw_desc} "
|
||||
<> "sw = #{desc.sw_desc} "
|
||||
<> "for #{datapath_id}"
|
||||
"[#{__MODULE__}] Switch Desc: " <>
|
||||
"mfr = #{desc.mfr_desc} " <>
|
||||
"hw = #{desc.hw_desc} " <> "sw = #{desc.sw_desc} " <> "for #{datapath_id}"
|
||||
)
|
||||
end
|
||||
|
||||
defp handle_port_desc_stats_reply(port_desc, datapath_id) do
|
||||
for port <- port_desc.ports do
|
||||
info(
|
||||
"[#{__MODULE__}] Switch has port: "
|
||||
<> "number = #{port.number} "
|
||||
<> "hw_addr = #{port.hw_addr} "
|
||||
<> "name = #{port.name} "
|
||||
<> "config = #{inspect(port.config)} "
|
||||
<> "current_speed = #{port.current_speed} "
|
||||
<> "on #{datapath_id}"
|
||||
"[#{__MODULE__}] Switch has port: " <>
|
||||
"number = #{port.number} " <>
|
||||
"hw_addr = #{port.hw_addr} " <>
|
||||
"name = #{port.name} " <>
|
||||
"config = #{inspect(port.config)} " <>
|
||||
"current_speed = #{port.current_speed} " <> "on #{datapath_id}"
|
||||
)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ defmodule Tres.MessageHandlerSup do
|
|||
end
|
||||
|
||||
def start_child({dpid, aux_id}) do
|
||||
{_cb_mod, cb_args} = Tres.Utils.get_callback_module
|
||||
{_cb_mod, cb_args} = Tres.Utils.get_callback_module()
|
||||
Supervisor.start_child(__MODULE__, [{dpid, aux_id}, cb_args])
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ defmodule Tres.MessageHelper do
|
|||
quote location: :keep do
|
||||
defp send_flow_mod_add(datapath_id, options \\ []) do
|
||||
flow_mod = %Openflow.FlowMod{
|
||||
cookie: options[:cookie] || 0,
|
||||
cookie: options[:cookie] || 0,
|
||||
priority: options[:priority] || 0,
|
||||
table_id: options[:table_id] || 0,
|
||||
command: :add,
|
||||
|
|
@ -11,41 +11,46 @@ defmodule Tres.MessageHelper do
|
|||
hard_timeout: options[:hard_timeout] || 0,
|
||||
buffer_id: :no_buffer,
|
||||
out_port: :any,
|
||||
out_group: :any,
|
||||
out_group: :any,
|
||||
flags: options[:flags] || [],
|
||||
match: options[:match] || Openflow.Match.new,
|
||||
instructions: options[:instructions] || [],
|
||||
match: options[:match] || Openflow.Match.new(),
|
||||
instructions: options[:instructions] || []
|
||||
}
|
||||
|
||||
send_message(flow_mod, datapath_id)
|
||||
end
|
||||
|
||||
defp send_flow_mod_modify(datapath_id, options \\ []) do
|
||||
command = Tres.Utils.flow_command(:modify, options)
|
||||
|
||||
flow_mod = %Openflow.FlowMod{
|
||||
cookie: options[:cookie] || 0,
|
||||
cookie: options[:cookie] || 0,
|
||||
table_id: options[:table_id] || 0,
|
||||
command: command,
|
||||
idle_timeout: options[:idle_timeout] || 0,
|
||||
hard_timeout: options[:hard_timeout] || 0,
|
||||
out_port: :any,
|
||||
out_group: :any,
|
||||
match: options[:match] || Openflow.Match.new,
|
||||
instructions: options[:instructions] || [],
|
||||
match: options[:match] || Openflow.Match.new(),
|
||||
instructions: options[:instructions] || []
|
||||
}
|
||||
|
||||
send_message(flow_mod, datapath_id)
|
||||
end
|
||||
|
||||
defp send_flow_mod_delete(datapath_id, options \\ []) do
|
||||
command = Tres.Utils.flow_command(:delete, options)
|
||||
|
||||
flow_mod = %Openflow.FlowMod{
|
||||
cookie: options[:cookie] || 0,
|
||||
cookie: options[:cookie] || 0,
|
||||
cookie_mask: options[:cookie_mask] || 0,
|
||||
table_id: options[:table_id] || :all,
|
||||
command: command,
|
||||
out_port: options[:out_port] || :any,
|
||||
out_group: options[:out_group] || :any,
|
||||
match: options[:match] || Openflow.Match.new
|
||||
match: options[:match] || Openflow.Match.new()
|
||||
}
|
||||
|
||||
send_message(flow_mod, datapath_id)
|
||||
end
|
||||
|
||||
|
|
@ -56,16 +61,19 @@ defmodule Tres.MessageHelper do
|
|||
actions: options[:actions] || [],
|
||||
data: options[:data] || ""
|
||||
}
|
||||
|
||||
send_message(packet_out, datapath_id)
|
||||
end
|
||||
|
||||
defp send_group_mod_add(datapath_id, options \\ []) do
|
||||
group_mod = Openflow.GroupMod.new(
|
||||
command: :add,
|
||||
type: options[:type] || :all,
|
||||
group_id: options[:group_id] || 0,
|
||||
buckets: options[:buckets] || []
|
||||
)
|
||||
group_mod =
|
||||
Openflow.GroupMod.new(
|
||||
command: :add,
|
||||
type: options[:type] || :all,
|
||||
group_id: options[:group_id] || 0,
|
||||
buckets: options[:buckets] || []
|
||||
)
|
||||
|
||||
send_message(group_mod, datapath_id)
|
||||
end
|
||||
|
||||
|
|
@ -75,12 +83,14 @@ defmodule Tres.MessageHelper do
|
|||
end
|
||||
|
||||
defp send_group_mod_modify(datapath_id, options) do
|
||||
group_mod = Openflow.GroupMod.new(
|
||||
command: :modify,
|
||||
type: options[:type] || :all,
|
||||
group_id: options[:group_id] || 0,
|
||||
buckets: options[:buckets] || []
|
||||
)
|
||||
group_mod =
|
||||
Openflow.GroupMod.new(
|
||||
command: :modify,
|
||||
type: options[:type] || :all,
|
||||
group_id: options[:group_id] || 0,
|
||||
buckets: options[:buckets] || []
|
||||
)
|
||||
|
||||
send_message(group_mod, datapath_id)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -4,21 +4,21 @@ defmodule Tres.SecureChannel do
|
|||
import Logger
|
||||
|
||||
alias :tres_xact_kv, as: XACT_KV
|
||||
alias :queue, as: Queue
|
||||
alias :queue, as: Queue
|
||||
alias Tres.SecureChannelState, as: State
|
||||
alias Tres.SwitchRegistry
|
||||
alias Tres.MessageHandlerSup
|
||||
|
||||
@process_flags [
|
||||
trap_exit: true,
|
||||
trap_exit: true,
|
||||
message_queue_data: :on_heap
|
||||
]
|
||||
|
||||
@supported_version 4
|
||||
|
||||
@hello_handshake_timeout 1000
|
||||
@hello_handshake_timeout 1000
|
||||
@features_handshake_timeout 1000
|
||||
@ping_timeout 5000
|
||||
@ping_timeout 5000
|
||||
# @transaction_timeout 5000
|
||||
|
||||
@ping_interval 5000
|
||||
|
|
@ -35,47 +35,73 @@ defmodule Tres.SecureChannel do
|
|||
|
||||
def init([ref, socket, transport, _opts]) do
|
||||
state_data = init_secure_channel(ref, socket, transport)
|
||||
debug("[#{__MODULE__}] TCP connected to Switch on"
|
||||
<> " #{state_data.ip_addr}:#{state_data.port}"
|
||||
<> " on #{inspect(self())}")
|
||||
|
||||
debug(
|
||||
"[#{__MODULE__}] TCP connected to Switch on" <>
|
||||
" #{state_data.ip_addr}:#{state_data.port}" <> " on #{inspect(self())}"
|
||||
)
|
||||
|
||||
:gen_statem.enter_loop(__MODULE__, [debug: [:debug]], :INIT, state_data, [])
|
||||
end
|
||||
|
||||
# TCP handler
|
||||
def handle_event(:info, {:tcp, socket, packet}, state,
|
||||
%State{socket: socket, transport: transport} = state_data) do
|
||||
transport.setopts(socket, [active: :once])
|
||||
def handle_event(
|
||||
:info,
|
||||
{:tcp, socket, packet},
|
||||
state,
|
||||
%State{socket: socket, transport: transport} = state_data
|
||||
) do
|
||||
transport.setopts(socket, active: :once)
|
||||
handle_packet(packet, state_data, state, [])
|
||||
end
|
||||
def handle_event(:info, {:tcp_closed, socket}, _state,
|
||||
%State{socket: socket} = state_data) do
|
||||
|
||||
def handle_event(:info, {:tcp_closed, socket}, _state, %State{socket: socket} = state_data) do
|
||||
close_connection(:tcp_closed, state_data)
|
||||
end
|
||||
def handle_event(:info, {:tcp_error, socket, reason}, _state,
|
||||
%State{socket: socket} = state_data) do
|
||||
|
||||
def handle_event(
|
||||
:info,
|
||||
{:tcp_error, socket, reason},
|
||||
_state,
|
||||
%State{socket: socket} = state_data
|
||||
) do
|
||||
close_connection({:tcp_error, reason}, state_data)
|
||||
end
|
||||
def handle_event(:info, {:'DOWN', _ref, :process, _main_pid, _reason} = signal, _state, state_data) do
|
||||
|
||||
def handle_event(
|
||||
:info,
|
||||
{:DOWN, _ref, :process, _main_pid, _reason} = signal,
|
||||
_state,
|
||||
state_data
|
||||
) do
|
||||
handle_signal(signal, state_data)
|
||||
end
|
||||
def handle_event(:info, {:'EXIT', _pid, _reason} = signal, _state, state_data) do
|
||||
|
||||
def handle_event(:info, {:EXIT, _pid, _reason} = signal, _state, state_data) do
|
||||
handle_signal(signal, state_data)
|
||||
end
|
||||
|
||||
def handle_event(type, message, :INIT, state_data) do
|
||||
handle_INIT(type, message, state_data)
|
||||
end
|
||||
|
||||
def handle_event(type, message, :CONNECTING, state_data) do
|
||||
handle_CONNECTING(type, message, state_data)
|
||||
end
|
||||
|
||||
def handle_event(type, message, :CONNECTED, state_data) do
|
||||
handle_CONNECTED(type, message, state_data)
|
||||
end
|
||||
|
||||
def handle_event(type, message, :WAITING, state_data) do
|
||||
handle_WATING(type, message, state_data)
|
||||
end
|
||||
|
||||
def terminate(reason, state,
|
||||
%State{datapath_id: datapath_id, aux_id: aux_id, xact_kv_ref: kv_ref}) do
|
||||
def terminate(reason, state, %State{
|
||||
datapath_id: datapath_id,
|
||||
aux_id: aux_id,
|
||||
xact_kv_ref: kv_ref
|
||||
}) do
|
||||
warn("[#{__MODULE__}] termiate: #{inspect(reason)} state = #{inspect(state)}")
|
||||
true = XACT_KV.drop(kv_ref)
|
||||
:ok = SwitchRegistry.unregister({datapath_id, aux_id})
|
||||
|
|
@ -85,8 +111,8 @@ defmodule Tres.SecureChannel do
|
|||
|
||||
defp init_secure_channel(ref, socket, transport) do
|
||||
init_process(ref)
|
||||
:ok = transport.setopts(socket, [active: :once])
|
||||
kv_ref = XACT_KV.create
|
||||
:ok = transport.setopts(socket, active: :once)
|
||||
kv_ref = XACT_KV.create()
|
||||
State.new(ref: ref, socket: socket, transport: transport, xact_kv_ref: kv_ref)
|
||||
end
|
||||
|
||||
|
|
@ -100,52 +126,76 @@ defmodule Tres.SecureChannel do
|
|||
%State{datapath_id: dpid, aux_id: aux_id} = state_data
|
||||
{:ok, pid} = MessageHandlerSup.start_child({dpid, aux_id})
|
||||
ref = Process.monitor(pid)
|
||||
%{state_data|handler_pid: pid, handler_ref: ref}
|
||||
%{state_data | handler_pid: pid, handler_ref: ref}
|
||||
end
|
||||
|
||||
# INIT state
|
||||
defp handle_INIT(:enter, _old_state, state_data) do
|
||||
debug("[#{__MODULE__}] Initiate HELLO handshake: "<>
|
||||
">#{state_data.ip_addr}:#{state_data.port}")
|
||||
debug(
|
||||
"[#{__MODULE__}] Initiate HELLO handshake: " <> ">#{state_data.ip_addr}:#{state_data.port}"
|
||||
)
|
||||
|
||||
initiate_hello_handshake(state_data)
|
||||
end
|
||||
|
||||
defp handle_INIT(:info, :hello_timeout, state_data) do
|
||||
close_connection(:hello_handshake_timeout, state_data)
|
||||
end
|
||||
|
||||
defp handle_INIT(:internal, {:openflow, %Openflow.Hello{} = hello}, state_data) do
|
||||
handle_hello_handshake_1(hello, state_data)
|
||||
end
|
||||
|
||||
defp handle_INIT(:internal, message, _state_data) do
|
||||
debug("[#{__MODULE__}] Hello handshake in progress, "
|
||||
<>"dropping message: #{inspect(message)}")
|
||||
debug(
|
||||
"[#{__MODULE__}] Hello handshake in progress, " <> "dropping message: #{inspect(message)}"
|
||||
)
|
||||
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
# CONNECTING state
|
||||
defp handle_CONNECTING(:enter, :INIT, state_data) do
|
||||
debug("[#{__MODULE__}] Initiate FEATURES handshake:"
|
||||
<>" #{state_data.ip_addr}:#{state_data.port}")
|
||||
debug(
|
||||
"[#{__MODULE__}] Initiate FEATURES handshake:" <>
|
||||
" #{state_data.ip_addr}:#{state_data.port}"
|
||||
)
|
||||
|
||||
initiate_features_handshake(state_data)
|
||||
end
|
||||
|
||||
defp handle_CONNECTING(:enter, :WAITING, state_data) do
|
||||
debug("[#{__MODULE__}] Re-entered features handshake")
|
||||
initiate_features_handshake(state_data)
|
||||
end
|
||||
|
||||
defp handle_CONNECTING(:info, :features_timeout, state_data) do
|
||||
close_connection(:features_handshake_timeout, state_data)
|
||||
end
|
||||
defp handle_CONNECTING(:internal, {:openflow, %Openflow.Features.Reply{} = features}, state_data) do
|
||||
debug("[#{__MODULE__}] Switch connected "<>
|
||||
"datapath_id: #{features.datapath_id}"<>
|
||||
" auxiliary_id: #{features.aux_id}")
|
||||
|
||||
defp handle_CONNECTING(
|
||||
:internal,
|
||||
{:openflow, %Openflow.Features.Reply{} = features},
|
||||
state_data
|
||||
) do
|
||||
debug(
|
||||
"[#{__MODULE__}] Switch connected " <>
|
||||
"datapath_id: #{features.datapath_id}" <> " auxiliary_id: #{features.aux_id}"
|
||||
)
|
||||
|
||||
_ = maybe_cancel_timer(state_data.timer_ref)
|
||||
handle_features_handshake(features, state_data)
|
||||
end
|
||||
|
||||
defp handle_CONNECTING(:internal, {:openflow, message}, _state_data) do
|
||||
debug("[#{__MODULE__}] Features handshake in progress,"<>
|
||||
" dropping message: #{inspect(message.__struct__)}")
|
||||
debug(
|
||||
"[#{__MODULE__}] Features handshake in progress," <>
|
||||
" dropping message: #{inspect(message.__struct__)}"
|
||||
)
|
||||
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
defp handle_CONNECTING(type, _message, state_data) when type == :cast or type == :call do
|
||||
{:keep_state, state_data, [{:postpone, true}]}
|
||||
end
|
||||
|
|
@ -156,46 +206,61 @@ defmodule Tres.SecureChannel do
|
|||
start_periodic_idle_check()
|
||||
{:keep_state, new_state_data}
|
||||
end
|
||||
|
||||
defp handle_CONNECTED(:info, :idle_check, state_data) do
|
||||
start_periodic_idle_check()
|
||||
new_state_data = maybe_ping(state_data)
|
||||
{:keep_state, new_state_data}
|
||||
end
|
||||
|
||||
defp handle_CONNECTED(:info, :ping_timeout, state_data) do
|
||||
handle_ping_timeout(state_data, :CONNECTED)
|
||||
end
|
||||
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Reply{xid: xid}},
|
||||
%State{ping_xid: xid} = state_data) do
|
||||
|
||||
defp handle_CONNECTED(
|
||||
:internal,
|
||||
{:openflow, %Openflow.Echo.Reply{xid: xid}},
|
||||
%State{ping_xid: xid} = state_data
|
||||
) do
|
||||
handle_ping_reply(state_data)
|
||||
end
|
||||
|
||||
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Echo.Request{} = echo}, state_data) do
|
||||
send_echo_reply(echo.xid, echo.data, state_data)
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
defp handle_CONNECTED(:internal, {:openflow, %Openflow.Barrier.Reply{} = barrier}, state_data) do
|
||||
{new_state_data, next_actions} = process_xact(barrier, state_data)
|
||||
{:keep_state, new_state_data, next_actions}
|
||||
end
|
||||
|
||||
defp handle_CONNECTED(:internal, {:openflow, message}, state_data) do
|
||||
%State{datapath_id: dpid, aux_id: aux_id} = state_data
|
||||
new_message = %{message|datapath_id: dpid, aux_id: aux_id}
|
||||
new_message = %{message | datapath_id: dpid, aux_id: aux_id}
|
||||
|
||||
state_data.xact_kv_ref
|
||||
|> XACT_KV.is_exists(message.xid)
|
||||
|> handle_message(new_message, state_data)
|
||||
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
defp handle_CONNECTED(:internal, {:send_message, message}, state_data) do
|
||||
xactional_send_message(message, state_data)
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
defp handle_CONNECTED(:cast, {:send_message, message} = action, state_data) do
|
||||
new_action_queue = if XACT_KV.is_empty(state_data.xact_kv_ref) do
|
||||
xactional_send_message(message, state_data)
|
||||
state_data.action_queue
|
||||
else
|
||||
Queue.in(action, state_data.action_queue)
|
||||
end
|
||||
{:keep_state, %{state_data|action_queue: new_action_queue}}
|
||||
new_action_queue =
|
||||
if XACT_KV.is_empty(state_data.xact_kv_ref) do
|
||||
xactional_send_message(message, state_data)
|
||||
state_data.action_queue
|
||||
else
|
||||
Queue.in(action, state_data.action_queue)
|
||||
end
|
||||
|
||||
{:keep_state, %{state_data | action_queue: new_action_queue}}
|
||||
end
|
||||
|
||||
# WATING state
|
||||
|
|
@ -206,21 +271,25 @@ defmodule Tres.SecureChannel do
|
|||
start_periodic_idle_check()
|
||||
:keep_state_and_data
|
||||
end
|
||||
|
||||
defp handle_WATING(:info, :idle_check, state_data) do
|
||||
start_periodic_idle_check()
|
||||
new_state_data = maybe_ping(state_data)
|
||||
{:keep_state, new_state_data}
|
||||
end
|
||||
|
||||
defp handle_WATING(:info, :ping_timeout, state_data) do
|
||||
handle_ping_timeout(state_data, :WAITING)
|
||||
end
|
||||
|
||||
defp handle_WATING(:internal, {:openflow, message}, state_data) do
|
||||
%State{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data
|
||||
send(handler_pid, %{message|datapath_id: dpid, aux_id: aux_id})
|
||||
send(handler_pid, %{message | datapath_id: dpid, aux_id: aux_id})
|
||||
{:next_state, :CONNECTING, state_data}
|
||||
end
|
||||
|
||||
defp handle_WATING(type, message, state_data)
|
||||
when type == :cast or type == :call do
|
||||
when type == :cast or type == :call do
|
||||
debug("[#{__MODULE__}] Postponed: #{inspect(message)}, now WATING")
|
||||
{:keep_state, state_data, [{:postpone, true}]}
|
||||
end
|
||||
|
|
@ -228,17 +297,24 @@ defmodule Tres.SecureChannel do
|
|||
defp handle_packet("", state_data, _state, actions) do
|
||||
{:keep_state, state_data, Enum.reverse(actions)}
|
||||
end
|
||||
|
||||
defp handle_packet(packet, %State{buffer: buffer} = state_data, state, actions) do
|
||||
binary = <<buffer::bytes, packet::bytes>>
|
||||
|
||||
case Openflow.read(binary) do
|
||||
{:ok, message, leftovers} ->
|
||||
debug("[#{__MODULE__}] Received: #{inspect(message.__struct__)}"<>
|
||||
"(xid: #{message.xid}) in #{state}")
|
||||
debug(
|
||||
"[#{__MODULE__}] Received: #{inspect(message.__struct__)}" <>
|
||||
"(xid: #{message.xid}) in #{state}"
|
||||
)
|
||||
|
||||
action = {:next_event, :internal, {:openflow, message}}
|
||||
new_state_data = %{state_data|buffer: "", last_received: :os.timestamp}
|
||||
handle_packet(leftovers, new_state_data, state, [action|actions])
|
||||
new_state_data = %{state_data | buffer: "", last_received: :os.timestamp()}
|
||||
handle_packet(leftovers, new_state_data, state, [action | actions])
|
||||
|
||||
{:error, :binary_too_small} ->
|
||||
handle_packet("", %{state_data|buffer: binary}, state, actions)
|
||||
handle_packet("", %{state_data | buffer: binary}, state, actions)
|
||||
|
||||
{:error, _reason} ->
|
||||
handle_packet("", state_data, state, actions)
|
||||
end
|
||||
|
|
@ -246,20 +322,24 @@ defmodule Tres.SecureChannel do
|
|||
|
||||
defp handle_message(_in_xact = true, message, state_data) do
|
||||
case XACT_KV.get(state_data.xact_kv_ref, message.xid) do
|
||||
[{:xact_entry, _xid, prev_message, _orig}|_] ->
|
||||
[{:xact_entry, _xid, prev_message, _orig} | _] ->
|
||||
new_message = Openflow.append_body(prev_message, message)
|
||||
XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message)
|
||||
|
||||
_ ->
|
||||
XACT_KV.delete(state_data.xact_kv_ref, message.xid)
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_message(_in_xact = false, message, state_data),
|
||||
do: send(state_data.handler_pid, message)
|
||||
|
||||
defp process_xact(%Openflow.Barrier.Reply{xid: xid}, state_data) do
|
||||
:ok = state_data.xact_kv_ref
|
||||
|> XACT_KV.get(xid)
|
||||
|> Enum.each(&process_xact_entry(&1, state_data))
|
||||
:ok =
|
||||
state_data.xact_kv_ref
|
||||
|> XACT_KV.get(xid)
|
||||
|> Enum.each(&process_xact_entry(&1, state_data))
|
||||
|
||||
pop_action_queue(state_data)
|
||||
end
|
||||
|
||||
|
|
@ -273,23 +353,26 @@ defmodule Tres.SecureChannel do
|
|||
case Queue.out(queue) do
|
||||
{:empty, action_queue} ->
|
||||
{[], action_queue}
|
||||
|
||||
{{:value, next_action}, action_queue} ->
|
||||
{[{:next_event, :internal, next_action}], action_queue}
|
||||
end
|
||||
{%{state_data|action_queue: new_queue}, next_actions}
|
||||
|
||||
{%{state_data | action_queue: new_queue}, next_actions}
|
||||
end
|
||||
|
||||
defp initiate_hello_handshake(state_data) do
|
||||
send_hello(state_data)
|
||||
ref = :erlang.send_after(@hello_handshake_timeout, self(), :hello_timeout)
|
||||
{:keep_state, %{state_data|timer_ref: ref}}
|
||||
{:keep_state, %{state_data | timer_ref: ref}}
|
||||
end
|
||||
|
||||
defp handle_hello_handshake_1(hello, state_data) do
|
||||
maybe_cancel_timer(state_data.timer_ref)
|
||||
State.set_transaction_id(state_data.xid, hello.xid)
|
||||
|
||||
if Openflow.Hello.supported_version?(hello) do
|
||||
{:next_state, :CONNECTING, %{state_data|timer_ref: nil}}
|
||||
{:next_state, :CONNECTING, %{state_data | timer_ref: nil}}
|
||||
else
|
||||
close_connection(:failed_version_negotiation, state_data)
|
||||
end
|
||||
|
|
@ -299,45 +382,49 @@ defmodule Tres.SecureChannel do
|
|||
new_xid = State.increment_transaction_id(state_data.xid)
|
||||
send_features(new_xid, state_data)
|
||||
ref = :erlang.send_after(@features_handshake_timeout, self(), :features_timeout)
|
||||
{:keep_state, %{state_data|timer_ref: ref}}
|
||||
{:keep_state, %{state_data | timer_ref: ref}}
|
||||
end
|
||||
|
||||
defp handle_features_handshake(%Openflow.Features.Reply{datapath_id: datapath_id,
|
||||
aux_id: aux_id}, state_data) do
|
||||
defp handle_features_handshake(
|
||||
%Openflow.Features.Reply{datapath_id: datapath_id, aux_id: aux_id},
|
||||
state_data
|
||||
) do
|
||||
{:ok, _} = SwitchRegistry.register({datapath_id, aux_id})
|
||||
|
||||
new_state_data = %{
|
||||
state_data|
|
||||
datapath_id: datapath_id,
|
||||
aux_id: aux_id,
|
||||
timer_ref: nil,
|
||||
main_monitor_ref: monitor_connection(datapath_id, aux_id)
|
||||
state_data
|
||||
| datapath_id: datapath_id,
|
||||
aux_id: aux_id,
|
||||
timer_ref: nil,
|
||||
main_monitor_ref: monitor_connection(datapath_id, aux_id)
|
||||
}
|
||||
|
||||
{:next_state, :CONNECTED, new_state_data}
|
||||
end
|
||||
|
||||
defp monitor_connection(datapath_id, aux_id) when aux_id > 0,
|
||||
do: SwitchRegistry.monitor(datapath_id)
|
||||
defp monitor_connection(_datapath_id, _aux_id),
|
||||
do: nil
|
||||
|
||||
defp monitor_connection(_datapath_id, _aux_id), do: nil
|
||||
|
||||
defp send_hello(state_data) do
|
||||
@supported_version
|
||||
|> Openflow.Hello.new
|
||||
|> Openflow.Hello.new()
|
||||
|> send_message(state_data)
|
||||
end
|
||||
|
||||
defp send_features(xid, state_data) do
|
||||
%{Openflow.Features.Request.new|xid: xid}
|
||||
%{Openflow.Features.Request.new() | xid: xid}
|
||||
|> send_message(state_data)
|
||||
end
|
||||
|
||||
defp send_echo_reply(xid, data, state_data) do
|
||||
%{Openflow.Echo.Reply.new(data)|xid: xid}
|
||||
%{Openflow.Echo.Reply.new(data) | xid: xid}
|
||||
|> send_message(state_data)
|
||||
end
|
||||
|
||||
defp send_echo_request(xid, data, state_data) do
|
||||
%{Openflow.Echo.Request.new(data)|xid: xid}
|
||||
%{Openflow.Echo.Request.new(data) | xid: xid}
|
||||
|> send_message(state_data)
|
||||
end
|
||||
|
||||
|
|
@ -347,14 +434,15 @@ defmodule Tres.SecureChannel do
|
|||
|
||||
defp maybe_ping(state_data) do
|
||||
case should_be_ping?(state_data) do
|
||||
true -> send_ping(state_data)
|
||||
true -> send_ping(state_data)
|
||||
false -> state_data
|
||||
end
|
||||
end
|
||||
|
||||
defp should_be_ping?(%State{last_received: last_received, aux_id: 0}) do
|
||||
:timer.now_diff(:os.timestamp(), last_received) > (1000 * @ping_interval)
|
||||
:timer.now_diff(:os.timestamp(), last_received) > 1000 * @ping_interval
|
||||
end
|
||||
|
||||
defp should_be_ping?(_) do
|
||||
false
|
||||
end
|
||||
|
|
@ -363,32 +451,36 @@ defmodule Tres.SecureChannel do
|
|||
xid = State.increment_transaction_id(x_agent)
|
||||
send_echo_request(xid, "", state_data)
|
||||
ping_ref = :erlang.send_after(@ping_timeout, self(), :ping_timeout)
|
||||
%{state_data|ping_timer_ref: ping_ref, ping_xid: xid}
|
||||
%{state_data | ping_timer_ref: ping_ref, ping_xid: xid}
|
||||
end
|
||||
|
||||
defp handle_ping_timeout(%State{ping_fail_count: fail_count} = state_data, :CONNECTING)
|
||||
when fail_count > @ping_fail_max_count do
|
||||
when fail_count > @ping_fail_max_count do
|
||||
{:next_state, :WAITING, state_data}
|
||||
end
|
||||
|
||||
defp handle_ping_timeout(%State{ping_fail_count: fail_count} = state_data, :WAITING)
|
||||
when fail_count > @ping_fail_max_count do
|
||||
when fail_count > @ping_fail_max_count do
|
||||
close_connection(:ping_failed, state_data)
|
||||
end
|
||||
|
||||
defp handle_ping_timeout(state_data, _) do
|
||||
new_state_data = maybe_ping(state_data)
|
||||
{:keep_state, new_state_data}
|
||||
end
|
||||
|
||||
defp handle_ping_reply(state_data) do
|
||||
{:keep_state, %{state_data|ping_timer_ref: nil, ping_xid: nil}}
|
||||
{:keep_state, %{state_data | ping_timer_ref: nil, ping_xid: nil}}
|
||||
end
|
||||
|
||||
defp xactional_send_message(message, state_data) do
|
||||
xid = State.increment_transaction_id(state_data.xid)
|
||||
|
||||
messages = [
|
||||
%{message|xid: xid},
|
||||
%{Openflow.Barrier.Request.new|xid: xid}
|
||||
%{message | xid: xid},
|
||||
%{Openflow.Barrier.Request.new() | xid: xid}
|
||||
]
|
||||
|
||||
XACT_KV.insert(state_data.xact_kv_ref, xid, message)
|
||||
send_message(messages, state_data)
|
||||
end
|
||||
|
|
@ -396,83 +488,101 @@ defmodule Tres.SecureChannel do
|
|||
defp send_message(message, %State{socket: socket, transport: transport}) do
|
||||
if is_list(message) do
|
||||
for message <- message,
|
||||
do: debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||
do:
|
||||
debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||
else
|
||||
debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})")
|
||||
end
|
||||
|
||||
Tres.Utils.send_message(message, socket, transport)
|
||||
end
|
||||
|
||||
defp maybe_cancel_timer(ref) when not is_reference(ref), do: :ok
|
||||
|
||||
defp maybe_cancel_timer(ref) do
|
||||
:erlang.cancel_timer(ref)
|
||||
:ok
|
||||
end
|
||||
|
||||
defp handle_signal({:'DOWN', mon_ref, :process, _main_pid, reason},
|
||||
%State{main_monitor_ref: mon_ref} = state_data) do
|
||||
defp handle_signal(
|
||||
{:DOWN, mon_ref, :process, _main_pid, reason},
|
||||
%State{main_monitor_ref: mon_ref} = state_data
|
||||
) do
|
||||
close_connection({:main_closed, reason}, state_data)
|
||||
end
|
||||
defp handle_signal({:'DOWN', handler_ref, :process, _main_pid, reason},
|
||||
%State{handler_ref: handler_ref} = state_data) do
|
||||
|
||||
defp handle_signal(
|
||||
{:DOWN, handler_ref, :process, _main_pid, reason},
|
||||
%State{handler_ref: handler_ref} = state_data
|
||||
) do
|
||||
close_connection({:handler_down, reason}, state_data)
|
||||
end
|
||||
defp handle_signal({:'EXIT', _pid, reason}, state_data) do
|
||||
|
||||
defp handle_signal({:EXIT, _pid, reason}, state_data) do
|
||||
close_connection({:trap_detected, reason}, state_data)
|
||||
end
|
||||
|
||||
defp close_connection(:failed_version_negotiation, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Version negotiation failed")
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection(:hello_handshake_timeout, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Hello handshake timed out")
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection(:features_timeout, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Features handshake timed out")
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection(:handler_error = disconnected_reason, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Got handler error")
|
||||
%State{handler_pid: handler_pid} = state_data
|
||||
send(handler_pid, {:switch_disconnected, disconnected_reason})
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection(:ping_failed = disconnected_reason, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Exceeded to max_ping_fail_count")
|
||||
%State{handler_pid: handler_pid} = state_data
|
||||
send(handler_pid, {:switch_disconnected, disconnected_reason})
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection({:main_closed = disconnected_reason, reason}, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Main connection down by #{reason}")
|
||||
%State{handler_pid: handler_pid} = state_data
|
||||
send(handler_pid, {:switch_disconnected, disconnected_reason})
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection({:handler_down = disconnected_reason, reason}, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Handler process down by #{reason}")
|
||||
%State{handler_pid: handler_pid} = state_data
|
||||
send(handler_pid, {:switch_disconnected, disconnected_reason})
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection({:trap_detected = disconnected_reason, reason}, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: Trapped by #{reason}")
|
||||
%State{handler_pid: handler_pid} = state_data
|
||||
send(handler_pid, {:switch_disconnected, disconnected_reason})
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection(:tcp_closed = disconnected_reason, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: TCP Closed by peer")
|
||||
%State{handler_pid: handler_pid} = state_data
|
||||
send(handler_pid, {:switch_disconnected, disconnected_reason})
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
|
||||
defp close_connection({:tcp_error, reason} = disconnected_reason, state_data) do
|
||||
warn("[#{__MODULE__}] connection terminated: TCP Error occured: #{reason}")
|
||||
%State{handler_pid: handler_pid} = state_data
|
||||
send(handler_pid, {:switch_disconnected, disconnected_reason})
|
||||
{:stop, :normal, %{state_data|socket: nil}}
|
||||
{:stop, :normal, %{state_data | socket: nil}}
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -1,24 +1,24 @@
|
|||
defmodule Tres.SecureChannelState do
|
||||
defstruct(
|
||||
handler_pid: nil,
|
||||
handler_ref: nil,
|
||||
ref: nil,
|
||||
socket: nil,
|
||||
transport: nil,
|
||||
buffer: "",
|
||||
ip_addr: nil,
|
||||
port: "",
|
||||
datapath_id: "",
|
||||
aux_id: "",
|
||||
timer_ref: nil,
|
||||
xid: nil,
|
||||
main_monitor_ref: nil,
|
||||
ping_xid: 0,
|
||||
ping_timer_ref: nil,
|
||||
ping_fail_count: 0,
|
||||
last_received: 0,
|
||||
xact_kv_ref: nil,
|
||||
action_queue: :queue.new
|
||||
handler_pid: nil,
|
||||
handler_ref: nil,
|
||||
ref: nil,
|
||||
socket: nil,
|
||||
transport: nil,
|
||||
buffer: "",
|
||||
ip_addr: nil,
|
||||
port: "",
|
||||
datapath_id: "",
|
||||
aux_id: "",
|
||||
timer_ref: nil,
|
||||
xid: nil,
|
||||
main_monitor_ref: nil,
|
||||
ping_xid: 0,
|
||||
ping_timer_ref: nil,
|
||||
ping_fail_count: 0,
|
||||
last_received: 0,
|
||||
xact_kv_ref: nil,
|
||||
action_queue: :queue.new()
|
||||
)
|
||||
|
||||
alias __MODULE__
|
||||
|
|
@ -30,27 +30,28 @@ defmodule Tres.SecureChannelState do
|
|||
transport = Keyword.get(options, :transport)
|
||||
{:ok, {ip_addr, port}} = :inet.peername(socket)
|
||||
{:ok, xid_agent} = Agent.start_link(fn -> 0 end)
|
||||
kv_ref = XACT_KV.create
|
||||
kv_ref = XACT_KV.create()
|
||||
|
||||
%SecureChannelState{
|
||||
ref: ref,
|
||||
socket: socket,
|
||||
ref: ref,
|
||||
socket: socket,
|
||||
transport: transport,
|
||||
ip_addr: :inet.ntoa(ip_addr),
|
||||
port: port,
|
||||
xid: xid_agent,
|
||||
ip_addr: :inet.ntoa(ip_addr),
|
||||
port: port,
|
||||
xid: xid_agent,
|
||||
xact_kv_ref: kv_ref
|
||||
}
|
||||
end
|
||||
|
||||
def increment_transaction_id(xid_agent) do
|
||||
Agent.get_and_update(xid_agent, &({&1 + 1, &1 + 1}))
|
||||
Agent.get_and_update(xid_agent, &{&1 + 1, &1 + 1})
|
||||
end
|
||||
|
||||
def set_transaction_id(xid_agent, xid) do
|
||||
Agent.update(xid_agent, fn(_) -> xid end)
|
||||
Agent.update(xid_agent, fn _ -> xid end)
|
||||
end
|
||||
|
||||
def get_transaction_id(xid_agent) do
|
||||
Agent.get(xid_agent, &(&1))
|
||||
Agent.get(xid_agent, & &1)
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ defmodule Tres.SwitchRegistry do
|
|||
[] -> nil
|
||||
end
|
||||
end
|
||||
|
||||
def lookup_pid(datapath_id) do
|
||||
lookup_pid({datapath_id, 0})
|
||||
end
|
||||
|
|
@ -20,6 +21,7 @@ defmodule Tres.SwitchRegistry do
|
|||
def send_message(message, {_dpid, _aux_id} = datapath_id) do
|
||||
Registry.dispatch(__MODULE__, datapath_id, &dispatch(&1, message))
|
||||
end
|
||||
|
||||
def send_message(message, dpid) when is_binary(dpid) do
|
||||
send_message(message, {dpid, 0})
|
||||
end
|
||||
|
|
@ -27,7 +29,7 @@ defmodule Tres.SwitchRegistry do
|
|||
def monitor(datapath_id) do
|
||||
datapath_id
|
||||
|> lookup_pid
|
||||
|> Process.monitor
|
||||
|> Process.monitor()
|
||||
end
|
||||
|
||||
# private function
|
||||
|
|
|
|||
|
|
@ -3,8 +3,8 @@ defmodule Tres.Utils do
|
|||
|
||||
@connection_manager Tres.SecureChannel
|
||||
@default_max_connections 10
|
||||
@default_num_acceptors 10
|
||||
@default_openflow_port 6633
|
||||
@default_num_acceptors 10
|
||||
@default_openflow_port 6633
|
||||
|
||||
def get_callback_module do
|
||||
cb_mod = get_config(:callback_module, Tres.ExampleHandler)
|
||||
|
|
@ -32,7 +32,7 @@ defmodule Tres.Utils do
|
|||
|
||||
def is_multipart?(message) do
|
||||
message.__struct__
|
||||
|> Module.split
|
||||
|> Module.split()
|
||||
|> Enum.at(1)
|
||||
|> String.match?(~r/Multipart/)
|
||||
end
|
||||
|
|
@ -44,6 +44,7 @@ defmodule Tres.Utils do
|
|||
:delete
|
||||
end
|
||||
end
|
||||
|
||||
def flow_command(:modify, options) do
|
||||
if Keyword.get(options, :strict, false) do
|
||||
:modify_strict
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue