Fix to use ETS based transaction_id counter
This commit is contained in:
parent
25b0271ab1
commit
57611f9dc0
3 changed files with 28 additions and 17 deletions
|
|
@ -11,7 +11,7 @@ config :tres,
|
||||||
callback_args: []
|
callback_args: []
|
||||||
|
|
||||||
config :logger,
|
config :logger,
|
||||||
level: :info,
|
level: :debug,
|
||||||
format: "$date $time [$level] $metadata$message\n",
|
format: "$date $time [$level] $metadata$message\n",
|
||||||
metadata: [:application],
|
metadata: [:application],
|
||||||
handle_otp_reports: true
|
handle_otp_reports: true
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ defmodule Tres.SecureChannel do
|
||||||
|
|
||||||
@process_flags [
|
@process_flags [
|
||||||
trap_exit: true,
|
trap_exit: true,
|
||||||
message_queue_data: :on_heap
|
message_queue_data: :off_heap
|
||||||
]
|
]
|
||||||
|
|
||||||
@supported_version 4
|
@supported_version 4
|
||||||
|
|
@ -379,7 +379,7 @@ defmodule Tres.SecureChannel do
|
||||||
|
|
||||||
defp initiate_hello_handshake(state_data) do
|
defp initiate_hello_handshake(state_data) do
|
||||||
send_hello(state_data)
|
send_hello(state_data)
|
||||||
ref = :erlang.send_after(@hello_handshake_timeout, self(), :hello_timeout)
|
ref = Process.send_after(self(), :hello_timeout, @hello_handshake_timeout)
|
||||||
{:keep_state, %{state_data | timer_ref: ref}}
|
{:keep_state, %{state_data | timer_ref: ref}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -397,7 +397,7 @@ defmodule Tres.SecureChannel do
|
||||||
defp initiate_features_handshake(state_data) do
|
defp initiate_features_handshake(state_data) do
|
||||||
new_xid = State.increment_transaction_id(state_data.xid)
|
new_xid = State.increment_transaction_id(state_data.xid)
|
||||||
send_features(new_xid, state_data)
|
send_features(new_xid, state_data)
|
||||||
ref = :erlang.send_after(@features_handshake_timeout, self(), :features_timeout)
|
ref = Process.send_after(self(), :features_timeout, @features_handshake_timeout)
|
||||||
{:keep_state, %{state_data | timer_ref: ref}}
|
{:keep_state, %{state_data | timer_ref: ref}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -445,7 +445,7 @@ defmodule Tres.SecureChannel do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp start_periodic_idle_check do
|
defp start_periodic_idle_check do
|
||||||
:erlang.send_after(@ping_interval, self(), :idle_check)
|
Process.send_after(self(), :idle_check, @ping_interval)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp maybe_ping(state_data) do
|
defp maybe_ping(state_data) do
|
||||||
|
|
@ -463,10 +463,10 @@ defmodule Tres.SecureChannel do
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
|
|
||||||
defp send_ping(%State{xid: x_agent} = state_data) do
|
defp send_ping(%State{xid: table_ref} = state_data) do
|
||||||
xid = State.increment_transaction_id(x_agent)
|
xid = State.increment_transaction_id(table_ref)
|
||||||
send_echo_request(xid, "", state_data)
|
send_echo_request(xid, "", state_data)
|
||||||
ping_ref = :erlang.send_after(@ping_timeout, self(), :ping_timeout)
|
ping_ref = Process.send_after(self(), :ping_timeout, @ping_timeout)
|
||||||
%{state_data | ping_timer_ref: ping_ref, ping_xid: xid}
|
%{state_data | ping_timer_ref: ping_ref, ping_xid: xid}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
@ -516,7 +516,7 @@ defmodule Tres.SecureChannel do
|
||||||
defp maybe_cancel_timer(ref) when not is_reference(ref), do: :ok
|
defp maybe_cancel_timer(ref) when not is_reference(ref), do: :ok
|
||||||
|
|
||||||
defp maybe_cancel_timer(ref) do
|
defp maybe_cancel_timer(ref) do
|
||||||
:erlang.cancel_timer(ref)
|
Process.cancel_timer(ref)
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ defmodule Tres.SecureChannelState do
|
||||||
socket = Keyword.get(options, :socket)
|
socket = Keyword.get(options, :socket)
|
||||||
transport = Keyword.get(options, :transport)
|
transport = Keyword.get(options, :transport)
|
||||||
{:ok, {ip_addr, port}} = :inet.peername(socket)
|
{:ok, {ip_addr, port}} = :inet.peername(socket)
|
||||||
{:ok, xid_agent} = Agent.start_link(fn -> 0 end)
|
{:ok, table_ref} = create_counter()
|
||||||
kv_ref = XACT_KV.create()
|
kv_ref = XACT_KV.create()
|
||||||
|
|
||||||
%SecureChannelState{
|
%SecureChannelState{
|
||||||
|
|
@ -38,20 +38,31 @@ defmodule Tres.SecureChannelState do
|
||||||
transport: transport,
|
transport: transport,
|
||||||
ip_addr: :inet.ntoa(ip_addr),
|
ip_addr: :inet.ntoa(ip_addr),
|
||||||
port: port,
|
port: port,
|
||||||
xid: xid_agent,
|
xid: table_ref,
|
||||||
xact_kv_ref: kv_ref
|
xact_kv_ref: kv_ref
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
def increment_transaction_id(xid_agent) do
|
def increment_transaction_id(table_ref) do
|
||||||
Agent.get_and_update(xid_agent, &{&1 + 1, &1 + 1})
|
:ets.update_counter(table_ref, :datapath_xid, {2, 1, 0xffffffff, 0})
|
||||||
end
|
end
|
||||||
|
|
||||||
def set_transaction_id(xid_agent, xid) do
|
def set_transaction_id(table_ref, xid) do
|
||||||
Agent.update(xid_agent, fn _ -> xid end)
|
:ets.insert(table_ref, {:datapath_xid, xid})
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_transaction_id(xid_agent) do
|
def get_transaction_id(table_ref) do
|
||||||
Agent.get(xid_agent, & &1)
|
case :ets.lookup(table_ref, :datapath_xid) do
|
||||||
|
[{_, xid} | _] -> xid
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# private functions
|
||||||
|
|
||||||
|
@spec create_counter() :: reference()
|
||||||
|
defp create_counter do
|
||||||
|
table_ref = :ets.new(:xid_counter, [:set, :private])
|
||||||
|
_ = :ets.insert(table_ref, {:datapath_xid, 0})
|
||||||
|
{:ok, table_ref}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue