Supervisors

Learn how supervisor tasks work


A supervisor task is a way to monitor children tasks and manage their health. By structuring your side-effects and business logic around supervisor tasks, we gain interesting coding paradigms that result in easier to read and manage code.

Supplemental reading from erlang

The most basic version of a supervisor is simply an infinite loop that calls a child task:

 1import { call } from "starfx";
 2
 3function* supervisor() {
 4  while (true) {
 5    try {
 6      yield* call(someTask);
 7    } catch (err) {
 8      console.error(err);
 9    }
10  }
11}
12
13function* someTask() {
14  yield* sleep(10 * 1000);
15  throw new Error("boom!");
16}

Here we call some task that should always be in a running and healthy state. If it raises an exception, we log it and try to run the task again.

Building on top of that simple supervisor, we can have tasks that always listen for events and if they fail, restart them.

 1import { parallel, run, take } from "starfx";
 2
 3function* watchFetch() {
 4  while (true) {
 5    const action = yield* take("FETCH_USERS");
 6    console.log(action);
 7  }
 8}
 9
10function* send() {
11  yield* put({ type: "FETCH_USERS" });
12  yield* put({ type: "FETCH_USERS" });
13  yield* put({ type: "FETCH_USERS" });
14}
15
16await run(
17  parallel([watchFetch, send]),
18);

Here we create a supervisor function using a helper take to call a function for every FETCH_USERS event emitted.

While inside a while loop, you get full access to its powerful flow control. Another example, let's say we we only want to respond to a login action when the user isn't logged in and conversely only listen to a logout action when the user is logged in:

1function*() {
2  while (true) {
3    const login = yield* take("LOGIN");
4    // e.g. fetch token with creds inside `login.payload`
5    const logout = yield* take("LOGOUT");
6    // e.g. destroy token from `logout.payload`
7  }
8}

Interesting, we've essentially created a finite state machine within a while-loop!

We also built a helper that will abstract the while loop if you don't need it:

1import { takeEvery } from "starfx";
2
3function* watchFetch() {
4  yield* takeEvery("FETCH_USERS", function* (action) {
5    console.log(action);
6  });
7}

However, this means that we are going to make the same request 3 times, we probably want a throttle or debounce so we only make a fetch request once within some interval.

1import { takeLeading } from "starfx";
2
3function* watchFetch() {
4  yield* takeLeading("FETCH_USERS", function* (action) {
5    console.log(action);
6  });
7}

That's better, now only one task can be alive at one time.

Both thunks and endpoints simply listen for actions being emitted onto a channel -- which is just an event emitter -- and then call the middleware stack with that action.

Both thunks and endpoints support overriding the default takeEvery supervisor for either our officially supported supervisors takeLatest and takeLeading, or a user-defined supervisor.

Because every thunk and endpoint have their own supervisor tasks monitoring the health of their children, we allow the end-developer to change the default supervisor -- which is takeEvery:

1const someAction = thunks.create("some-action", { supervisor: takeLatest });
2dispatch(someAction()); // this task gets cancelled
3dispatch(someAction()); // this task gets cancelled
4dispatch(someAction()); // this tasks lives

This is the power of supervisors and is fundamental to how starfx works.

poll #

When activated, call a thunk or endpoint once every N millisecond indefinitely until cancelled.

 1import { poll } from "starfx";
 2
 3const fetchUsers = api.get("/users", { supervisor: poll() });
 4store.dispatch(fetchUsers());
 5// fetch users
 6// sleep 5000
 7// fetch users
 8// sleep 5000
 9// fetch users
10store.dispatch(fetchUsers());
11// cancelled

The default value provided to poll() is 5 seconds.

You can optionally provide a cancel action instead of calling the thunk twice:

 1import { poll } from "starfx";
 2
 3const cancelPoll = createAction("cancel-poll");
 4const fetchUsers = api.get("/users", {
 5  supervisor: poll(5 * 1000, `${cancelPoll}`),
 6});
 7store.dispatch(fetchUsers());
 8// fetch users
 9// sleep 5000
10// fetch users
11// sleep 5000
12// fetch users
13store.dispatch(cancelPoll());
14// cancelled

timer #

Only call a thunk or endpoint at-most once every N milliseconds.

 1import { timer } from "starfx";
 2
 3const fetchUsers = api.get("/users", { supervisor: timer(1000) });
 4store.dispatch(fetchUsers());
 5store.dispatch(fetchUsers());
 6// sleep(100);
 7store.dispatch(fetchUsers());
 8// sleep(1000);
 9store.dispatch(fetchUsers());
10// called: 2 times

The default value provided to timer() is 5 minutes. This means you can only call fetchUsers at-most once every 5 minutes.

clearTimers #

Want to clear a timer and refetch?

1import { clearTimers, timer } from "starfx";
2
3const fetchUsers = api.get("/users", { supervisor: timer(1000) });
4store.dispatch(fetchUsers());
5store.dispatch(clearTimers(fetchUsers()));
6store.dispatch(fetchUsers());
7// called: 2 times
8store.dispatch(clearTimers("*")); // clear all timers