1 module workermanager;
2 import core.sync.mutex;
3 import core.thread,
4        core.time;
5 import std.typecons,
6        std.stdio;
7 import libasync;
8 
9 enum WorkerStatus {
10   Suspended,
11   Running
12 }
13 
14 struct Worker {
15   string   name;
16   Duration durx;
17   void delegate(Worker) dg;
18   private {
19     bool periodic;
20   }
21 
22   this (string name, Duration durx, void delegate(Worker worker) dg) {
23     this.name = name;
24     this.durx = durx;
25     this.dg   = dg;
26     this.periodic = true;
27   }
28 
29   this (string name, Duration durx, void delegate() dg) {
30     this.name = name;
31     this.durx = durx;
32     this.dg   = (Worker worker) => dg();
33     this.periodic = true;
34   }
35 
36     this (string name, void delegate(Worker worker) dg) {
37     this.name = name;
38     this.dg   = dg;
39   }
40 
41   this (string name, void delegate() dg) {
42     this.name = name;
43     this.dg   = (Worker worker) => dg();
44   }
45 }
46 
47 class WorkerManager {
48   WorkerStatus[string] workers;
49   ThreadGroup tg;
50   Mutex m;
51 
52   this() {
53     this.m = new Mutex;
54     this.tg = new ThreadGroup;
55   }
56 
57   void registerWorkers(Worker[] _workers) {
58     foreach (worker; _workers) {
59       this.registerWorker(worker);
60     }
61   }
62 
63   void registerWorker(Worker worker) {
64     Nullable!WorkerStatus _sts = this.getWorkerStatus(worker.name);
65 
66     if (_sts.isNull) {
67       this.setWorkerStatus(worker.name, WorkerStatus.Running);
68       startWorker(worker);
69     } else {
70       WorkerStatus sts = _sts.get;
71       if (sts.Running) {
72         debug writeln("The worker already exists!");
73       } else {
74         debug writeln("The worker have been suspended, then restart");
75         this.setWorkerStatus(worker.name, WorkerStatus.Running);
76         startWorker(worker);
77       }
78     }
79   }
80 
81   private void startWorker(Worker worker) {
82     auto _dg = (Worker _worker) => () => workerMain(_worker);
83     synchronized (m) {
84       tg.create(_dg(worker));
85     }
86   }
87 
88   void suspendWorker(string workerName) {
89     Nullable!WorkerStatus _sts = this.getWorkerStatus(workerName);
90 
91     if (!_sts.isNull) {
92       WorkerStatus sts = _sts.get;
93       if (sts.Running) {
94         debug writefln("<WorkerManager> Suspended worker - %s", workerName);
95         this.setWorkerStatus(workerName, WorkerStatus.Suspended);
96       }
97     }
98   }
99 
100   void workerMain(Worker worker) {
101     debug writefln("<worker - %s> worker \"%s\" is spawned!", worker.name, worker.name);
102 
103     auto ev_loop = getThreadEventLoop();
104     auto timer   = new AsyncTimer(ev_loop);
105 
106     if (worker.periodic) {
107       timer.duration(worker.durx).periodic.run({
108         worker.dg(worker);
109       });
110 
111       while (ev_loop.loop()) {
112         WorkerStatus sts = getWorkerStatus(worker.name).get;
113         if (sts == WorkerStatus.Suspended) {
114           debug writefln("<worker - %s> suspended!", worker.name);
115           break;
116         }
117       }
118     } else {
119       worker.dg(worker);
120       workers[worker.name] = WorkerStatus.Suspended;
121     }
122   }
123 
124   Nullable!WorkerStatus getWorkerStatus(string workerName) {
125     if (workerName in this.workers) {
126       synchronized (this.m) {
127         return nullable(this.workers[workerName]);
128       }
129     } else {
130       return typeof(return).init;
131     }
132   }
133 
134   void setWorkerStatus(string workerName, WorkerStatus WorkerStatus) {
135     synchronized (m) {
136       this.workers[workerName] = WorkerStatus;
137     }
138   }
139 
140   Thread createWaitLoop() {
141     auto th = new Thread({
142       bool flag;
143 
144       do {
145         WorkerStatus[] statuses;
146         bool m_flag;
147 
148         synchronized (m) {
149           statuses = workers.values;
150         }
151 
152         foreach (status; statuses) {
153           if (status == WorkerStatus.Running) {
154             m_flag = true;
155             break;
156           }
157         }
158 
159         if (m_flag) {
160           continue;
161         } else {
162           flag = true;
163         }
164       } while (!flag);
165     });
166 
167     th.start;
168 
169     return th;
170   }
171 
172   void joinAll() {
173     this.tg.joinAll;
174   }
175 
176   bool checkAllFinished() {
177     foreach (worker; workers.values) {
178       if (worker == WorkerStatus.Running) {
179         return false;
180       }
181     }
182 
183     return true;
184   }
185 }