Slave Process 初始化的代码在 src/slave/slave.cpp 中:

void Slave::initialize()
{...}

cgourps 初始化

可以通过参数 --agent_subsystems 指定需要使用的 cgroup 子系统,默认问空。 Mesos Slave 会运行在这些 cgroup 子系统里面。主要是用于资源监控。 这些子系统都继承自 root mesos cgroup 。

注意这里的 cgroup 初始化都是针对 slave 进程本身的,而不是针对 executor。

Credential 和 Authenticaton 初始化

// 在 src/slave/slave.hpp 中
Option<Credential> credential;

// 在 src/slave/slave.cpp 中
  if (flags.credential.isSome()) {
    Result<Credential> _credential =
      credentials::readCredential(flags.credential.get());
    ...
    credential = _credential.get();
  }

  Option<Credentials> httpCredentials;
  if (flags.http_credentials.isSome()) {
    Result<Credentials> credentials =
      credentials::read(flags.http_credentials.get());
    ...
    httpCredentials = credentials.get();
  }
  if (flags.authenticate_http_readonly) {
    Try<Nothing> result = initializeHttpAuthenticators(
        READONLY_HTTP_AUTHENTICATION_REALM,
        strings::split(flags.http_authenticators, ","),
        httpCredentials);

    if (result.isError()) {
      EXIT(EXIT_FAILURE) << result.error();
    }
  }
  ...

Resource Estimator 和 Qos Controller 初始化

  Try<Nothing> initialize =
    resourceEstimator->initialize(defer(self(), &Self::usage));

  if (initialize.isError()) {
    EXIT(EXIT_FAILURE)
      << "Failed to initialize the resource estimator: " << initialize.error();
  }

  initialize = qosController->initialize(defer(self(), &Self::usage));

  if (initialize.isError()) {
    EXIT(EXIT_FAILURE)
      << "Failed to initialize the QoS Controller: " << initialize.error();
  }

Resources 初始化

下面的代码把 flags 传给 Containerizer,得到 resource:

  Try<Resources> resources = Containerizer::resources(flags);
  if (resources.isError()) {
    EXIT(EXIT_FAILURE)
      << "Failed to determine agent resources: " << resources.error();
  }

Containerizer::resources 定义在 src/slave/containerizer/containerizer.cpp 中:

Try<Resources> Containerizer::resources(const Flags& flags)
{
  Try<Resources> parsed = Resources::parse(
      flags.resources.getOrElse(""), flags.default_role);

  if (parsed.isError()) {
    return Error(parsed.error());
  }

  Resources resources = parsed.get();
  ...
  return resources;
}
...

如果命令行中没有明确指定 cpus, gpus, mem, portsdisk 这5种资源,则它们会被自动添加。

另外还会检查 disk 类型的资源是否真的存在于硬盘上。

Resource 定义在 include/mesos/resources.hppsrc/common/resources.cpp 中。

Attributes 初始化

  Attributes attributes;
  if (flags.attributes.isSome()) {
    attributes = Attributes::parse(flags.attributes.get());
  }

Attributes 定义在 include/mesos/attributes.hppsrc/common/attributes.cpp 中。

初始化 slave info

就是把之前初始化的很多东西填充到 info 中:

// 在 src/slave/slave.hpp 中
SlaveInfo info;

// 在 src/slave/slave.cpp 中
  // Initialize slave info.
  info.set_hostname(hostname);
  info.set_port(self().address.port);

  info.mutable_resources()->CopyFrom(resources.get());
  if (HookManager::hooksAvailable()) {
    info.mutable_resources()->CopyFrom(
        HookManager::slaveResourcesDecorator(info));
  }

  // Initialize `totalResources` with `info.resources`, checkpointed
  // resources will be applied later during recovery.
  totalResources = resources.get();

  LOG(INFO) << "Agent resources: " << info.resources();

  info.mutable_attributes()->CopyFrom(attributes);
  if (HookManager::hooksAvailable()) {
    info.mutable_attributes()->CopyFrom(
        HookManager::slaveAttributesDecorator(info));
  }

  LOG(INFO) << "Agent attributes: " << info.attributes();

  // Checkpointing of slaves is always enabled.
  info.set_checkpoint(true);

这里有两个 Hooks 有可能会被调用到(如果在命令行设置了的话):

  • slaveAttributesDecorator:Slave 初始化时调用,该 hook 为这个 slave 创建 attributes,然后 slave 会把自身的信息(包含 attribute)通知到 master。
  • slaveResourcesDecorator:slave 初始化时被调用,为 slave 生成 resource

