Skip to content

Commit 1b6b0e0

Browse files
committed
khepri_machine: Introduce {send, Pid, Priv} trigger action
[Why] It allows to send a message to an arbitrary process when a trigger is triggered. [How] The `khepri:register_trigger/{3,4,5}` functions can take a PID in addition to of the stored procedure path. They can also accept the following tuples: * `{sproc, StoreProcPath}` * `{send, Pid, Priv}` If only the PID is passed, `Priv` is set to `undefined`. The message send to the process is: #khepri_trigger{type = EventType, store_id = StoreId, trigger_id = TriggerId, event = EventProps, action = ActionProps}
1 parent 22f6c01 commit 1b6b0e0

File tree

4 files changed

+156
-47
lines changed

4 files changed

+156
-47
lines changed

src/khepri.erl

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2839,74 +2839,88 @@ clear_many_payloads(StoreId, PathPattern, Options) ->
28392839
%% register_trigger().
28402840
%% -------------------------------------------------------------------
28412841

2842-
-spec register_trigger(TriggerId, EventFilter, StoredProcPath) -> Ret when
2842+
-spec register_trigger(TriggerId, EventFilter, Action) -> Ret when
28432843
TriggerId :: trigger_id(),
28442844
EventFilter :: khepri_evf:event_filter_or_compat(),
2845+
Action :: StoredProcPath |
2846+
Pid |
2847+
khepri_event_handler:trigger_action(),
28452848
StoredProcPath :: khepri_path:path(),
2849+
Pid :: pid(),
28462850
Ret :: ok | error().
28472851
%% @doc Registers a trigger.
28482852
%%
28492853
%% Calling this function is the same as calling `register_trigger(StoreId,
2850-
%% TriggerId, EventFilter, StoredProcPath)' with the default store ID (see
2854+
%% TriggerId, EventFilter, Action)' with the default store ID (see
28512855
%% {@link khepri_cluster:get_default_store_id/0}).
28522856
%%
28532857
%% @see register_trigger/4.
28542858

2855-
register_trigger(TriggerId, EventFilter, StoredProcPath) ->
2859+
register_trigger(TriggerId, EventFilter, Action) ->
28562860
StoreId = khepri_cluster:get_default_store_id(),
2857-
register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath).
2861+
register_trigger(StoreId, TriggerId, EventFilter, Action).
28582862

28592863
-spec register_trigger
2860-
(StoreId, TriggerId, EventFilter, StoredProcPath) -> Ret when
2864+
(StoreId, TriggerId, EventFilter, Action) -> Ret when
28612865
StoreId :: khepri:store_id(),
28622866
TriggerId :: trigger_id(),
28632867
EventFilter :: khepri_evf:event_filter_or_compat(),
2868+
Action :: StoredProcPath |
2869+
Pid |
2870+
khepri_event_handler:trigger_action(),
28642871
StoredProcPath :: khepri_path:path(),
2872+
Pid :: pid(),
28652873
Ret :: ok | error();
2866-
(TriggerId, EventFilter, StoredProcPath, Options) -> Ret when
2874+
(TriggerId, EventFilter, Action, Options) -> Ret when
28672875
TriggerId :: trigger_id(),
28682876
EventFilter :: khepri_evf:event_filter_or_compat(),
2877+
Action :: StoredProcPath |
2878+
Pid |
2879+
khepri_event_handler:trigger_action(),
28692880
StoredProcPath :: khepri_path:path(),
2881+
Pid :: pid(),
28702882
Options :: command_options() | khepri:trigger_options(),
28712883
Ret :: ok | error().
28722884
%% @doc Registers a trigger.
28732885
%%
28742886
%% This function accepts the following two forms:
28752887
%% <ul>
2876-
%% <li>`register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath)'.
2888+
%% <li>`register_trigger(StoreId, TriggerId, EventFilter, Action)'.
28772889
%% Calling it is the same as calling `register_trigger(StoreId, TriggerId,
2878-
%% EventFilter, StoredProcPath, #{})'.</li>
2879-
%% <li>`register_trigger(TriggerId, EventFilter, StoredProcPath, Options)'.
2890+
%% EventFilter, Action, #{})'.</li>
2891+
%% <li>`register_trigger(TriggerId, EventFilter, Action, Options)'.
28802892
%% Calling it is the same as calling `register_trigger(StoreId, TriggerId,
2881-
%% EventFilter, StoredProcPath, Options)' with the default store ID (see
2893+
%% EventFilter, Action, Options)' with the default store ID (see
28822894
%% {@link khepri_cluster:get_default_store_id/0}).</li>
28832895
%% </ul>
28842896
%%
28852897
%% @see register_trigger/5.
28862898

2887-
register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath)
2899+
register_trigger(StoreId, TriggerId, EventFilter, Action)
28882900
when ?IS_KHEPRI_STORE_ID(StoreId) andalso is_atom(TriggerId) ->
2889-
register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath, #{});
2890-
register_trigger(TriggerId, EventFilter, StoredProcPath, Options)
2901+
register_trigger(StoreId, TriggerId, EventFilter, Action, #{});
2902+
register_trigger(TriggerId, EventFilter, Action, Options)
28912903
when is_atom(TriggerId) andalso is_map(Options) ->
28922904
StoreId = khepri_cluster:get_default_store_id(),
2893-
register_trigger(
2894-
StoreId, TriggerId, EventFilter, StoredProcPath, Options).
2905+
register_trigger(StoreId, TriggerId, EventFilter, Action, Options).
28952906

2896-
-spec register_trigger(
2897-
StoreId, TriggerId, EventFilter, StoredProcPath, Options) ->
2907+
-spec register_trigger(StoreId, TriggerId, EventFilter, Action, Options) ->
28982908
Ret when
28992909
StoreId :: khepri:store_id(),
29002910
TriggerId :: trigger_id(),
29012911
EventFilter :: khepri_evf:event_filter_or_compat(),
2912+
Action :: StoredProcPath |
2913+
Pid |
2914+
khepri_event_handler:trigger_action(),
29022915
StoredProcPath :: khepri_path:path(),
2916+
Pid :: pid(),
29032917
Options :: command_options() | khepri:trigger_options(),
29042918
Ret :: ok | error().
29052919
%% @doc Registers a trigger.
29062920
%%
2907-
%% A trigger is based on an event filter. It associates an event with a stored
2908-
%% procedure. When an event matching the event filter is emitted, the stored
2909-
%% procedure is executed.
2921+
%% A trigger is based on an event filter. It associates an event with an
2922+
%% action. When an event matching the event filter is emitted, the action is
2923+
%% executed.
29102924
%%
29112925
%% The following event filters are documented by {@link
29122926
%% khepri_evf:event_filter()}.
@@ -2927,8 +2941,9 @@ register_trigger(TriggerId, EventFilter, StoredProcPath, Options)
29272941
%% EventFilter = "/:stock/:wood/oak".
29282942
%% '''
29292943
%%
2930-
%% The stored procedure is expected to accept a single argument. This argument
2931-
%% is a map containing the event properties. Here is an example:
2944+
%% When giving a stored procedure as the action, it is expected to accept a
2945+
%% single argument. This argument is a map containing the event properties.
2946+
%% Here is an example:
29322947
%%
29332948
%% ```
29342949
%% my_stored_procedure(Props) ->
@@ -2938,23 +2953,26 @@ register_trigger(TriggerId, EventFilter, StoredProcPath, Options)
29382953
%%
29392954
%% The stored procedure is executed on the leader's Erlang node.
29402955
%%
2956+
%% When giving a PID as the action, a `#khepri_trigger{}' message is sent to
2957+
%% this process.
2958+
%%
29412959
%% It is guaranteed to run at least once. It could be executed multiple times
2942-
%% if the Ra leader changes, therefore the stored procedure must be
2943-
%% idempotent.
2960+
%% if the Ra leader changes, therefore the action must be idempotent.
29442961
%%
29452962
%% @param StoreId the name of the Khepri store.
29462963
%% @param TriggerId the name of the trigger.
29472964
%% @param EventFilter the event filter used to associate an event with a
29482965
%% stored procedure.
2949-
%% @param StoredProcPath the path to the stored procedure to execute when the
2950-
%% corresponding event occurs.
2966+
%% @param Action the path to a stored procedure to execute when the
2967+
%% corresponding event occurs, or PID to send a message to, or a
2968+
%% "wrapped action".
29512969
%%
29522970
%% @returns `ok' if the trigger was registered, an `{error, Reason}' tuple
29532971
%% otherwise.
29542972

2955-
register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath, Options) ->
2973+
register_trigger(StoreId, TriggerId, EventFilter, Action, Options) ->
29562974
khepri_machine:register_trigger(
2957-
StoreId, TriggerId, EventFilter, StoredProcPath, Options).
2975+
StoreId, TriggerId, EventFilter, Action, Options).
29582976

29592977
%% -------------------------------------------------------------------
29602978
%% register_projection().

src/khepri_event_handler.erl

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,32 @@
2828
terminate/2,
2929
code_change/3]).
3030

31-
-type trigger_action() :: {sproc, khepri_path:native_path()}.
31+
-type trigger_action() :: {sproc, khepri_path:native_path()} |
32+
{send, pid(), any()}.
3233
%% The type of action associated with a trigger.
3334
%%
34-
%% It must be the path to a stored procedure.
35+
%% It can be the following actions:
36+
%% <ul>
37+
%% <li>`sproc': the stored procedure pointed to by the given path will be
38+
%% executed</li>
39+
%% <li>`send': a message will be sent to the specified PID with the event
40+
%% details and the given term</li>
41+
%% </ul>
3542
%%
3643
%% The action will get a trigger desccriptor as argument to describe a specific
3744
%% execution of that trigger. See {@link trigger_descriptor/0}.
3845

39-
-type triggered_action() :: {sproc, horus:horus_fun()}.
46+
-type triggered_action() :: {sproc, horus:horus_fun()} |
47+
{send, pid(), any()}.
4048
%% The action associated with a trigger once it is triggered.
4149
%%
42-
%% It is the stored procedure pointed to by the path in the triggered action's
43-
%% arguments.
50+
%% It can be the following actions:
51+
%% <ul>
52+
%% <li>`sproc': the stored procedure (extracted from the path given to the
53+
%% trigger registration) is executed</li>
54+
%% <li>`send': a message is sent to the specified PID with the event details
55+
%% and the given term</li>
56+
%% </ul>
4457

4558
-type trigger_exec_loc() :: leader | {member, node()} | all_members.
4659
%% Where to execute the triggered action.
@@ -160,7 +173,11 @@ event_to_props(#ev_process{pid = Pid, change = Change}) ->
160173
change => Change}.
161174

162175
action_to_props({sproc, _StoredProc}) ->
163-
#{}.
176+
#{};
177+
action_to_props({send, _Pid, undefined}) ->
178+
#{};
179+
action_to_props({send, _Pid, Priv}) ->
180+
#{priv => Priv}.
164181

165182
run_triggered_action(
166183
StoreId,
@@ -173,7 +190,12 @@ run_triggered_action(
173190
#triggered_v2{action = {sproc, StoredProc}} = TriggeredAction,
174191
ActionArg, State) ->
175192
run_triggered_sproc(
176-
StoreId, TriggeredAction, StoredProc, ActionArg, State).
193+
StoreId, TriggeredAction, StoredProc, ActionArg, State);
194+
run_triggered_action(
195+
StoreId,
196+
#triggered_v2{action = {send, Pid, _Priv}} = TriggeredAction,
197+
ActionArg, State) ->
198+
run_triggered_send(StoreId, TriggeredAction, Pid, ActionArg, State).
177199

178200
run_triggered_sproc(StoreId, TriggeredAction, StoredProc, ActionArg, State) ->
179201
Args = [ActionArg],
@@ -226,6 +248,11 @@ handle_action_crash(
226248
State#?MODULE{trigger_crashes = Crashes1}
227249
end.
228250

251+
run_triggered_send(_StoreId, _TriggeredAction, Pid, ActionArg, State) ->
252+
Message = ActionArg,
253+
erlang:send(Pid, Message),
254+
State.
255+
229256
log_accumulated_trigger_crashes(
230257
#?MODULE{trigger_crashes = Crashes} = State)
231258
when Crashes =:= #{} ->

src/khepri_machine.erl

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -672,12 +672,16 @@ handle_tx_exception(
672672
erlang:raise(Class, Reason, Stacktrace).
673673

674674
-spec register_trigger(
675-
StoreId, TriggerId, EventFilter, StoredProcPath, Options) ->
675+
StoreId, TriggerId, EventFilter, Action, Options) ->
676676
Ret when
677677
StoreId :: khepri:store_id(),
678678
TriggerId :: khepri:trigger_id(),
679679
EventFilter :: khepri_evf:event_filter_or_compat(),
680+
Action :: StoredProcPath |
681+
Pid |
682+
khepri_event_handler:trigger_action(),
680683
StoredProcPath :: khepri_path:path(),
684+
Pid :: pid(),
681685
Options :: khepri:command_options() | khepri:trigger_options(),
682686
Ret :: ok | khepri:error().
683687
%% @doc Registers a trigger.
@@ -693,14 +697,12 @@ handle_tx_exception(
693697
%% @returns `ok' if the trigger was registered, an `{error, Reason}' tuple
694698
%% otherwise.
695699

696-
register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath, Options)
697-
when ?IS_KHEPRI_STORE_ID(StoreId) andalso
698-
?IS_KHEPRI_PATH_OR_COMPAT(StoredProcPath) ->
699-
EventFilter1 = khepri_evf:wrap(EventFilter),
700-
StoredProcPath1 = khepri_path:from_string(StoredProcPath),
701-
khepri_path:ensure_is_valid(StoredProcPath1),
700+
register_trigger(StoreId, TriggerId, EventFilter, Action, Options)
701+
when ?IS_KHEPRI_STORE_ID(StoreId) ->
702702
{CommandOptions, NonCommandOptions} = split_command_options(
703703
StoreId, Options),
704+
EventFilter1 = khepri_evf:wrap(EventFilter),
705+
Action1 = maybe_convert_to_action(StoreId, TriggerId, EventFilter, Action),
704706
case does_api_comply_with(extended_trigger, StoreId) of
705707
false ->
706708
UnsupportedOptions = maps:filter(
@@ -711,7 +713,7 @@ register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath, Options)
711713
if
712714
UnsupportedOptions =:= #{} ->
713715
register_trigger_v1(
714-
StoreId, TriggerId, EventFilter1, StoredProcPath1,
716+
StoreId, TriggerId, EventFilter1, Action1,
715717
CommandOptions);
716718
true ->
717719
?khepri_misuse(
@@ -721,13 +723,35 @@ register_trigger(StoreId, TriggerId, EventFilter, StoredProcPath, Options)
721723
options => UnsupportedOptions})
722724
end;
723725
true ->
724-
StoredProcPath2 = khepri_path:realpath(StoredProcPath1),
725-
Action = {sproc, StoredProcPath2},
726726
register_trigger_v2(
727-
StoreId, TriggerId, EventFilter1, Action,
727+
StoreId, TriggerId, EventFilter1, Action1,
728728
CommandOptions, NonCommandOptions)
729729
end.
730730

731+
maybe_convert_to_action(StoreId, _TriggerId, _EventFilter, StoredProcPath)
732+
when ?IS_KHEPRI_PATH_PATTERN(StoredProcPath) ->
733+
StoredProcPath1 = khepri_path:from_string(StoredProcPath),
734+
khepri_path:ensure_is_valid(StoredProcPath1),
735+
case does_api_comply_with(extended_trigger, StoreId) of
736+
true ->
737+
StoredProcPath2 = khepri_path:realpath(StoredProcPath1),
738+
{sproc, StoredProcPath2};
739+
false ->
740+
StoredProcPath1
741+
end;
742+
maybe_convert_to_action(StoreId, TriggerId, EventFilter, Pid)
743+
when is_pid(Pid) ->
744+
case does_api_comply_with(extended_trigger, StoreId) of
745+
true ->
746+
{send, Pid, undefined};
747+
false ->
748+
?khepri_misuse(
749+
unsupported_trigger_action,
750+
#{trigger_id => TriggerId,
751+
event_filter => EventFilter,
752+
action => Pid})
753+
end.
754+
731755
register_trigger_v1(
732756
StoreId, TriggerId, EventFilter, StoredProcPath, CommandOptions) ->
733757
Command = #register_trigger{id = TriggerId,
@@ -2878,7 +2902,20 @@ evaluate_action(State, Event, TriggerId, Trigger, TriggeredActions)
28782902
event = Event}
28792903
end,
28802904
evaluate_where_option(State, Triggered, TriggeredActions)
2881-
end.
2905+
end;
2906+
evaluate_action(
2907+
State, Event, TriggerId,
2908+
#{event_filter := EventFilter,
2909+
action := Action,
2910+
where := Where},
2911+
TriggeredActions) ->
2912+
Triggered = #triggered_v2{
2913+
id = TriggerId,
2914+
event_filter = EventFilter,
2915+
action = Action,
2916+
where = Where,
2917+
event = Event},
2918+
evaluate_where_option(State, Triggered, TriggeredActions).
28822919

