diff --git a/config/config.exs b/config/config.exs index 93b9ad9..4113ff7 100644 --- a/config/config.exs +++ b/config/config.exs @@ -3,11 +3,12 @@ use Mix.Config config :tres, - callback_module: Tres.ExampleHandler, + protocol: :tcp, + port: 6633, max_connections: 10, - num_acceptors: 10, - protocol: :tcp, - port: 6653 + num_acceptors: 10, + callback_module: Tres.ExampleHandler, + callback_args: [] config :logger, level: :info, diff --git a/lib/openflow.ex b/lib/openflow.ex index a5ca2bc..a0e5f0d 100644 --- a/lib/openflow.ex +++ b/lib/openflow.ex @@ -26,6 +26,10 @@ defmodule Openflow do end end + def to_binary(messages) when is_list(messages) do + binaries = for message <- messages, do: to_binary(message) + Enum.join(binaries, "") + end def to_binary(%{__struct__: encoder, version: version, xid: xid} = msg) do case encoder.to_binary(msg) do body_bin when is_binary(body_bin) -> @@ -36,6 +40,16 @@ defmodule Openflow do end end + def append_body(nil, message), do: message + def append_body(message, continue) do + mod = message.__struct__ + if function_exported?(mod, :append_body, 2) do + mod.append_body(message, continue) + else + message + end + end + # private functions defp do_read({:error, reason}, _) do diff --git a/lib/openflow/.#nx_packet_in2.ex b/lib/openflow/.#nx_packet_in2.ex deleted file mode 120000 index b4344b8..0000000 --- a/lib/openflow/.#nx_packet_in2.ex +++ /dev/null @@ -1 +0,0 @@ -shun159@shun159.5674:1510580208 \ No newline at end of file diff --git a/lib/openflow/actions/nx_bundle.ex b/lib/openflow/actions/nx_bundle.ex index 6a65bc4..8fcd5f0 100644 --- a/lib/openflow/actions/nx_bundle.ex +++ b/lib/openflow/actions/nx_bundle.ex @@ -14,15 +14,12 @@ defmodule Openflow.Action.NxBundle do alias __MODULE__ def new(options) do - hash_field = Keyword.get(options, :hash_field, :eth_src) - basis = Keyword.get(options, :basis, 0) - alg = Keyword.get(options, :algorithm, :active_backup) - slaves = Keyword.get(options, :slaves, []) - %NxBundle{algorithm: alg, - hash_field: hash_field, - basis: basis, - n_slaves: length(slaves), - slaves: slaves} + slaves = options[:slaves] || [] + %NxBundle{algorithm: options[:algorithm] || :active_backup, + hash_field: options[:hash_field] || :eth_src, + basis: options[:basis] || 0, + n_slaves: length(slaves), + slaves: slaves} end def to_binary(%NxBundle{algorithm: alg, diff --git a/lib/openflow/actions/nx_bundle_load.ex b/lib/openflow/actions/nx_bundle_load.ex index a76d9f4..b9c4ef1 100644 --- a/lib/openflow/actions/nx_bundle_load.ex +++ b/lib/openflow/actions/nx_bundle_load.ex @@ -19,22 +19,17 @@ defmodule Openflow.Action.NxBundleLoad do alias __MODULE__ def new(options) do - hash_field = Keyword.get(options, :hash_field, :eth_src) - basis = Keyword.get(options, :basis, 0) - alg = Keyword.get(options, :algorithm, :active_backup) - slaves = Keyword.get(options, :slaves, []) - dst_field = Keyword.get(options, :dst_field) + dst_field = options[:dst_field] default_n_bits = Openflow.Match.Field.n_bits_of(dst_field) - n_bits = Keyword.get(options, :n_bits, default_n_bits) - ofs = Keyword.get(options, :offset, 0) - %NxBundleLoad{algorithm: alg, - hash_field: hash_field, - basis: basis, - n_slaves: length(slaves), - slaves: slaves, - offset: ofs, - n_bits: n_bits, - dst_field: dst_field} + slaves = options[:slaves] || [] + %NxBundleLoad{algorithm: options[:algorithm] || :active_backup, + hash_field: options[:hash_field] || :eth_src, + basis: options[:basis] || 0, + n_slaves: length(slaves), + slaves: slaves, + offset: options[:offset] || 0, + n_bits: options[:n_bits] || default_n_bits, + dst_field: options[:dst_field]} end def to_binary(%NxBundleLoad{algorithm: alg, diff --git a/lib/openflow/actions/nx_conjunction.ex b/lib/openflow/actions/nx_conjunction.ex index 3b90b4d..b9818d6 100644 --- a/lib/openflow/actions/nx_conjunction.ex +++ b/lib/openflow/actions/nx_conjunction.ex @@ -11,10 +11,9 @@ defmodule Openflow.Action.NxConjunction do alias __MODULE__ def new(options) do - clause = Keyword.get(options, :clause, 0) - n_clauses = Keyword.get(options, :n_clauses, 0) - id = Keyword.get(options, :id, 0) - %NxConjunction{clause: clause, n_clauses: n_clauses, id: id} + %NxConjunction{clause: options[:clause] || 0, + n_clauses: options[:n_clauses] || 0, + id: options[:id] || 0} end def to_binary(%NxConjunction{clause: clause, n_clauses: n_clauses, id: id}) do diff --git a/lib/openflow/actions/nx_conntrack.ex b/lib/openflow/actions/nx_conntrack.ex index 76b033f..4c4dafc 100644 --- a/lib/openflow/actions/nx_conntrack.ex +++ b/lib/openflow/actions/nx_conntrack.ex @@ -18,24 +18,14 @@ defmodule Openflow.Action.NxConntrack do alias __MODULE__ def new(options \\ []) do - flags = Keyword.get(options, :flags, []) - zone_src = Keyword.get(options, :zone_src) - zone_ofs = Keyword.get(options, :zone_offset) - zone_n_bits = Keyword.get(options, :zone_n_bits) - zone_imm = Keyword.get(options, :zone_imm, 0) - recirc_table = Keyword.get(options, :recirc_table, 255) - alg = Keyword.get(options, :alg, 0) - exec = Keyword.get(options, :exec, []) - %NxConntrack{ - flags: flags, - zone_src: zone_src, - zone_imm: zone_imm, - zone_offset: zone_ofs, - zone_n_bits: zone_n_bits, - recirc_table: recirc_table, - alg: alg, - exec: exec - } + %NxConntrack{flags: options[:flags] || [], + zone_src: options[:zone_src], + zone_imm: options[:zone_imm] || 0, + zone_offset: options[:zone_offset], + zone_n_bits: options[:zone_n_bits], + recirc_table: options[:recirc_table] || 255, + alg: options[:alg] || 0, + exec: options[:exec] || []} end def to_binary(%NxConntrack{ diff --git a/lib/openflow/actions/nx_controller.ex b/lib/openflow/actions/nx_controller.ex index 8128f40..4faf769 100644 --- a/lib/openflow/actions/nx_controller.ex +++ b/lib/openflow/actions/nx_controller.ex @@ -11,10 +11,9 @@ defmodule Openflow.Action.NxController do alias __MODULE__ def new(options) do - max_len = Keyword.get(options, :max_len, :no_buffer) - controller_id = Keyword.get(options, :id, 0) - reason = Keyword.get(options, :reason, :action) - %NxController{max_len: max_len, id: controller_id, reason: reason} + %NxController{max_len: options[:max_len] || :no_buffer, + id: options[:id] || 0, + reason: options[:reason] || :action} end def to_binary(%NxController{max_len: max_len, id: controller_id, reason: reason}) do diff --git a/lib/openflow/actions/nx_controller2.ex b/lib/openflow/actions/nx_controller2.ex index 6e1637c..5b932e7 100644 --- a/lib/openflow/actions/nx_controller2.ex +++ b/lib/openflow/actions/nx_controller2.ex @@ -21,16 +21,11 @@ defmodule Openflow.Action.NxController2 do alias __MODULE__ def new(options) do - max_len = Keyword.get(options, :max_len, :no_buffer) - controller_id = Keyword.get(options, :id, 0) - reason = Keyword.get(options, :reason, :action) - userdata = Keyword.get(options, :userdata) - pause = Keyword.get(options, :pause, false) - %NxController2{max_len: max_len, - id: controller_id, - reason: reason, - userdata: userdata, - pause: pause} + %NxController2{max_len: options[:max_len] || :no_buffer, + id: options[:id] || 0, + reason: options[:reason] || :action, + userdata: options[:userdata], + pause: options[:pause] || false} end def to_binary(%NxController2{} = ctl) do diff --git a/lib/openflow/actions/nx_fin_timeout.ex b/lib/openflow/actions/nx_fin_timeout.ex index 00a14d4..a94a4dd 100644 --- a/lib/openflow/actions/nx_fin_timeout.ex +++ b/lib/openflow/actions/nx_fin_timeout.ex @@ -10,9 +10,8 @@ defmodule Openflow.Action.NxFinTimeout do alias __MODULE__ def new(options) do - fin_idle = Keyword.get(options, :idle_timeout, 0) - fin_hard = Keyword.get(options, :hard_timeout, 0) - %NxFinTimeout{idle_timeout: fin_idle, hard_timeout: fin_hard} + %NxFinTimeout{idle_timeout: options[:idle_timeout] || 0, + hard_timeout: options[:hard_timeout] || 0} end def to_binary(%NxFinTimeout{idle_timeout: fin_idle, hard_timeout: fin_hard}) do diff --git a/lib/openflow/actions/nx_flow_spec_load.ex b/lib/openflow/actions/nx_flow_spec_load.ex index 120a90d..5b35f20 100644 --- a/lib/openflow/actions/nx_flow_spec_load.ex +++ b/lib/openflow/actions/nx_flow_spec_load.ex @@ -14,17 +14,13 @@ defmodule Openflow.Action.NxFlowSpecLoad do alias __MODULE__ def new(options) do - src = Keyword.get(options, :src) - dst = Keyword.get(options, :dst) - src_ofs = Keyword.get(options, :src_offset, 0) - dst_ofs = Keyword.get(options, :dst_offset, 0) - default_n_bits = Openflow.Match.Field.n_bits_of(dst) - n_bits = Keyword.get(options, :n_bits, default_n_bits) - %NxFlowSpecLoad{src: src, - dst: dst, - n_bits: n_bits, - src_offset: src_ofs, - dst_offset: dst_ofs} + dst = options[:dst] + n_bits = options[:n_bits] || Openflow.Match.Field.n_bits_of(dst) + %NxFlowSpecLoad{src: options[:src], + dst: dst, + n_bits: n_bits, + src_offset: options[:src_offset] || 0, + dst_offset: options[:dst_offset] || 0} end def to_binary(%NxFlowSpecLoad{} = fsm) do diff --git a/lib/openflow/actions/nx_flow_spec_match.ex b/lib/openflow/actions/nx_flow_spec_match.ex index 8312fb4..c8e2e1c 100644 --- a/lib/openflow/actions/nx_flow_spec_match.ex +++ b/lib/openflow/actions/nx_flow_spec_match.ex @@ -14,17 +14,13 @@ defmodule Openflow.Action.NxFlowSpecMatch do alias __MODULE__ def new(options) do - src = Keyword.get(options, :src) - dst = Keyword.get(options, :dst) - default_n_bits = Openflow.Match.Field.n_bits_of(dst) - n_bits = Keyword.get(options, :n_bits, default_n_bits) - src_ofs = Keyword.get(options, :src_offset, 0) - dst_ofs = Keyword.get(options, :dst_offset, 0) - %NxFlowSpecMatch{src: src, - dst: dst, - n_bits: n_bits, - src_offset: src_ofs, - dst_offset: dst_ofs} + dst = options[:dst] + n_bits = options[:n_bits] || Openflow.Match.Field.n_bits_of(dst) + %NxFlowSpecMatch{src: options[:src], + dst: dst, + n_bits: n_bits, + src_offset: options[:src_offset] || 0, + dst_offset: options[:dst_offset] || 0} end def to_binary(%NxFlowSpecMatch{} = fsm) do diff --git a/lib/openflow/actions/nx_flow_spec_output.ex b/lib/openflow/actions/nx_flow_spec_output.ex index ae0b18f..75ad422 100644 --- a/lib/openflow/actions/nx_flow_spec_output.ex +++ b/lib/openflow/actions/nx_flow_spec_output.ex @@ -11,13 +11,11 @@ defmodule Openflow.Action.NxFlowSpecOutput do alias __MODULE__ def new(options) do - src = Keyword.get(options, :src) - src_ofs = Keyword.get(options, :src_offset, 0) - default_n_bits = Openflow.Match.Field.n_bits_of(src) - n_bits = Keyword.get(options, :n_bits, default_n_bits) - %NxFlowSpecOutput{n_bits: n_bits, - src: src, - src_offset: src_ofs} + src = options[:src] + n_bits = options[:n_bits] || Openflow.Match.Field.n_bits_of(src) + %NxFlowSpecOutput{n_bits: n_bits, + src: src, + src_offset: options[:src_offset] || 0} end def to_binary(%NxFlowSpecOutput{n_bits: n_bits, diff --git a/lib/openflow/actions/nx_learn.ex b/lib/openflow/actions/nx_learn.ex index df6932f..b57f380 100644 --- a/lib/openflow/actions/nx_learn.ex +++ b/lib/openflow/actions/nx_learn.ex @@ -17,24 +17,15 @@ defmodule Openflow.Action.NxLearn do alias __MODULE__ def new(options) do - idle = Keyword.get(options, :idle_timeout, 0) - hard = Keyword.get(options, :hard_timeout, 0) - prio = Keyword.get(options, :priority, 0) - cookie = Keyword.get(options, :cookie, 0) - flags = Keyword.get(options, :flags, []) - table_id = Keyword.get(options, :table_id, 0) - fin_idle = Keyword.get(options, :fin_idle_timeout, 0) - fin_hard = Keyword.get(options, :fin_hard_timeout, 0) - flow_specs = Keyword.get(options, :flow_specs, []) - %NxLearn{idle_timeout: idle, - hard_timeout: hard, - priority: prio, - cookie: cookie, - flags: flags, - table_id: table_id, - fin_idle_timeout: fin_idle, - fin_hard_timeout: fin_hard, - flow_specs: flow_specs} + %NxLearn{idle_timeout: options[:idle_timeout] || 0, + hard_timeout: options[:hard_timeout] || 0, + priority: options[:priority] || 0, + cookie: options[:cookie] || 0, + flags: options[:flags] || [], + table_id: options[:table_id] || 0xff, + fin_idle_timeout: options[:fin_idle_timeout] || 0, + fin_hard_timeout: options[:fin_hard_timeout] || 0, + flow_specs: options[:flow_specs] || []} end def to_binary(%NxLearn{idle_timeout: idle, @@ -52,7 +43,7 @@ defmodule Openflow.Action.NxLearn do prio::16, cookie::64, flags_int::16, table_id::8, 0::size(1)-unit(8), fin_idle::16, fin_hard::16, flow_specs_bin::bitstring>> - exp_body_size = byte_size(exp_body) + exp_body_size = byte_size(exp_body) padding_length = Openflow.Utils.padding(4 + exp_body_size, 8) length = 4 + exp_body_size + padding_length <<0xffff::16, length::16, exp_body::bytes, 0::size(padding_length)-unit(8)>> diff --git a/lib/openflow/actions/nx_learn2.ex b/lib/openflow/actions/nx_learn2.ex index 4a47ef5..fac6e77 100644 --- a/lib/openflow/actions/nx_learn2.ex +++ b/lib/openflow/actions/nx_learn2.ex @@ -20,30 +20,18 @@ defmodule Openflow.Action.NxLearn2 do alias __MODULE__ def new(options) do - idle = Keyword.get(options, :idle_timeout, 0) - hard = Keyword.get(options, :hard_timeout, 0) - prio = Keyword.get(options, :priority, 0) - cookie = Keyword.get(options, :cookie, 0) - flags = Keyword.get(options, :flags, []) - table_id = Keyword.get(options, :table_id, 0) - fin_idle = Keyword.get(options, :fin_idle_timeout, 0) - fin_hard = Keyword.get(options, :fin_hard_timeout, 0) - flow_specs = Keyword.get(options, :flow_specs, []) - limit = Keyword.get(options, :limit, 0) - result_dst_offset = Keyword.get(options, :result_dst_offset, 0) - result_dst = Keyword.get(options, :result_dst) - %NxLearn2{idle_timeout: idle, - hard_timeout: hard, - priority: prio, - cookie: cookie, - flags: flags, - table_id: table_id, - fin_idle_timeout: fin_idle, - fin_hard_timeout: fin_hard, - limit: limit, - result_dst_offset: result_dst_offset, - result_dst: result_dst, - flow_specs: flow_specs} + %NxLearn2{idle_timeout: options[:idle_timeout] || 0, + hard_timeout: options[:hard_timeout] || 0, + priority: options[:priority] || 0, + cookie: options[:cookie] || 0, + flags: options[:flags] || [], + table_id: options[:table_id] || 0xff, + fin_idle_timeout: options[:fin_idle_timeout] || 0, + fin_hard_timeout: options[:fin_hard_timeout] || 0, + limit: options[:limit] || 0, + result_dst_offset: options[:result_dst_offset] || 0, + result_dst: options[:result_dst], + flow_specs: options[:flow_specs] || []} end def to_binary(%NxLearn2{idle_timeout: idle, diff --git a/lib/openflow/actions/set_field.ex b/lib/openflow/actions/set_field.ex index ed962ab..a1b0c1d 100644 --- a/lib/openflow/actions/set_field.ex +++ b/lib/openflow/actions/set_field.ex @@ -25,7 +25,7 @@ defmodule Openflow.Action.SetField do end def read(<<25::16, _length::16, match_field_bin::bytes>>) do - <<_class::16, _field::7, _hm::1, flen::8, _rest::bytes>>= match_field_bin + <<_class::16, _field::7, _hm::1, flen::8, _rest::bytes>> = match_field_bin match_len = 4 + 4 + flen match_bin = <<1::16, match_len::16, match_field_bin::bytes>> {[field|_], _rest} = Openflow.Match.read(match_bin) diff --git a/lib/openflow/enums.ex b/lib/openflow/enums.ex index 32f36d4..ab41448 100644 --- a/lib/openflow/enums.ex +++ b/lib/openflow/enums.ex @@ -316,8 +316,9 @@ defmodule Openflow.Enums do ], experimenter_oxm_vendors: [ - nicira_ext_match: 0x00002320, - onf_ext_match: 0x4f4e4600 + nicira_ext_match: 0x00002320, + hp_ext_match: 0x00002428, + onf_ext_match: 0x4f4e4600 ], match_type: [ @@ -618,6 +619,24 @@ defmodule Openflow.Enums do nsh_c4: 9 ], + hp_ext_match: [ + hp_udp_src_port_range: 0, + hp_udp_dst_port_range: 1, + hp_tcp_src_port_range: 2, + hp_tcp_dst_port_range: 3, + hp_tcp_flags: 4, + hp_custom_1: 5, + hp_custom_2: 6, + hp_custom_3: 7, + hp_custom_4: 8 + ], + + hp_custom_match_type: [ + l2_start: 1, + l3_start: 2, + l4_start: 3 + ], + onf_ext_match: [ onf_tcp_flags: 42, onf_actset_output: 43, diff --git a/lib/openflow/match.ex b/lib/openflow/match.ex index baac231..bd40634 100644 --- a/lib/openflow/match.ex +++ b/lib/openflow/match.ex @@ -18,7 +18,7 @@ defmodule Openflow.Match do def read(binary) do <<1::16, no_pad_len::16, binary1::binary>> = binary - padding_length = @match_size - rem(no_pad_len, 8) + padding_length = Openflow.Utils.pad_length(no_pad_len, 8) match_field_len = no_pad_len - @header_size <> = binary1 {decode_fields(match_fields, []), rest} @@ -58,7 +58,7 @@ defmodule Openflow.Match do def header_size(<<_oxm_class_int::16, _oxm_field_int::7, _oxm_has_mask::1, _oxm_length::8, _::bytes>>), do: 4 - def header_size(<<0xffff::16, _oxm_field_int::7, _oxm_has_mask::1, _oxm_length::8, _exp_int::32, _::bytes>>), + def header_size(<<0xffff::16, _oxm_field_int::7, _oxm_has_mask::1, _oxm_length::8, _exp_int::32, _::bytes>>), do: 8 # private functions diff --git a/lib/openflow/match/field.ex b/lib/openflow/match/field.ex index 4c45221..8a6e411 100644 --- a/lib/openflow/match/field.ex +++ b/lib/openflow/match/field.ex @@ -275,6 +275,17 @@ defmodule Openflow.Match.Field do def vendor_of(:nsh_c3), do: :nicira_ext_match def vendor_of(:nsh_c4), do: :nicira_ext_match + # HP Ext Match + def vendor_of(:hp_udp_src_port_range), do: :hp_ext_match + def vendor_of(:hp_udp_dst_port_range), do: :hp_ext_match + def vendor_of(:hp_tcp_src_port_range), do: :hp_ext_match + def vendor_of(:hp_tcp_dst_port_range), do: :hp_ext_match + def vendor_of(:hp_tcp_flags), do: :hp_ext_match + def vendor_of(:hp_custom_1), do: :hp_ext_match + def vendor_of(:hp_custom_2), do: :hp_ext_match + def vendor_of(:hp_custom_3), do: :hp_ext_match + def vendor_of(:hp_custom_4), do: :hp_ext_match + # ONF Ext Match def vendor_of(:onf_tcp_flags), do: :onf_ext_match def vendor_of(:onf_actset_output), do: :onf_ext_match @@ -518,6 +529,17 @@ defmodule Openflow.Match.Field do def format_of(:nsh_c3), do: {:be32, :decimal} def format_of(:nsh_c4), do: {:be32, :decimal} + # HP Ext Match + def format_of(:hp_udp_src_port_range), do: {:be32, :decimal} + def format_of(:hp_udp_dst_port_range), do: {:be32, :decimal} + def format_of(:hp_tcp_src_port_range), do: {:be32, :decimal} + def format_of(:hp_tcp_dst_port_range), do: {:be32, :decimal} + def format_of(:hp_tcp_flags), do: {:be16, :tcp_flags} + def format_of(:hp_custom_1), do: {:dynamic, :bytes} + def format_of(:hp_custom_2), do: {:dynamic, :bytes} + def format_of(:hp_custom_3), do: {:dynamic, :bytes} + def format_of(:hp_custom_4), do: {:dynamic, :bytes} + # ONF Ext Match def format_of(:onf_tcp_flags), do: {:be16, :tcp_flags} def format_of(:onf_actset_output), do: {:be32, :openflow13_port} diff --git a/lib/openflow/multipart/aggregate/reply.ex b/lib/openflow/multipart/aggregate/reply.ex index bcc208f..7bb7beb 100644 --- a/lib/openflow/multipart/aggregate/reply.ex +++ b/lib/openflow/multipart/aggregate/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.Aggregate.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], packet_count: 0, byte_count: 0, diff --git a/lib/openflow/multipart/desc/reply.ex b/lib/openflow/multipart/desc/reply.ex index db3c818..7794428 100644 --- a/lib/openflow/multipart/desc/reply.ex +++ b/lib/openflow/multipart/desc/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.Desc.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], mfr_desc: "", hw_desc: "", diff --git a/lib/openflow/multipart/flow/reply.ex b/lib/openflow/multipart/flow/reply.ex index 5d41dbc..41941f0 100644 --- a/lib/openflow/multipart/flow/reply.ex +++ b/lib/openflow/multipart/flow/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.Flow.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], flows: [] ) @@ -19,6 +20,16 @@ defmodule Openflow.Multipart.Flow.Reply do flows = Openflow.Multipart.FlowStats.read(flows_bin) %Reply{flows: flows} end + + def append_body(%Reply{flows: flows} = message, %Reply{flags: [:more], flows: continue}) do + %{message|flows: [continue|flows]} + end + def append_body(%Reply{flows: flows} = message, %Reply{flags: [], flows: continue}) do + new_flows = [continue|flows] + |> Enum.reverse + |> List.flatten + %{message|flows: new_flows} + end end defmodule Openflow.Multipart.FlowStats do diff --git a/lib/openflow/multipart/flow/request.ex b/lib/openflow/multipart/flow/request.ex index 9485c63..53e8146 100644 --- a/lib/openflow/multipart/flow/request.ex +++ b/lib/openflow/multipart/flow/request.ex @@ -16,13 +16,13 @@ defmodule Openflow.Multipart.Flow.Request do def ofp_type, do: 18 - def new(options) do + def new(options \\ []) do table_id = Keyword.get(options, :table_id, :all) out_port = Keyword.get(options, :out_port, :any) out_group = Keyword.get(options, :out_group, :any) cookie = Keyword.get(options, :cookie, 0) cookie_mask = Keyword.get(options, :cookie, 0) - match = Keyword.get(options, :match, []) + match = Keyword.get(options, :match, Openflow.Match.new) %Request{table_id: table_id, out_port: out_port, out_group: out_group, diff --git a/lib/openflow/multipart/group/reply.ex b/lib/openflow/multipart/group/reply.ex index 8d96406..fc434a0 100644 --- a/lib/openflow/multipart/group/reply.ex +++ b/lib/openflow/multipart/group/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.Group.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], groups: [] ) @@ -19,6 +20,16 @@ defmodule Openflow.Multipart.Group.Reply do groups = Openflow.Multipart.Group.read(groups_bin) %Reply{groups: groups} end + + def append_body(%Reply{groups: groups} = message, %Reply{flags: [:more], groups: continue}) do + %{message|groups: [continue|groups]} + end + def append_body(%Reply{groups: groups} = message, %Reply{flags: [], groups: continue}) do + new_groups = [continue|groups] + |> Enum.reverse + |> List.flatten + %{message|groups: new_groups} + end end defmodule Openflow.Multipart.Group do diff --git a/lib/openflow/multipart/group_desc/reply.ex b/lib/openflow/multipart/group_desc/reply.ex index 49bd412..fea5684 100644 --- a/lib/openflow/multipart/group_desc/reply.ex +++ b/lib/openflow/multipart/group_desc/reply.ex @@ -19,6 +19,16 @@ defmodule Openflow.Multipart.GroupDesc.Reply do groups = Openflow.Multipart.GroupDescStats.read(groups_bin) %Reply{groups: groups} end + + def append_body(%Reply{groups: groups} = message, %Reply{flags: [:more], groups: continue}) do + %{message|groups: [continue|groups]} + end + def append_body(%Reply{groups: groups} = message, %Reply{flags: [], groups: continue}) do + new_groups = [continue|groups] + |> Enum.reverse + |> List.flatten + %{message|groups: new_groups} + end end defmodule Openflow.Multipart.GroupDescStats do diff --git a/lib/openflow/multipart/group_features/reply.ex b/lib/openflow/multipart/group_features/reply.ex index 498d9ea..495ad83 100644 --- a/lib/openflow/multipart/group_features/reply.ex +++ b/lib/openflow/multipart/group_features/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.GroupFeatures.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], types: 0, capabilities: [], diff --git a/lib/openflow/multipart/meter/reply.ex b/lib/openflow/multipart/meter/reply.ex index bafd74f..2aede6e 100644 --- a/lib/openflow/multipart/meter/reply.ex +++ b/lib/openflow/multipart/meter/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.Meter.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], meters: [] ) @@ -15,6 +16,16 @@ defmodule Openflow.Multipart.Meter.Reply do meters = Openflow.Multipart.Meter.read(meters_bin) %Reply{meters: meters} end + + def append_body(%Reply{meters: meters} = message, %Reply{flags: [:more], meters: continue}) do + %{message|meters: [continue|meters]} + end + def append_body(%Reply{meters: meters} = message, %Reply{flags: [], meters: continue}) do + new_meters = [continue|meters] + |> Enum.reverse + |> List.flatten + %{message|meters: new_meters} + end end defmodule Openflow.Multipart.Meter do diff --git a/lib/openflow/multipart/port_desc/reply.ex b/lib/openflow/multipart/port_desc/reply.ex new file mode 100644 index 0000000..73e0573 --- /dev/null +++ b/lib/openflow/multipart/port_desc/reply.ex @@ -0,0 +1,33 @@ +defmodule Openflow.Multipart.PortDesc.Reply do + defstruct( + version: 4, + xid: 0, + datapath_id: nil, # virtual field + aux_id: nil, + flags: [], + ports: [] + ) + + alias __MODULE__ + + def ofp_type, do: 18 + + def new(ports \\ []) do + %Reply{ports: ports} + end + + def read(<>) do + ports = for (<>), do: Openflow.Port.read(port_bin) + %Reply{ports: Enum.reverse(ports)} + end + + def append_body(%Reply{ports: ports} = message, %Reply{flags: [:more], ports: continue}) do + %{message|ports: [continue|ports]} + end + def append_body(%Reply{ports: ports} = message, %Reply{flags: [], ports: continue}) do + new_ports = [continue|ports] + |> Enum.reverse + |> List.flatten + %{message|ports: new_ports} + end +end diff --git a/lib/openflow/multipart/port_desc/request.ex b/lib/openflow/multipart/port_desc/request.ex new file mode 100644 index 0000000..be0abde --- /dev/null +++ b/lib/openflow/multipart/port_desc/request.ex @@ -0,0 +1,24 @@ +defmodule Openflow.Multipart.PortDesc.Request do + defstruct( + version: 4, + xid: 0, + datapath_id: nil, # virtual field + flags: [] + ) + + alias __MODULE__ + + def ofp_type, do: 18 + + def new do + %Request{} + end + + def read("") do + %Request{} + end + + def to_binary(%Request{} = msg) do + Openflow.Multipart.Request.header(msg) + end +end diff --git a/lib/openflow/multipart/port_stats/reply.ex b/lib/openflow/multipart/port_stats/reply.ex index e51d0a3..e57ed32 100644 --- a/lib/openflow/multipart/port_stats/reply.ex +++ b/lib/openflow/multipart/port_stats/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.PortStats.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], ports: [] ) @@ -19,6 +20,16 @@ defmodule Openflow.Multipart.PortStats.Reply do ports = Openflow.Multipart.PortStats.read(ports_bin) %Reply{ports: ports} end + + def append_body(%Reply{ports: ports} = message, %Reply{flags: [:more], ports: continue}) do + %{message|ports: [continue|ports]} + end + def append_body(%Reply{ports: ports} = message, %Reply{flags: [], ports: continue}) do + new_ports = [continue|ports] + |> Enum.reverse + |> List.flatten + %{message|ports: new_ports} + end end defmodule Openflow.Multipart.PortStats do diff --git a/lib/openflow/multipart/queue/reply.ex b/lib/openflow/multipart/queue/reply.ex index 53d3284..38afbbc 100644 --- a/lib/openflow/multipart/queue/reply.ex +++ b/lib/openflow/multipart/queue/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.Queue.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], queues: [] ) @@ -19,6 +20,16 @@ defmodule Openflow.Multipart.Queue.Reply do queues = Openflow.Multipart.Queue.read(queues_bin) %Reply{queues: queues} end + + def append_body(%Reply{queues: queues} = message, %Reply{flags: [:more], queues: continue}) do + %{message|queues: [continue|queues]} + end + def append_body(%Reply{queues: queues} = message, %Reply{flags: [], queues: continue}) do + new_queues = [continue|queues] + |> Enum.reverse + |> List.flatten + %{message|queues: new_queues} + end end defmodule Openflow.Multipart.Queue do diff --git a/lib/openflow/multipart/table/reply.ex b/lib/openflow/multipart/table/reply.ex index 496450b..457de7c 100644 --- a/lib/openflow/multipart/table/reply.ex +++ b/lib/openflow/multipart/table/reply.ex @@ -3,6 +3,7 @@ defmodule Openflow.Multipart.Table.Reply do version: 4, xid: 0, datapath_id: nil, # virtual field + aux_id: nil, flags: [], tables: [] ) @@ -15,6 +16,16 @@ defmodule Openflow.Multipart.Table.Reply do tables = Openflow.Multipart.TableStats.read(tables_bin) %Reply{tables: tables} end + + def append_body(%Reply{tables: tables} = message, %Reply{flags: [:more], tables: continue}) do + %{message|tables: [continue|tables]} + end + def append_body(%Reply{tables: tables} = message, %Reply{flags: [], tables: continue}) do + new_tables = [continue|tables] + |> Enum.reverse + |> List.flatten + %{message|tables: new_tables} + end end defmodule Openflow.Multipart.TableStats do diff --git a/lib/openflow/nx_packet_in2.ex b/lib/openflow/nx_packet_in2.ex index 1ba67e0..dfa8b34 100644 --- a/lib/openflow/nx_packet_in2.ex +++ b/lib/openflow/nx_packet_in2.ex @@ -13,14 +13,13 @@ defmodule Openflow.NxPacketIn2 do reason: nil, metadata: nil, userdata: nil, - continuation: nil, # continuation properties: - continuation_bridge: nil, - continuation_stack: nil, - continuation_conntracked: nil, + continuation_bridge: "", + continuation_stack: [], + continuation_conntracked: false, continuation_table_id: nil, continuation_cookie: nil, - continuation_actions: nil, + continuation_actions: [], continuation_action_set: nil ) @@ -29,6 +28,138 @@ defmodule Openflow.NxPacketIn2 do @experimenter 0x00002320 @nx_type 30 + @packet 0 + @full_len 1 + @buffer_id 2 + @table_id 3 + @cookie 4 + @reason 5 + @metadata 6 + @userdata 7 + @continuation 8 + + @nxcpt_bridge 0x8000 + @nxcpt_stack 0x8001 + @nxcpt_mirrors 0x8002 + @nxcpt_conntracked 0x8003 + @nxcpt_table_id 0x8004 + @nxcpt_cookie 0x8005 + @nxcpt_actions 0x8006 + @nxcpt_action_set 0x8007 + + @prop_header_length 4 + def ofp_type, do: 4 + def read(<<@experimenter::32, @nx_type::32, props_bin::bytes>>) do + %NxPacketIn2{} + |> decode_props(props_bin) + end + + ## private functions + + defp decode_props(pktin, ""), do: pktin + defp decode_props(pktin, <<@packet::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + packet_length = length - @prop_header_length + <> = tail + decode_props(%{pktin|packet: packet}, rest) + end + defp decode_props(pktin, <<@full_len::16, _length::16, full_len::32, rest::bytes>>) do + decode_props(%{pktin|full_len: full_len}, rest) + end + defp decode_props(pktin, <<@buffer_id::16, _length::16, buffer_id::32, rest::bytes>>) do + decode_props(%{pktin|buffer_id: buffer_id}, rest) + end + defp decode_props(pktin, <<@table_id::16, _length::16, table_id::8, _::24, rest::bytes>>) do + decode_props(%{pktin|table_id: table_id}, rest) + end + defp decode_props(pktin, <<@cookie::16, _length::16, _::32, cookie::64, rest::bytes>>) do + decode_props(%{pktin|cookie: cookie}, rest) + end + defp decode_props(pktin, <<@reason::16, _length::16, reason_int::8, _::24, rest::bytes>>) do + reason = Openflow.Enums.to_atom(reason_int, :packet_in_reason) + decode_props(%{pktin|reason: reason}, rest) + end + defp decode_props(pktin, <<@metadata::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + match_field_length = length - @prop_header_length + <> = tail + match_len = 4 + byte_size(match_fields_bin) + padding = Openflow.Utils.pad_length(match_len, 8) + match_bin = (<<1::16, match_len::16, match_fields_bin::bytes, 0::size(padding)-unit(8)>>) + {fields, _rest} = Openflow.Match.read(match_bin) + decode_props(%{pktin|metadata: fields}, rest) + end + defp decode_props(pktin, <<@userdata::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + userdata_length = length - @prop_header_length + <> = tail + decode_props(%{pktin|userdata: userdata}, rest) + end + defp decode_props(pktin, <<@continuation::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + data_length = length - @prop_header_length - 4 + <<_pad::32, data::size(data_length)-bytes, _::size(pad_length)-unit(8), rest::bytes>> = tail + pktin + |> decode_continuations(data) + |> decode_props(rest) + end + defp decode_props(pktin, <<_::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + data_length = length - @prop_header_length + <<_data::size(data_length)-bytes, _::size(pad_length)-unit(8), rest::bytes>> = tail + decode_props(pktin, rest) + end + + defp decode_continuations(pktin, ""), do: pktin + defp decode_continuations(pktin, <<@nxcpt_bridge::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + data_length = length - @prop_header_length + <> = tail + decode_continuations(%{pktin|continuation_bridge: bridge}, rest) + end + defp decode_continuations(pktin, <<@nxcpt_stack::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + data_length = (length - @prop_header_length) * 8 + <> = tail + decode_continuations(%{pktin|continuation_stack: pktin.continuation_stack ++ [stack]}, rest) + end + defp decode_continuations(pktin, <<@nxcpt_mirrors::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + data_length = length - @prop_header_length + <> = tail + decode_continuations(%{pktin|continuation_mirrors: mirrors}, rest) + end + defp decode_continuations(pktin, <<@nxcpt_conntracked::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + data_length = length - @prop_header_length + <<_::size(data_length)-bytes, _::size(pad_length)-unit(8), rest::bytes>> = tail + decode_continuations(%{pktin|continuation_conntracked: true}, rest) + end + defp decode_continuations(pktin, <<@nxcpt_table_id::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + <> = tail + decode_continuations(%{pktin|continuation_table_id: table_id}, rest) + end + defp decode_continuations(pktin, <<@nxcpt_cookie::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + <> = tail + decode_continuations(%{pktin|continuation_cookie: cookie}, rest) + end + defp decode_continuations(pktin, <<@nxcpt_actions::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + data_length = length - @prop_header_length - 4 + <<_pad::32, actions::size(data_length)-bytes, _::size(pad_length)-unit(8), rest::bytes>> = tail + decode_continuations(%{pktin|continuation_actions: Openflow.Action.read(actions)}, rest) + end + defp decode_continuations(pktin, <<@nxcpt_action_set::16, length::16, tail::bytes>>) do + pad_length = Openflow.Utils.pad_length(length, 8) + data_length = length - @prop_header_length + <> = tail + decode_continuations(%{pktin|continuation_action_set: action_set}, rest) + end + defp decode_continuations(pktin, _) do + decode_continuations(pktin, "") + end end diff --git a/lib/tres/application.ex b/lib/tres/application.ex index 5dc3b16..3186d9d 100644 --- a/lib/tres/application.ex +++ b/lib/tres/application.ex @@ -8,8 +8,9 @@ defmodule Tres.Application do def start(_type, _args) do import Supervisor.Spec + {cb_mod, _cb_args} = Tres.Utils.get_callback_module children = [worker(Registry, [[keys: :unique, name: SwitchRegistry]], id: SwitchRegistry), - supervisor(Tres.MessageHandlerSup, [], id: MessageHandlerSup)] + supervisor(Tres.MessageHandlerSup, [cb_mod], id: MessageHandlerSup)] opts = [strategy: :one_for_one, name: Tres.Supervisor] {:ok, _connection_pid} = Tres.Utils.start_openflow_listener Supervisor.start_link(children, opts) diff --git a/lib/tres/controller.ex b/lib/tres/controller.ex new file mode 100644 index 0000000..c9e0a38 --- /dev/null +++ b/lib/tres/controller.ex @@ -0,0 +1,14 @@ +defmodule Tres.Controller do + def controller_helpers do + quote do + import Tres.SwitchRegistry, only: [send_message: 2] + + use Tres.Messages + use Tres.MessageHelper + end + end + + defmacro __using__(_) do + controller_helpers() + end +end diff --git a/lib/tres/example_handler.ex b/lib/tres/example_handler.ex new file mode 100644 index 0000000..deaf722 --- /dev/null +++ b/lib/tres/example_handler.ex @@ -0,0 +1,108 @@ +defmodule Tres.ExampleHandler do + use GenServer + use Tres.Controller + + import Logger + + defmodule State do + defstruct [ + datapath_id: nil, + aux_id: nil, + conn_ref: nil + ] + end + + def start_link(datapath, args) do + GenServer.start_link(__MODULE__, [datapath, args]) + end + + def init([{datapath_id, aux_id}, _args]) do + info("[#{__MODULE__}] Switch Ready: " + <> "datapath_id: #{datapath_id} " + <> "aux_id: #{aux_id} " + <> "in #{inspect(self())}") + _ = send_flows_for_test(datapath_id) + _ = send_flow_stats_request(datapath_id) + _ = send_desc_stats_request(datapath_id) + _ = send_port_desc_stats_request(datapath_id) + conn_ref = SwitchRegistry.monitor(datapath_id) + state = %State{datapath_id: datapath_id, aux_id: aux_id, conn_ref: conn_ref} + {:ok, state} + end + + def handle_info(%Flow.Reply{datapath_id: datapath_id} = desc, state) do + handle_flow_stats_reply(desc, datapath_id) + {:noreply, state} + end + def handle_info(%PortDesc.Reply{datapath_id: datapath_id} = desc, state) do + handle_port_desc_stats_reply(desc, datapath_id) + {:noreply, state} + end + def handle_info(%Desc.Reply{datapath_id: datapath_id} = desc, state) do + handle_desc_stats_reply(desc, datapath_id) + {:noreply, state} + end + + # To prevent process leakage, following section is required. + def handle_info({:'DOWN', ref, :process, _pid, _reason}, %State{conn_ref: ref} = state) do + :ok = warn("[#{__MODULE__}] Switch Disconnected: datapath_id: #{state.datapath_id}") + {:stop, :normal, state} + end + + # `Catch all` function is required. + def handle_info(info, state) do + :ok = warn("[#{__MODULE__}] unhandled message #{inspect(info)}: #{state.datapath_id}") + {:noreply, state} + end + + # private functions + + defp send_flows_for_test(datapath_id) do + for count <- Range.new(1, 1024) do + send_flow_mod_add(datapath_id, match: Match.new(metadata: count)) + end + end + + defp send_flow_stats_request(datapath_id) do + Flow.Request.new + |> send_message(datapath_id) + end + + defp send_desc_stats_request(datapath_id) do + Desc.Request.new + |> send_message(datapath_id) + end + + defp send_port_desc_stats_request(datapath_id) do + PortDesc.Request.new + |> send_message(datapath_id) + end + + defp handle_flow_stats_reply(desc, datapath_id) do + info("[#{__MODULE__}] Switch #{length(desc.flows)} installed on #{datapath_id}") + end + + defp handle_desc_stats_reply(desc, datapath_id) do + info( + "[#{__MODULE__}] Switch Desc: " + <> "mfr = #{desc.mfr_desc} " + <> "hw = #{desc.hw_desc} " + <> "sw = #{desc.sw_desc} " + <> "for #{datapath_id}" + ) + end + + defp handle_port_desc_stats_reply(port_desc, datapath_id) do + for port <- port_desc.ports do + info( + "[#{__MODULE__}] Switch has port: " + <> "number = #{port.number} " + <> "hw_addr = #{port.hw_addr} " + <> "name = #{port.name} " + <> "config = #{inspect(port.config)} " + <> "current_speed = #{port.current_speed} " + <> "on #{datapath_id}" + ) + end + end +end diff --git a/lib/tres/message_handler.ex b/lib/tres/message_handler.ex deleted file mode 100644 index 7f01ed8..0000000 --- a/lib/tres/message_handler.ex +++ /dev/null @@ -1,52 +0,0 @@ -defmodule Tres.MessageHandler do - use GenServer - - defmodule State do - defstruct [ - ip_addr: nil, - port: nil, - datapath_id: nil, - conn_pid: nil, - conn_ref: nil, - handler_pid: nil, - handler_ref: nil - ] - end - - alias Tres.MessageHandler.State - - @process_flags [trap_exit: true] - - def start_link({ip_addr, port}, conn_pid) do - GenServer.start_link(__MODULE__, [{ip_addr, port}, conn_pid]) - end - - def init([{ip_addr, port}, conn_pid]) do - init_process() - conn_ref = Process.monitor(conn_pid) - state = %State{ - conn_pid: conn_pid, - conn_ref: conn_ref, - ip_addr: ip_addr, - port: port - } - {:ok, state} - end - - def handle_info({:'EXIT', _pid, _reason}, state) do - {:stop, :normal, state} - end - def handle_info({:'DOWN', conn_ref, :process, _conn_pid, _reason}, %State{conn_ref: conn_ref} = state) do - {:stop, :normal, state} - end - - def terminate(_reason, state) do - {:shutdown, state} - end - - ## private functions - - defp init_process do - for {flag, value} <- @process_flags, do: Process.flag(flag, value) - end -end diff --git a/lib/tres/message_handler_sup.ex b/lib/tres/message_handler_sup.ex index ca6d393..9c250f6 100644 --- a/lib/tres/message_handler_sup.ex +++ b/lib/tres/message_handler_sup.ex @@ -1,16 +1,17 @@ defmodule Tres.MessageHandlerSup do use Supervisor - def start_link do - Supervisor.start_link(__MODULE__, [], name: __MODULE__) + def start_link(cb_mod) do + Supervisor.start_link(__MODULE__, [cb_mod], name: __MODULE__) end - def init(_) do - children = [worker(Tres.MessageHandler, [], restart: :temporary)] + def init([cb_mod]) do + children = [worker(cb_mod, [], restart: :temporary, shutdown: 5000)] supervise(children, strategy: :simple_one_for_one) end - def start_child({ip_addr, port}) do - Supervisor.start_child(__MODULE__, [{ip_addr, port}, self()]) + def start_child({dpid, aux_id}) do + {_cb_mod, cb_args} = Tres.Utils.get_callback_module + Supervisor.start_child(__MODULE__, [{dpid, aux_id}, cb_args]) end end diff --git a/lib/tres/message_helper.ex b/lib/tres/message_helper.ex new file mode 100644 index 0000000..8080d5b --- /dev/null +++ b/lib/tres/message_helper.ex @@ -0,0 +1,88 @@ +defmodule Tres.MessageHelper do + defmacro __using__(_) do + quote location: :keep do + defp send_flow_mod_add(datapath_id, options) do + flow_mod = %Openflow.FlowMod{ + cookie: options[:cookie] || 0, + priority: options[:priority] || 0, + table_id: options[:table_id] || 0, + command: :add, + idle_timeout: options[:idle_timeout] || 0, + hard_timeout: options[:hard_timeout] || 0, + buffer_id: :no_buffer, + out_port: :any, + out_group: :any, + flags: options[:flags] || [], + match: options[:match] || Openflow.Match.new, + instructions: options[:instructions] || [], + } + send_message(flow_mod, datapath_id) + end + + defp send_flow_mod_modify(datapath_id, options) do + command = Tres.Utils.flow_command(options, :modify) + flow_mod = %Openflow.FlowMod{ + cookie: options[:cookie] || 0, + table_id: options[:table_id] || 0, + command: command, + idle_timeout: options[:idle_timeout] || 0, + hard_timeout: options[:hard_timeout] || 0, + out_port: :any, + out_group: :any, + match: options[:match] || Openflow.Match.new, + instructions: options[:instructions] || [], + } + send_message(flow_mod, datapath_id) + end + + defp send_flow_mod_delete(datapath_id, options) do + command = Tres.Utils.flow_command(options, :delete) + flow_mod = %Openflow.FlowMod{ + cookie: options[:cookie] || 0, + cookie_mask: options[:cookie_mask] || 0, + table_id: options[:table_id] || :all, + command: command, + out_port: options[:out_port] || :any, + out_group: options[:out_group] || :any, + match: options[:match] || Openflow.Match.new + } + send_message(flow_mod, datapath_id) + end + + defp send_packet_out(datapath_id, options) do + packet_out = %Openflow.PacketOut{ + buffer_id: options[:buffer_id] || :no_buffer, + in_port: options[:in_port] || :controller, + actions: options[:actions] || [], + data: options[:data] || "" + } + send_message(packet_out, datapath_id) + end + + defp send_group_mod_add(datapath_id, options) do + group_mod = Openflow.GroupMod.new( + command: :add, + type: options[:type] || :all, + group_id: options[:group_id] || 0, + buckets: options[:buckets] || [] + ) + send_message(group_mod, datapath_id) + end + + defp send_group_mod_delete(datapath_id, group_id) do + group_mod = Openflow.GroupMod.new(command: :delete, group_id: group_id) + send_message(group_mod, datapath_id) + end + + defp send_group_mod_modify(datapath_id, options) do + group_mod = Openflow.GroupMod.new( + command: :modify, + type: options[:type] || :all, + group_id: options[:group_id] || 0, + buckets: options[:buckets] || [] + ) + send_message(group_mod, datapath_id) + end + end + end +end diff --git a/lib/tres/messages.ex b/lib/tres/messages.ex new file mode 100644 index 0000000..04810a2 --- /dev/null +++ b/lib/tres/messages.ex @@ -0,0 +1,128 @@ +defmodule Tres.Messages do + defmacro __using__(_) do + quote do + alias Openflow.ErrorMsg + alias Openflow.Echo + alias Openflow.Features + alias Openflow.GetConfig + alias Openflow.SetConfig + alias Openflow.PacketIn + alias Openflow.FlowRemoved + alias Openflow.PortStatus + alias Openflow.PacketOut + alias Openflow.FlowMod + alias Openflow.GroupMod + alias Openflow.PortMod + alias Openflow.TableMod + alias Openflow.Multipart + alias Openflow.Barrier + alias Openflow.Role + alias Openflow.GetAsync + alias Openflow.SetAsync + alias Openflow.MeterMod + + alias Openflow.Match + alias Openflow.Port + + alias Openflow.NxSetPacketInFormat + alias Openflow.NxSetControllerId + alias Openflow.NxPacketIn2 + + alias Openflow.Multipart.Desc + alias Openflow.Multipart.Flow + alias Openflow.Multipart.Aggregate + alias Openflow.Multipart.Table + alias Openflow.Multipart.PortStats + alias Openflow.Multipart.Queue + alias Openflow.Multipart.Group + alias Openflow.Multipart.GroupDesc + alias Openflow.Multipart.GroupFeatures + alias Openflow.Multipart.Meter + alias Openflow.Multipart.MeterConfig + alias Openflow.Multipart.MeterFeatures + alias Openflow.Multipart.TableFeatures + alias Openflow.Multipart.PortDesc + + alias Openflow.Instruction.GotoTable + alias Openflow.Instruction.WriteMetadata + alias Openflow.Instruction.WriteActions + alias Openflow.Instruction.ApplyActions + alias Openflow.Instruction.ClearActions + alias Openflow.Instruction.Meter + + alias Openflow.Action.Output + alias Openflow.Action.CopyTtlOut + alias Openflow.Action.CopyTtlIn + alias Openflow.Action.SetMplsTtl + alias Openflow.Action.DecMplsTtl + alias Openflow.Action.PushVlan + alias Openflow.Action.PopVlan + alias Openflow.Action.PushMpls + alias Openflow.Action.PopMpls + alias Openflow.Action.SetQueue + alias Openflow.Action.Group + alias Openflow.Action.SetNwTtl + alias Openflow.Action.DecNwTtl + alias Openflow.Action.SetField + alias Openflow.Action.PushPbb + alias Openflow.Action.PopPbb + alias Openflow.Action.Encap + alias Openflow.Action.Decap + alias Openflow.Action.SetSequence + alias Openflow.Action.ValidateSequence + + alias Openflow.Action.NxResubmit + alias Openflow.Action.NxSetTunnel + alias Openflow.Action.NxSetQueue + alias Openflow.Action.NxPopQueue + alias Openflow.Action.NxRegMove + alias Openflow.Action.NxRegLoad + alias Openflow.Action.NxNote + alias Openflow.Action.NxSetTunnel64 + alias Openflow.Action.NxMultipath + alias Openflow.Action.NxBundle + alias Openflow.Action.NxBundleLoad + alias Openflow.Action.NxResubmitTable + alias Openflow.Action.NxOutputReg + alias Openflow.Action.NxLearn + alias Openflow.Action.NxExit + alias Openflow.Action.NxDecTtl + alias Openflow.Action.NxFinTimeout + alias Openflow.Action.NxController + alias Openflow.Action.NxDecTtlCntIds + alias Openflow.Action.NxWriteMetadata + alias Openflow.Action.NxPushMpls + alias Openflow.Action.NxPopMpls + alias Openflow.Action.NxSetMplsTtl + alias Openflow.Action.NxDecMplsTtl + alias Openflow.Action.NxStackPush + alias Openflow.Action.NxStackPop + alias Openflow.Action.NxSample + alias Openflow.Action.NxSetMplsLabel + alias Openflow.Action.NxSetMplsTc + alias Openflow.Action.NxOutputReg2 + alias Openflow.Action.NxRegLoad2 + alias Openflow.Action.NxConjunction + alias Openflow.Action.NxConntrack + alias Openflow.Action.NxNat + alias Openflow.Action.NxController2 + alias Openflow.Action.NxSample2 + alias Openflow.Action.NxOutputTrunc + alias Openflow.Action.NxGroup + alias Openflow.Action.NxSample3 + alias Openflow.Action.NxClone + alias Openflow.Action.NxCtClear + alias Openflow.Action.NxResubmitTableCt + alias Openflow.Action.NxLearn2 + alias Openflow.Action.NxEncap + alias Openflow.Action.NxDecap + alias Openflow.Action.NxDebugRecirc + + alias Openflow.Action.NxFlowSpecMatch + alias Openflow.Action.NxFlowSpecLoad + alias Openflow.Action.NxFlowSpecOutput + + alias Tres.SwitchRegistry + end + end +end diff --git a/lib/tres/secure_channel.ex b/lib/tres/secure_channel.ex index 013e731..187b693 100644 --- a/lib/tres/secure_channel.ex +++ b/lib/tres/secure_channel.ex @@ -3,6 +3,7 @@ defmodule Tres.SecureChannel do import Logger + alias :tres_xact_kv, as: XACT_KV alias Tres.SecureChannelState alias Tres.SwitchRegistry alias Tres.MessageHandlerSup @@ -32,12 +33,11 @@ defmodule Tres.SecureChannel do end def init([ref, socket, transport, _opts]) do - state_data = - ref - |> init_secure_channel(socket, transport) - |> init_handler - info("[#{__MODULE__}] TCP connected to Switch on #{state_data.ip_addr}:#{state_data.port} on #{inspect(self())}") - :gen_statem.enter_loop(__MODULE__, [debug: []], :INIT, state_data, []) + state_data = init_secure_channel(ref, socket, transport) + debug("[#{__MODULE__}] TCP connected to Switch on" + <> " #{state_data.ip_addr}:#{state_data.port}" + <> " on #{inspect(self())}") + :gen_statem.enter_loop(__MODULE__, [debug: [:debug]], :INIT, state_data, []) end # TCP handler @@ -73,8 +73,9 @@ defmodule Tres.SecureChannel do handle_WATING(type, message, state_data) end - def terminate(reason, state, %SecureChannelState{datapath_id: datapath_id, aux_id: aux_id}) do + def terminate(reason, state, %SecureChannelState{datapath_id: datapath_id, aux_id: aux_id, xact_kv_ref: kv_ref}) do warn("[#{__MODULE__}] termiate: #{inspect(reason)} state = #{inspect(state)}") + true = XACT_KV.drop(kv_ref) :ok = SwitchRegistry.unregister({datapath_id, aux_id}) end @@ -83,7 +84,8 @@ defmodule Tres.SecureChannel do defp init_secure_channel(ref, socket, transport) do init_process(ref) :ok = transport.setopts(socket, [active: :once]) - SecureChannelState.new(ref: ref, socket: socket, transport: transport) + kv_ref = XACT_KV.create + SecureChannelState.new(ref: ref, socket: socket, transport: transport, xact_kv_ref: kv_ref) end defp init_process(ref) do @@ -93,8 +95,8 @@ defmodule Tres.SecureChannel do end defp init_handler(state_data) do - %SecureChannelState{ip_addr: ip_addr, port: port} = state_data - {:ok, pid} = MessageHandlerSup.start_child({ip_addr, port}) + %SecureChannelState{datapath_id: dpid, aux_id: aux_id} = state_data + {:ok, pid} = MessageHandlerSup.start_child({dpid, aux_id}) ref = Process.monitor(pid) %{state_data|handler_pid: pid, handler_ref: ref} end @@ -128,8 +130,7 @@ defmodule Tres.SecureChannel do close_connection(:features_handshake_timeout, state_data) end defp handle_CONNECTING(:internal, {:openflow, %Openflow.Features.Reply{} = features}, state_data) do - # TODO: Send to handler - info("[#{__MODULE__}] Switch connected datapath_id: #{features.datapath_id} auxiliary_id: #{features.aux_id}") + debug("[#{__MODULE__}] Switch connected datapath_id: #{features.datapath_id} auxiliary_id: #{features.aux_id}") _ = maybe_cancel_timer(state_data.timer_ref) handle_features_handshake(features, state_data) end @@ -142,9 +143,10 @@ defmodule Tres.SecureChannel do end # CONNECTED state - defp handle_CONNECTED(:enter, :CONNECTING, _state_data) do + defp handle_CONNECTED(:enter, :CONNECTING, state_data) do + new_state_data = init_handler(state_data) start_periodic_idle_check() - :keep_state_and_data + {:keep_state, new_state_data} end defp handle_CONNECTED(:info, :idle_check, state_data) do start_periodic_idle_check() @@ -162,19 +164,48 @@ defmodule Tres.SecureChannel do send_echo_reply(echo.xid, echo.data, state_data) :keep_state_and_data end - defp handle_CONNECTED(:internal, {:openflow, _message}, _state_data) do - # TODO: Send to handler + defp handle_CONNECTED(:internal, {:openflow, %Openflow.Barrier.Reply{xid: xid}}, state_data) do + for {:xact_entry, _xid, message, _orig} <- XACT_KV.get(state_data.xact_kv_ref, xid) do + unless is_nil(message) do + send(state_data.handler_pid, message) + XACT_KV.delete(state_data.xact_kv_ref, message.xid) + end + end + :keep_state_and_data + end + defp handle_CONNECTED(:internal, {:openflow, message}, state_data) do + %SecureChannelState{datapath_id: dpid, aux_id: aux_id} = state_data + new_message = %{message|datapath_id: dpid, aux_id: aux_id} + state_data.xact_kv_ref + |> XACT_KV.is_exists(message.xid) + |> handle_message(new_message, state_data) :keep_state_and_data end defp handle_CONNECTED(:cast, {:send_message, message}, state_data) do - message - |> send_message(state_data) + xid = SecureChannelState.increment_transaction_id(state_data.xid) + messages = [ + %{message|xid: xid}, + %{Openflow.Barrier.Request.new|xid: xid} + ] + XACT_KV.insert(state_data.xact_kv_ref, xid, message) + send_message(messages, state_data) :keep_state_and_data end + defp handle_message(_in_xact = true, message, state_data) do + [{:xact_entry, _xid, prev_message, _orig}|_] = XACT_KV.get(state_data.xact_kv_ref, message.xid) + new_message = Openflow.append_body(prev_message, message) + XACT_KV.update(state_data.xact_kv_ref, message.xid, new_message) + end + defp handle_message(_in_xact = false, message, %SecureChannelState{handler_pid: handler_pid}) do + send(handler_pid, message) + end + # WATING state defp handle_WATING(:enter, :CONNECTING, state_data) do warn("[#{__MODULE__}] Possible HANG Detected on datapath_id: #{state_data.datapath_id} !") + %SecureChannelState{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data + send(handler_pid, {:switch_hang, {dpid, aux_id}}) start_periodic_idle_check() :keep_state_and_data end @@ -186,8 +217,9 @@ defmodule Tres.SecureChannel do defp handle_WATING(:info, :ping_timeout, state_data) do handle_ping_timeout(state_data, :WAITING) end - defp handle_WATING(:internal, {:openflow, _message}, state_data) do - # TODO: Send to handler + defp handle_WATING(:internal, {:openflow, message}, state_data) do + %SecureChannelState{handler_pid: handler_pid, datapath_id: dpid, aux_id: aux_id} = state_data + send(handler_pid, %{message|datapath_id: dpid, aux_id: aux_id}) {:next_state, :CONNECTING, state_data} end defp handle_WATING(type, message, state_data) @@ -249,12 +281,10 @@ defmodule Tres.SecureChannel do {:next_state, :CONNECTED, new_state_data} end - defp monitor_connection(datapath_id, aux_id) when aux_id > 0 do - datapath_id - |> SwitchRegistry.lookup_pid - |> Process.monitor - end - defp monitor_connection(_datapath_id, _aux_id), do: nil + defp monitor_connection(datapath_id, aux_id) when aux_id > 0, + do: SwitchRegistry.monitor(datapath_id) + defp monitor_connection(_datapath_id, _aux_id), + do: nil defp send_hello(state_data) do @supported_version @@ -320,7 +350,7 @@ defmodule Tres.SecureChannel do end defp send_message(message, %SecureChannelState{socket: socket, transport: transport}) do - debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})") + #debug("[#{__MODULE__}] Sending: #{inspect(message.__struct__)}(xid: #{message.xid})") Tres.Utils.send_message(message, socket, transport) end @@ -371,7 +401,7 @@ defmodule Tres.SecureChannel do {:stop, :normal, %{state_data|socket: nil}} end defp close_connection({:trap_detected, reason}, state_data) do - warn("[#{__MODULE__}] connection terminated: Handler process down by #{reason}") + warn("[#{__MODULE__}] connection terminated: Trapped by #{reason}") {:stop, :normal, %{state_data|socket: nil}} end defp close_connection(:tcp_closed, state_data) do diff --git a/lib/tres/secure_channel_state.ex b/lib/tres/secure_channel_state.ex index f2c163d..e3f8873 100644 --- a/lib/tres/secure_channel_state.ex +++ b/lib/tres/secure_channel_state.ex @@ -16,10 +16,12 @@ defmodule Tres.SecureChannelState do ping_xid: 0, ping_timer_ref: nil, ping_fail_count: 0, - last_received: 0 + last_received: 0, + xact_kv_ref: nil ) alias __MODULE__ + alias :tres_xact_kv, as: XACT_KV def new(options) do ref = Keyword.get(options, :ref) @@ -27,13 +29,15 @@ defmodule Tres.SecureChannelState do transport = Keyword.get(options, :transport) {:ok, {ip_addr, port}} = :inet.peername(socket) {:ok, xid_agent} = Agent.start_link(fn -> 0 end) + kv_ref = XACT_KV.create %SecureChannelState{ ref: ref, socket: socket, transport: transport, ip_addr: :inet.ntoa(ip_addr), port: port, - xid: xid_agent + xid: xid_agent, + xact_kv_ref: kv_ref } end diff --git a/lib/tres/switch_registry.ex b/lib/tres/switch_registry.ex index f7b8a19..1145ee3 100644 --- a/lib/tres/switch_registry.ex +++ b/lib/tres/switch_registry.ex @@ -24,6 +24,12 @@ defmodule Tres.SwitchRegistry do send_message(message, {dpid, 0}) end + def monitor(datapath_id) do + datapath_id + |> lookup_pid + |> Process.monitor + end + # private function defp dispatch(entries, message) do diff --git a/lib/tres/utils.ex b/lib/tres/utils.ex index 358e44f..b99f3a1 100644 --- a/lib/tres/utils.ex +++ b/lib/tres/utils.ex @@ -6,10 +6,16 @@ defmodule Tres.Utils do @default_num_acceptors 10 @default_openflow_port 6633 + def get_callback_module do + cb_mod = get_config(:callback_module, Tres.ExampleHandler) + cb_args = get_config(:callback_args, []) + {cb_mod, cb_args} + end + def start_openflow_listener do - max_connections = Application.get_env(:tres, :max_connections, @default_max_connections) - num_acceptors = Application.get_env(:tres, :num_acceptors, @default_num_acceptors) - port = Application.get_env(:tres, :port, @default_openflow_port) + max_connections = get_config(:max_connections, @default_max_connections) + num_acceptors = get_config(:num_acceptors, @default_num_acceptors) + port = get_config(:port, @default_openflow_port) options = [max_connections: max_connections, num_acceptors: num_acceptors, port: port] :ranch.start_listener(Tres, :ranch_tcp, options, @connection_manager, []) end @@ -23,4 +29,31 @@ defmodule Tres.Utils do error("[#{__MODULE__}] Unencodable error: #{inspect(message)}") end end + + def is_multipart?(message) do + message.__struct__ + |> Module.split + |> Enum.at(1) + |> String.match?(~r/Multipart/) + end + + def flow_command(:delete, options) do + if Keyword.get(options, :strict, false) do + :delete_strict + else + :delete + end + end + def flow_command(:modify, options) do + if Keyword.get(options, :strict, false) do + :modify_strict + else + :modify + end + end + + # private functions + defp get_config(item, default) do + Application.get_env(:tres, item, default) + end end diff --git a/mix.exs b/mix.exs index 7a68614..8aa92e7 100644 --- a/mix.exs +++ b/mix.exs @@ -6,6 +6,7 @@ defmodule Tres.Mixfile do version: "0.1.0", elixir: "~> 1.5", start_permanent: Mix.env == :prod, + compilers: [:erlang] ++ Mix.compilers, deps: deps()] end diff --git a/src/tres_xact_kv.erl b/src/tres_xact_kv.erl new file mode 100644 index 0000000..6e41e90 --- /dev/null +++ b/src/tres_xact_kv.erl @@ -0,0 +1,61 @@ +-module(tres_xact_kv). + +-include_lib("stdlib/include/ms_transform.hrl"). + +-export([create/0, drop/1]). +-export([insert/3, update/3, get/2, delete/2, is_exists/2]). + +-define(TABLE, xact_kv). +-define(ENTRY, xact_entry). +-define(TABLE_OPTS, [set, protected, {keypos, #?ENTRY.xid}]). + +-record(?ENTRY, {xid = 0, pending = nil, orig = nil}). + +-spec create() -> reference(). +create() -> + ets:new(?TABLE, ?TABLE_OPTS). + +-spec drop(reference()) -> true. +drop(Tid) -> + ets:delete(Tid). + +-spec insert(reference(), integer(), map()) -> true. +insert(Tid, Xid, Orig) -> + ets:insert(Tid, #?ENTRY{xid = Xid, orig = Orig}). + +-spec update(reference(), integer(), map()) -> integer(). +update(Tid, Xid, #{'__struct__' := 'Elixir.Openflow.ErrorMsg'} = Error) -> + ets:select_replace(Tid, ms_for_handle_error(Tid, Xid, Error)); +update(Tid, Xid, Msg) -> + ets:select_replace(Tid, ms_for_update(Xid, Msg)). + +-spec get(reference(), integer()) -> [term()]. +get(Tid, Xid) -> + ets:select(Tid, ms_for_get(Xid)). + +-spec delete(reference(), integer()) -> integer(). +delete(Tid, Xid) -> + ets:select_delete(Tid, ms_for_exists(Xid)). + +-spec is_exists(reference(), integer()) -> boolean(). +is_exists(Tid, Xid) -> + case ets:select(Tid, ms_for_exists(Xid)) of + [_|_] -> true; + [] -> false + end. + +%% Private functions + +ms_for_exists(Xid) -> + ets:fun2ms(fun(#?ENTRY{xid = TXid}) when TXid < Xid -> true end). + +ms_for_get(Xid) -> + ets:fun2ms(fun(#?ENTRY{xid = TXid} = E) when TXid == Xid -> E end). + +ms_for_update(Xid, Msg) -> + ets:fun2ms(fun(#?ENTRY{xid = TXid} = E) when TXid == Xid -> E#?ENTRY{pending = Msg} end). + +ms_for_handle_error(Tid, Xid, Error) -> + [Orig|_] = get(Tid, Xid), + Error1 = maps:merge(Error, #{data => Orig}), + ets:fun2ms(fun(#?ENTRY{xid = TXid} = E) when TXid == Xid -> E#?ENTRY{pending = Error1} end). diff --git a/test/flay_test.exs b/test/flay_test.exs new file mode 100644 index 0000000..22285b8 --- /dev/null +++ b/test/flay_test.exs @@ -0,0 +1,5 @@ +defmodule FlayTest do + use ExUnit.Case + + +end diff --git a/test/packet_data/nx_packet_in2_arp.raw b/test/packet_data/nx_packet_in2_arp.raw new file mode 100644 index 0000000..cbf807f Binary files /dev/null and b/test/packet_data/nx_packet_in2_arp.raw differ