1 module workermanager; 2 import core.sync.mutex; 3 import core.thread, 4 core.time; 5 import std.typecons, 6 std.stdio; 7 import libasync; 8 9 enum WorkerStatus { 10 Suspended, 11 Running 12 } 13 14 struct Worker { 15 string name; 16 Duration durx; 17 void delegate(Worker) dg; 18 private { 19 bool periodic; 20 } 21 22 this (string name, Duration durx, void delegate(Worker worker) dg) { 23 this.name = name; 24 this.durx = durx; 25 this.dg = dg; 26 this.periodic = true; 27 } 28 29 this (string name, Duration durx, void delegate() dg) { 30 this.name = name; 31 this.durx = durx; 32 this.dg = (Worker worker) => dg(); 33 this.periodic = true; 34 } 35 36 this (string name, void delegate(Worker worker) dg) { 37 this.name = name; 38 this.dg = dg; 39 } 40 41 this (string name, void delegate() dg) { 42 this.name = name; 43 this.dg = (Worker worker) => dg(); 44 } 45 } 46 47 class WorkerManager { 48 WorkerStatus[string] workers; 49 ThreadGroup tg; 50 Mutex m; 51 52 this() { 53 this.m = new Mutex; 54 this.tg = new ThreadGroup; 55 } 56 57 void registerWorkers(Worker[] _workers) { 58 foreach (worker; _workers) { 59 this.registerWorker(worker); 60 } 61 } 62 63 void registerWorker(Worker worker) { 64 Nullable!WorkerStatus _sts = this.getWorkerStatus(worker.name); 65 66 if (_sts.isNull) { 67 this.setWorkerStatus(worker.name, WorkerStatus.Running); 68 startWorker(worker); 69 } else { 70 WorkerStatus sts = _sts.get; 71 if (sts.Running) { 72 debug writeln("The worker already exists!"); 73 } else { 74 debug writeln("The worker have been suspended, then restart"); 75 this.setWorkerStatus(worker.name, WorkerStatus.Running); 76 startWorker(worker); 77 } 78 } 79 } 80 81 private void startWorker(Worker worker) { 82 auto _dg = (Worker _worker) => () => workerMain(_worker); 83 synchronized (m) { 84 tg.create(_dg(worker)); 85 } 86 } 87 88 void suspendWorker(string workerName) { 89 Nullable!WorkerStatus _sts = this.getWorkerStatus(workerName); 90 91 if (!_sts.isNull) { 92 WorkerStatus sts = _sts.get; 93 if (sts.Running) { 94 debug writefln("<WorkerManager> Suspended worker - %s", workerName); 95 this.setWorkerStatus(workerName, WorkerStatus.Suspended); 96 } 97 } 98 } 99 100 void workerMain(Worker worker) { 101 debug writefln("<worker - %s> worker \"%s\" is spawned!", worker.name, worker.name); 102 103 auto ev_loop = getThreadEventLoop(); 104 auto timer = new AsyncTimer(ev_loop); 105 106 if (worker.periodic) { 107 timer.duration(worker.durx).periodic.run({ 108 worker.dg(worker); 109 }); 110 111 while (ev_loop.loop()) { 112 WorkerStatus sts = getWorkerStatus(worker.name).get; 113 if (sts == WorkerStatus.Suspended) { 114 debug writefln("<worker - %s> suspended!", worker.name); 115 break; 116 } 117 } 118 } else { 119 worker.dg(worker); 120 workers[worker.name] = WorkerStatus.Suspended; 121 } 122 } 123 124 Nullable!WorkerStatus getWorkerStatus(string workerName) { 125 if (workerName in this.workers) { 126 synchronized (this.m) { 127 return nullable(this.workers[workerName]); 128 } 129 } else { 130 return typeof(return).init; 131 } 132 } 133 134 void setWorkerStatus(string workerName, WorkerStatus WorkerStatus) { 135 synchronized (m) { 136 this.workers[workerName] = WorkerStatus; 137 } 138 } 139 140 Thread createWaitLoop() { 141 auto th = new Thread({ 142 bool flag; 143 144 do { 145 WorkerStatus[] statuses; 146 bool m_flag; 147 148 synchronized (m) { 149 statuses = workers.values; 150 } 151 152 foreach (status; statuses) { 153 if (status == WorkerStatus.Running) { 154 m_flag = true; 155 break; 156 } 157 } 158 159 if (m_flag) { 160 continue; 161 } else { 162 flag = true; 163 } 164 } while (!flag); 165 }); 166 167 th.start; 168 169 return th; 170 } 171 172 void joinAll() { 173 this.tg.joinAll; 174 } 175 176 bool checkAllFinished() { 177 foreach (worker; workers.values) { 178 if (worker == WorkerStatus.Running) { 179 return false; 180 } 181 } 182 183 return true; 184 } 185 }