добивание живых задач, отключение лишней рассылки админам, настройка удаления или не удаления пакета на машине
315 lines
10 KiB
C++
315 lines
10 KiB
C++
#pragma once
|
|
|
|
#include <map>
|
|
#include <set>
|
|
#include <vector>
|
|
#include <queue>
|
|
#include <math.h>
|
|
#include <thread>
|
|
#include "File.h"
|
|
#include "Task.h"
|
|
#include "Array.h"
|
|
#include "Utils.h"
|
|
|
|
#ifndef _WIN32
|
|
#include <sys/time.h>
|
|
#include <unistd.h>
|
|
#endif
|
|
#include <time.h>
|
|
|
|
enum SupervisorState {
|
|
WorkspacesCreation, //0
|
|
Preparation, //1
|
|
Execution, //2
|
|
End //3
|
|
};
|
|
|
|
template <class T>
|
|
class Supervisor : public Array <T> {
|
|
protected:
|
|
SupervisorState state;
|
|
bool killed;
|
|
public:
|
|
virtual String getStatePrefix() {
|
|
return String("");
|
|
}
|
|
String printState() {
|
|
switch (state) {
|
|
case WorkspacesCreation:
|
|
return String("WorkspacesCreation");
|
|
case Preparation:
|
|
return String("Preparation");
|
|
case Execution:
|
|
return String("Execution");
|
|
case End:
|
|
return String("End");
|
|
default:
|
|
return "?";
|
|
}
|
|
}
|
|
//-
|
|
void ToLog(const String& text){
|
|
FILE * pfile = fopen("planner_log.txt", "a");
|
|
fprintf(pfile, "%s\n", text.getCharArray());
|
|
fclose(pfile);
|
|
}
|
|
//--
|
|
void print() {
|
|
for (auto& elem : this->getElements())
|
|
elem->print();
|
|
}
|
|
void init(const char* fileName, int recordSize) {
|
|
state = WorkspacesCreation;
|
|
File* packedTasks = new File(fileName);
|
|
Text* lines = packedTasks->readLines();
|
|
|
|
const long length = lines->getLength() / recordSize;
|
|
int offset = 0;
|
|
for (int i = 0; i < length; ++i) {
|
|
this->add(new T(lines, offset));
|
|
offset += recordSize;
|
|
}
|
|
delete packedTasks;
|
|
delete lines;
|
|
}
|
|
void changeState() {
|
|
switch (this->state) {
|
|
case WorkspacesCreation:
|
|
this->state = Preparation;
|
|
saveState();
|
|
break;
|
|
case Preparation:
|
|
this->state = Execution;
|
|
saveState();
|
|
break;
|
|
case Execution:
|
|
this->state = End;
|
|
saveState();
|
|
break;
|
|
default:
|
|
this->state = End;
|
|
break;
|
|
}
|
|
}
|
|
void DoWithSchedule(int maxKernels) {
|
|
saveState();
|
|
saveProgress(0);
|
|
// подготовка тестов
|
|
while (this->state != Execution) {
|
|
for (auto& task : this->getElements()) {
|
|
switch (this->state) {
|
|
case WorkspacesCreation:
|
|
if (task->getState() == Waiting) {
|
|
task->createWorkspace();
|
|
task->setState(WorkspaceCreated);
|
|
}
|
|
break;
|
|
case Preparation:
|
|
if (task->getState() == WorkspaceCreated) {
|
|
task->prepareWorkspace();
|
|
task->createLaunchScript();
|
|
task->setState(WorkspaceReady);
|
|
}
|
|
break;
|
|
default:
|
|
//printf("id = %ld; state = %d\n", task->getId(), task->getState());
|
|
break;
|
|
}
|
|
}
|
|
changeState();
|
|
}
|
|
|
|
vector<T*> taskList;
|
|
map<int, queue<T*>, std::greater<int>> sortedByKernelNeeds;
|
|
size_t activeTasks = 0;
|
|
size_t totalProcessTasks = 0;
|
|
int maxNeededKernels = 0;
|
|
|
|
for (auto& task : this->getElements()) {
|
|
if (task->getState() == WorkspaceReady) {
|
|
activeTasks++;
|
|
totalProcessTasks += task->getKernels();
|
|
sortedByKernelNeeds[task->getKernels()].push(task);
|
|
if (maxNeededKernels < task->getKernels())
|
|
maxNeededKernels = task->getKernels();
|
|
taskList.push_back(task);
|
|
}
|
|
}
|
|
|
|
const int maxThreads = std::thread::hardware_concurrency();
|
|
printf("total tasks count = %lu, active task count %lu, maxKernels %lu, maxNeededKernels %d, maxThreads %d\n",
|
|
this->getLength(), activeTasks, maxKernels, maxNeededKernels, maxThreads);
|
|
|
|
if (maxKernels > maxThreads) {
|
|
printf("truncated maxKernels to maxThreads: %d -> %d\n", maxKernels, maxThreads);
|
|
maxKernels = maxThreads;
|
|
}
|
|
|
|
if (maxNeededKernels > maxKernels) {
|
|
printf("increased maxKernels to maxNeededKernels: %d -> %d\n", maxKernels, maxNeededKernels);
|
|
maxKernels = maxNeededKernels;
|
|
}
|
|
|
|
int busyKernels = 0;
|
|
set<T*> activeTaskSet;
|
|
bool ignoreCheck = true;
|
|
|
|
String pathRes("results");
|
|
Utils::Mkdir(pathRes);
|
|
string buf;
|
|
|
|
vector<int> emptyKeys;
|
|
vector<T*> toDel;
|
|
|
|
size_t done = 0;
|
|
size_t step = ceil(totalProcessTasks * 0.01); // step == 1%
|
|
const double total = totalProcessTasks;
|
|
|
|
auto timer_pause = Utils::getAbsoluteTime();
|
|
auto timer_killed = Utils::getAbsoluteTime();
|
|
killed = false;
|
|
|
|
while (activeTasks) {
|
|
long oldActiveTasks = activeTasks;
|
|
emptyKeys.clear();
|
|
toDel.clear();
|
|
|
|
//ставим задачи от больших к меньшему по ядрам
|
|
for (auto& elem : sortedByKernelNeeds) {
|
|
int freeKernels = maxKernels - busyKernels;
|
|
int kernelsNeeded = elem.first;
|
|
|
|
while (kernelsNeeded <= freeKernels && elem.second.size()) {
|
|
T* task = elem.second.front();
|
|
elem.second.pop();
|
|
|
|
activeTaskSet.insert(task);
|
|
task->Start(ignoreCheck);
|
|
#if DEB
|
|
printf("start task with %d kernels and id %ld\n", task->getKernels(), task->getId());
|
|
#endif
|
|
busyKernels += task->getKernels();
|
|
freeKernels = maxKernels - busyKernels;
|
|
}
|
|
|
|
if (elem.second.size() == 0)
|
|
emptyKeys.push_back(kernelsNeeded);
|
|
|
|
//если ядер не осталось, то нет смысла дальше смотреть
|
|
if (freeKernels == 0)
|
|
break;
|
|
}
|
|
|
|
// очищаем от пустых ключей
|
|
for (auto& empty : emptyKeys)
|
|
sortedByKernelNeeds.erase(empty);
|
|
|
|
checkTasksFinish(activeTaskSet, toDel, activeTasks, done, busyKernels, buf);
|
|
|
|
// очищаем завершенные задачи
|
|
for (auto& del : toDel)
|
|
activeTaskSet.erase(del);
|
|
|
|
if (oldActiveTasks != activeTasks) {
|
|
#if DEB
|
|
printf("done %ld / %d\n", done, this->getLength());
|
|
#endif
|
|
if ((done % step) == 0) {
|
|
size_t persentDone = (done / total) * 100.0;
|
|
saveProgress(persentDone);
|
|
}
|
|
}
|
|
|
|
// прошло больше 30 секунд, проверяем паузу
|
|
if (Utils::getAbsoluteTime() - timer_pause > 30) {
|
|
bool hasSleep = false;
|
|
while (checkPause()) {
|
|
hasSleep = true;
|
|
printf("stoped, sleep 10 seconds\n");
|
|
Utils::Sleep(10);
|
|
}
|
|
timer_pause = Utils::getAbsoluteTime();
|
|
if (hasSleep)
|
|
printf("resume\n");
|
|
}
|
|
|
|
// прошло больше 10 секунд, проверяем нужно ли завершиться
|
|
if (Utils::getAbsoluteTime() - timer_killed > 10) {
|
|
if (checkKilled()) {
|
|
ToLog("killing active tasks...");
|
|
ToLog("activeTasks="+ String((int)activeTasks)+";busyKernels="+String((int)busyKernels));
|
|
killActiveTasks(activeTaskSet);
|
|
ToLog("done");
|
|
ToLog("waiting for active tasks...");
|
|
while (busyKernels) {
|
|
ToLog("activeTasks="+ String((int)activeTasks)+";busyKernels="+String((int)busyKernels));
|
|
checkTasksFinish(activeTaskSet, toDel, activeTasks, done, busyKernels, buf);
|
|
// очищаем завершенные задачи
|
|
for (auto& del : toDel)
|
|
activeTaskSet.erase(del);
|
|
toDel.clear();
|
|
Utils::Sleep(5);
|
|
}
|
|
ToLog("exit of main while");
|
|
killed = true;
|
|
break;
|
|
}
|
|
timer_killed = Utils::getAbsoluteTime();
|
|
}
|
|
}
|
|
|
|
if (!killed) {
|
|
for (auto& task : taskList)
|
|
task->copyResults(pathRes);
|
|
changeState();
|
|
String outFile(pathRes + "/" + getStatePrefix() + "Info.txt");
|
|
File tmp(outFile, String(buf.c_str()));
|
|
}
|
|
else {
|
|
//всегда финальное состояние. даже если это компиляция.
|
|
saveState("RunningEnd");
|
|
ToLog("quit application");
|
|
std::exit(0);
|
|
}
|
|
}
|
|
void saveState(const String& state_text){
|
|
String stateFile = packageWorkspace + "/state/" + state_text;
|
|
File tmp(stateFile, Utils::getDate());
|
|
}
|
|
void saveState() {
|
|
saveState(getStatePrefix() + printState());
|
|
}
|
|
void saveProgress(long long persentDone) {
|
|
FILE* f = fopen("progress", "w");
|
|
if (f) {
|
|
fprintf(f, "%lld", persentDone);
|
|
fflush(f);
|
|
fclose(f);
|
|
}
|
|
}
|
|
bool checkPause() const {
|
|
return Utils::Exists("pause");
|
|
}
|
|
bool checkKilled() const {
|
|
return Utils::Exists("kill");
|
|
}
|
|
void killActiveTasks(const set<T*>& activeTaskSet) {
|
|
for (auto& task : activeTaskSet){
|
|
task->kill();
|
|
}
|
|
}
|
|
void checkTasksFinish(const set<T*>& activeTaskSet, vector<T*>& toDel, size_t& activeTasks,
|
|
size_t& done, int& busyKernels, string& buf) {
|
|
// проверяем нет ли завершившихся задач
|
|
for (auto& task : activeTaskSet)
|
|
{
|
|
if (task->Check()) {
|
|
toDel.push_back(task);
|
|
activeTasks--;
|
|
done += task->getKernels();
|
|
busyKernels -= task->getKernels();
|
|
buf += to_string(task->getId()) + " " + string(task->printState().getCharArray()) + " " + to_string(task->getTotalTime()) + "\n";
|
|
}
|
|
}
|
|
}
|
|
}; |