Mesos 源码学习(0) libprocess 简介
Libprocess 特性
Mesos 底层依赖于 libprocess 库,该库用 C++ 实现了 Actor 模式,不过它不叫 actor,而叫做 process。
Libprocess 的几个主要特征是:
- 没有异常
- 没有 NULL
- 不用用户自己管理内存
Libprocess 还提供了不同服务之间远程调用方式,依赖于 HTTP。 也就是每个 process 会监听一个端口,提供 HTTP 服务,调用方最终是通过发送一个 HTTP 请求来调用,返回值也是通过 HTTP 返回来获得。
没有异常
通过 Try<T>
代替异常:
Try<int> t = foo();
if (t.isError()) {
EXIT(1) << t.error();
} else {
bar(t.get());
}
没有 NULL
通过 Option<T>
代替 NULL:
Option<int> o = foo();
if (o.isNone()) {
EXIT(1) << "None";
} else {
bar(o.get());
}
不需要自己管理内存
通过使用 Owned<T>
:
Owned<String> o = foo();
bar(o.get());
Libprocess 库中通过“引用计数的指针(reference counted pointers)”来自己实现内存管理,不需要用户操心。
Try<T>
和 Option<T>
可以组合成 Result<T>
基本上,Result<T>
相当于 Try<Option<T>>
。例如:
// Forward declarations.
// Try<Option<string>> readline();
Result<string> readline();
// Try<Option<int>> numify(Try<Option<string>> s);
Result<int> numify(Result<string> s);
...
Try<Option<int>> i = numify(readline());
if (i.isError())) {
EXIT(1) << i.error();
} else {
foo(i.get().get(42)); // Use ‘42’ for none.
}
process 特性
Libprocess 中的 Actor 叫做 process
,可以向一个 process 发送各种消息。主要特性:
- 每个 process 拥有一个 PID;
- 本地消息通过
dispatch
,delay
和defer
实现; - 可以通过
promises/futures
进行函数式的组合; - 远程消息通过
send
,route
和install
实现;
任意时刻,在一个 process 内只有一个线程在执行,这样就无需在 process 内部做同步:
Processes provide execution contexts (only one thread executing within a process at a time so no need for per process synchronization).
一个 process 的生命周期
using namespace process;
class MyProcess : public Process<MyProcess> {};
int main(int argc, char** argv) {
MyProcess process;
spawn(process);
terminate(process);
wait(process);
return 0;
}
PID
每一个 process
拥有一个 PID,libprocess 的使用者只需要知道这个 PID,而不需要维护这个 process 的引用(指针)。
由于最终是基于 HTTP 的,PID 实际上代表的也就是一个 URL,比如:http://ip:port/name/endpoint
。
例:
int main(int argc, char** argv) {
QueueProcess process;
spawn(process);
PID<QueueProcess> pid = process.self();
dispatch(pid, &QueueProcess:enqueue, 42);
terminate(pid);
wait(pid);
return 0;
}
本地消息
通过 dispatch
异步地调用一个函数/方法
相当于把一个消息传给一个 process:
dispatch(instance, &SomeProcess::someMethod, arg1, arg2, ...);
每个 process 有一个“消息队列”, 例:
class QueueProcess : public Process<QueueProcess> {
public:
void enqueue(int i) { this->i = i; }
int dequeue() { return this->i; }
private:
int i;
};
int main(int argc, char** argv) {
QueueProcess process;
spawn(process);
dispatch(process, &QueueProcess::enqueue, 42);
dispatch(process, &QueueProcess::enqueue, 43);
terminate(process);
wait(process);
return 0;
}
通过 delay
延迟地调用一个函数/方法
和 dispatch
类似,但调用会推迟一段时间,例如:
delay(Seconds(1), process, &QueueProcess::enqueue, 42);
通过 promises/futures
进行函数式的组合
Future 是针对调用方说得,表示一个可能还没有完成的异步任务的结果,需要在“未来”才能取到值。
Promise 是针对任务执行方来说的,可以在 promise 上设置任务的结果。 可以把 promise 理解为一个可写的,可以实现一个 future 的单一赋值容器。
Future 是对结果“读”的一端。Promise 是对结果“写”的一方。
例:
template <typename T>
class QueueProcess : public Process<QueueProcess<T>> {
public:
Future<T> dequeue() {
return promise.future();
}
void enqueue(T t) {
promise.set(t);
}
private:
Promise<T> promise;
};
int main(int argc, char** argv) {
...
Future<int> i = dispatch(process, &QueueProcess<int>::dequeue);
dispatch(process, &QueueProcess<int>::enqueue, 42);
i.await();
...
}
对于 future,通过 then
和回调函数,调用方无需显式地调用 wait 等阻塞方法。例:
int main(int argc, char** argv) {
...;
Future<int> i = dispatch(process, &QueueProcess<int>::dequeue);
dispatch(process, &QueueProcess<int>::enqueue, 42);
i.then([] (int i) {
// Use ‘i’.
});
// OR use bind
i.then(bind(&dequeueReady, _1));
...;
}
类似的回调函数还有 Future::onReady
, Future::onFailed
等。
通过 defer
让回调函数在某个 process 中执行
Future 中注册回调函数,其执行是同步的,也就是说会在完成任务(实现了 future)的那个线程执行,这有可能会阻塞。
defer
提供了一种机制,可以把回调函数异步地执行。
defer
is similar todispatch
, but rather than enqueing the execution of a method or function on the specified process immediately (i.e., synchronously), it returns aDeferred
, which is a callable object that only after getting invoked will dispatch the method or function on the specified process. Said another way, usingdefer
is a way to defer adispatch
.
例子:
using namespace process;
void foo()
{
ProcessBase process;
spawn(process);
Deferred<void(int)> deferred = defer(
process,
[](int i) {
// 通过 defer,这个回调会在 `process` 所在的那个线程中执行
// Invoked _asynchronously_ using `process` as the execution context.
});
Promise<int> promise;
promise.future().then(deferred);
promise.future().then([](int i) {
// 没有使用 defer,则回调会在任务完成后,马上在执行任务的那个线程执行
// Invoked synchronously from the execution context of
// the thread that completes the future!
});
// Executes both callbacks synchronously, which _dispatches_
// the deferred lambda to run asynchronously in the execution
// context of `process` but invokes the other lambda immediately.
promise.set(42);
terminate(process);
}
远程消息
process 的 name
Process 可以有一个 name,比如创建一个 name 为 “queue” 的 process:
class QueueProcess : public Process<QueueProcess> {
public:
QueueProcess() : ProcessBase("queue") {}
};
通过 route
和 install
为 process 添加远程消息的“路由”
由于消息都是基于 HTTP 的,所以我们需要为 process 设置不同的“路由”,来处理不同的消息。
using namespace process;
using namespace process::http;
class QueueProcess : public Process<QueueProcess> {
public:
QueueProcess() : ProcessBase("queue") {}
virtual void initialize() {
route("/enqueue", [] (Request request) {
// Parse argument from 'request.query' or 'request.body'.
enqueue(arg);
return OK();
});
}
};
install
和 route
类似,使用方法有所不同:
class QueueProcess : public Process<QueueProcess> {
public:
QueueProcess() : ProcessBase("queue") {}
virtual void initialize() {
install("enqueue", [] (PID<> from, string body) {
// Parse argument from 'body'.
enqueue(arg);
});
}
};
通过 send
来实现消息传递(远程方法调用)
调用方事实上等价于这样的 HTTP 请求:
curl localhost:1234/queue/enqueue?value=42
Libprocess 为调用方提供了 send
方法:
class QueueProcess : public Process<QueueProcess> {
public:
QueueProcess() : ProcessBase("queue") {}
virtual void initialize() {
install("enqueue", [] (PID<> from, string body) {
send(from, "some response");
});
}
};
使用 protobuf
Libprocess 还支持使用 protobuf 来承载 payload:
protobuf::Protocol<EnqueueRequest, EnqueueResponse> enqueue;
class SomeProcess : public Process<SomeProcess> {
public:
void f() {
EnqueueRequest request;
request.set_arg(42);
enqueue(pid, request)
.onFailed(defer([] () { ...; }))
.onReady(defer([] (EnqueueResponse response) {
// Use response.
}));
}
}
参考资料
- libprocess: 这是 libprocess 和 Mesos 原作者 Benjamin Hindman 对于 libprocess 的介绍。
- libprocess – an Actor-based inter-process communication library: 提供了一个例子。
- Libprocess Developer Guide: 里面详细介绍了
defer
使用。