1. 摘要

今天意外翻到了以前整理的hbase连接超时的研究,在这里存一份。针对hbase0.96版本。

2. 参数分析

final private static String SOCKET_TIMEOUT = "ipc.socket.timeout";
    final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds

final private static String SOCKET_TIMEOUT = "ipc.socket.timeout" ;

final static int DEFAULT_SOCKET_TIMEOUT = 20000 ; // 20 seconds

生效位置:

RpcClient.Connection:575
protected synchronized void setupConnection()
         NetUtils.connect(this.socket, remoteId.getAddress(),
             getSocketTimeout(conf));
RpcClient.Connection:259
static int getSocketTimeout(Configuration conf) {
   return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
 }

RpcClient . Connection : 575

protected synchronized void setupConnection ( )

NetUtils . connect ( this . socket , remoteId . getAddress ( ) ,

getSocketTimeout ( conf ) ) ;

RpcClient . Connection : 259

static int getSocketTimeout ( Configuration conf ) {

return conf . getInt ( SOCKET_TIMEOUT , DEFAULT_SOCKET_TIMEOUT ) ;

}

RpcClient.Connection:1246
   this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);

RpcClient . Connection : 1246

this . maxRetries = conf . getInt ( "hbase.ipc.client.connect.max.retries" , 0 ) ;

生效位置:

RpcClient.Connection:582
    catch (SocketTimeoutException toe) {
         /* The max number of retries is 45,
          * which amounts to 20s*45 = 15 minutes retries.
          */
         handleConnectionFailure(timeoutFailures++, maxRetries, toe);
       } catch (IOException ie) {
         handleConnectionFailure(ioFailures++, maxRetries, ie);
       }
  private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe)
   throws IOException {

     closeConnection();

     // throw the exception if the maximum number of retries is reached
     if (curRetries >= maxRetries) {
       throw ioe;
     }
}

RpcClient . Connection : 582

catch ( SocketTimeoutException toe ) {

/* The max number of retries is 45,

* which amounts to 20s*45 = 15 minutes retries.

*/

handleConnectionFailure ( timeoutFailures ++ , maxRetries , toe ) ;

} catch ( IOException ie ) {

handleConnectionFailure ( ioFailures ++ , maxRetries , ie ) ;

}

private void handleConnectionFailure ( int curRetries , int maxRetries ,IOException ioe )

throws IOException {

closeConnection ( ) ;

// throw the exception if the maximum number of retries is reached

if ( curRetries >= maxRetries ) {

throw ioe ;

}

}

注释里的45不知道怎么来的。

RpcClient.rpcTimeout:156
 // thread-specific RPC timeout, which may override that of what was passed in.
 // This is used to change dynamically the timeout (for read only) when retrying: if
 //  the time allowed for the operation is less than the usual socket timeout, then
 //  we lower the timeout. This is subject to race conditions, and should be used with
 //  extreme caution.
 private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
   @Override
   protected Integer initialValue() {
     return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
   }
 };

RpcClient . rpcTimeout : 156

// thread-specific RPC timeout, which may override that of what was passed in.

// This is used to change dynamically the timeout (for read only) when retrying: if

//  the time allowed for the operation is less than the usual socket timeout, then

//  we lower the timeout. This is subject to race conditions, and should be used with

//  extreme caution.

private static ThreadLocal < Integer > rpcTimeout = new ThreadLocal < Integer > () {

@ Override

protected Integer initialValue ( ) {

return HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ;

}

} ;

这个值在RpcRetryingCaller:87被改写:

private void beforeCall() {
   int remaining = (int)(callTimeout -
     (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime));
   if (remaining < MIN_RPC_TIMEOUT) {
     // If there is no time left, we're trying anyway. It's too late.
     // 0 means no timeout, and it's not the intent here. So we secure both cases by
     // resetting to the minimum.
     remaining = MIN_RPC_TIMEOUT;
   }
   RpcClient.setRpcTimeout(remaining);
 }

private void beforeCall ( ) {

int remaining = ( int ) ( callTimeout -

( EnvironmentEdgeManager . currentTimeMillis ( ) - this . globalStartTime ) ) ;

if ( remaining < MIN_RPC_TIMEOUT ) {

// If there is no time left, we're trying anyway. It's too late.

// 0 means no timeout, and it's not the intent here. So we secure both cases by

// resetting to the minimum.

remaining = MIN_RPC_TIMEOUT ;

}

RpcClient . setRpcTimeout ( remaining ) ;

}

calltimeout默认值:

