paddle fluid版本代码解析

基本结构

Variable

定义在variable.h文件中,具体定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Variable {
public:
template <typename T>
const T& Get() const {
PADDLE_ENFORCE(holder_ != nullptr, "Variable must hold some thing");
PADDLE_ENFORCE(IsType<T>(),
"Variable must be type %s, the holding type is %s",
typeid(T).name(), holder_->Type().name());
return *static_cast<const T*>(holder_->Ptr());
}
template <typename T>
T* GetMutable() {
if (!IsType<T>()) {
holder_.reset(new PlaceholderImpl<T>(new T()));
}
return static_cast<T*>(holder_->Ptr());
}
...
std::unique_ptr<Placeholder>
holder_; // pointers to a PlaceholderImpl object indeed.
friend class Scope;
const std::string* name_;
};
Placeholder用来真正保存分配的空间, 实现在PlaceholderImpl中。 Variable中的holder_保留了指向Placeholder实例的指针。每个variable都有一个string类型的名字保存在name_中,名字只在当前Scope空间中国年有效。

Scope

定义在scope.h文件中,具体定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
class Scope {
public:
...
Variable* FindVar(const std::string& name) const;
const Scope& parent() const { return *parent_; }
const Scope* FindScope(const Variable* var) const;
...
private:
// Call Scope::NewScope for a sub-scope.
explicit Scope(Scope const* parent) : parent_(parent) {}
mutable std::unordered_map<std::string, Variable*> vars_;
mutable std::list<Scope*> kids_;
Scope const* parent_{nullptr};
scope负责存储variable类型的变量,每个scope中的vars_是负责存储变量的map,每个Variable实例都会对应成一个string类型的名字。scope支持嵌套,list类型的kids变量负责存储当前scope的子scope。针对Variable提供了FindVar的接口支持查询操作。

RPC通信

paddle采用grpc作为底层的通信系统,paddle/operators/detail/send_recv.proto定义pserver收发消息的rpc消息接口,具体实现是在 recv_impl.cc中,头文件对应send_recv_impl.h。

pserver端请求收发

recv_impl.cc中SendRecvServerImpl类负责处理pserver端的rpc请求收发。定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
typedef std::pair<std::string, sendrecv::VariableMessage> MessageWithName;
class SendRecvServerImpl final : public SendRecvService::Service {
public:
explicit SendRecvServerImpl() {}

Status SendVariable(ServerContext *context, const VariableMessage *in_var,
VoidMessage *out_var) override;
Status GetVariable(ServerContext *context, const VariableMessage *in_var,
VariableMessage *out_var) override;
Status Wait(ServerContext *context, const VoidMessage *in_var,
VoidMessage *out_var) override;
void Reset();
void Done();
void SetScope(framework::Scope *scope) { scope_ = scope; };

const MessageWithName Get() { return this->var_recv_queue_.Pop(); }

void Push(const MessageWithName &msg) { this->var_recv_queue_.Push(msg); }

private:
// received variable from RPC, operators fetch variable from this queue.
SimpleBlockQueue<MessageWithName> var_recv_queue_;
framework::Scope *scope_;
// condition of the sub program
std::mutex mutex_;
bool done_;
std::condition_variable condition_;
};
其中var_recv_queue_ 队列负责存储收到的请求内容,目前一个请求里只能支持传一个Tensor,MessageWithName定义是std::pair<std::string, sendrecv::VariableMessage> mutex_, condition_, done_ 用来实现Wait接口的阻塞操作. 遗留问题: SendRecvServerImpl::SendVariable从名字上理解是发送Variable,但实现里是直接把传入VariableMessage塞到了var_recv_queue_队列中。
1
2
3
4
5
6
7
8
Status SendRecvServerImpl::SendVariable(ServerContext *context,
const VariableMessage *in_var,
VoidMessage *out_var) {
MessageWithName msg_with_name =
std::make_pair(in_var->varname(), std::move(*in_var));
var_recv_queue_.Push(std::move(msg_with_name));
return Status::OK;
}

trainer端请求收发

RPCClient类实现了trainer/worker端的rpc请求收发,定义:

1
2
3
4
5
6
7
8
9
10
11
12
class RPCClient {
public:
RPCClient(std::shared_ptr<Channel> channel)
: stub_(SendRecvService::NewStub(channel)) {}

bool SendVariable(const framework::Scope &scope, const std::string &inname);
bool GetVariable(const framework::Scope &scope, const std::string &outname);
void Wait();

private:
std::unique_ptr<SendRecvService::Stub> stub_;
};
与stub_保存了与单个pserver通信的Stub,SendVariable函数中做的事情是把本地Scope中名字为innameVariable数据封装成一个VariableMessage发送给pserver。GetVariable函数中做的事情正好相反,从收到的rpc请求中取出名为outnameVariableMesage保存到本地的Scope中。

所有的Variable在收发的时候会通过SerializeToStreamDeserializeFromStream进行序列化与反序列化操作,不同类型的Variable的序列化函数定义在各自实现文件中,例如:lod_tensor.cc selected_rows.cc,公共的序列化操作定义在tensor_util.h文件中。

待开发计划: 目前不管是pserver还是trainer的GetVariableSendVariable方法,固定传入的Place都是platform::CPUDeviceContext,这块后面会增加其他Place类型。

Operator

OperatorBase

OperatorBase是所有Op类的基类,所有Op都会实现对应的接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class OperatorBase {
public:
OperatorBase(const std::string& type, const VariableNameMap& inputs,
const VariableNameMap& outputs, const AttributeMap& attrs);
virtual ~OperatorBase() {}
template <typename T>
inline const T& Attr(const std::string& name) const {
PADDLE_ENFORCE(attrs_.count(name) != 0, "%s should be in AttributeMap",
name);
return boost::get<T>(attrs_.at(name));
}
/// Net will call this function to Run an op.
virtual void Run(const Scope& scope, const platform::Place& place) const = 0;
// FIXME(typhoonzero): this is only used for recv_op to stop event_loop.
virtual void Stop() {}
virtual bool IsNetOp() const { return false; }
virtual bool SupportGPU() const { return false; }
/// rename inputs outputs name
void Rename(const std::string& old_name, const std::string& new_name);
const VariableNameMap& Inputs() const { return inputs_; }
const VariableNameMap& Outputs() const { return outputs_; }
//! Get a input with argument's name described in `op_proto`
std::string Input(const std::string& name) const;
//! Get a input which has multiple variables.
const std::vector<std::string>& Inputs(const std::string& name) const;
std::vector<std::string> InputVars() const;
//! Get a output with argument's name described in `op_proto`
std::string Output(const std::string& name) const;
//! Get an output which has multiple variables.
//! TODO add a vector_view to prevent memory copy.
const std::vector<std::string>& Outputs(const std::string& name) const;
virtual std::vector<std::string> OutputVars(bool has_intermediate) const;
const std::string& Type() const { return type_; }
const AttributeMap& Attrs() const { return attrs_; }
protected:
std::string type_;
// NOTE: in case of OpGrad, inputs_ contains:
// I (Inputs)
// O (Outputs)
// OG (Output Gradients)
VariableNameMap inputs_;
// NOTE: in case of OpGrad, outputs_ contains
// IG (Inputs Gradients)
VariableNameMap outputs_;
AttributeMap attrs_;
private:
void GenerateTemporaryNames();
void CheckAllInputOutputSet() const;
};

这里attrs_存储op的属性,inputs_outputs_分别存储op的输入变量名称和输出变量名称,变量名称是string类型存储的,通过key(I, O, OG)的不同来区分不同的输入和输出。当Net中开始执行op的时候会调用op中的Run方法来执行。其中AttributeMapVariableNameMap定义是在type_defs.h中,定义为

1
2
3
4
5
6
7
using VariableNameMap = std::map<std::string, std::vector<std::string>>;
// The order should be as same as framework.proto
using Attribute =
boost::variant<boost::blank, int, float, std::string, std::vector<int>,
std::vector<float>, std::vector<std::string>, bool,
std::vector<bool>, BlockDesc*>;
using AttributeMap = std::unordered_map<std::string, Attribute>;
函数Input与Inputs的区别在于Input只用于只有一个变量名称(Variable)的情况,Inputs的返回值是返回一个string类型的变量名称数组vector。同理Output与Outputs是类似的。InputVars与OutputVars是把variable名称字典中所有的variable名称都打平放到一个vector中返回给用户,其中有个has_intermediate参数来表明是否结果中需要包含临时的变量。变量真正的存储的位置是在Scope中。

ExecutionContext

ExecutionContext是执行op的上下文环境的一个类,前面说OperatorBase中只保存了待操作变量的名称,这里ExecutionContext中就负责从对应Scope中把真是的Variable变量返回。以及可以返回Op对应的Attr内容。另外,在ExecutionContext也保存了platform::DeviceContext的执行设备的信息。

SendOp

sendOp相对简单,先举个简单的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class SendOp : public framework::OperatorBase {
public:
SendOp(const std::string &type, const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {
// init client when the operator is created at runtime.
std::vector<std::string> endpoints =
Attr<std::vector<std::string>>("endpoints");
for (auto ep : endpoints) {
client_map_[ep].reset(new detail::RPCClient(
grpc::CreateChannel(ep, grpc::InsecureChannelCredentials())));
}
}

void Run(const framework::Scope &scope,
const platform::Place &dev_place) const override {
auto ins = Inputs("X");
auto outs = Outputs("Out");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
// TODO(typhoonzero): use async calls to send multiple variable asyncly.
for (size_t i = 0; i < ins.size(); ++i) {
bool ret = client_map_[epmap[i]]->SendVariable(scope, ins[i]);
if (!ret) {
LOG(ERROR) << "send variable error: " << ins[i];
}
}
// TODO(typhoonzero): support async optimization
client_map_[epmap[0]]->Wait();
for (size_t i = 0; i < outs.size(); ++i) {
bool ret = client_map_[epmap[i]]->GetVariable(scope, outs[i]);
if (!ret) {
LOG(ERROR) << "GetVariable error: " << outs[i];
}
}
}

protected:
mutable std::unordered_map<std::string, std::shared_ptr<detail::RPCClient>>
client_map_;
};
client_map_保存所有跟pserver通信的Client,在初始化的时候根据endpoint进行初始化操作,Run方法真正开始执行的时候,根据X, Out取出对应的输入输出变量名(这里key是固定的),从scope中取出对应的Variable,然后调用RpcClient的Send方法进行发送,目前是阻塞式的,每次只发一个变量,效率低,后续优化会改成异步发送。

OpKernelBase

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class OpKernelBase {
public:
/**
* ExecutionContext is the only parameter of Kernel Run function.
* Run will get input/output variables, state such as momentum and
* device resource such as CUDA stream, cublas handle, etc. from
* ExecutionContext. User should construct it before run the Operator.
*/
virtual void Compute(const ExecutionContext& context) const = 0;
virtual ~OpKernelBase() = default;
};
template <typename T>
class OpKernel : public OpKernelBase {
public:
using ELEMENT_TYPE = T;
};

OpKernelBaseExecutionContext传入当成唯一的参数。OpKernelBase是Op计算函数的基类,依据是否包含kernel,可以将Op分为两种:包含Kernel的Op和不包含kernel的Op,前者Op的定义继承自OperatorWithKernel,后者继承自OperatorBase。在operator.h中专门定义有OperatorWithKernel是在OperatorBase中加入了OpKernelMap,接下来会举一个OpKearnel的例子。添加新op可参考文档 如何添加带kernel的Operator

MulOp

首先说说下注册,MulOp定义在mul_op.cc, mul_op.h文件中,op在添加的时候需要进行注册操作。

1
2
3
4
5
6
7
8
9
namespace ops = paddle::operators;
REGISTER_OPERATOR(mul, paddle::framework::OperatorWithKernel, ops::MulOpMaker,
ops::MulOpShapeInference,
paddle::framework::DefaultGradOpDescMaker<true>);
REGISTER_OPERATOR(mul_grad, ops::MulOpGrad);
REGISTER_OP_CPU_KERNEL(
mul, ops::MulKernel<paddle::platform::CPUDeviceContext, float>);
REGISTER_OP_CPU_KERNEL(
mul_grad, ops::MulGradKernel<paddle::platform::CPUDeviceContext, float>);
之前的注册机制
1
2
3
4
5
namespace ops = paddle::operators;
REGISTER_OP(mul, ops::MulOp, ops::MulOpMaker, mul_grad, ops::MulOpGrad);
REGISTER_OP_CPU_KERNEL(mul, ops::MulKernel<paddle::platform::CPUDeviceContext, float>);
REGISTER_OP_CPU_KERNEL(mul_grad,
ops::MulGradKernel<paddle::platform::CPUDeviceContext, float>);
目前共存了有两种注册机制,REGISTER_OP注册时候把计算mul与mul_grad都一起注册了,REGISTER_OPERATOR注册的时候给分开了,其实REGISTER_OP底层也是用的REGISTER_OPERATOR,相比REGISTER_OPERATOR更简单一点。

MulOp具体的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MulOpShapeInference : public framework::InferShapeBase {
public:
void operator()(framework::InferShapeContext* ctx) const override {
auto x_dims = ctx->GetInputDim("X");
auto y_dims = ctx->GetInputDim("Y");
int x_num_col_dims = ctx->Attrs().Get<int>("x_num_col_dims");
int y_num_col_dims = ctx->Attrs().Get<int>("y_num_col_dims");

....
ctx->SetOutputDim("Out", framework::make_ddim(output_dims));
ctx->ShareLoD("X", /*->*/ "Out");
}
};
class MulOpMaker : public framework::OpProtoAndCheckerMaker {
public:
MulOpMaker(OpProto* proto, OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "(Tensor), The first input tensor of mul op.");
AddInput("Y", "(Tensor), The second input tensor of mul op.");
AddOutput("Out", "(Tensor), The output tensor of mul op.");
...
};
template <typename DeviceContext, typename T>
class MulKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
const Tensor* x = context.Input<Tensor>("X");
const Tensor* y = context.Input<Tensor>("Y");
Tensor* z = context.Output<Tensor>("Out");
......
math::matmul<DeviceContext, T>(
context.template device_context<DeviceContext>(), x_matrix, false,
y_matrix, false, 1, z, 0);
......
}
};
MulOp实现过程中主要有两个类MulOpMaker与MulKernel,MulOpMaker主要进行的时候通过proto的描述信息生成对应的op操作,MulKernel作用是真正实现计算逻辑,定义在Compute方法中。另外还有一个步操作是推导输出tensor的维度,目前也有两种方式,一个是继承framework::InferShapeBase类实现得MulOpShapeInference,还有一种方式是MulOpGrad中重载父类framework::OperatorWithKernel中的InferShape方法。

