Hadoop IPC的代码结构分析

与IPC相关的代码在org.apache.hadoop.ipc包下。共七个文件,其中4个辅助类:

RemoteException

Status

VersionedProtocol

ConnectionHeader

主要实现类3个:

Client

Server

RPC

客户端Client:

如上图:

与IPC连接相关的

  • Client.Connection
  • Client.ConnectionId
  • ConnectionHeader

与远程调用Call相关的

  • Client.Call
  • Client.ParallelCall
  • Client.ParallelResults

服务器端Server:

与IPC连接相关的

  • Server.Connection
  • ConnectionHeader

与远程调用Call相关的

  • Server.Call
  • Server.Responder
  • Server.Listener
  • Server.Handler

 

RPC

RPC是在Server及Client的基础上实现了Hadoop IPC。

与客户端相关的功能:

  • RPC.ClientCache
  • RPC.Invoker(继承java.lang.reflect.InvocationHandler)
  • RPC.Invocation

与服务端相关的功能:

  • RPC.Server

 

Connection

客户端与服务器端对连接的抽象不一样,所以有Server.Connection和Client.Connection。Hadoop远程调用采用TCP协议通信。

1)客户端Client.ConnectionId

连接复用:当多个IPC客户端的ConnectionId相同时,他们共享一个IPC连接。连接复用可以减少Hadoop Server、Client的资源占用,同时节省IPC连接时间。

2)ConnectionHeader

Server与Client间TCP连接建立后交换的第一条信息,包含ConnectionId.ticket(UserGroupInformation)用户信息和IPC接口信息,检验是否实现了IPC接口,以及该用户是否有权使用接口。

Call

建立连接后,即可以进行远程过程调用服务,即对IPC接口方法的调用,源码抽象为Call。

远程调用Client.Call对象和Server.Call对象,是一个IPC调用产生的,存在于IPC客户端(存根)和IPC服务端(骨架)中的实体。

Client.Call对象通过IPC连接到服务器后,自然会构成相应的Server.Call对象。

 

Client.Call何时产生以及如何产生?

如上图所示流程:

  1. 用户发起远程接口调用
  2. 动态代理,RPC.Invoker调用句柄捕获远程调用
  3. 根据invoke的输入参数method、args生成RPC.Invocation对象
  4. 并调用Client.call,call会将上一步的Invocation对象序列化并通过IPC连接发送到服务器。Client.call会等待服务端返回的结果。
  5. 服务器端Listener监听Client发来的连接请求和数据请求,并调用Server端的连接对象。
  6. 连接对象接收远程调用请求帧,反序列化,并将请求放于阻塞队列中,由Handler处理。
  7. Handler调用对应的IPC接口实现类,完成过程调用,将结果序列化。
  8. 如果此时连接的应答队列为空,返回给客户端。
  9. 否则,客户端比较忙,应答队列不为空,Handler将结果放入响应队列,由Responser通过IPC发送给客户端。

 

IPC连接

  • 连接建立
  • 连接上的数据读写
  • 连接维护
  • 连接关闭

 

服务器端的IPC连接代码分散在Listener和Server.Connection中。

Listener.run() 实现了NIO中的选择器循环。如下代码:

//Listener构造函数
public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      readPool = Executors.newFixedThreadPool(readThreads);
      for (int i = 0; i < readThreads; i++) {
        Selector readSelector = Selector.open();
        Reader reader = new Reader(readSelector);
        readers[i] = reader;
        readPool.execute(reader);
      }

Listener.run()开启选择器循环,并处理Accept请求,如下:

//Listener运行函数
public void run() {
      LOG.info(getName() + ": starting");
      SERVER.set(Server.this);
      while (running) {
        SelectionKey key = null;
        try {
          selector.select();
          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())
                  doAccept(key);
              }
            } catch (IOException e) {
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          // we can run out of memory if we have too many threads
          // log the event and sleep for a minute and give
          // some thread(s) a chance to finish
          LOG.warn("Out of Memory in server select", e);
          closeCurrentConnection(key, e);
          cleanupConnections(true);
          try { Thread.sleep(60000); } catch (Exception ie) {}
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
        cleanupConnections(false);
      }
      LOG.info("Stopping " + this.getName());

      synchronized (this) {
        try {
          acceptChannel.close();
          selector.close();
        } catch (IOException e) { }

        selector= null;
        acceptChannel= null;

        // clean up all connections
        while (!connectionList.isEmpty()) {
          closeConnection(connectionList.remove(0));
        }
      }
    }

doAccept()中通过server.accpet获取SocketChannel,并获取一个Reader对象,该对象包含一个Selector:readerSelector,通过reader.registerChannel,将SocketChannel注册到readerSelector下.并新建connection对象。