this.callTimeout = conf.getInt(
       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
 /** Parameter name for HBase client operation timeout, which overrides RPC timeout */
 public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout";
 /** Default HBase client operation timeout, which is tantamount to a blocking call */
 public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE;

this . callTimeout = conf . getInt (

HConstants . HBASE_CLIENT_OPERATION_TIMEOUT ,

HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ) ;

/** Parameter name for HBase client operation timeout, which overrides RPC timeout */

public static final String HBASE_CLIENT_OPERATION_TIMEOUT ="hbase.client.operation.timeout" ;

/** Default HBase client operation timeout, which is tantamount to a blocking call */

public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT =Integer . MAX_VALUE ;

每次调用时会把这个值跟default做比较,取小的那个

RpcClient.Connection:1612
 /**
  * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given
  * default timeout.
  */
 public static int getRpcTimeout(int defaultTimeout) {
   return Math.min(defaultTimeout, rpcTimeout.get());
 }

RpcClient . Connection : 1612

/**

* Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given

* default timeout.

*/

public static int getRpcTimeout ( int defaultTimeout ) {

return Math . min ( defaultTimeout , rpcTimeout . get ( ) ) ;

}

传入的timeout是由HConnectionImplementation传下来的:

HConnectionManager:674
     this.rpcTimeout = conf.getInt(
         HConstants.HBASE_RPC_TIMEOUT_KEY,
         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
 /**
  * timeout for each RPC
  */
 public static String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout”;
 /**
  * Default value of {@link #HBASE_RPC_TIMEOUT_KEY}
  */
 public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;

HConnectionManager : 674

this . rpcTimeout = conf . getInt (

HConstants . HBASE_RPC_TIMEOUT_KEY ,

HConstants . DEFAULT_HBASE_RPC_TIMEOUT ) ;

/**

* timeout for each RPC

*/

public static String HBASE_RPC_TIMEOUT_KEY = " hbase . rpc . timeout  ;

/**

* Default value of {@link #HBASE_RPC_TIMEOUT_KEY}

*/

public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000 ;

每次rpc调用的超时是hbase.client.operation.timeout和hbase.rpc.timeout里较小的那个,并且每次重试会重算hbase.client.operation.timeout,hbase.client.operation.timeout最小为2s。

this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
       HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
 /**
  * Parameter name for client pause value, used mostly as value to wait
  * before running a retry of a failed get, region lookup, etc.
  */
 public static String HBASE_CLIENT_PAUSE = "hbase.client.pause”;
 /**
  * Default value of {@link #HBASE_CLIENT_PAUSE}.
  */
 public static long DEFAULT_HBASE_CLIENT_PAUSE = 100;

this . failureSleep = conf . getLong ( HConstants . HBASE_CLIENT_PAUSE ,

HConstants . DEFAULT_HBASE_CLIENT_PAUSE ) ;

/**

* Parameter name for client pause value, used mostly as value to wait

* before running a retry of a failed get, region lookup, etc.

*/

public static String HBASE_CLIENT_PAUSE = " hbase . client . pause  ;

/**

* Default value of {@link #HBASE_CLIENT_PAUSE}.

*/

public static long DEFAULT_HBASE_CLIENT_PAUSE = 100 ;

RpcRetryingCaller:70
   this.retries =
       conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
 /**
  * Parameter name for maximum retries, used as maximum for all retryable
  * operations such as fetching of the root region from root region server,
  * getting a cell's value, starting a row update, etc.
  */
 public static String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number”;
 /**
  * Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}.
  */
 public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 31;

RpcRetryingCaller : 70

this . retries =

conf . getInt ( HConstants . HBASE_CLIENT_RETRIES_NUMBER ,

HConstants . DEFAULT_HBASE_CLIENT_RETRIES_NUMBER ) ;

/**

* Parameter name for maximum retries, used as maximum for all retryable

* operations such as fetching of the root region from root region server,

* getting a cell's value, starting a row update, etc.

*/

public static String HBASE_CLIENT_RETRIES_NUMBER = " hbase . client .retries . number  ;

/**

* Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}.

*/

public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 31 ;

RpcRetryingCaller:73
   this.callTimeout = conf.getInt(
       HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);

 /** Parameter name for HBase client operation timeout, which overrides RPC timeout */
 public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout”;

 /** Default HBase client operation timeout, which is tantamount to a blocking call */
 public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE;

在callWithRetries(RetryingCallable<T> callable, int callTimeout)里这个值被覆盖
this.callTimeout = callTimeout;

RpcRetryingCaller : 73

this . callTimeout = conf . getInt (

HConstants . HBASE_CLIENT_OPERATION_TIMEOUT ,

HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ) ;

/** Parameter name for HBase client operation timeout, which overrides RPC timeout */

public static final String HBASE_CLIENT_OPERATION_TIMEOUT = " hbase .client . operation . timeout  ;

/** Default HBase client operation timeout, which is tantamount to a blocking call */

public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT =Integer . MAX_VALUE ;

 callWithRetries ( RetryingCallable < T > callable , int callTimeout ) 里这个值被覆盖

this . callTimeout = callTimeout ;

参数的callTimeout从HTable里传进来:

Htable:327
   this.operationTimeout = tableName.isSystemTable() ?
     this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT,
       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT):
     this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
       HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);

 public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT =
   "hbase.client.meta.operation.timeout”;
 public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE; 
 public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout”;