op的执行

所有的操作都是op的组合,在python中实现的时候所有的op都会append到Block中,然后通过proto格式发给具体实行的c++程序。在executor.py的run函数中会进行汇总,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def run(self,
program=None,
feed=None,
fetch_list=None,
feed_var_name='feed',
fetch_var_name='fetch',
scope=None,
return_numpy=True):
......
for i, var in enumerate(fetch_list):
global_block.append_op(
type='fetch',
inputs={'X': [var]},
outputs={'Out': [fetch_var]},
attrs={'col': i})

self.executor.run(program.desc, scope, 0, True, True)
......
python中的run实际是调用在c++的executor.cc中实现的run方法,按照添加op的顺序会依次执行每一个op的Run方法,op的输入输出的变量都保存在Scope中。
1
2
3
4
5
6
7
8
9
10
11
12
13
for (auto& op_desc : block.AllOps()) {
auto op = paddle::framework::OpRegistry::CreateOp(*op_desc);
VLOG(3) << op->DebugStringEx(local_scope);
op->Run(*local_scope, place_);
if (FLAGS_check_nan_inf) {
for (auto& vname : op->OutputVars(true)) {
auto* var = local_scope->FindVar(vname);
if (var == nullptr) continue;
if (var->IsType<framework::LoDTensor>()) {
CheckTensorNANOrInf(vname, var->Get<framework::LoDTensor>());
}
}
}

带pserver的分布式实现

为了支持分布式版本的paddle训练,python中新增了DistributeTranspiler类,DistributeTranspiler的作用主要是optimization操作都放到了参数服务器上,在本地训练中的trainer上删除所有optimization操作。并针对trainer和pserver程序自动增加send_op操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class DistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
if program is None:
program = default_main_program()
self.program = program
self.trainers = trainers
self.optimize_ops = optimize_ops
self._optimize_distributed(
optimize_ops,
program,
params_grads,
pservers=pservers,
trainers=trainers,
split_method=split_method)
def get_trainer_program(self):#获取trainer程序
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
return self.program
_optimize_distributed程序的具体实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin

assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)

