今天意外翻到了以前整理的hbase连接超时的研究,在这里存一份。针对hbase0.96版本。
socket建连时间
默认值:
final private static String SOCKET_TIMEOUT = "ipc.socket.timeout"; final static int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
|
final private static String SOCKET_TIMEOUT = "ipc.socket.timeout" ; final static int DEFAULT_SOCKET_TIMEOUT = 20000 ; // 20 seconds |
生效位置:
RpcClient.Connection:575 protected synchronized void setupConnection() NetUtils.connect(this.socket, remoteId.getAddress(), getSocketTimeout(conf)); RpcClient.Connection:259 static int getSocketTimeout(Configuration conf) { return conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT); }
|
RpcClient . Connection : 575 protected synchronized void setupConnection ( ) NetUtils . connect ( this . socket , remoteId . getAddress ( ) , getSocketTimeout ( conf ) ) ; RpcClient . Connection : 259 static int getSocketTimeout ( Configuration conf ) { return conf . getInt ( SOCKET_TIMEOUT , DEFAULT_SOCKET_TIMEOUT ) ; } |
RpcClient.Connection:1246 this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
|
RpcClient . Connection : 1246 this . maxRetries = conf . getInt ( "hbase.ipc.client.connect.max.retries" , 0 ) ; |
生效位置:
RpcClient.Connection:582 catch (SocketTimeoutException toe) { /* The max number of retries is 45, * which amounts to 20s*45 = 15 minutes retries. */ handleConnectionFailure(timeoutFailures++, maxRetries, toe); } catch (IOException ie) { handleConnectionFailure(ioFailures++, maxRetries, ie); } private void handleConnectionFailure(int curRetries, int maxRetries, IOException ioe) throws IOException { closeConnection(); // throw the exception if the maximum number of retries is reached if (curRetries >= maxRetries) { throw ioe; } }
|
RpcClient . Connection : 582 catch ( SocketTimeoutException toe ) { /* The max number of retries is 45, * which amounts to 20s*45 = 15 minutes retries. */ handleConnectionFailure ( timeoutFailures ++ , maxRetries , toe ) ; } catch ( IOException ie ) { handleConnectionFailure ( ioFailures ++ , maxRetries , ie ) ; } private void handleConnectionFailure ( int curRetries , int maxRetries ,IOException ioe ) throws IOException { closeConnection ( ) ; // throw the exception if the maximum number of retries is reached if ( curRetries >= maxRetries ) { throw ioe ; } } |
注释里的45不知道怎么来的。
RpcClient.rpcTimeout:156 // thread-specific RPC timeout, which may override that of what was passed in. // This is used to change dynamically the timeout (for read only) when retrying: if // the time allowed for the operation is less than the usual socket timeout, then // we lower the timeout. This is subject to race conditions, and should be used with // extreme caution. private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() { @Override protected Integer initialValue() { return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; } };
|
RpcClient . rpcTimeout : 156 // thread-specific RPC timeout, which may override that of what was passed in. // This is used to change dynamically the timeout (for read only) when retrying: if // the time allowed for the operation is less than the usual socket timeout, then // we lower the timeout. This is subject to race conditions, and should be used with // extreme caution. private static ThreadLocal < Integer > rpcTimeout = new ThreadLocal < Integer > () { @ Override protected Integer initialValue ( ) { return HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ; } } ; |
这个值在RpcRetryingCaller:87被改写:
private void beforeCall() { int remaining = (int)(callTimeout - (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime)); if (remaining < MIN_RPC_TIMEOUT) { // If there is no time left, we're trying anyway. It's too late. // 0 means no timeout, and it's not the intent here. So we secure both cases by // resetting to the minimum. remaining = MIN_RPC_TIMEOUT; } RpcClient.setRpcTimeout(remaining); }
|
private void beforeCall ( ) { int remaining = ( int ) ( callTimeout - ( EnvironmentEdgeManager . currentTimeMillis ( ) - this . globalStartTime ) ) ; if ( remaining < MIN_RPC_TIMEOUT ) { // If there is no time left, we're trying anyway. It's too late. // 0 means no timeout, and it's not the intent here. So we secure both cases by // resetting to the minimum. remaining = MIN_RPC_TIMEOUT ; } RpcClient . setRpcTimeout ( remaining ) ; } |
calltimeout默认值:
this.callTimeout = conf.getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout"; /** Default HBase client operation timeout, which is tantamount to a blocking call */ public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
|
this . callTimeout = conf . getInt ( HConstants . HBASE_CLIENT_OPERATION_TIMEOUT , HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ) ; /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT ="hbase.client.operation.timeout" ; /** Default HBase client operation timeout, which is tantamount to a blocking call */ public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT =Integer . MAX_VALUE ; |
每次调用时会把这个值跟default做比较,取小的那个
RpcClient.Connection:1612 /** * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given * default timeout. */ public static int getRpcTimeout(int defaultTimeout) { return Math.min(defaultTimeout, rpcTimeout.get()); }
|
RpcClient . Connection : 1612 /** * Returns the lower of the thread-local RPC time from {@link #setRpcTimeout(int)} and the given * default timeout. */ public static int getRpcTimeout ( int defaultTimeout ) { return Math . min ( defaultTimeout , rpcTimeout . get ( ) ) ; } |
传入的timeout是由HConnectionImplementation传下来的:
HConnectionManager:674 this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); /** * timeout for each RPC */ public static String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout”; /** * Default value of {@link #HBASE_RPC_TIMEOUT_KEY} */ public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
|
HConnectionManager : 674 this . rpcTimeout = conf . getInt ( HConstants . HBASE_RPC_TIMEOUT_KEY , HConstants . DEFAULT_HBASE_RPC_TIMEOUT ) ; /** * timeout for each RPC */ public static String HBASE_RPC_TIMEOUT_KEY = " hbase . rpc . timeout ” ; /** * Default value of {@link #HBASE_RPC_TIMEOUT_KEY} */ public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000 ; |
每次rpc调用的超时是hbase.client.operation.timeout和hbase.rpc.timeout里较小的那个,并且每次重试会重算hbase.client.operation.timeout,hbase.client.operation.timeout最小为2s。
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); /** * Parameter name for client pause value, used mostly as value to wait * before running a retry of a failed get, region lookup, etc. */ public static String HBASE_CLIENT_PAUSE = "hbase.client.pause”; /** * Default value of {@link #HBASE_CLIENT_PAUSE}. */ public static long DEFAULT_HBASE_CLIENT_PAUSE = 100;
|
this . failureSleep = conf . getLong ( HConstants . HBASE_CLIENT_PAUSE , HConstants . DEFAULT_HBASE_CLIENT_PAUSE ) ; /** * Parameter name for client pause value, used mostly as value to wait * before running a retry of a failed get, region lookup, etc. */ public static String HBASE_CLIENT_PAUSE = " hbase . client . pause ” ; /** * Default value of {@link #HBASE_CLIENT_PAUSE}. */ public static long DEFAULT_HBASE_CLIENT_PAUSE = 100 ; |
RpcRetryingCaller:70 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); /** * Parameter name for maximum retries, used as maximum for all retryable * operations such as fetching of the root region from root region server, * getting a cell's value, starting a row update, etc. */ public static String HBASE_CLIENT_RETRIES_NUMBER = "hbase.client.retries.number”; /** * Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}. */ public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 31;
|
RpcRetryingCaller : 70 this . retries = conf . getInt ( HConstants . HBASE_CLIENT_RETRIES_NUMBER , HConstants . DEFAULT_HBASE_CLIENT_RETRIES_NUMBER ) ; /** * Parameter name for maximum retries, used as maximum for all retryable * operations such as fetching of the root region from root region server, * getting a cell's value, starting a row update, etc. */ public static String HBASE_CLIENT_RETRIES_NUMBER = " hbase . client .retries . number ” ; /** * Default value of {@link #HBASE_CLIENT_RETRIES_NUMBER}. */ public static int DEFAULT_HBASE_CLIENT_RETRIES_NUMBER = 31 ; |
RpcRetryingCaller:73 this.callTimeout = conf.getInt( HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout”; /** Default HBase client operation timeout, which is tantamount to a blocking call */ public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE; 在callWithRetries(RetryingCallable<T> callable, int callTimeout)里这个值被覆盖 this.callTimeout = callTimeout;
|
RpcRetryingCaller : 73 this . callTimeout = conf . getInt ( HConstants . HBASE_CLIENT_OPERATION_TIMEOUT , HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ) ; /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT = " hbase .client . operation . timeout ” ; /** Default HBase client operation timeout, which is tantamount to a blocking call */ public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT =Integer . MAX_VALUE ; 在 callWithRetries ( RetryingCallable < T > callable , int callTimeout ) 里这个值被覆盖 this . callTimeout = callTimeout ; |
参数的callTimeout从HTable里传进来:
Htable:327 this.operationTimeout = tableName.isSystemTable() ? this.configuration.getInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT): this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT = "hbase.client.meta.operation.timeout”; public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE; public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout”;
|
Htable : 327 this . operationTimeout = tableName . isSystemTable ( ) ? this . configuration . getInt ( HConstants .HBASE_CLIENT_META_OPERATION_TIMEOUT , HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ) : this . configuration . getInt ( HConstants .HBASE_CLIENT_OPERATION_TIMEOUT , HConstants . DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT ) ; public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT = "hbase.client.meta.operation.timeout”; public static final int DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT = Integer.MAX_VALUE; public static final String HBASE_CLIENT_OPERATION_TIMEOUT = " hbase .client . operation . timeout ” ; |
生效位置:
RpcRetryingCaller:142 // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration(expectedSleep); if (duration > this.callTimeout) { String msg = "callTimeout=" + this.callTimeout + ", callDuration=" + duration + ": " + callable.getExceptionMessageAdditionalDetail(); throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t)); }
|
RpcRetryingCaller : 142 // If, after the planned sleep, there won't be enough time left, we stop now. long duration = singleCallDuration ( expectedSleep ) ; if ( duration > this . callTimeout ) { String msg = "callTimeout=" + this . callTimeout + ", callDuration=" + duration + ": " + callable . getExceptionMessageAdditionalDetail ( ) ; throw ( SocketTimeoutException ) ( new SocketTimeoutException ( msg ) .initCause ( t ) ) ; } |
ZKUtil:122 int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); int retry = conf.getInt("zookeeper.recovery.retry", 3); int retryIntervalMillis = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000); /** Configuration key for ZooKeeper session timeout */ public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout"; /** Default value for ZooKeeper session timeout */ public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000;
|
ZKUtil : 122 int timeout = conf . getInt ( HConstants . ZK_SESSION_TIMEOUT , HConstants . DEFAULT_ZK_SESSION_TIMEOUT ) ; int retry = conf . getInt ( "zookeeper.recovery.retry" , 3 ) ; int retryIntervalMillis = conf . getInt ( "zookeeper.recovery.retry.intervalmill" , 1000 ) ; /** Configuration key for ZooKeeper session timeout */ public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout" ; /** Default value for ZooKeeper session timeout */ public static final int DEFAULT_ZK_SESSION_TIMEOUT = 180 * 1000 ; |
这个值没仔细看究竟是干啥的
ZooKeeperRegistry:43 public void init(HConnection connection) { if (!(connection instanceof HConnectionManager.HConnectionImplementation)) { throw new RuntimeException("This registry depends on HConnectionImplementation"); } this.hci = (HConnectionManager.HConnectionImplementation)connection; } @Override public HRegionLocation getMetaRegionLocation() throws IOException { ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher(); try { if (LOG.isTraceEnabled()) { LOG.trace("Looking up meta region location in ZK," + " connection=" + this); } ServerName servername = MetaRegionTracker.blockUntilAvailable(zkw, hci.rpcTimeout); }
|
ZooKeeperRegistry : 43 public void init ( HConnection connection ) { if ( ! ( connection instanceof HConnectionManager . HConnectionImplementation )) { throw new RuntimeException ( "This registry depends on HConnectionImplementation" ) ; } this . hci = ( HConnectionManager . HConnectionImplementation ) connection ; } @ Override public HRegionLocation getMetaRegionLocation ( ) throws IOException { ZooKeeperKeepAliveConnection zkw = hci . getKeepAliveZooKeeperWatcher ( ) ; try { if ( LOG . isTraceEnabled ( ) ) { LOG . trace ( "Looking up meta region location in ZK," + " connection=" + this ) ; } ServerName servername = MetaRegionTracker . blockUntilAvailable ( zkw , hci .rpcTimeout ) ; } |
一次调用的时间轴大概是:(带*的为可缓存操作)
getconnection在初始化时完成,不考虑。
hConnection.getTable ->
*zk取meta(hci.rpcTimeout) ->
*meta ragion scan数据 ,超时与get类似,但callWithRetries里没有限制超时。
hTable.get ->RpcRetryingCaller.callWithRetries(最小为callable.call超时+hbase.client.pause,最大为Max((callable.call超时+hbase.client.pause),(callable.call超时+hbase.client.pause+hbase.client.operation.timeout)) ->
RpcClient.call中:
socket建连超时:(ipc.socket.timeout+hbase.client.pause)*hbase.ipc.client.connect.max.retries
socket超时:Min(hbase.rpc.timeout,Max(hbase.client.operation.timeout-已用时间,2000))