Mesos Slave 启动相关的代码在 src/slave/main.cpp 中。 很多过程和 master 的启动过程类似。

解析命令行参数

和 master 类似,不赘。

初始化 Libprocess

首先为 slave 生成一个 slave ID:

const string id = process::ID::generate("slave"); // Process ID.

其中 process::ID::generate 定义在 3rdparty/libprocess/include/process/id.hpp

/**
 * Returns 'prefix(N)' where N represents the number of instances
 * where the same prefix (wrt. string value equality) has been used
 * to generate an ID.
 *
 * @param prefix The prefix to base the result.
 * @return An "ID" in the shape 'prefix(N)'.
 */
std::string generate(const std::string& prefix = "");

然后初始化“根 process”:

  // If `process::initialize()` returns `false`, then it was called before this
  // invocation, meaning the authentication realm for libprocess-level HTTP
  // endpoints was set incorrectly. This should be the first invocation.
  if (!process::initialize(
          id,
          READWRITE_HTTP_AUTHENTICATION_REALM,
          READONLY_HTTP_AUTHENTICATION_REALM)) {
    EXIT(EXIT_FAILURE) << "The call to `process::initialize()` in the agent's "
                       << "`main()` was not the function's first invocation";
  }

初始化 Version Process

同 master ,不赘。

加载 Module 和 Hooks

同 master ,不赘。

支持 systemd

#ifdef __linux__
  // Initialize systemd if it exists.
  if (flags.systemd_enable_support && systemd::exists()) {
    LOG(INFO) << "Inializing systemd state";

    systemd::Flags systemdFlags;
    systemdFlags.enabled = flags.systemd_enable_support;
    systemdFlags.runtime_directory = flags.systemd_runtime_directory;
    systemdFlags.cgroups_hierarchy = flags.cgroups_hierarchy;

    Try<Nothing> initialize = systemd::initialize(systemdFlags);
    if (initialize.isError()) {
      EXIT(EXIT_FAILURE)
        << "Failed to initialize systemd: " + initialize.error();
    }
  }
#endif // __linux__

创建 Fetcher 和 Containerizer

  Fetcher fetcher;

  Try<Containerizer*> containerizer =
    Containerizer::create(flags, false, &fetcher);

Containerize 相关的代码在 src/slave/containerizer 中。 src/slave/containerizer/containerizer.hpp 中定义了 Containerizer 的接口。 Containerizer::create 方法实现在 src/slave/containerizer/containerizer.cpp 中:

Try<Containerizer*> Containerizer::create(
    const Flags& flags,
    bool local,
    Fetcher* fetcher)
{
  ...
    // Create containerizer(s).
  vector<Containerizer*> containerizers;

  foreach (const string& type, containerizerTypes) {
    if (type == "mesos") {
      Try<MesosContainerizer*> containerizer =
        MesosContainerizer::create(flags, local, fetcher, nvidia);
      if (containerizer.isError()) {
        return Error("Could not create MesosContainerizer: " +
                     containerizer.error());
      } else {
        containerizers.push_back(containerizer.get());
      }
    } else if (type == "docker") {
      Try<DockerContainerizer*> containerizer =
        DockerContainerizer::create(flags, fetcher, nvidia);
      if (containerizer.isError()) {
        return Error("Could not create DockerContainerizer: " +
                     containerizer.error());
      } else {
        containerizers.push_back(containerizer.get());
      }
    } else {
      return Error("Unknown or unsupported containerizer: " + type);
    }
  }

  if (containerizers.size() == 1) {
    return containerizers.front();
  }

  Try<ComposingContainerizer*> containerizer =
    ComposingContainerizer::create(containerizers);

  if (containerizer.isError()) {
    return Error(containerizer.error());
  }

  return containerizer.get();
}

根据参数,会返回 Docker Containerizer 或者 Mesos Containerizer , 两者都指定时,就返回组合起来的 Composing Containerizer。

创建 Master Detector

  Try<MasterDetector*> detector_ = MasterDetector::create(
      flags.master, flags.master_detector);

创建 Authorizer

// 在 src/slave/slave.hpp 中
const Option<Authorizer*> authorizer;

