tensorflow distributed runtime初窥

一个Tensorflow的Cluster包含多个Job,一个Job包含多个task,每一个task都是一个进程。
Job相当于Server Role。一般是分成"ps"和"worker"两种。
每个task会有1个或多个device, 如"cpu:0"、"gpu:0"。在创建graph的时候,可以指定node被place到哪个device上。tensor在device和device之间的传输由tensorflow distributed runtime自动完成。这就是tensorflow的分布式的核心设计思想。

tensorflow的分布式设计相比于它的前身DistBelief有什么改变和优势呢?

首先,DistBelief里只有两种角色:parameter server和worker。而tensorflow在摒弃parameter server这个概念,它希望把parameter server做的更generic一些。它认为,parameter server也是既需要做运算,也需要做KV存储,所以它和计算梯度的worker并没有本质的不同,只是它比较“被动”。Google希望,当想尝试一个新的优化算法时,如FTRL、Adam,不必去改parameter server的code。

于是在tensorflow中,就变成了:有一个进程负责分配任务和存储到其它节点,而其它节点负责执行收到的任务。那个分配任务的就是master节点。其它的slave节点就是传统意义上的parameter server。被分配的任务是Partial graph execution。存储是variable。整个计算流程由master节点来drive。再往上层来讲,假如你采用了training data partition,那么,每个data partition都应对应一个master节点。每个master节点都对应一个tensorflow session。每个slave节点可以同时支撑来自多个master的多个session。

Tensorflow中的session有两个实现,GrpcSession和DirectSession。前者是给分布式用的,后者是给单机用的。


一个GrpcSession的执行环境中包含一个master进程,和多个slave进程。master负责创建session,slave只需要执行master送过来的graph片段。

master进程的流程一般如下:
  1. 创建并启动grpc server
  2. 创建session
  3. while(...)  session.run()
其中,创建session必须在创建server之后进行。而graph的构造在什么时候都可以。

slave进程的流程一般如下:
  1. 创建并启动grpc server
  2. 等待grpc server结束
创建server需要两步:
  1. 构造一个ServerDef类型的protobuf对象,内容主要是cluster里的ip和端口号列表。
  2. 调用tensorflow::NewServer

一个ClusterDef包含多个JobDef,一个JobDef包含多个task,每个task是一个进程,对应一个ServerDef。
有了ServerDef,就可以用tensorflow::NewServer创建一个ServerInterface。ServerInterface主要提供的是Start/Stop/Join这样很基础的方法。

class ServerInterface {
 public:
  ServerInterface() {}
  virtual ~ServerInterface() {}

  // Starts the server running asynchronously. Returns OK on success, otherwise
  // returns an error.
  virtual Status Start() = 0;

  // Stops the server asynchronously. Returns OK on success, otherwise returns
  // an error.
  //
  // After calling `Stop()`, the caller may call `Join()` to block until the
  // server has stopped.
  virtual Status Stop() = 0;

  // Blocks until the server has stopped. Returns OK on success, otherwise
  // returns an error.
  virtual Status Join() = 0;

  // Returns a target string that can be used to connect to this server using
  // `tensorflow::NewSession()`.
  virtual const string target() const = 0;
};

tensorflow::NewServer()函数根据ServerDef的内容(主要是protocol字段)来决定ServerInterface的实现是什么(找到对应的ServerFactory)。目前我只看到一个实现:GrpcServer。

下面是如何创建一个Server并一直等在那。为了简单,我直接调用了GrpcServer::Create方法,而不是tensorflow::NewServer。

#include <iostream>
#include <fstream>
#include <SDKDDKVer.h>
#include <google/protobuf/text_format.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <tensorflow/core/distributed_runtime/rpc/grpc_server_lib.h>

using namespace tensorflow;

int main(int argc, char* argv[]) {
 ServerDef server_def;
 {
  std::ifstream confFile("1.txt");
  google::protobuf::io::IstreamInputStream in(&confFile);  
  if (!google::protobuf::TextFormat::Parse(&in, &server_def)) {
   LOG(ERROR) << "load conf failed\n";
   return -1;
  }
  confFile.close();
 }
 std::unique_ptr<ServerInterface> out_server;
 tensorflow::Status status = GrpcServer::Create(server_def, Env::Default(), &out_server);
 if (!status.ok()) {
  LOG(ERROR) << "Create server failed\n" << status;
 }
 out_server->Start();
 out_server->Join();
 return 0;
}

