Skip to content

Commit 442ba20

Browse files
committed
khepri_machine: Introduce {apply, MFA} trigger action
[Why] It allows to execute an arbitrary function when a trigger is triggered. This is like a stored procedure, except that the function must be available on the cluster member that will execute it. [How] The `khepri:register_trigger/{3,4,5}` functions can take an MFA tuple in addition to a stored procedure path or a PID. They can also accept the `{sproc, StoreProcPath}`. The function is executed with the given args list with the following trigger descriptor appended: #khepri_trigger{type = EventType, store_id = StoreId, trigger_id = TriggerId, event = EventProps, action = ActionProps}
1 parent 1b6b0e0 commit 442ba20

File tree

4 files changed

+85
-3
lines changed

4 files changed

+85
-3
lines changed

src/khepri.erl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2843,9 +2843,11 @@ clear_many_payloads(StoreId, PathPattern, Options) ->
28432843
TriggerId :: trigger_id(),
28442844
EventFilter :: khepri_evf:event_filter_or_compat(),
28452845
Action :: StoredProcPath |
2846+
MFA |
28462847
Pid |
28472848
khepri_event_handler:trigger_action(),
28482849
StoredProcPath :: khepri_path:path(),
2850+
MFA :: {module(), atom(), list()},
28492851
Pid :: pid(),
28502852
Ret :: ok | error().
28512853
%% @doc Registers a trigger.
@@ -2866,18 +2868,22 @@ register_trigger(TriggerId, EventFilter, Action) ->
28662868
TriggerId :: trigger_id(),
28672869
EventFilter :: khepri_evf:event_filter_or_compat(),
28682870
Action :: StoredProcPath |
2871+
MFA |
28692872
Pid |
28702873
khepri_event_handler:trigger_action(),
28712874
StoredProcPath :: khepri_path:path(),
2875+
MFA :: {module(), atom(), list()},
28722876
Pid :: pid(),
28732877
Ret :: ok | error();
28742878
(TriggerId, EventFilter, Action, Options) -> Ret when
28752879
TriggerId :: trigger_id(),
28762880
EventFilter :: khepri_evf:event_filter_or_compat(),
28772881
Action :: StoredProcPath |
2882+
MFA |
28782883
Pid |
28792884
khepri_event_handler:trigger_action(),
28802885
StoredProcPath :: khepri_path:path(),
2886+
MFA :: {module(), atom(), list()},
28812887
Pid :: pid(),
28822888
Options :: command_options() | khepri:trigger_options(),
28832889
Ret :: ok | error().
@@ -2910,9 +2916,11 @@ register_trigger(TriggerId, EventFilter, Action, Options)
29102916
TriggerId :: trigger_id(),
29112917
EventFilter :: khepri_evf:event_filter_or_compat(),
29122918
Action :: StoredProcPath |
2919+
MFA |
29132920
Pid |
29142921
khepri_event_handler:trigger_action(),
29152922
StoredProcPath :: khepri_path:path(),
2923+
MFA :: {module(), atom(), list()},
29162924
Pid :: pid(),
29172925
Options :: command_options() | khepri:trigger_options(),
29182926
Ret :: ok | error().
@@ -2953,6 +2961,10 @@ register_trigger(TriggerId, EventFilter, Action, Options)
29532961
%%
29542962
%% The stored procedure is executed on the leader's Erlang node.
29552963
%%
2964+
%% When giving an MFA tuple (`Module, Function, ArgsList}'), the designated
2965+
%% function is executed. The `#khepri_trigger{}' trigger descriptor is
2966+
%% appended to the list of arguments.
2967+
%%
29562968
%% When giving a PID as the action, a `#khepri_trigger{}' message is sent to
29572969
%% this process.
29582970
%%

src/khepri_event_handler.erl

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,16 @@
2929
code_change/3]).
3030

3131
-type trigger_action() :: {sproc, khepri_path:native_path()} |
32+
{apply, {module(), atom(), list()}} |
3233
{send, pid(), any()}.
3334
%% The type of action associated with a trigger.
3435
%%
3536
%% It can be the following actions:
3637
%% <ul>
3738
%% <li>`sproc': the stored procedure pointed to by the given path will be
3839
%% executed</li>
40+
%% <li>`apply', the function described by the given module/function/arguments
41+
%% tuple will be executed</li>
3942
%% <li>`send': a message will be sent to the specified PID with the event
4043
%% details and the given term</li>
4144
%% </ul>
@@ -44,13 +47,16 @@
4447
%% execution of that trigger. See {@link trigger_descriptor/0}.
4548

4649
-type triggered_action() :: {sproc, horus:horus_fun()} |
50+
{apply, {module(), atom(), list()}} |
4751
{send, pid(), any()}.
4852
%% The action associated with a trigger once it is triggered.
4953
%%
5054
%% It can be the following actions:
5155
%% <ul>
5256
%% <li>`sproc': the stored procedure (extracted from the path given to the
5357
%% trigger registration) is executed</li>
58+
%% <li>`apply': the function described by the given module/function/arguments
59+
%% tuple is executed</li>
5460
%% <li>`send': a message is sent to the specified PID with the event details
5561
%% and the given term</li>
5662
%% </ul>
@@ -174,6 +180,8 @@ event_to_props(#ev_process{pid = Pid, change = Change}) ->
174180

175181
action_to_props({sproc, _StoredProc}) ->
176182
#{};
183+
action_to_props({apply, _MFA}) ->
184+
#{};
177185
action_to_props({send, _Pid, undefined}) ->
178186
#{};
179187
action_to_props({send, _Pid, Priv}) ->
@@ -191,6 +199,11 @@ run_triggered_action(
191199
ActionArg, State) ->
192200
run_triggered_sproc(
193201
StoreId, TriggeredAction, StoredProc, ActionArg, State);
202+
run_triggered_action(
203+
StoreId,
204+
#triggered_v2{action = {apply, MFA}} = TriggeredAction,
205+
ActionArg, State) ->
206+
run_triggered_apply(StoreId, TriggeredAction, MFA, ActionArg, State);
194207
run_triggered_action(
195208
StoreId,
196209
#triggered_v2{action = {send, Pid, _Priv}} = TriggeredAction,
@@ -210,6 +223,21 @@ run_triggered_sproc(StoreId, TriggeredAction, StoredProc, ActionArg, State) ->
210223
Class, Reason, Stacktrace, State)
211224
end.
212225

226+
run_triggered_apply(StoreId, TriggeredAction, MFA, ActionArg, State) ->
227+
try
228+
%% TODO: Be flexible and accept a function with an arity of that fits
229+
%% `Args0' only.
230+
{Mod, Func, Args0} = MFA,
231+
Args1 = Args0 ++ [ActionArg],
232+
_ = erlang:apply(Mod, Func, Args1),
233+
State
234+
catch
235+
Class:Reason:Stacktrace ->
236+
handle_action_crash(
237+
StoreId, TriggeredAction, ActionArg,
238+
Class, Reason, Stacktrace, State)
239+
end.
240+
213241
handle_action_crash(
214242
StoreId, TriggeredAction, ActionArg, Class, Reason, Stacktrace,
215243
#?MODULE{trigger_crashes = Crashes} = State) ->
@@ -228,7 +256,7 @@ handle_action_crash(
228256
#triggered_v2{event_filter = EF} -> EF
229257
end,
230258
Msg = io_lib:format(
231-
"Triggered stored procedure crash~n"
259+
"Triggered action crash~n"
232260
" Store ID: ~s~n"
233261
" Trigger ID: ~s~n"
234262
" Event filter:~n"

src/khepri_machine.erl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,9 +678,11 @@ handle_tx_exception(
678678
TriggerId :: khepri:trigger_id(),
679679
EventFilter :: khepri_evf:event_filter_or_compat(),
680680
Action :: StoredProcPath |
681+
MFA |
681682
Pid |
682683
khepri_event_handler:trigger_action(),
683684
StoredProcPath :: khepri_path:path(),
685+
MFA :: {module(), atom(), list()},
684686
Pid :: pid(),
685687
Options :: khepri:command_options() | khepri:trigger_options(),
686688
Ret :: ok | khepri:error().
@@ -739,6 +741,19 @@ maybe_convert_to_action(StoreId, _TriggerId, _EventFilter, StoredProcPath)
739741
false ->
740742
StoredProcPath1
741743
end;
744+
maybe_convert_to_action(
745+
StoreId, TriggerId, EventFilter, {Mod, Func, Args} = MFA)
746+
when is_atom(Mod) andalso is_atom(Func) andalso is_list(Args) ->
747+
case does_api_comply_with(extended_trigger, StoreId) of
748+
true ->
749+
{apply, MFA};
750+
false ->
751+
?khepri_misuse(
752+
unsupported_trigger_action,
753+
#{trigger_id => TriggerId,
754+
event_filter => EventFilter,
755+
action => MFA})
756+
end;
742757
maybe_convert_to_action(StoreId, TriggerId, EventFilter, Pid)
743758
when is_pid(Pid) ->
744759
case does_api_comply_with(extended_trigger, StoreId) of

test/triggers.erl

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ a_buggy_sproc_does_not_crash_state_machine_test_() ->
491491
ok, khepri:put(?FUNCTION_NAME, [foo], 1))
492492
end),
493493
?assertMatch(
494-
<<"Triggered stored procedure crash", _/binary>>, Log)
494+
<<"Triggered action crash", _/binary>>, Log)
495495
end)},
496496