// 在 src/slave/main.cpp 中
authorizer = Authorizer::create(authorizerName);
...
    // Set the authorization callbacks for libprocess HTTP endpoints.
    // Note that these callbacks capture `authorizer_.get()`, but the agent
    // creates a copy of the authorizer during construction. Thus, if in the
    // future it becomes possible to dynamically set the authorizer, this would
    // break.
    process::http::authorization::setCallbacks(
        createAuthorizationCallbacks(authorizer_.get()));

创建 Garbage Collector

Garbage Collector 负责清理运行 task 后剩产生的垃圾(文件和目录)。 相关的代码定义在 src/slave/gc.hppsrc/slave/gc.cpp 中。

创建 Status Update Manager

StatusUpdateManager statusUpdateManager(flags);

Status Update Manager 负责:

  1. 向 master 汇报状态
  2. 把状态 checkpointing 到本地硬盘
  3. 向 Executor 发送 ACK
  4. 从 scheduler 获取 ACK

相关代码在 src/slave/status_update_manager.hppsrc/slave/status_update_manager.cpp 中。

// StatusUpdateManager is responsible for
// 1) Reliably sending status updates to the master.
// 2) Checkpointing the update to disk (optional).
// 3) Sending ACKs to the executor (optional).
// 4) Receiving ACKs from the scheduler.
class StatusUpdateManager
{
public:
  StatusUpdateManager(const Flags& flags);
  virtual ~StatusUpdateManager();

  // Expects a callback 'forward' which gets called whenever there is
  // a new status update that needs to be forwarded to the master.
  void initialize(const lambda::function<void(StatusUpdate)>& forward);

  // Checkpoints the status update and reliably sends the
  // update to the master (and hence the scheduler).
  // @return Whether the update is handled successfully
  // (e.g. checkpointed).
  process::Future<Nothing> update(
      const StatusUpdate& update,
      const SlaveID& slaveId,
      const ExecutorID& executorId,
      const ContainerID& containerId);
  ...

  // Checkpoints the status update to disk if necessary.
  // Also, sends the next pending status update, if any.
  // @return True if the ACK is handled successfully (e.g., checkpointed)
  //              and the task's status update stream is not terminated.
  //         False same as above except the status update stream is terminated.
  //         Failed if there are any errors (e.g., duplicate, checkpointing).
  process::Future<bool> acknowledgement(
      const TaskID& taskId,
      const FrameworkID& frameworkId,
      const UUID& uuid);

  // Recover status updates.
  process::Future<Nothing> recover(
      const std::string& rootDir,
      const Option<state::SlaveState>& state);


  // Pause sending updates.
  // This is useful when the slave is disconnected because a
  // disconnected slave will drop the updates.
  void pause();

  // Unpause and resend all the pending updates right away.
  // This is useful when the updates were pending because there was
  // no master elected (e.g., during recovery) or framework failed over.
  void resume();

  // Closes all the status update streams corresponding to this framework.
  // NOTE: This stops retrying any pending status updates for this framework.
  void cleanup(const FrameworkID& frameworkId);

private:
  StatusUpdateManagerProcess* process;
};

创建 Resource Estimator 和 QoS Controller

  Try<ResourceEstimator*> resourceEstimator =
    ResourceEstimator::create(flags.resource_estimator);

Resource Estimator 用于超发(oversubscribe),负责估计 slave 有多少资源。

创建 QoS Controller

Try<QoSController*> qosController =
    QoSController::create(flags.qos_controller);

QoS Controller 负责保证服务的质量。当出现一些情况时,Slave 会选择杀掉一些 revocable executors。

Mesos 自己实现了两个 QoS Controller,一个叫 noop,一个叫 loadnoop 什么都不做。而 load 会在服务器 load 过高时杀掉一些 revocable executors。

更多信息参考 Oversubscription

创建 Slave(Agent) Process

  Slave* slave = new Slave(
      id,
      flags,
      detector,
      containerizer.get(),
      &files,
      &gc,
      &statusUpdateManager,
      resourceEstimator.get(),
      qosController.get(),
      authorizer_);

  process::spawn(slave);
  process::wait(slave->self());
  ...

接下来就是 Slave Process 的初始化。