28832920
evaluate_where_option(
28842921
_State, #triggered{} = Triggered, TriggeredActions) ->

test/triggers.erl

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,3 +631,30 @@ receive_sproc_msg_with_props(Key) ->
631631
receive {sproc, Key, Props} -> Props
632632
after 1000 -> timeout
633633
end.
634+
635+
event_triggers_message_send_test_() ->
636+
EventFilter = khepri_evf:tree([foo]),
637+
{setup,
638+
fun() -> test_ra_server_helpers:setup(?FUNCTION_NAME) end,
639+
fun(Priv) -> test_ra_server_helpers:cleanup(Priv) end,
640+
[{inorder,
641+
[{"Registering a trigger",
642+
?_assertEqual(
643+
ok,
644+
khepri:register_trigger(
645+
?FUNCTION_NAME,
646+
?FUNCTION_NAME,
647+
EventFilter,
648+
self()))},
649+
650+
{"Updating a node; should trigger the send of the message",
651+
?_assertMatch(
652+
ok,
653+
khepri:put(?FUNCTION_NAME, [foo], value))},
654+
655+
{"Checking the procedure was executed",
656+
?_assert(receive
657+
#khepri_trigger{} ->
658+
true
659+
end)}]
660+
}]}.

0 commit comments

Comments
 (0)