Mesos 源码学习(3) Mesos Slave 的启动
Mesos Slave 启动相关的代码在 src/slave/main.cpp
中。
很多过程和 master 的启动过程类似。
解析命令行参数
和 master 类似,不赘。
初始化 Libprocess
首先为 slave 生成一个 slave ID:
const string id = process::ID::generate("slave"); // Process ID.
其中 process::ID::generate
定义在 3rdparty/libprocess/include/process/id.hpp
:
/**
* Returns 'prefix(N)' where N represents the number of instances
* where the same prefix (wrt. string value equality) has been used
* to generate an ID.
*
* @param prefix The prefix to base the result.
* @return An "ID" in the shape 'prefix(N)'.
*/
std::string generate(const std::string& prefix = "");
然后初始化“根 process”:
// If `process::initialize()` returns `false`, then it was called before this
// invocation, meaning the authentication realm for libprocess-level HTTP
// endpoints was set incorrectly. This should be the first invocation.
if (!process::initialize(
id,
READWRITE_HTTP_AUTHENTICATION_REALM,
READONLY_HTTP_AUTHENTICATION_REALM)) {
EXIT(EXIT_FAILURE) << "The call to `process::initialize()` in the agent's "
<< "`main()` was not the function's first invocation";
}
初始化 Version Process
同 master ,不赘。
加载 Module 和 Hooks
同 master ,不赘。
支持 systemd
#ifdef __linux__
// Initialize systemd if it exists.
if (flags.systemd_enable_support && systemd::exists()) {
LOG(INFO) << "Inializing systemd state";
systemd::Flags systemdFlags;
systemdFlags.enabled = flags.systemd_enable_support;
systemdFlags.runtime_directory = flags.systemd_runtime_directory;
systemdFlags.cgroups_hierarchy = flags.cgroups_hierarchy;
Try<Nothing> initialize = systemd::initialize(systemdFlags);
if (initialize.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to initialize systemd: " + initialize.error();
}
}
#endif // __linux__
创建 Fetcher 和 Containerizer
Fetcher fetcher;
Try<Containerizer*> containerizer =
Containerizer::create(flags, false, &fetcher);
Containerize 相关的代码在 src/slave/containerizer
中。
src/slave/containerizer/containerizer.hpp
中定义了 Containerizer 的接口。
Containerizer::create
方法实现在 src/slave/containerizer/containerizer.cpp
中:
Try<Containerizer*> Containerizer::create(
const Flags& flags,
bool local,
Fetcher* fetcher)
{
...
// Create containerizer(s).
vector<Containerizer*> containerizers;
foreach (const string& type, containerizerTypes) {
if (type == "mesos") {
Try<MesosContainerizer*> containerizer =
MesosContainerizer::create(flags, local, fetcher, nvidia);
if (containerizer.isError()) {
return Error("Could not create MesosContainerizer: " +
containerizer.error());
} else {
containerizers.push_back(containerizer.get());
}
} else if (type == "docker") {
Try<DockerContainerizer*> containerizer =
DockerContainerizer::create(flags, fetcher, nvidia);
if (containerizer.isError()) {
return Error("Could not create DockerContainerizer: " +
containerizer.error());
} else {
containerizers.push_back(containerizer.get());
}
} else {
return Error("Unknown or unsupported containerizer: " + type);
}
}
if (containerizers.size() == 1) {
return containerizers.front();
}
Try<ComposingContainerizer*> containerizer =
ComposingContainerizer::create(containerizers);
if (containerizer.isError()) {
return Error(containerizer.error());
}
return containerizer.get();
}
根据参数,会返回 Docker Containerizer 或者 Mesos Containerizer , 两者都指定时,就返回组合起来的 Composing Containerizer。
创建 Master Detector
Try<MasterDetector*> detector_ = MasterDetector::create(
flags.master, flags.master_detector);
创建 Authorizer
// 在 src/slave/slave.hpp 中
const Option<Authorizer*> authorizer;
// 在 src/slave/main.cpp 中
authorizer = Authorizer::create(authorizerName);
...
// Set the authorization callbacks for libprocess HTTP endpoints.
// Note that these callbacks capture `authorizer_.get()`, but the agent
// creates a copy of the authorizer during construction. Thus, if in the
// future it becomes possible to dynamically set the authorizer, this would
// break.
process::http::authorization::setCallbacks(
createAuthorizationCallbacks(authorizer_.get()));
创建 Garbage Collector
Garbage Collector 负责清理运行 task 后剩产生的垃圾(文件和目录)。
相关的代码定义在 src/slave/gc.hpp
和 src/slave/gc.cpp
中。
创建 Status Update Manager
StatusUpdateManager statusUpdateManager(flags);
Status Update Manager 负责:
- 向 master 汇报状态
- 把状态 checkpointing 到本地硬盘
- 向 Executor 发送 ACK
- 从 scheduler 获取 ACK
相关代码在 src/slave/status_update_manager.hpp
和 src/slave/status_update_manager.cpp
中。
// StatusUpdateManager is responsible for
// 1) Reliably sending status updates to the master.
// 2) Checkpointing the update to disk (optional).
// 3) Sending ACKs to the executor (optional).
// 4) Receiving ACKs from the scheduler.
class StatusUpdateManager
{
public:
StatusUpdateManager(const Flags& flags);
virtual ~StatusUpdateManager();
// Expects a callback 'forward' which gets called whenever there is
// a new status update that needs to be forwarded to the master.
void initialize(const lambda::function<void(StatusUpdate)>& forward);
// Checkpoints the status update and reliably sends the
// update to the master (and hence the scheduler).
// @return Whether the update is handled successfully
// (e.g. checkpointed).
process::Future<Nothing> update(
const StatusUpdate& update,
const SlaveID& slaveId,
const ExecutorID& executorId,
const ContainerID& containerId);
...
// Checkpoints the status update to disk if necessary.
// Also, sends the next pending status update, if any.
// @return True if the ACK is handled successfully (e.g., checkpointed)
// and the task's status update stream is not terminated.
// False same as above except the status update stream is terminated.
// Failed if there are any errors (e.g., duplicate, checkpointing).
process::Future<bool> acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
const UUID& uuid);
// Recover status updates.
process::Future<Nothing> recover(
const std::string& rootDir,
const Option<state::SlaveState>& state);
// Pause sending updates.
// This is useful when the slave is disconnected because a
// disconnected slave will drop the updates.
void pause();
// Unpause and resend all the pending updates right away.
// This is useful when the updates were pending because there was
// no master elected (e.g., during recovery) or framework failed over.
void resume();
// Closes all the status update streams corresponding to this framework.
// NOTE: This stops retrying any pending status updates for this framework.
void cleanup(const FrameworkID& frameworkId);
private:
StatusUpdateManagerProcess* process;
};
创建 Resource Estimator 和 QoS Controller
Try<ResourceEstimator*> resourceEstimator =
ResourceEstimator::create(flags.resource_estimator);
Resource Estimator 用于超发(oversubscribe),负责估计 slave 有多少资源。
创建 QoS Controller
Try<QoSController*> qosController =
QoSController::create(flags.qos_controller);
QoS Controller 负责保证服务的质量。当出现一些情况时,Slave 会选择杀掉一些 revocable executors。
Mesos 自己实现了两个 QoS Controller,一个叫 noop
,一个叫 load
。
noop
什么都不做。而 load
会在服务器 load 过高时杀掉一些 revocable executors。
更多信息参考 Oversubscription。
创建 Slave(Agent) Process
Slave* slave = new Slave(
id,
flags,
detector,
containerizer.get(),
&files,
&gc,
&statusUpdateManager,
resourceEstimator.get(),
qosController.get(),
authorizer_);
process::spawn(slave);
process::wait(slave->self());
...
接下来就是 Slave Process 的初始化。