summaryrefslogtreecommitdiff
path: root/libimc/master.c
blob: 6d48b28dac52219c06f8e25bf458c53cd104176c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#include "_libimc.h"
#include <stdio.h>
#include <time.h>
#include <assert.h>
#include <string.h>
#include <stdlib.h>
#include <stddef.h>
#include <unistd.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/prctl.h>

int MASTER2WORKER_RD[N_WORKERS];
int MASTER2WORKER_WR[N_WORKERS];
int WORKER2MASTER_RD[N_WORKERS];
int WORKER2MASTER_WR[N_WORKERS];

static size_t N_BRANCHES[N_WORKERS];
static size_t N_BUGS[N_WORKERS];
void handle_progress(struct message message, int worker_id) {
    N_BRANCHES[worker_id] += message.n_branches;
    N_BUGS[worker_id] += message.n_bugs;
}

void print_progress(void) {
    size_t total_branches = 0, total_bugs = 0;
    printf("# branches:");
    for (int i = 0; i < N_WORKERS; i++) {
        printf(" %lu", N_BRANCHES[i]);
        total_branches += N_BRANCHES[i];
    }
    printf("\n");
    printf("# bugs:");
    for (int i = 0; i < N_WORKERS; i++) {
        printf(" %lu", N_BUGS[i]);
        total_bugs += N_BUGS[i];
    }
    printf("\n");
    printf("TOTAL branches: %lu; bugs: %lu\n", total_branches, total_bugs);
}

void launch_master() {
    // make a bugs directory
    system("mkdir -p bugs");
    system("rm bugs/*");

    assert(!prctl(PR_SET_CHILD_SUBREAPER, 1, 0, 0, 0));

    // set up the control pipes
    for (int i = 0; i < N_WORKERS; i++) {
        int ends[2];
        pipe2(ends, O_NONBLOCK);
        MASTER2WORKER_RD[i] = ends[0];
        MASTER2WORKER_WR[i] = ends[1];
        pipe2(ends, O_NONBLOCK);
        WORKER2MASTER_RD[i] = ends[0];
        WORKER2MASTER_WR[i] = ends[1];
    }

    // 0 -> dead, 1 -> alive, -1 -> waiting on it to die
    int worker_alive[N_WORKERS] = {0};
    // only guaranteed to be up-to-date for workers with worker_alive[i] == -1
    int worker_pid[N_WORKERS] = {0};
    size_t dead_count = 0;

    // launch an initial worker
    if (!fork()) {
        launch_worker(0);
        exit(0);
    }

    worker_alive[0] = 1;
    while (1) {
        static time_t last_time = 0;
        if ((time(0) - last_time) > 1) {
            last_time = time(0);
            print_progress();
        }

        // First: handle any workers that want to die
        for (int i = 0; i < N_WORKERS; i++) {
            if (!worker_alive[i]) continue;
            if (worker_alive[i] == -1) {
                if (waitpid(worker_pid[i], NULL, WNOHANG)) {
                    worker_alive[i] = 0;
                    clear_pipe(MASTER2WORKER_RD[i]);
                    clear_pipe(WORKER2MASTER_RD[i]);
                }
                continue;
            }

            struct message message = hear_worker(i);
            switch (message.message_type) {
                case MSG_CAN_I_DIE:
                    worker_alive[i] = -1;
                    worker_pid[i] = message.pid;
                    verbose("Telling worker %d (%d) they can die\n", i, message.pid);
                    tell_worker((struct message){MSG_OK_DIE, 0}, i);
                    break;

                case MSG_PROGRESS:
                    handle_progress(message, i);
                    break;

                case MSG_NONE:
                    break;

                default:
                    assert(!"Bad message!");
                    break;
            }
        }

        // Second: if we have empty slots, ask a worker to split.
        int n_alive = 0, fill_slot = 0;
        for (int i = 0; i < N_WORKERS; i++) {
            n_alive += (worker_alive[i] != 0);
            if (!worker_alive[i]) fill_slot = i;
        }
        if (n_alive == 0) break;
        if (n_alive == N_WORKERS) continue;

        int which = rand() % n_alive;
        for (int i = 0; i < N_WORKERS; i++) {
            if (!worker_alive[i]) continue;
            if (which--) continue;
            if (worker_alive[i] == -1) continue;

            verbose("Requesting a split of %d -> %d\n", i, fill_slot);
            tell_worker((struct message){
                .message_type = MSG_PLEASE_SPLIT,
                .new_id = fill_slot
            }, i);

            while (1) {
                struct message reply = hear_worker(i);
                switch (reply.message_type) {
                    case MSG_CAN_I_DIE:
                        verbose("Telling worker %d not to die, because we want to split\n");
                        tell_worker((struct message){MSG_NO_DIE}, i);
                        break;

                    case MSG_DID_SPLIT:
                        worker_alive[fill_slot] = 1;
                        waitpid(reply.pid, NULL, 0);
                        goto finish_split;

                    case MSG_NO_SPLIT:
                        goto finish_split;

                    case MSG_PROGRESS:
                        handle_progress(reply, i);
                        break;

                    case MSG_NONE:
                        break;

                    default:
                        printf("Unknown message %d!\n", reply.message_type);
                        break;
                }
            }

finish_split: break;
        }
    }

    printf("Done with exhaustive search.\n");
    print_progress();
}

int main(int argc, char **argv) {
    if (argc > 1) {
        assert(argc == 2);
        worker_replay(argv[1]);
    } else {
        launch_master();
    }
    return 0;
}
generated by cgit on debian on lair
contact matthew@masot.net with questions or feedback