paddle fluid版本代码解析
基本结构
Variable
定义在variable.h文件中,具体定义如下 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23class Variable {
public:
template <typename T>
const T& Get() const {
PADDLE_ENFORCE(holder_ != nullptr, "Variable must hold some thing");
PADDLE_ENFORCE(IsType<T>(),
"Variable must be type %s, the holding type is %s",
typeid(T).name(), holder_->Type().name());
return *static_cast<const T*>(holder_->Ptr());
}
template <typename T>
T* GetMutable() {
if (!IsType<T>()) {
holder_.reset(new PlaceholderImpl<T>(new T()));
}
return static_cast<T*>(holder_->Ptr());
}
...
std::unique_ptr<Placeholder>
holder_; // pointers to a PlaceholderImpl object indeed.
friend class Scope;
const std::string* name_;
};Placeholder
用来真正保存分配的空间,
实现在PlaceholderImpl
中。
Variable
中的holder_
保留了指向Placeholder
实例的指针。每个variable都有一个string类型的名字保存在name_
中,名字只在当前Scope空间中国年有效。
Scope
定义在scope.h文件中,具体定义如下 1
2
3
4
5
6
7
8
9
10
11
12
13class Scope {
public:
...
Variable* FindVar(const std::string& name) const;
const Scope& parent() const { return *parent_; }
const Scope* FindScope(const Variable* var) const;
...
private:
// Call Scope::NewScope for a sub-scope.
explicit Scope(Scope const* parent) : parent_(parent) {}
mutable std::unordered_map<std::string, Variable*> vars_;
mutable std::list<Scope*> kids_;
Scope const* parent_{nullptr};
RPC通信
paddle采用grpc作为底层的通信系统,paddle/operators/detail/send_recv.proto定义pserver收发消息的rpc消息接口,具体实现是在 recv_impl.cc中,头文件对应send_recv_impl.h。
pserver端请求收发
recv_impl.cc中SendRecvServerImpl类负责处理pserver端的rpc请求收发。定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28typedef std::pair<std::string, sendrecv::VariableMessage> MessageWithName;
class SendRecvServerImpl final : public SendRecvService::Service {
public:
explicit SendRecvServerImpl() {}
Status SendVariable(ServerContext *context, const VariableMessage *in_var,
VoidMessage *out_var) override;
Status GetVariable(ServerContext *context, const VariableMessage *in_var,
VariableMessage *out_var) override;
Status Wait(ServerContext *context, const VoidMessage *in_var,
VoidMessage *out_var) override;
void Reset();
void Done();
void SetScope(framework::Scope *scope) { scope_ = scope; };
const MessageWithName Get() { return this->var_recv_queue_.Pop(); }
void Push(const MessageWithName &msg) { this->var_recv_queue_.Push(msg); }
private:
// received variable from RPC, operators fetch variable from this queue.
SimpleBlockQueue<MessageWithName> var_recv_queue_;
framework::Scope *scope_;
// condition of the sub program
std::mutex mutex_;
bool done_;
std::condition_variable condition_;
};std::pair<std::string, sendrecv::VariableMessage>
mutex_, condition_, done_ 用来实现Wait接口的阻塞操作. 遗留问题:
SendRecvServerImpl::SendVariable
从名字上理解是发送Variable,但实现里是直接把传入VariableMessage
塞到了var_recv_queue_
队列中。
1
2
3
4
5
6
7
8Status SendRecvServerImpl::SendVariable(ServerContext *context,
const VariableMessage *in_var,
VoidMessage *out_var) {
MessageWithName msg_with_name =
std::make_pair(in_var->varname(), std::move(*in_var));
var_recv_queue_.Push(std::move(msg_with_name));
return Status::OK;
}
trainer端请求收发
RPCClient类实现了trainer/worker端的rpc请求收发,定义: 1
2
3
4
5
6
7
8
9
10
11
12class RPCClient {
public:
RPCClient(std::shared_ptr<Channel> channel)
: stub_(SendRecvService::NewStub(channel)) {}
bool SendVariable(const framework::Scope &scope, const std::string &inname);
bool GetVariable(const framework::Scope &scope, const std::string &outname);
void Wait();
private:
std::unique_ptr<SendRecvService::Stub> stub_;
};SendVariable
函数中做的事情是把本地Scope中名字为inname
的Variable
数据封装成一个VariableMessage
发送给pserver。GetVariable
函数中做的事情正好相反,从收到的rpc请求中取出名为outname
的VariableMesage
保存到本地的Scope
中。
所有的Variable在收发的时候会通过SerializeToStream
与DeserializeFromStream
进行序列化与反序列化操作,不同类型的Variable的序列化函数定义在各自实现文件中,例如:lod_tensor.cc selected_rows.cc
,公共的序列化操作定义在tensor_util.h
文件中。
待开发计划:
目前不管是pserver还是trainer的GetVariable
与SendVariable
方法,固定传入的Place都是platform::CPUDeviceContext
,这块后面会增加其他Place类型。
Operator
OperatorBase
OperatorBase是所有Op类的基类,所有Op都会实现对应的接口。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49class OperatorBase {
public:
OperatorBase(const std::string& type, const VariableNameMap& inputs,
const VariableNameMap& outputs, const AttributeMap& attrs);
virtual ~OperatorBase() {}
template <typename T>
inline const T& Attr(const std::string& name) const {
PADDLE_ENFORCE(attrs_.count(name) != 0, "%s should be in AttributeMap",
name);
return boost::get<T>(attrs_.at(name));
}
/// Net will call this function to Run an op.
virtual void Run(const Scope& scope, const platform::Place& place) const = 0;
// FIXME(typhoonzero): this is only used for recv_op to stop event_loop.
virtual void Stop() {}
virtual bool IsNetOp() const { return false; }
virtual bool SupportGPU() const { return false; }
/// rename inputs outputs name
void Rename(const std::string& old_name, const std::string& new_name);
const VariableNameMap& Inputs() const { return inputs_; }
const VariableNameMap& Outputs() const { return outputs_; }
//! Get a input with argument's name described in `op_proto`
std::string Input(const std::string& name) const;
//! Get a input which has multiple variables.
const std::vector<std::string>& Inputs(const std::string& name) const;
std::vector<std::string> InputVars() const;
//! Get a output with argument's name described in `op_proto`
std::string Output(const std::string& name) const;
//! Get an output which has multiple variables.
//! TODO add a vector_view to prevent memory copy.
const std::vector<std::string>& Outputs(const std::string& name) const;
virtual std::vector<std::string> OutputVars(bool has_intermediate) const;
const std::string& Type() const { return type_; }
const AttributeMap& Attrs() const { return attrs_; }
protected:
std::string type_;
// NOTE: in case of OpGrad, inputs_ contains:
// I (Inputs)
// O (Outputs)
// OG (Output Gradients)
VariableNameMap inputs_;
// NOTE: in case of OpGrad, outputs_ contains
// IG (Inputs Gradients)
VariableNameMap outputs_;
AttributeMap attrs_;
private:
void GenerateTemporaryNames();
void CheckAllInputOutputSet() const;
};
这里attrs_
存储op的属性,inputs_
与outputs_
分别存储op的输入变量名称和输出变量名称,变量名称是string类型存储的,通过key(I,
O,
OG)的不同来区分不同的输入和输出。当Net中开始执行op的时候会调用op中的Run
方法来执行。其中AttributeMap
与VariableNameMap
定义是在type_defs.h中,定义为
1
2
3
4
5
6
7using VariableNameMap = std::map<std::string, std::vector<std::string>>;
// The order should be as same as framework.proto
using Attribute =
boost::variant<boost::blank, int, float, std::string, std::vector<int>,
std::vector<float>, std::vector<std::string>, bool,
std::vector<bool>, BlockDesc*>;
using AttributeMap = std::unordered_map<std::string, Attribute>;
ExecutionContext
ExecutionContext是执行op的上下文环境的一个类,前面说OperatorBase中只保存了待操作变量的名称,这里ExecutionContext中就负责从对应Scope中把真是的Variable变量返回。以及可以返回Op对应的Attr内容。另外,在ExecutionContext也保存了platform::DeviceContext的执行设备的信息。
SendOp
sendOp相对简单,先举个简单的例子。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41class SendOp : public framework::OperatorBase {
public:
SendOp(const std::string &type, const framework::VariableNameMap &inputs,
const framework::VariableNameMap &outputs,
const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) {
// init client when the operator is created at runtime.
std::vector<std::string> endpoints =
Attr<std::vector<std::string>>("endpoints");
for (auto ep : endpoints) {
client_map_[ep].reset(new detail::RPCClient(
grpc::CreateChannel(ep, grpc::InsecureChannelCredentials())));
}
}
void Run(const framework::Scope &scope,
const platform::Place &dev_place) const override {
auto ins = Inputs("X");
auto outs = Outputs("Out");
std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
// TODO(typhoonzero): use async calls to send multiple variable asyncly.
for (size_t i = 0; i < ins.size(); ++i) {
bool ret = client_map_[epmap[i]]->SendVariable(scope, ins[i]);
if (!ret) {
LOG(ERROR) << "send variable error: " << ins[i];
}
}
// TODO(typhoonzero): support async optimization
client_map_[epmap[0]]->Wait();
for (size_t i = 0; i < outs.size(); ++i) {
bool ret = client_map_[epmap[i]]->GetVariable(scope, outs[i]);
if (!ret) {
LOG(ERROR) << "GetVariable error: " << outs[i];
}
}
}
protected:
mutable std::unordered_map<std::string, std::shared_ptr<detail::RPCClient>>
client_map_;
};client_map_
保存所有跟pserver通信的Client,在初始化的时候根据endpoint进行初始化操作,Run方法真正开始执行的时候,根据X,
Out取出对应的输入输出变量名(这里key是固定的),从scope中取出对应的Variable,然后调用RpcClient的Send方法进行发送,目前是阻塞式的,每次只发一个变量,效率低,后续优化会改成异步发送。
OpKernelBase
1 | class OpKernelBase { |
OpKernelBase
把ExecutionContext
传入当成唯一的参数。OpKernelBase
是Op计算函数的基类,依据是否包含kernel,可以将Op分为两种:包含Kernel的Op和不包含kernel的Op,前者Op的定义继承自OperatorWithKernel,后者继承自OperatorBase。在operator.h中专门定义有OperatorWithKernel
是在OperatorBase
中加入了OpKernelMap
,接下来会举一个OpKearnel
的例子。添加新op可参考文档
如何添加带kernel的Operator
MulOp
首先说说下注册,MulOp定义在mul_op.cc,
mul_op.h文件中,op在添加的时候需要进行注册操作。 1
2
3
4
5
6
7
8
9namespace ops = paddle::operators;
REGISTER_OPERATOR(mul, paddle::framework::OperatorWithKernel, ops::MulOpMaker,
ops::MulOpShapeInference,
paddle::framework::DefaultGradOpDescMaker<true>);
REGISTER_OPERATOR(mul_grad, ops::MulOpGrad);
REGISTER_OP_CPU_KERNEL(
mul, ops::MulKernel<paddle::platform::CPUDeviceContext, float>);
REGISTER_OP_CPU_KERNEL(
mul_grad, ops::MulGradKernel<paddle::platform::CPUDeviceContext, float>);1
2
3
4
5namespace ops = paddle::operators;
REGISTER_OP(mul, ops::MulOp, ops::MulOpMaker, mul_grad, ops::MulOpGrad);
REGISTER_OP_CPU_KERNEL(mul, ops::MulKernel<paddle::platform::CPUDeviceContext, float>);
REGISTER_OP_CPU_KERNEL(mul_grad,
ops::MulGradKernel<paddle::platform::CPUDeviceContext, float>);
MulOp具体的实现 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36class MulOpShapeInference : public framework::InferShapeBase {
public:
void operator()(framework::InferShapeContext* ctx) const override {
auto x_dims = ctx->GetInputDim("X");
auto y_dims = ctx->GetInputDim("Y");
int x_num_col_dims = ctx->Attrs().Get<int>("x_num_col_dims");
int y_num_col_dims = ctx->Attrs().Get<int>("y_num_col_dims");
....
ctx->SetOutputDim("Out", framework::make_ddim(output_dims));
ctx->ShareLoD("X", /*->*/ "Out");
}
};
class MulOpMaker : public framework::OpProtoAndCheckerMaker {
public:
MulOpMaker(OpProto* proto, OpAttrChecker* op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "(Tensor), The first input tensor of mul op.");
AddInput("Y", "(Tensor), The second input tensor of mul op.");
AddOutput("Out", "(Tensor), The output tensor of mul op.");
...
};
template <typename DeviceContext, typename T>
class MulKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext& context) const override {
const Tensor* x = context.Input<Tensor>("X");
const Tensor* y = context.Input<Tensor>("Y");
Tensor* z = context.Output<Tensor>("Out");
......
math::matmul<DeviceContext, T>(
context.template device_context<DeviceContext>(), x_matrix, false,
y_matrix, false, 1, z, 0);
......
}
};
op的执行
所有的操作都是op的组合,在python中实现的时候所有的op都会append到Block中,然后通过proto格式发给具体实行的c++程序。在executor.py的run
函数中会进行汇总,具体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18def run(self,
program=None,
feed=None,
fetch_list=None,
feed_var_name='feed',
fetch_var_name='fetch',
scope=None,
return_numpy=True):
......
for i, var in enumerate(fetch_list):
global_block.append_op(
type='fetch',
inputs={'X': [var]},
outputs={'Out': [fetch_var]},
attrs={'col': i})
self.executor.run(program.desc, scope, 0, True, True)
......1
2
3
4
5
6
7
8
9
10
11
12
13for (auto& op_desc : block.AllOps()) {
auto op = paddle::framework::OpRegistry::CreateOp(*op_desc);
VLOG(3) << op->DebugStringEx(local_scope);
op->Run(*local_scope, place_);
if (FLAGS_check_nan_inf) {
for (auto& vname : op->OutputVars(true)) {
auto* var = local_scope->FindVar(vname);
if (var == nullptr) continue;
if (var->IsType<framework::LoDTensor>()) {
CheckTensorNANOrInf(vname, var->Get<framework::LoDTensor>());
}
}
}
带pserver的分布式实现
为了支持分布式版本的paddle训练,python中新增了DistributeTranspiler类,DistributeTranspiler的作用主要是optimization操作都放到了参数服务器上,在本地训练中的trainer上删除所有optimization操作。并针对trainer和pserver程序自动增加send_op操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24class DistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
if program is None:
program = default_main_program()
self.program = program
self.trainers = trainers
self.optimize_ops = optimize_ops
self._optimize_distributed(
optimize_ops,
program,
params_grads,
pservers=pservers,
trainers=trainers,
split_method=split_method)
def get_trainer_program(self):#获取trainer程序
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
return self.program1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)
send_op_ordered_inputs = []
send_op_ordered_outputs = []
epmap = []
for ep, v in self.param_grad_map.iteritems():
send_op_ordered_inputs.extend(v["grads"])
send_op_ordered_outputs.extend(v["params"])
for i in v["grads"]:
epmap.append(ep)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_op_ordered_inputs
}, # inputs is a list of tensors to be send
outputs={"Out": send_op_ordered_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
1 | def get_pserver_program(self, endpoint, optimize_ops): |
pserver程序执行的时候需要先通过get_pserver_program获取pserver的program,这里一开始会先把split以后的参数拷贝到当前pserver_program的global_block里。保存所有梯度对应的变量名到grad_var_names中。通过_create_var_for_trainers在每个pserver上为每一个trainer都创建相同的变量,保存在optimize_sub_program的global_block中,同时也创建了一个用来合并所有trainer的变量merged_var。在optimize_sub_program添加了sum
与scale
的op操作用来合并计算的梯度。opt_op.inputs中如果有名为"Grad"的key的表明有针对Grad进行操作的新op需要进行append_op操作。最后一步操作是给pserver_program增加recv
类型的op用来收取各个trainer发送过来的梯度内容。