Merge pull request #2 from shun159/ets_counter

Fix to use ETS based transaction_id counter
This commit is contained in:
Eishun Kondoh 2018-03-18 22:37:23 +09:00 committed by GitHub
commit c4c70c4d99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 17 deletions

View file

@ -11,7 +11,7 @@ config :tres,
callback_args: [] callback_args: []
config :logger, config :logger,
level: :info, level: :debug,
format: "$date $time [$level] $metadata$message\n", format: "$date $time [$level] $metadata$message\n",
metadata: [:application], metadata: [:application],
handle_otp_reports: true handle_otp_reports: true

View file

@ -11,7 +11,7 @@ defmodule Tres.SecureChannel do
@process_flags [ @process_flags [
trap_exit: true, trap_exit: true,
message_queue_data: :on_heap message_queue_data: :off_heap
] ]
@supported_version 4 @supported_version 4
@ -379,7 +379,7 @@ defmodule Tres.SecureChannel do
defp initiate_hello_handshake(state_data) do defp initiate_hello_handshake(state_data) do
send_hello(state_data) send_hello(state_data)
ref = :erlang.send_after(@hello_handshake_timeout, self(), :hello_timeout) ref = Process.send_after(self(), :hello_timeout, @hello_handshake_timeout)
{:keep_state, %{state_data | timer_ref: ref}} {:keep_state, %{state_data | timer_ref: ref}}
end end
@ -397,7 +397,7 @@ defmodule Tres.SecureChannel do
defp initiate_features_handshake(state_data) do defp initiate_features_handshake(state_data) do
new_xid = State.increment_transaction_id(state_data.xid) new_xid = State.increment_transaction_id(state_data.xid)
send_features(new_xid, state_data) send_features(new_xid, state_data)
ref = :erlang.send_after(@features_handshake_timeout, self(), :features_timeout) ref = Process.send_after(self(), :features_timeout, @features_handshake_timeout)
{:keep_state, %{state_data | timer_ref: ref}} {:keep_state, %{state_data | timer_ref: ref}}
end end
@ -445,7 +445,7 @@ defmodule Tres.SecureChannel do
end end
defp start_periodic_idle_check do defp start_periodic_idle_check do
:erlang.send_after(@ping_interval, self(), :idle_check) Process.send_after(self(), :idle_check, @ping_interval)
end end
defp maybe_ping(state_data) do defp maybe_ping(state_data) do
@ -463,10 +463,10 @@ defmodule Tres.SecureChannel do
false false
end end
defp send_ping(%State{xid: x_agent} = state_data) do defp send_ping(%State{xid: table_ref} = state_data) do
xid = State.increment_transaction_id(x_agent) xid = State.increment_transaction_id(table_ref)
send_echo_request(xid, "", state_data) send_echo_request(xid, "", state_data)
ping_ref = :erlang.send_after(@ping_timeout, self(), :ping_timeout) ping_ref = Process.send_after(self(), :ping_timeout, @ping_timeout)
%{state_data | ping_timer_ref: ping_ref, ping_xid: xid} %{state_data | ping_timer_ref: ping_ref, ping_xid: xid}
end end
@ -516,7 +516,7 @@ defmodule Tres.SecureChannel do
defp maybe_cancel_timer(ref) when not is_reference(ref), do: :ok defp maybe_cancel_timer(ref) when not is_reference(ref), do: :ok
defp maybe_cancel_timer(ref) do defp maybe_cancel_timer(ref) do
:erlang.cancel_timer(ref) Process.cancel_timer(ref)
:ok :ok
end end

View file

@ -29,7 +29,7 @@ defmodule Tres.SecureChannelState do
socket = Keyword.get(options, :socket) socket = Keyword.get(options, :socket)
transport = Keyword.get(options, :transport) transport = Keyword.get(options, :transport)
{:ok, {ip_addr, port}} = :inet.peername(socket) {:ok, {ip_addr, port}} = :inet.peername(socket)
{:ok, xid_agent} = Agent.start_link(fn -> 0 end) {:ok, table_ref} = create_counter()
kv_ref = XACT_KV.create() kv_ref = XACT_KV.create()
%SecureChannelState{ %SecureChannelState{
@ -38,20 +38,31 @@ defmodule Tres.SecureChannelState do
transport: transport, transport: transport,
ip_addr: :inet.ntoa(ip_addr), ip_addr: :inet.ntoa(ip_addr),
port: port, port: port,
xid: xid_agent, xid: table_ref,
xact_kv_ref: kv_ref xact_kv_ref: kv_ref
} }
end end
def increment_transaction_id(xid_agent) do def increment_transaction_id(table_ref) do
Agent.get_and_update(xid_agent, &{&1 + 1, &1 + 1}) :ets.update_counter(table_ref, :datapath_xid, {2, 1, 0xffffffff, 0})
end end
def set_transaction_id(xid_agent, xid) do def set_transaction_id(table_ref, xid) do
Agent.update(xid_agent, fn _ -> xid end) :ets.insert(table_ref, {:datapath_xid, xid})
end end
def get_transaction_id(xid_agent) do def get_transaction_id(table_ref) do
Agent.get(xid_agent, & &1) case :ets.lookup(table_ref, :datapath_xid) do
[{_, xid} | _] -> xid
end
end
# private functions
@spec create_counter() :: reference()
defp create_counter do
table_ref = :ets.new(:xid_counter, [:set, :private])
_ = :ets.insert(table_ref, {:datapath_xid, 0})
{:ok, table_ref}
end end
end end