#pragma once #include #include #include #include #include #include #include "File.h" #include "Task.h" #include "Array.h" enum SupervisorState { WorkspacesCreation, //0 Preparation, //1 Execution, //2 End //3 }; template class Supervisor : public Array { protected: SupervisorState state; public: virtual String getStatePrefix() { return String(""); } String printState() { switch (state) { case WorkspacesCreation: return String("WorkspacesCreation"); case Preparation: return String("Preparation"); case Execution: return String("Execution"); case End: return String("End"); default: return "?"; } } //- void print() { for (auto& elem : this->getElements()) elem->print(); } void init(const char* fileName, int recordSize) { state = WorkspacesCreation; File* packedTasks = new File(fileName); Text* lines = packedTasks->readLines(); const long length = lines->getLength() / recordSize; int offset = 0; for (int i = 0; i < length; ++i) { this->add(new T(lines, offset)); offset += recordSize; } delete packedTasks; delete lines; } void changeState() { switch (this->state) { case WorkspacesCreation: this->state = Preparation; saveState(); break; case Preparation: this->state = Execution; saveState(); break; case Execution: this->state = End; saveState(); break; default: this->state = End; break; } } void DoWithSchedule(int maxKernels) { saveState(); saveProgress(0); // подготовка тестов while (this->state != Execution) { for (auto& task : this->getElements()) { switch (this->state) { case WorkspacesCreation: if (task->getState() == Waiting) { task->createWorkspace(); task->setState(WorkspaceCreated); } break; case Preparation: if (task->getState() == WorkspaceCreated) { task->prepareWorkspace(); task->createLaunchScript(); task->setState(WorkspaceReady); } break; default: //printf("id = %ld; state = %d\n", task->getId(), task->getState()); break; } } changeState(); } map, std::greater> sortedByKernelNeeds; size_t activeTasks = 0; int maxNeededKernels = 0; for (auto& task : this->getElements()) { if (task->getState() == WorkspaceReady) { activeTasks++; sortedByKernelNeeds[task->getKernels()].push(task); if (maxNeededKernels < task->getKernels()) maxNeededKernels = task->getKernels(); } } const int maxThreads = std::thread::hardware_concurrency(); printf("total tasks count = %ld, active task count %ld, maxKernels %d, maxNeededKernels %d, maxThreads %d\n", this->getLength(), activeTasks, maxKernels, maxNeededKernels, maxThreads); if (maxKernels > maxThreads) { printf("truncated maxKernels to maxThreads: %d -> %d\n", maxKernels, maxThreads); maxKernels = maxThreads; } if (maxNeededKernels > maxKernels) { printf("increased maxKernels to maxNeededKernels: %d -> %d\n", maxKernels, maxNeededKernels); maxKernels = maxNeededKernels; } int busyKernels = 0; set activeTaskSet; bool ignoreCheck = true; String pathRes("results"); Utils::Mkdir(pathRes); string buf; vector emptyKeys; vector toDel; size_t done = 0; size_t step = ceil(activeTasks * 0.01); // step == 1% const double total = activeTasks; while (activeTasks) { long oldActiveTasks = activeTasks; emptyKeys.clear(); toDel.clear(); //ставим задачи от больших к меньшему по ядрам for (auto& elem : sortedByKernelNeeds) { int freeKernels = maxKernels - busyKernels; int kernelsNeeded = elem.first; while (kernelsNeeded <= freeKernels && elem.second.size()) { T* task = elem.second.front(); elem.second.pop(); activeTaskSet.insert(task); task->Start(ignoreCheck); #if DEB printf("start task with %d kernels and id %ld\n", task->getKernels(), task->getId()); #endif busyKernels += task->getKernels(); freeKernels = maxKernels - busyKernels; } if (elem.second.size() == 0) emptyKeys.push_back(kernelsNeeded); //если ядер не осталось, то нет смысла дальше смотреть if (freeKernels == 0) break; } // очищаем от пустых ключей for (auto& empty : emptyKeys) sortedByKernelNeeds.erase(empty); // проверяем нет ли завершившихся задач for (auto& task : activeTaskSet) { if (task->Check()) { toDel.push_back(task); activeTasks--; done++; busyKernels -= task->getKernels(); #if DEB printf(" done task with %d kernels and id %ld\n", task->getKernels(), task->getId()); #endif buf += to_string(task->getId()) + " " + string(task->printState().getCharArray()) + " " + to_string(task->getTotalTime()) + "\n"; task->copyResults(pathRes); } } // очищаем завершенные задачи for (auto& del : toDel) activeTaskSet.erase(del); if (oldActiveTasks != activeTasks) { #if DEB printf("done %ld / %d\n", done, this->getLength()); #endif if ((done % step) == 0) { size_t persentDone = (done / total) * 100.0; saveProgress(persentDone); } } } changeState(); String outFile(pathRes + "/" + getStatePrefix() + "Info.txt"); File tmp(outFile, String(buf.c_str())); } void saveState() { String stateFile = packageWorkspace + "/state/" + getStatePrefix() + printState(); File tmp(stateFile, Utils::getDate()); } void saveProgress(long long persentDone) { FILE *f = fopen("progress", "w"); if (f) { fprintf(f, "%lld", persentDone); fclose(f); } } };