//Do_Accept
void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
      Connection c = null;
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        channel.configureBlocking(false);
        channel.socket().setTcpNoDelay(tcpNoDelay);
        Reader reader = getReader();
        try {
          reader.startAdd();
          SelectionKey readKey = reader.registerChannel(channel);
          c = new Connection(readKey, channel, System.currentTimeMillis());
          readKey.attach(c);
          synchronized (connectionList) {
            connectionList.add(numConnections, c);
            numConnections++;
          }
          if (LOG.isDebugEnabled())
            LOG.debug("Server connection from " + c.toString() +
                "; # active connections: " + numConnections +
                "; # queued calls: " + callQueue.size());
        } finally {
          reader.finishAdd();
        }

      }
    }
public synchronized SelectionKey registerChannel(SocketChannel channel)
                                                          throws IOException {
          return channel.register(readSelector, SelectionKey.OP_READ);
      }
时间: 12-20

Hadoop IPC的代码结构分析的相关文章

Hadoop HA HDFS启动错误之org.apache.hadoop.ipc.Client: Retrying connect to server问题解决

近日,在搭建Hadoop HA QJM集群的时候,出现一个问题,如本文标题. 网上有很多HA的博文,其实比较好的博文就是官方文档,讲的已经非常详细.所以,HA的搭建这里不再赘述. 本文就想给出一篇org.apache.hadoop.ipc.Client: Retrying connect to server错误的解决的方法. 因为在搜索引擎中输入了错误问题,没有找到一篇解决问题的.这里写一篇备忘,也可以给出现同样问题的朋友一个提示. 一.问题描述 HA按照规划配置好,启动后,NameNode不能

Hadoop中WordCount代码-直接加载hadoop的配置文件

Hadoop中WordCount代码-直接加载hadoop的配置文件 在Myeclipse中,直接编写WordCount代码,代码中直接调用core-site.xml,hdfs-site.xml,mapred-site.xml配置文件 package com.apache.hadoop.function; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import 

application master 持续org.apache.hadoop.ipc.Client: Retrying connect to server

一.问题现象 某一个nodemanager退出后,导致 application master中出现大量的如下日志,并且持续很长时间,application master才成功退出. 2016-06-24 09:32:35,596 INFO [ContainerLauncher #3] org.apache.hadoop.ipc.Client: Retrying connect to server: dchadoop206/192.168.1.199:32951. Already tried 1 

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException)

在运行hadoop的程序时,向hdfs中写文件时候,抛出异常信息如下: Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=Administrator, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x 原因:Hdfs中的/user

一张图解释Hadoop IPC

基于hadoop2.6.2.... 一张图Server启动,Client访问..... 使用hadoop ipc步骤: 1.定义RPC协议 2.实现RPC协议 3.构造和启动RPC SERVER 4.构造RPC Client并发送请求 参考: http://www.cnblogs.com/dycg/p/3934394.html http://www.cnblogs.com/dycg/p/rpc.html 我的字好丑!!!!

hive执行query语句时提示错误:org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.io.IOException:

hive> select product_id, track_time from trackinfo limit 5; Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks is set to 0 since there's no reduce operator org.apache.hadoop.ipc.RemoteException: java.io.IOException: java.io.IOEx

Hadoop分布式文件系统--HDFS结构分析

转自:http://blog.csdn.net/androidlushangderen/article/details/47377543 HDFS系列:http://blog.csdn.net/Androidlushangderen/article/category/5734703 前言 在Hadoop内部,具体实现了许多类的文件系统,当然最最被我们用到的就是他的分布式文件系统HDFS了.但是本篇文章不会讲HDFS的主从架构等东西,因为这些东西网上和资料书中都讲得很多了.所以,我决定以我个人的学

requirejs代码结构分析

一.函数入口函数. req = requirejs = function (deps, callback, errback, optional) { //Find the right context, use default var context, config, contextName = defContextName; //“_” //deps 是对象的话,则可能是config. // Determine if have config object in the call. if (!is

org.apache.hadoop.ipc.Client: Retrying connect to server异常的解决

检查发现是DataNode一直连接不到NameNode. 检查各个节点在etc/hosts中的配置是否有127.0.1.1 xxxxxx.如果有把其屏蔽或者删除,重启各节点即可. 原因:127.0.1.1是debian中的本地回环.这个造成了hadoop解析出现问题.这个设置应该是在做伪分布式的hadoop集群的时候,留下来的.   如果照上面的方法还是出现同样的问题,或者etc/hosts目录并没有127.0.1.1的设置的话. 那么可以在conf/core-site.xml (0.19.2版

jquery源码解析:代码结构分析

本系列是针对jquery2.0.3版本进行的讲解.此版本不支持IE8及以下版本. (function(){ (21, 94)     定义了一些变量和函数,   jQuery = function(){}; (96,283)   给jQuery对象添加一些属性和方法(实例方法,通过$("div")这类的jQuery实例对象来调用) (285,347)   extend : jQuery的继承方法 (349,817)   jQuery.extend():扩展一些工具方法(静态方法,直接通