497497
{"Checking the procedure was executed",
@@ -551,7 +551,7 @@ a_buggy_sproc_does_not_crash_state_machine_test_() ->
551551
khepri:put(?FUNCTION_NAME, [foo], 6)
552552
end),
553553
?assertSubString(
554-
<<"Triggered stored procedure crash">>, Log),
554+
<<"Triggered action crash">>, Log),
555555
?assertSubString(
556556
<<"(this crash occurred 6 times in the last 10 seconds)">>,
557557
Log),
@@ -632,6 +632,33 @@ receive_sproc_msg_with_props(Key) ->
632632
after 1000 -> timeout
633633
end.
634634

635+
event_triggers_mfa_apply_test_() ->
636+
EventFilter = khepri_evf:tree([foo]),
637+
{setup,
638+
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
639+
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
640+
[{inorder,
641+
[{"Registering a trigger",
642+
?_assertEqual(
643+
ok,
644+
khepri:register_trigger(
645+
?FUNCTION_NAME,
646+
?FUNCTION_NAME,
647+
EventFilter,
648+
{erlang, send, [self()]}))},
649+
650+
{"Updating a node; should trigger the execution of the MFA",
651+
?_assertMatch(
652+
ok,
653+
khepri:put(?FUNCTION_NAME, [foo], value))},
654+
655+
{"Checking the procedure was executed",
656+
?_assert(receive
657+
#khepri_trigger{} ->
658+
true
659+
end)}]
660+
}]}.
661+
635662
event_triggers_message_send_test_() ->
636663
EventFilter = khepri_evf:tree([foo]),
637664
{setup,

0 commit comments

Comments
 (0)