7474% % khepri:trigger_options()}.</li>
7575% % <li>Added support for process-lifetime-based triggers. The trigger can be
7676% % executed when a process exits.</li>
77+ % % <li>Added support for process-lifetime-based `keep_while' condition. The
78+ % % `keep_while' put option can take a PID. The tree node will be deleted as
79+ % % soon as the associated process exits. See {@link
80+ % % khepri_condition:keep_while()}.</li>
7781% % </ul>
7882% % </td>
7983% % </tr>
@@ -415,10 +419,27 @@ put(StoreId, PathPattern, Payload, Options)
415419 Payload1 = khepri_payload :prepare (Payload ),
416420 {CommandOptions , NonCommandOptions } = split_command_options (
417421 StoreId , Options ),
418- Command = # put {path = PathPattern1 ,
419- payload = Payload1 ,
420- options = NonCommandOptions },
421- process_command (StoreId , Command , CommandOptions );
422+ ValidKeepWhile = case NonCommandOptions of
423+ #{keep_while := KW } when is_pid (KW ) ->
424+ does_api_comply_with (
425+ process_based_keep_while , StoreId );
426+ _ ->
427+ true
428+ end ,
429+ case ValidKeepWhile of
430+ true ->
431+ Command = # put {path = PathPattern1 ,
432+ payload = Payload1 ,
433+ options = NonCommandOptions },
434+ process_command (StoreId , Command , CommandOptions );
435+ false ->
436+ ? khepri_misuse (
437+ unsupported_process_based_keep_while ,
438+ #{store_id => StoreId ,
439+ node_path => PathPattern ,
440+ payload => Payload ,
441+ options => Options })
442+ end ;
422443put (_StoreId , PathPattern , Payload , _Options ) ->
423444 ? khepri_misuse (invalid_payload , #{path => PathPattern ,
424445 payload => Payload }).
@@ -1556,7 +1577,8 @@ handle_aux(
15561577
15571578 % % In `maybe_down_procs', we have processes that run on a member of the
15581579 % % cluster and we are waiting for that node to come back up to monitor
1559- % % these processes again (they are referenced by triggers).
1580+ % % these processes again (they are referenced by triggers and/or
1581+ % % `keep_while' conditions).
15601582 % %
15611583 % % However, the member might have been removed from the cluster after
15621584 % % going down. Therefore, we need to verify on a regular basis if this
@@ -1774,9 +1796,27 @@ apply(
17741796 # put {path = PathPattern , payload = Payload , options = NonCommandOptions },
17751797 State ) ->
17761798 {TreeOptions , PutOptions } = split_put_options (NonCommandOptions ),
1777- Ret = insert_or_update_node (
1778- State , PathPattern , Payload , PutOptions , TreeOptions , []),
1779- post_apply (Ret , Meta );
1799+ Ret1 = insert_or_update_node (
1800+ State , PathPattern , Payload , PutOptions , TreeOptions , []),
1801+
1802+ % % If the operation was a success and the caller set a `keep_while'
1803+ % % condition on the inserted/updated tree node, we add the side effects
1804+ % % required by this condition, if any. For example, if the `keep_while'
1805+ % % condition is a PID, we need to monitor it.
1806+ Ret2 = case Ret1 of
1807+ {State1 , {ok , _ } = Result , SideEffects } ->
1808+ case PutOptions of
1809+ #{keep_while := KeepWhile } ->
1810+ SideEffects1 = keep_while_to_side_effects (
1811+ KeepWhile , SideEffects ),
1812+ {State1 , Result , SideEffects1 };
1813+ _ ->
1814+ Ret1
1815+ end ;
1816+ _ ->
1817+ Ret1
1818+ end ,
1819+ post_apply (Ret2 , Meta );
17801820apply (
17811821 Meta ,
17821822 # delete {path = PathPattern , options = TreeOptions },
@@ -2003,12 +2043,35 @@ apply(Meta, {down, _Pid, noconnection} = Command, State) ->
20032043 post_apply (Ret , Meta );
20042044apply (Meta , {Down , Pid , Reason }, State )
20052045 when Down =:= down orelse Down =:= really_down ->
2046+ % % Handle keep_while conditions. We lookup tree nodes watching this exited
2047+ % % process and add delete them.
2048+ KeepWhileConds = get_keep_while_conds (State ),
2049+ TreeOptions = #{},
2050+ {State1 ,
2051+ SideEffects1 } = maps :fold (
2052+ fun
2053+ (Watcher , KeepWhile , {S , SE })
2054+ when is_pid (KeepWhile ) andalso
2055+ KeepWhile =:= Pid ->
2056+ % % The process associated with this path
2057+ % % exited. Therefore, we delete the
2058+ % % corresponding tree node.
2059+ % %
2060+ % % We ignore the result here, because there is
2061+ % % no explicit caller and no-one to return to.
2062+ {S1 , _R , SE1 } = delete_matching_nodes (
2063+ S , Watcher , TreeOptions , SE ),
2064+ {S1 , SE1 };
2065+ (_Watcher , _KeepWhile , Acc ) ->
2066+ Acc
2067+ end , {State , []}, KeepWhileConds ),
2068+
20062069 % % Handle triggers. We lookup triggers watching this exited process and add
20072070 % % the side effects to execute the corresponding actions.
20082071 Event = # ev_process {pid = Pid , change = {'DOWN' , Reason }},
2009- {State1 , SideEffects } = add_trigger_side_effects (
2010- State , State , [Event ], [] ),
2011- Ret = {State1 , Meta , SideEffects },
2072+ {State2 , SideEffects2 } = add_trigger_side_effects (
2073+ State1 , State1 , [Event ], SideEffects1 ),
2074+ Ret = {State2 , Meta , SideEffects2 },
20122075 post_apply (Ret , Meta );
20132076apply (Meta , {nodeup , Node }, State ) ->
20142077 % % We can stop monitoring this node. If we get a `noconnection' from a
@@ -2018,8 +2081,19 @@ apply(Meta, {nodeup, Node}, State) ->
20182081 % % We need to restore monitoring of processes that are hosted on that
20192082 % % specific node. This will tell us if the process are still there are
20202083 % % gone.
2021- Triggers = get_triggers (State ),
2084+ KeepWhileConds = get_keep_while_conds (State ),
20222085 SideEffects2 = maps :fold (
2086+ fun
2087+ (_Watcher , KeepWhile , SE )
2088+ when is_pid (KeepWhile ) andalso
2089+ node (KeepWhile ) =:= Node ->
2090+ keep_while_to_side_effects (KeepWhile , SE );
2091+ (_Watcher , _KeepWhile , SE ) ->
2092+ SE
2093+ end , SideEffects1 , KeepWhileConds ),
2094+
2095+ Triggers = get_triggers (State ),
2096+ SideEffects3 = maps :fold (
20232097 fun
20242098 (_TriggerId ,
20252099 #{event_filter :=
@@ -2028,8 +2102,8 @@ apply(Meta, {nodeup, Node}, State) ->
20282102 event_filter_to_side_effect (EventFilter , SE );
20292103 (_TriggerId , _Trigger , SE ) ->
20302104 SE
2031- end , SideEffects1 , Triggers ),
2032- Ret = {State , Meta , SideEffects2 },
2105+ end , SideEffects2 , Triggers ),
2106+ Ret = {State , Meta , SideEffects3 },
20332107 post_apply (Ret , Meta );
20342108apply (Meta , {nodedown , _Node }, State ) ->
20352109 Ret = {State , Meta },
@@ -2177,7 +2251,8 @@ trigger_delayed_aux_queries_eval({State, Result, SideEffects}, _Meta) ->
21772251state_enter (leader , State ) ->
21782252 SideEffects1 = emitted_triggers_to_side_effects (State , leader ),
21792253 SideEffects2 = non_emitted_triggers_to_side_effects (State , SideEffects1 ),
2180- SideEffects2 ;
2254+ SideEffects3 = keep_while_conds_to_side_effects (State , SideEffects2 ),
2255+ SideEffects3 ;
21812256state_enter (recovered , _State ) ->
21822257 SideEffect = {aux , restore_projections },
21832258 [SideEffect ];
@@ -2393,6 +2468,9 @@ does_api_comply_with(cached_members_list, MacVer)
23932468does_api_comply_with (extended_trigger , MacVer )
23942469 when is_integer (MacVer ) ->
23952470 MacVer >= 3 ;
2471+ does_api_comply_with (process_based_keep_while , MacVer )
2472+ when is_integer (MacVer ) ->
2473+ MacVer >= 3 ;
23962474does_api_comply_with (_Behaviour , MacVer )
23972475 when is_integer (MacVer ) ->
23982476 false ;
@@ -2625,6 +2703,36 @@ evaluate_projection(
26252703 Effect = {aux , Trigger },
26262704 [Effect | Effects ].
26272705
2706+ - spec keep_while_conds_to_side_effects (State , SideEffects ) ->
2707+ NewSideEffects when
2708+ State :: khepri_machine :state (),
2709+ SideEffects :: ra_machine :effects (),
2710+ NewSideEffects :: ra_machine :effects ().
2711+ % % @private
2712+
2713+ keep_while_conds_to_side_effects (State , SideEffects ) ->
2714+ KeepWhileConds = get_keep_while_conds (State ),
2715+ SideEffects1 = maps :fold (
2716+ fun (_Watcher , KeepWhile , SE ) ->
2717+ keep_while_to_side_effects (KeepWhile , SE )
2718+ end , SideEffects , KeepWhileConds ),
2719+ SideEffects1 .
2720+
2721+ - spec keep_while_to_side_effects (KeepWhile , SideEffects ) ->
2722+ NewSideEffects when
2723+ KeepWhile :: khepri_condition :native_keep_while (),
2724+ SideEffects :: ra_machine :effects (),
2725+ NewSideEffects :: ra_machine :effects ().
2726+ % % @private
2727+
2728+ keep_while_to_side_effects (KeepWhile , SideEffects ) when is_map (KeepWhile ) ->
2729+ SideEffects ;
2730+ keep_while_to_side_effects (KeepWhile , SideEffects ) when is_pid (KeepWhile ) ->
2731+ Pid = KeepWhile ,
2732+ SideEffect = {monitor , process , Pid },
2733+ SideEffects1 = [SideEffect | SideEffects ],
2734+ SideEffects1 .
2735+
26282736- spec event_filter_to_side_effect (EventFilter , SideEffects ) ->
26292737 NewSideEffects when
26302738 EventFilter :: khepri_evf :event_filter (),
0 commit comments