Libprocess 特性

Mesos 底层依赖于 libprocess 库,该库用 C++ 实现了 Actor 模式,不过它不叫 actor,而叫做 process。

Libprocess 的几个主要特征是:

  1. 没有异常
  2. 没有 NULL
  3. 不用用户自己管理内存

Libprocess 还提供了不同服务之间远程调用方式,依赖于 HTTP。 也就是每个 process 会监听一个端口,提供 HTTP 服务,调用方最终是通过发送一个 HTTP 请求来调用,返回值也是通过 HTTP 返回来获得。

没有异常

通过 Try<T> 代替异常:

Try<int> t = foo();

if (t.isError()) {
    EXIT(1) << t.error();
} else {
    bar(t.get());
}

没有 NULL

通过 Option<T> 代替 NULL:

Option<int> o = foo();

if (o.isNone()) {
    EXIT(1) << "None";
} else {
    bar(o.get());
}

不需要自己管理内存

通过使用 Owned<T>:

Owned<String> o = foo();
bar(o.get());

Libprocess 库中通过“引用计数的指针(reference counted pointers)”来自己实现内存管理,不需要用户操心。

Try<T>Option<T> 可以组合成 Result<T>

基本上,Result<T> 相当于 Try<Option<T>>。例如:

// Forward declarations.
// Try<Option<string>> readline();
Result<string> readline();
// Try<Option<int>> numify(Try<Option<string>> s);
Result<int> numify(Result<string> s);
...
Try<Option<int>> i = numify(readline());
if (i.isError())) {
    EXIT(1) << i.error();
} else {
    foo(i.get().get(42)); // Use ‘42’ for none.
}

process 特性

Libprocess 中的 Actor 叫做 process,可以向一个 process 发送各种消息。主要特性:

  1. 每个 process 拥有一个 PID;
  2. 本地消息通过 dispatchdelaydefer 实现;
  3. 可以通过 promises/futures 进行函数式的组合;
  4. 远程消息通过 sendrouteinstall 实现;

任意时刻,在一个 process 内只有一个线程在执行,这样就无需在 process 内部做同步:

Processes provide execution contexts (only one thread executing within a process at a time so no need for per process synchronization).

一个 process 的生命周期

using namespace process;

class MyProcess : public Process<MyProcess> {};

int main(int argc, char** argv) {
    MyProcess process;
    spawn(process);
    terminate(process);
    wait(process);
    return 0;
}

PID

每一个 process 拥有一个 PID,libprocess 的使用者只需要知道这个 PID,而不需要维护这个 process 的引用(指针)。

由于最终是基于 HTTP 的,PID 实际上代表的也就是一个 URL,比如:http://ip:port/name/endpoint

例:

int main(int argc, char** argv) {
    QueueProcess process;
    spawn(process);
    PID<QueueProcess> pid = process.self();
    dispatch(pid, &QueueProcess:enqueue, 42);
    terminate(pid);
    wait(pid);
    return 0;
}

本地消息

通过 dispatch 异步地调用一个函数/方法

相当于把一个消息传给一个 process:

dispatch(instance, &SomeProcess::someMethod, arg1, arg2, ...);

每个 process 有一个“消息队列”, 例:

class QueueProcess : public Process<QueueProcess> {
public:
    void enqueue(int i) { this->i = i; }
    int dequeue() { return this->i; }
private:
    int i;
};

int main(int argc, char** argv) {
    QueueProcess process;
    spawn(process);
    dispatch(process, &QueueProcess::enqueue, 42);
    dispatch(process, &QueueProcess::enqueue, 43);
    terminate(process);
    wait(process);
    return 0;
}

通过 delay 延迟地调用一个函数/方法

dispatch 类似,但调用会推迟一段时间,例如:

delay(Seconds(1), process, &QueueProcess::enqueue, 42);

通过 promises/futures 进行函数式的组合

Future 是针对调用方说得,表示一个可能还没有完成的异步任务的结果,需要在“未来”才能取到值。

Promise 是针对任务执行方来说的,可以在 promise 上设置任务的结果。 可以把 promise 理解为一个可写的,可以实现一个 future 的单一赋值容器。

Future 是对结果“读”的一端。Promise 是对结果“写”的一方。

例:

template <typename T>
class QueueProcess : public Process<QueueProcess<T>> {
public:
    Future<T> dequeue() {
        return promise.future();
    }
    void enqueue(T t) {
        promise.set(t);
    }
private:
    Promise<T> promise;
};

int main(int argc, char** argv) {
...
    Future<int> i = dispatch(process, &QueueProcess<int>::dequeue);
    dispatch(process, &QueueProcess<int>::enqueue, 42);
    i.await();
...
}

对于 future,通过 then 和回调函数,调用方无需显式地调用 wait 等阻塞方法。例:

int main(int argc, char** argv) {
...;
    Future<int> i = dispatch(process, &QueueProcess<int>::dequeue);
    dispatch(process, &QueueProcess<int>::enqueue, 42);
    i.then([] (int i) {
        // Use ‘i’.
    });
    // OR use bind
    i.then(bind(&dequeueReady, _1));
...;
}

类似的回调函数还有 Future::onReady, Future::onFailed 等。

通过 defer 让回调函数在某个 process 中执行

Future 中注册回调函数,其执行是同步的,也就是说会在完成任务(实现了 future)的那个线程执行,这有可能会阻塞。 defer 提供了一种机制,可以把回调函数异步地执行。

来自Libprocess Developer Guide:

defer is similar to dispatch, but rather than enqueing the execution of a method or function on the specified process immediately (i.e., synchronously), it returns a Deferred, which is a callable object that only after getting invoked will dispatch the method or function on the specified process. Said another way, using defer is a way to defer a dispatch.

例子:

using namespace process;

void foo()
{
  ProcessBase process;
  spawn(process);

  Deferred<void(int)> deferred = defer(
      process,
      [](int i) {
        // 通过 defer,这个回调会在 `process` 所在的那个线程中执行
        // Invoked _asynchronously_ using `process` as the execution context.
      });

  Promise<int> promise;

  promise.future().then(deferred);

  promise.future().then([](int i) {
    // 没有使用 defer,则回调会在任务完成后,马上在执行任务的那个线程执行
    // Invoked synchronously from the execution context of
    // the thread that completes the future!
  });

  // Executes both callbacks synchronously, which _dispatches_
  // the deferred lambda to run asynchronously in the execution
  // context of `process` but invokes the other lambda immediately.
  promise.set(42);

  terminate(process);
}

远程消息

process 的 name

Process 可以有一个 name,比如创建一个 name 为 “queue” 的 process:

class QueueProcess : public Process<QueueProcess> {
public:
QueueProcess() : ProcessBase("queue") {}
};

通过 routeinstall 为 process 添加远程消息的“路由”

由于消息都是基于 HTTP 的,所以我们需要为 process 设置不同的“路由”,来处理不同的消息。

using namespace process;
using namespace process::http;

class QueueProcess : public Process<QueueProcess> {
public:
    QueueProcess() : ProcessBase("queue") {}
    virtual void initialize() {
        route("/enqueue", [] (Request request) {
            // Parse argument from 'request.query' or 'request.body'.
            enqueue(arg);
            return OK();
        });
    }
};

installroute 类似,使用方法有所不同:

class QueueProcess : public Process<QueueProcess> {
public:
    QueueProcess() : ProcessBase("queue") {}
    virtual void initialize() {
        install("enqueue", [] (PID<> from, string body) {
            // Parse argument from 'body'.
            enqueue(arg);
        });
    }
};

通过 send 来实现消息传递(远程方法调用)

调用方事实上等价于这样的 HTTP 请求:

curl localhost:1234/queue/enqueue?value=42

Libprocess 为调用方提供了 send 方法:

class QueueProcess : public Process<QueueProcess> {
public:
    QueueProcess() : ProcessBase("queue") {}
    virtual void initialize() {
        install("enqueue", [] (PID<> from, string body) {
            send(from, "some response");
        });
    }
};

使用 protobuf

Libprocess 还支持使用 protobuf 来承载 payload:

protobuf::Protocol<EnqueueRequest, EnqueueResponse> enqueue;
class SomeProcess : public Process<SomeProcess> {
public:
    void f() {
        EnqueueRequest request;
        request.set_arg(42);
        enqueue(pid, request)
            .onFailed(defer([] () { ...; }))
            .onReady(defer([] (EnqueueResponse response) {
                // Use response.
            }));
    }
}

参考资料