StatusUpdateManager 初始化

  statusUpdateManager->initialize(defer(self(), &Slave::forward, lambda::_1)
    .operator std::function<void(StatusUpdate)>());

statusUpdateManager->initialize 定义在 src/slave/status_update_manager.cpp 中:

// 在 src/slave/status_update_manager.hpp 中
  // Expects a callback 'forward' which gets called whenever there is
  // a new status update that needs to be forwarded to the master.
  void initialize(const lambda::function<void(StatusUpdate)>& forward);

// 在 src/slave/status_update_manager.cpp 中
void StatusUpdateManager::initialize(
    const function<void(StatusUpdate)>& forward)
{
  dispatch(process, &StatusUpdateManagerProcess::initialize, forward);
}
...
class StatusUpdateManagerProcess
  : public ProtobufProcess<StatusUpdateManagerProcess>
{
  ...
  function<void(StatusUpdate)> forward_;
  ...
}
...
void StatusUpdateManagerProcess::initialize(
    const function<void(StatusUpdate)>& forward)
{
  forward_ = forward;
}

可以看到,传给 StatusUpdateManagerProcess 的回调函数是 Slave::forward 方法。 该方法放到 forward_变量中,该变量在 StatusUpdateManagerProcess::forward 方法中被调用:

Timeout StatusUpdateManagerProcess::forward(
    const StatusUpdate& update,
    const Duration& duration)
{
  CHECK(!paused);

  VLOG(1) << "Forwarding update " << update << " to the agent";

  // Forward the update.
  forward_(update);

  // Send a message to self to resend after some delay if no ACK is received.
  return delay(duration,
               self(),
               &StatusUpdateManagerProcess::timeout,
               duration).timeout();
}

StatusUpdateManagerProcess::forward 方法所做的事,就是把更新消息传出去。 具体怎么传就是依靠初始化时设置的 forward 回调函数,这里设置的就是 Slave::forward

// NOTE: An acknowledgement for this update might have already been
// processed by the slave but not the status update manager.
void Slave::forward(StatusUpdate update)
{
  ...
  update.mutable_status()->set_uuid(update.uuid());

  // Update the status update state of the task and include the latest
  // state of the task in the status update.
  Framework* framework = getFramework(update.framework_id());
  ...
  const TaskID& taskId = update.status().task_id();
  Executor* executor = framework->getExecutor(taskId);
  ...
  task = executor->launchedTasks[taskId];
  ...
  task->set_status_update_state(update.status().state());
  task->set_status_update_uuid(update.uuid());
  update.set_latest_state(task->state());
  ...
  // Forward the update to master.
  StatusUpdateMessage message;
  message.mutable_update()->MergeFrom(update);
  message.set_pid(self()); // The ACK will be first received by the slave.

  send(master.get(), message);
}

Slave::forward 做的事情,就是组装出一个 StatusUpdateMessage 消息,发送给 master 。

启动 disk 监控

  // Start disk monitoring.
  // NOTE: We send a delayed message here instead of directly calling
  // checkDiskUsage, to make disabling this feature easy (e.g by specifying
  // a very large disk_watch_interval).
  delay(flags.disk_watch_interval, self(), &Slave::checkDiskUsage);

其中, Slave::checkDiskUsage 会每隔一段时间,就检查一次硬盘使用情况。 如果硬盘空间使用量过大,就会出发一次垃圾回收,把早期的一些垃圾清理掉。 这些参数都可以在命令行中指定。

使用 libprocess 注册处理函数,基于 HTTP 监听消息

不赘。

注册信号处理方法

  auto signalHandler = defer(self(), &Slave::signaled, lambda::_1, lambda::_2)
    .operator std::function<void(int, int)>();
  if (os::internal::configureSignal(&signalHandler) < 0) {
    EXIT(EXIT_FAILURE)
      << "Failed to configure signal handlers: " << os::strerror(errno);
  }

当收到 SIGUSR1 时,slave 会 shutdown。其它信号不做处理。

异步地执行 recovery

详见 Mesos 源码学习(5) Slave Recovery

Slave Process 初始化完成

至此,Slave Process 完成了初始化。 接下来程序就依靠之前用 libprocess 的 installroute 方法注册的各个处理函数来运行了。