send_op_ordered_inputs = []
send_op_ordered_outputs = []
epmap = []
for ep, v in self.param_grad_map.iteritems():
send_op_ordered_inputs.extend(v["grads"])
send_op_ordered_outputs.extend(v["params"])
for i in v["grads"]:
epmap.append(ep)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_op_ordered_inputs
}, # inputs is a list of tensors to be send
outputs={"Out": send_op_ordered_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
首先针对传入的params_and_grads进行参数切分,通过传入的切分方法来把参数切分到不同的pserver上。通过添加send_op进行pserver参数的同步操作。这一步是在transpile中执行的,在python实现程序中transpile放到DistributeTranspiler实例创建完以后就执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def get_pserver_program(self, endpoint, optimize_ops):
pserver_program = Program()
for v in self.param_grad_map[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)

optimize_sub_program = Program()
grad_var_names = [
var.name for var in self.param_grad_map[endpoint]["grads"]
]
for opt_op in optimize_ops:
for _, var in opt_op.inputs.iteritems():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if self.trainers > 1 and var.name in grad_var_names:
vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers)
merged_var = optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type="sum",
inputs={"X": vars2merge},
outputs={"Out": merged_var})
optimize_sub_program.global_block().append_op(
type="scale",
inputs={"X": merged_var},
outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)})
else:
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)

if opt_op.inputs.has_key("Grad"):
if opt_op.inputs["Grad"].name in grad_var_names:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
else:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
"GradList":
[p.name for p in self.param_grad_map[endpoint]["grads"]],
"Trainers": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program

pserver程序执行的时候需要先通过get_pserver_program获取pserver的program,这里一开始会先把split以后的参数拷贝到当前pserver_program的global_block里。保存所有梯度对应的变量名到grad_var_names中。通过_create_var_for_trainers在每个pserver上为每一个trainer都创建相同的变量,保存在optimize_sub_program的global_block中,同时也创建了一个用来合并所有trainer的变量merged_var。在optimize_sub_program添加了sumscale的op操作用来合并计算的梯度。opt_op.inputs中如果有名为"Grad"的key的表明有针对Grad进行操作的新op需要进行append_op操作。最后一步操作是给pserver_program增加recv类型的op用来收取各个trainer发送过来的梯度内容。