Htable : 327

this . operationTimeout = tableName . isSystemTable ( ) ?

this . configuration . getInt ( HConstants .HBASE_CLIENT_META_OPERATION_TIMEOUT ,

HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ) :

this . configuration . getInt ( HConstants .HBASE_CLIENT_OPERATION_TIMEOUT ,

HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ) ;

public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT =

"hbase.client.meta.operation.timeout”;

public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE;

public static final String HBASE_CLIENT_OPERATION_TIMEOUT = " hbase .client . operation . timeout  ;

生效位置:

RpcRetryingCaller:142
// If, after the planned sleep, there won't be enough time left, we stop now.
       long duration = singleCallDuration(expectedSleep);
       if (duration > this.callTimeout) {
         String msg = "callTimeout=" + this.callTimeout + ", callDuration=" + duration +
             ": " + callable.getExceptionMessageAdditionalDetail();
         throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
       }

RpcRetryingCaller : 142

// If, after the planned sleep, there won't be enough time left, we stop now.

long duration = singleCallDuration ( expectedSleep ) ;

if ( duration > this . callTimeout ) {

String msg = "callTimeout=" + this . callTimeout + ", callDuration=" + duration +

": " + callable . getExceptionMessageAdditionalDetail ( ) ;

throw ( SocketTimeoutException ) ( new SocketTimeoutException ( msg ) .initCause ( t ) ) ;

}

ZKUtil:122
   int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
       HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
  int retry = conf.getInt("zookeeper.recovery.retry", 3);
   int retryIntervalMillis =
     conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);

 /** Configuration key for ZooKeeper session timeout */
 public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout";

 /** Default value for ZooKeeper session timeout */
 public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000;

ZKUtil  122

int timeout = conf . getInt ( HConstants . ZK_SESSION_TIMEOUT ,

HConstants . DEFAULT_ZK_SESSION_TIMEOUT ) ;

int retry = conf . getInt ( "zookeeper.recovery.retry" , 3 ) ;

int retryIntervalMillis =

conf . getInt ( "zookeeper.recovery.retry.intervalmill" , 1000 ) ;

/** Configuration key for ZooKeeper session timeout */

public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout" ;

/** Default value for ZooKeeper session timeout */

public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000 ;

这个值没仔细看究竟是干啥的

ZooKeeperRegistry:43
public void init(HConnection connection) {
   if (!(connection instanceof HConnectionManager.HConnectionImplementation)) {
     throw new RuntimeException("This registry depends on HConnectionImplementation");
   }
   this.hci = (HConnectionManager.HConnectionImplementation)connection;
 }

 @Override
 public HRegionLocation getMetaRegionLocation() throws IOException {
   ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();

   try {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
     }
     ServerName servername = MetaRegionTracker.blockUntilAvailable(zkw, hci.rpcTimeout);
}

ZooKeeperRegistry : 43

public void init ( HConnection connection ) {

if ( ! ( connection instanceof HConnectionManager . HConnectionImplementation )) {

throw new RuntimeException ( "This registry depends on HConnectionImplementation" ) ;

}

this . hci = ( HConnectionManager . HConnectionImplementation ) connection ;

}

@ Override

public HRegionLocation getMetaRegionLocation ( ) throws IOException {

ZooKeeperKeepAliveConnection zkw = hci . getKeepAliveZooKeeperWatcher ( ) ;

try {

if ( LOG . isTraceEnabled ( ) ) {

LOG . trace ( "Looking up meta region location in ZK," + " connection=" + this ) ;

}

ServerName servername = MetaRegionTracker . blockUntilAvailable ( zkw , hci .rpcTimeout ) ;

}

3. 总结:

一次调用的时间轴大概是:(带*的为可缓存操作)

  1. getconnection在初始化时完成,不考虑。

  2. hConnection.getTable ->

    *zk取meta(hci.rpcTimeout) ->

    *meta ragion scan数据 ,超时与get类似,但callWithRetries里没有限制超时。

  3. hTable.get ->RpcRetryingCaller.callWithRetries(最小为callable.call超时+hbase.client.pause,最大为Max((callable.call超时+hbase.client.pause),(callable.call超时+hbase.client.pause+hbase.client.operation.timeout)) ->

    RpcClient.call中:

    socket建连超时:(ipc.socket.timeout+hbase.client.pause)*hbase.ipc.client.connect.max.retries

    socket超时:Min(hbase.rpc.timeout,Max(hbase.client.operation.timeout-已用时间,2000))