Skip to content

Commit 1f908e8

Browse files
committed
add pool spec
1 parent fd959dc commit 1f908e8

File tree

4 files changed

+433
-181
lines changed

4 files changed

+433
-181
lines changed

src/process/ai_pool.erl

Lines changed: 192 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -1,179 +1,194 @@
1-
-module(ai_pool).
2-
3-
-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
4-
handle_info/2, terminate/2]).
5-
-export([least_busy/1,rand/1,join/2]).
6-
7-
%%----------------------------------------------------------------------------
8-
9-
-ifdef(use_specs).
10-
11-
-type(name() :: term()).
12-
13-
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
14-
-spec(start/0 :: () -> {'ok', pid()} | {'error', any()}).
15-
-spec(join/2 :: (name(), pid()) -> 'ok').
16-
-spec(leave/2 :: (name(), pid()) -> 'ok').
17-
-spec(get_members/1 :: (name()) -> [pid()]).
18-
19-
-endif.
20-
21-
%%----------------------------------------------------------------------------
22-
23-
%%% As of R13B03 monitors are used instead of links.
24-
25-
%%%
26-
%%% Exported functions
27-
%%%
28-
29-
30-
start_link() ->
31-
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
32-
33-
start() ->
34-
ensure_started().
35-
36-
join(Name, Pid) when is_pid(Pid) ->
37-
gen_server:cast(?MODULE, {join, Name, Pid}).
38-
39-
least_busy(Name) ->
40-
Members = group_members(Name),
41-
ai_process:least_busy(Members).
42-
rand(Name)->
43-
case group_members(Name) of
44-
[] -> {error, empty_process_group};
45-
Members ->
46-
{_,_,X} = erlang:timestamp(),
47-
{ok,lists:nth((X rem length(Members))+1, Members)}
48-
end.
49-
50-
%%%
51-
%%% Callback functions from gen_server
52-
%%%
53-
54-
-record(state, {}).
55-
56-
init([]) ->
57-
ai_pool_table = ets:new(ai_pool_table, [ordered_set,
58-
protected, named_table, {write_concurrency,false},{read_concurrency,true}]),
59-
{ok, #state{}}.
60-
61-
handle_call(sync, _From, S) ->
62-
{reply, ok, S};
63-
64-
handle_call(Request, From, S) ->
65-
error_logger:warning_msg("The ai_pool server received an unexpected message:\n"
66-
"handle_call(~p, ~p, _)\n",
67-
[Request, From]),
68-
{noreply, S}.
69-
70-
handle_cast({join, Name, Pid}, S) ->
71-
join_group(Name, Pid),
72-
{noreply, S};
73-
74-
handle_cast(_, S) ->
75-
{noreply, S}.
76-
77-
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
78-
member_died(MonitorRef),
79-
{noreply, S};
80-
handle_info(_, S) ->
81-
{noreply, S}.
82-
83-
terminate(_Reason, _S) ->
84-
true = ets:delete(ai_pool_table),
85-
ok.
86-
87-
%%%
88-
%%% Local functions
1+
%%%-------------------------------------------------------------------
2+
%%% @author David Gao <[email protected]>
3+
%%% @copyright (C) 2019, David Gao
4+
%%% @doc
895
%%%
6+
%%% @end
7+
%%% Created : 11 Feb 2019 by David Gao <[email protected]>
8+
%%%-------------------------------------------------------------------
9+
-module(ai_pool).
9010

91-
%%% One ETS table, ai_pool_table, is used for bookkeeping. The type of the
92-
%%% table is ordered_set, and the fast matching of partially
93-
%%% instantiated keys is used extensively.
94-
%%%
95-
%%% {{ref, Pid}, MonitorRef, Counter}
96-
%%% {{ref, MonitorRef}, Pid}
97-
%%% Each process has one monitor. Counter is incremented when the
98-
%%% Pid joins some group.
99-
%%% {{member, Name, Pid}, _}
100-
%%% Pid is a member of group Name, GroupCounter is incremented when the
101-
%%% Pid joins the group Name.
102-
%%% {{pid, Pid, Name}}
103-
%%% Pid is a member of group Name.
104-
105-
member_died(Ref) ->
106-
[{{ref, Ref}, Pid}] = ets:lookup(ai_pool_table, {ref, Ref}),
107-
Names = member_groups(Pid),
108-
_ = [leave_group(Name, P) ||
109-
Name <- Names,
110-
P <- member_in_group(Pid, Name)],
111-
ok.
112-
%% 先监控,再进入组
113-
join_group(Name, Pid) ->
114-
Ref_Pid = {ref, Pid},
115-
%% 先尝试 +1 如果 +1 失败
116-
%% 说明pid不在ets表中,那么需要先Monitor再添加
117-
try _ = ets:update_counter(ai_pool_table, Ref_Pid, {3, +1})
118-
catch _:_ ->
119-
Ref = erlang:monitor(process, Pid),
120-
true = ets:insert(ai_pool_table, {Ref_Pid, Ref, 1}),
121-
true = ets:insert(ai_pool_table, {{ref, Ref}, Pid})
122-
end,
123-
Member_Name_Pid = {member, Name, Pid},
124-
try _ = ets:update_counter(ai_pool_table, Member_Name_Pid, {2, +1})
125-
catch _:_ ->
126-
true = ets:insert(ai_pool_table, {Member_Name_Pid, 1}),
127-
true = ets:insert(ai_pool_table, {{pid, Pid, Name}})
128-
end.
129-
%% 先退组,再退监控
130-
leave_group(Name, Pid) ->
131-
Member_Name_Pid = {member, Name, Pid},
132-
%% 先减少1
133-
try ets:update_counter(ai_pool_table, Member_Name_Pid, {2, -1}) of
134-
N ->
135-
if
136-
N =:= 0 ->
137-
%% 到0了,那么我们就删掉表项
138-
true = ets:delete(ai_pool_table, {pid, Pid, Name}),
139-
true = ets:delete(ai_pool_table, Member_Name_Pid);
140-
true ->
141-
ok
142-
end,
143-
Ref_Pid = {ref, Pid},
144-
case ets:update_counter(ai_pool_table, Ref_Pid, {3, -1}) of
145-
0 ->
146-
[{Ref_Pid,Ref,0}] = ets:lookup(ai_pool_table, Ref_Pid),
147-
true = ets:delete(ai_pool_table, {ref, Ref}),
148-
true = ets:delete(ai_pool_table, Ref_Pid),
149-
true = erlang:demonitor(Ref, [flush]),
150-
ok;
151-
_ ->
152-
ok
153-
end
154-
catch _:_ ->
155-
ok
156-
end.
157-
%% 直接从表中找
158-
group_members(Name) ->
159-
[P ||
160-
[P, N] <- ets:match(ai_pool_table, {{member, Name, '$1'},'$2'}),
161-
_ <- lists:seq(1, N)].
162-
163-
member_in_group(Pid, Name) ->
164-
[{{member, Name, Pid}, N}] = ets:lookup(ai_pool_table, {member, Name, Pid}),
165-
lists:duplicate(N, Pid).
166-
167-
168-
member_groups(Pid) ->
169-
[Name || [Name] <- ets:match(ai_pool_table, {{pid, Pid, '$1'}})].
170-
171-
ensure_started() ->
172-
case whereis(?MODULE) of
173-
undefined ->
174-
C = {pg2_local, {?MODULE, start_link, []}, permanent,
175-
16#ffffffff, worker, [?MODULE]},
176-
supervisor:start_child(kernel_safe_sup, C);
177-
PgLocalPid ->
178-
{ok, PgLocalPid}
179-
end.
11+
-behaviour(gen_server).
12+
13+
%% API
14+
-export([start_link/3]).
15+
16+
%% gen_server callbacks
17+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
18+
terminate/2, code_change/3, format_status/2]).
19+
20+
-export([pool_spec/3,least_busy/1,rand/1]).
21+
22+
-define(SERVER, ?MODULE).
23+
24+
-record(state, {
25+
name,
26+
supervisor,
27+
size,
28+
workers
29+
}).
30+
31+
%%%===================================================================
32+
%%% API
33+
%%%===================================================================
34+
pool_spec(Name, PoolArgs, WorkerArgs) ->
35+
{Name, {ai_pool, start_link, [Name,PoolArgs, WorkerArgs]},
36+
permanent, 5000, worker, [ai_pool]}.
37+
least_busy(Name) -> ai_pool_table:least_busy(Name).
38+
rand(Name) -> ai_pool_table:rand(Name).
39+
%%--------------------------------------------------------------------
40+
%% @doc
41+
%% Starts the server
42+
%% @end
43+
%%--------------------------------------------------------------------
44+
-spec start_link(atom(),term(),term()) -> {ok, Pid :: pid()} |
45+
{error, Error :: {already_started, pid()}} |
46+
{error, Error :: term()} |
47+
ignore.
48+
start_link(Name, PoolArgs, WorkerArgs) ->
49+
gen_server:start_link(?MODULE, {Name,PoolArgs, WorkerArgs},[]).
50+
51+
%%%===================================================================
52+
%%% gen_server callbacks
53+
%%%===================================================================
54+
55+
%%--------------------------------------------------------------------
56+
%% @private
57+
%% @doc
58+
%% Initializes the server
59+
%% @end
60+
%%--------------------------------------------------------------------
61+
-spec init(Args :: term()) -> {ok, State :: term()} |
62+
{ok, State :: term(), Timeout :: timeout()} |
63+
{ok, State :: term(), hibernate} |
64+
{stop, Reason :: term()} |
65+
ignore.
66+
init({Name,PoolArgs, WorkerArgs})->
67+
process_flag(trap_exit, true),
68+
init(PoolArgs,WorkerArgs,#state{name = Name}).
69+
init([{worker_module, Mod} | Rest], WorkerArgs, #state{name = Name} = State) when is_atom(Mod) ->
70+
%% 该进程挂了,会将所有进程全部挂掉
71+
{ok, Sup} = ai_pool_worker_sup:start_link(Name, Mod,WorkerArgs),
72+
init(Rest, WorkerArgs, State#state{supervisor = Sup});
73+
init([{size, Size} | Rest], WorkerArgs, State) when is_integer(Size) ->
74+
init(Rest, WorkerArgs, State#state{size = Size});
75+
init([_ | Rest], WorkerArgs, State) ->
76+
init(Rest, WorkerArgs, State);
77+
init([], _WorkerArgs, #state{size = Size, supervisor = Sup} = State) ->
78+
Workers = prepopulate(Size, Sup),
79+
{ok, State#state{workers = Workers}}.
80+
%%--------------------------------------------------------------------
81+
%% @private
82+
%% @doc
83+
%% Handling call messages
84+
%% @end
85+
%%--------------------------------------------------------------------
86+
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
87+
{reply, Reply :: term(), NewState :: term()} |
88+
{reply, Reply :: term(), NewState :: term(), Timeout :: timeout()} |
89+
{reply, Reply :: term(), NewState :: term(), hibernate} |
90+
{noreply, NewState :: term()} |
91+
{noreply, NewState :: term(), Timeout :: timeout()} |
92+
{noreply, NewState :: term(), hibernate} |
93+
{stop, Reason :: term(), Reply :: term(), NewState :: term()} |
94+
{stop, Reason :: term(), NewState :: term()}.
95+
handle_call(_Request, _From, State) ->
96+
Reply = ok,
97+
{reply, Reply, State}.
98+
99+
%%--------------------------------------------------------------------
100+
%% @private
101+
%% @doc
102+
%% Handling cast messages
103+
%% @end
104+
%%--------------------------------------------------------------------
105+
-spec handle_cast(Request :: term(), State :: term()) ->
106+
{noreply, NewState :: term()} |
107+
{noreply, NewState :: term(), Timeout :: timeout()} |
108+
{noreply, NewState :: term(), hibernate} |
109+
{stop, Reason :: term(), NewState :: term()}.
110+
handle_cast(_Request, State) ->
111+
{noreply, State}.
112+
113+
%%--------------------------------------------------------------------
114+
%% @private
115+
%% @doc
116+
%% Handling all non call/cast messages
117+
%% @end
118+
%%--------------------------------------------------------------------
119+
-spec handle_info(Info :: timeout() | term(), State :: term()) ->
120+
{noreply, NewState :: term()} |
121+
{noreply, NewState :: term(), Timeout :: timeout()} |
122+
{noreply, NewState :: term(), hibernate} |
123+
{stop, Reason :: normal | term(), NewState :: term()}.
124+
125+
126+
handle_info({'EXIT', Pid, _Reason}, #state{supervisor = Sup} = State) ->
127+
case lists:member(Pid, State#state.workers) of
128+
true ->
129+
W = lists:filter(fun (P) -> P =/= Pid end, State#state.workers),
130+
{noreply, State#state{workers = [new_worker(Sup) | W]}};
131+
false ->
132+
{noreply, State}
133+
end;
134+
135+
handle_info(_Info, State) ->
136+
{noreply, State}.
137+
138+
%%--------------------------------------------------------------------
139+
%% @private
140+
%% @doc
141+
%% This function is called by a gen_server when it is about to
142+
%% terminate. It should be the opposite of Module:init/1 and do any
143+
%% necessary cleaning up. When it returns, the gen_server terminates
144+
%% with Reason. The return value is ignored.
145+
%% @end
146+
%%--------------------------------------------------------------------
147+
-spec terminate(Reason :: normal | shutdown | {shutdown, term()} | term(),
148+
State :: term()) -> any().
149+
terminate(_Reason, _State) ->
150+
ok.
151+
152+
%%--------------------------------------------------------------------
153+
%% @private
154+
%% @doc
155+
%% Convert process state when code is changed
156+
%% @end
157+
%%--------------------------------------------------------------------
158+
-spec code_change(OldVsn :: term() | {down, term()},
159+
State :: term(),
160+
Extra :: term()) -> {ok, NewState :: term()} |
161+
{error, Reason :: term()}.
162+
code_change(_OldVsn, State, _Extra) ->
163+
{ok, State}.
164+
165+
%%--------------------------------------------------------------------
166+
%% @private
167+
%% @doc
168+
%% This function is called for changing the form and appearance
169+
%% of gen_server status when it is returned from sys:get_status/1,2
170+
%% or when it appears in termination error logs.
171+
%% @end
172+
%%--------------------------------------------------------------------
173+
-spec format_status(Opt :: normal | terminate,
174+
Status :: list()) -> Status :: term().
175+
format_status(_Opt, Status) ->
176+
Status.
177+
178+
%%%===================================================================
179+
%%% Internal functions
180+
%%%===================================================================
181+
182+
new_worker(Sup) ->
183+
{ok, Pid} = supervisor:start_child(Sup, []),
184+
true = link(Pid),
185+
Pid.
186+
prepopulate(N, _Sup) when N < 1 ->
187+
[];
188+
prepopulate(N, Sup) ->
189+
prepopulate(N, Sup, []).
190+
191+
prepopulate(0, _Sup, Workers) ->
192+
Workers;
193+
prepopulate(N, Sup, Workers) ->
194+
prepopulate(N-1, Sup, [new_worker(Sup) | Workers]).

src/process/ai_pool_sup.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ init([]) ->
5656
SupFlags = #{strategy => one_for_one,
5757
intensity => 5,
5858
period => 5},
59-
Pool = #{id => ai_pool,
60-
start => {ai_pool, start_link, []},
59+
PoolTableSpec = #{id => ai_pool_table,
60+
start => {ai_pool_table, start_link, []},
6161
restart => transient,
6262
shutdown => 5000,
6363
type => worker,
64-
modules => [ai_pool]},
65-
{ok, {SupFlags, [Pool]}}.
64+
modules => [ai_pool_table]},
65+
{ok, {SupFlags, [PoolTableSpec]}}.
6666

6767
%%%===================================================================
6868
%%% Internal functions

0 commit comments

Comments
 (0)