配置文件1.txt的内容:
cluster {
  job {
    name: "ps"
    tasks {
      value: "127.0.0.1:10001"
    }
  }
  job {
    name: "worker"
    tasks {
      value: "127.0.0.1:10000"
    }
  }
}
job_name: "ps"
protocol: "grpc"

接下来是如何创建session。
举个例子,如果像下面这样什么都不填,那么创建的是就是LocalSession。
  tensorflow::SessionOptions options;
  std::unique_ptr<Session> session(tensorflow::NewSession(options));
如果设置了target为grpc,那么创建的是就是GrpcSession。
  tensorflow::SessionOptions options;
  options.target="grpc://localhost:3000";
  std::unique_ptr<Session> session(tensorflow::NewSession(options));

此处注意,target应该是以"grpc://localhost:"开头。后面的端口号是本进程的grpc server所用的端口号。最好是直接使用ServerInterface::target()的返回值。

下面介绍tensorflow::NewSession内部做了什么。
Session永远是由SessionFactory创建的。
class SessionFactory {
 public:
  virtual Session* NewSession(const SessionOptions& options) = 0;
  virtual bool AcceptsOptions(const SessionOptions& options) = 0;
};

SessionOptions最重要的一个选项是target。
struct SessionOptions {
  std::string target;
  //...
};
target决定了采用什么样的SessionFactory来创建Session。SessionFactory有两种实现:GrpcSessionFactory和DirectSessionFactory。



前者的AcceptsOptions函数只接受以grpc://开头的target,而后者只接受target等于空的情况。这两个子类的instance都会被注册到SessionFactory这个基类的一个静态变量map中。

下面回头来介绍GrpcServer的内部构造。

ClusterDef中的每一个task,也就是说tensorflow集群中的每一个进程,都对应这一个GrpcServer。而GrpcServer内部又可以分为两部分,Master Service和Worker Serivce。一个session中,只有一个master,但是会有多个worker。Master Service负责Session的创建、销毁、run step,而Worker Service则是执行Master的Graph的一部分,或者干脆直接当key-value存储用。Worker Serivce相当于以前的parameter server,而Master Service相当于以前的worker。



GrpcServer中的master service和worker service各对应一个线程。这两个线程是在GrpcServer::Start()函数中创建。该线程负责运行network IO event loop,接收请求,执行请求。但具体的执行通常会被分发到线程池中。

下面是Master和Session的关系。



先要明白时序关系:先有GrpcServer,后有Session。GrpcServer必须先于Session创建。
上图Master的ownership在GrpcServer那。

每个GrpcSession会以unique_ptr的方式own一个MasterInterface。

MasterInterface是GRPC Master Service的client端的抽象接口。但是,由于绝大多数情况下,master就在本进程内,为了提高效率就不用走RPC了。所以Master Interface有两个实现,

  • GrpcRemoteMaster,用GRPC实现通信
  • LocalMaster,直接把Master对象的指针拿来用
LocalMaster是如何获得Master指针的:
MasterInterface实现实例的创建是在GrpcSession::Create函数中。Master是在GrpcServer::Init()中创建,在该函数的末尾,Master的这个实例会被注册到LocalMaster单件中。GrpcSession::Create函数里,会根据target去LocalMaster单件里查,如果查到了,就创建一个LocalMaster对象并返回,否则就只能走GrpcRemoteMaster了。

关于设备管理:
在单机版中,DeviceManager是在DirectSession::NewSession函数中创建,由DirectSession own。在分布式版本中,DeviceManager是在GrpcServer::Init()中创建,由GrpcServer的WorkerEnv own,用于实现worker service。
在运行的时候,每个device都有一个name,大概是这样的格式 "/job:ps/replica:0/task:0/cpu:0"。

关于SessionOptions:
实际上有两个地方都会用到SessionOptions。一个是在创建Session的时候,另一个是创建GrpcServer的时候。ServerDef中有一个ConfigProto类型的字段叫default_session_config。这个其实就是SessionOptions。这个只是在初始Devices的时候使用。

(写了这么多其实还没写到graph和tensor呢。明天继续吧)


此博客中的热门博文

少写代码,多读别人写的代码

在windows下使用llvm+clang