In this lesson, we show how we can add and remove processes to a system during execution. To illustrate that, we will use a simple graphical application which displays a moving point.
We define the state of a moving point by a record containing the color of the point, and two signals that represent respectively the flows of positions and velocities of the point.
type state = { color: Graphics.color; pos: (float * float, float * float) event; vel: (float * float, float * float) event; }
At each instant, the position and velocity of a point are represented by a pair of floats.
We program a way to observe the moving points. To do that we
define a global signal to_draw
on which points will
send their states.
signal to_draw;;
Then we define a process called window
which opens the
graphical window and displays the points.
let process window = Graphics.open_graph ""; loop await to_draw (all) in Graphics.clear_graph(); List.iter (fun state -> let x, y = last ?state.pos in let x_int = (truncate x) mod (Graphics.size_x()) in let y_int = truncate y in Graphics.set_color state.color; Graphics.fill_circle x_int y_int 5) all end
#run window;;
To create a value of type state
, we need some auxiliary
functions.
We define the function +:
to add a pair of floats.
let (+:) (x1, y1) (x2, y2) = (x1 +. x2, y1 +. y2)
We define a function color_of_int
which converts an
integer into a value of type Graphics.color
.
let color_of_int = function | 0 -> Graphics.black | 1 -> Graphics.magenta | 2 -> Graphics.green | 3 -> Graphics.red | 4 -> Graphics.blue | _ -> Graphics.black
We can now define a function which creates a new state.
let new_state () = signal pos default (0., 0.) gather (+:) in signal vel default (0., 0.) gather (+:) in emit pos (0., float (Graphics.size_y () / 2)); { color = (color_of_int (Random.int 5)); pos = pos; vel = vel; }
The flow of positions is initialized on the left side of the graphical
window by an emission on the pos
signal.
To compute the position of the point, we integrate the velocity.
let process compute_pos state = loop let p = last ?state.pos +: last ?state.vel in emit state.pos p; pause end
So, to move a point from left to right, we define a process which maintains the velocity of a point at each instant.
let process left_right state = loop emit state.vel (2.0, 0.); pause end
To observe the position of a point, we define a
process draw
which emits the state of a point at each
instant.
let process draw state = loop emit to_draw state; pause end
The behavior of a moving point is the parallel composition of the three preceding processes.
let process moving_point state = run (compute_pos state) || run (left_right state) || run (draw state)
#run moving_point (new_state ());;
The created process cannot be removed since it never terminates. In the next step, we will see how to implement a simple process manager that can stop the execution of a process in a modular way.
We first define a function generating a unique identifier for each process.
let gen_id = let cpt = ref 0 in fun () -> incr cpt; !cpt
We define a global signal to_kill
on which we will send the
id of the processes to kill.
signal to_kill;;
We define a process killable
such that killable
p
associates an id to the process p
given as
argument and executes it. When the id associated to p
is
emitted on to_kill
, the execution of p
is
stopped.
let process killable p = let id = gen_id () in print_endline ("["^(string_of_int id)^"]"); do run p until to_kill(ids) when List.mem id ids done
The identifier id
is generated then the
process p
is executed. If during the execution, the
signal to_kill
is emitted, the variable ids
will be assigned the list of processes to kill, thanks to
the do/until/when
construct. If id
belongs
to this list (List.mem id ids
) then the execution is
stopped otherwise execution continues.
We can test our new combinator with a moving point.
#run killable (moving_point (new_state ()));;
Observe that the id [1]
is printed into the
terminal. Thefore, we can kill this process by sending its id on the
signal to_kill
.
emit to_kill 1;;
Let's now see how we can change the behavior of a process. We define a
process replace
which takes as argument an initial
behavior p_init
, a state state
and a
signal new_behavior
on which new behavior to replace the
current behavior can be sent.
let rec process replace p_init state new_behavior = do run (p_init state) until new_behavior(p) -> run (replace p state new_behavior) done
The process p_init
parameterized by state
is
executed under the control of the
signal new_behavior
. When new_behavior
is
emitted, p_init
is stopped and we receive the process
p
carried on this signal. The new process p
is executed through the recursive call to replace
, which
continues to allow changes to the behavior of the process.
To use this process, we need a signal on which we can send and receive a process. To handle multiple emissions during the same instant, we build the parallel composition of the sent processes.
signal new_behavior default (fun state -> process ()) gather (fun p q state -> process (run (p state) || run (q state))) ;;
We can now execute a moving point whose behavior can be replaced dynamically.
#run replace moving_point (new_state ()) new_behavior;;
Now, to replace the point moving from left to right, we define the behavior of a point moving up and down.
let process up_down state = for i = 1 to 50 do emit state.vel (0., 2.); pause done; loop for i = 1 to 100 do emit state.vel (0., - 2.); pause done; for i = 1 to 100 do emit state.vel (0., 2.); pause done end
let process moving_point' state = run (up_down state) || run (compute_pos state) || run (draw state)
To reconfigure the previous point, we need only send
this moving_point'
process on the
signal new_behavior
.
emit new_behavior moving_point';;
We can restore the previous behavior as follows.
emit new_behavior moving_point;;
Another useful reconfiguration combinator is one that adds a new
behavior to a running process. To do that, we define a
process extend
that executes a
process p_init
and awaits new processes to execute on a
signal add_behavior
. The initial
process p_init
and the added processes will share a
common state state
.
let rec process extend p_init state add_behavior = run (p_init state) || await add_behavior (p) in run (extend p state add_behavior)
In this process, p_init
is executed and in parallel a
process p
is awaited on the
signal add_behavior
. When p
is received, it
is executed through a recursive call to extend
so that
it is still possible to add new behaviors.
Similarly to new_behavior
, we define a
signal add_behavior
on which the adding requests will be sent.
signal add_behavior default (fun state -> process ()) gather (fun p q state -> process (run (p state) || run (q state))) ;;
We can run an extensible moving point,
#run extend moving_point (new_state ()) add_behavior ;;
and add an up/down behavior to this point moving left to right.
emit add_behavior up_down ;;
Of course, we can combine the previous operators. For example, we can define a process that can be extended and killed.
#run killable (extend moving_point (new_state ()) add_behavior) ;;
Then we can reconfigure this process
emit add_behavior up_down ;;
and kill it.
emit to_kill 2 ;;
We can also combine the operators in another way. First we run an extensible process.
#run extend moving_point (new_state ()) add_behavior ;;
and then we add a behavior which is killable.
emit add_behavior (fun state -> process (run (killable (up_down state)))) ;;
You can see that the behavior of two moving points have been modified
because two processes are waiting on the
signal add_behavior
. The two processes execute a new
killable behavior in parallel to their previous behavior and thus the
id of two killable processes are printed into the terminal. We can now
kill these processes.
emit to_kill 3 ;;
emit to_kill 4 ;;
Note that the two moving points that were modified still exist because we only killed the added behaviors.
To avoid that multiple processes share the same reconfiguration signal, we can associate an id to each extensible process and dispatch the requests.
The adding requests will be sent on the signal to_add
with the id of the process to extend.
signal to_add ;;
Each extensible process has its own add_behavior
signal
and filters the to_add
signal to extract only those
requests addressed to it.
let process extensible p_init state = let id = gen_id () in print_endline ("{"^(string_of_int id)^"}"); signal add_behavior default (fun state -> process ()) gather (fun p q state -> process (run (p state) || run (q state))) in run (extend p_init state add_behavior) || loop await to_add(reqs) in List.iter (fun (x, p) -> if x = id then emit add_behavior p) reqs end
We can test our new combinator as follows.
#run extensible moving_point (new_state ()) ;;
emit to_add (5, up_down) ;;
A small library of process management can be found in the standard library: rml_process_manager.rmli. For more information about dynamic reconfiguration in ReactiveML you can read this paper.