【workerman源码分析】异步TCP连接AsyncTcpConnection.php
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
-
-
/**
-
* This file is part of workerman.
-
*
-
* Licensed under The MIT License
-
* For full copyright and license information, please see the MIT-LICENSE.txt
-
* Redistributions of files must retain the above copyright notice.
-
*
-
* @author walkor<walkor@workerman.net>
-
* @copyright walkor<walkor@workerman.net>
-
* @link http://www.workerman.net/
-
* @license http://www.opensource.org/licenses/mit-license.php MIT License
-
*/
-
namespace Workerman\Connection;
-
-
use Workerman\Events\EventInterface;
-
use Workerman\Lib\Timer;
-
use Workerman\Worker;
-
use Exception;
-
-
/**
-
* 异步TCP连接
-
*/
-
class AsyncTcpConnection extends TcpConnection
-
{
-
/**
-
* 成功建立socket连接时发出。
-
*
-
* @var callback
-
*/
-
public $onConnect = null;
-
-
/**
-
* 传输层协议
-
*
-
* @var string
-
*/
-
public $transport = ‘tcp’;
-
-
/**
-
* 状态
-
*
-
* @var int
-
*/
-
protected $_status = self::STATUS_INITIAL;
-
-
/**
-
* 远程主机
-
*
-
* @var string
-
*/
-
protected $_remoteHost = ”;
-
-
/**
-
* 远程端口
-
*
-
* @var int
-
*/
-
protected $_remotePort = 80;
-
-
/**
-
* 连接开始时间
-
*
-
* @var string
-
*/
-
protected $_connectStartTime = 0;
-
-
/**
-
* 远程URI
-
*
-
* @var string
-
*/
-
protected $_remoteURI = ”;
-
-
/**
-
* 文本选项
-
*
-
* @var array
-
*/
-
protected $_contextOption = null;
-
-
/**
-
* 重新连接定时器
-
*
-
* @var int
-
*/
-
protected $_reconnectTimer = null;
-
-
-
/**
-
* PHP内置协议
-
*
-
* @var array
-
*/
-
protected static $_builtinTransports = array(
-
‘tcp’ => ‘tcp’,
-
‘udp’ => ‘udp’,
-
‘unix’ => ‘unix’,
-
‘ssl’ => ‘ssl’,
-
‘sslv2’ => ‘sslv2’,
-
‘sslv3’ => ‘sslv3’,
-
‘tls’ => ‘tls’
-
);
-
-
/**
-
* 构造方法
-
*
-
* @param string $remote_address
-
* @param array $context_option
-
* @throws Exception
-
*/
-
public function __construct($remote_address, $context_option = null)
-
{
-
$address_info = parse_url($remote_address);
-
if (!$address_info) {
-
list($scheme, $this->_remoteAddress) = explode(‘:’, $remote_address, 2);
-
if (!$this->_remoteAddress) {
-
Worker::safeEcho(new \Exception(‘bad remote_address’));
-
}
-
} else {
-
if (!isset($address_info[‘port’])) {
-
$address_info[‘port’] = 80;
-
}
-
if (!isset($address_info[‘path’])) {
-
$address_info[‘path’] = ‘/’;
-
}
-
if (!isset($address_info[‘query’])) {
-
$address_info[‘query’] = ”;
-
} else {
-
$address_info[‘query’] = ‘?’ . $address_info[‘query’];
-
}
-
$this->_remoteAddress = “{$address_info[‘host’]}:{$address_info[‘port’]}”;
-
$this->_remoteHost = $address_info[‘host’];
-
$this->_remotePort = $address_info[‘port’];
-
$this->_remoteURI = “{$address_info[‘path’]}{$address_info[‘query’]}”;
-
$scheme = isset($address_info[‘scheme’]) ? $address_info[‘scheme’] : ‘tcp’;
-
}
-
-
$this->id = $this->_id = self::$_idRecorder++;
-
if(PHP_INT_MAX === self::$_idRecorder){
-
self::$_idRecorder = 0;
-
}
-
// 检查应用层协议类。
-
if (!isset(self::$_builtinTransports[$scheme])) {
-
$scheme = ucfirst($scheme);
-
$this->protocol = ‘\\Protocols\\’ . $scheme;
-
if (!class_exists($this->protocol)) {
-
$this->protocol = “\\Workerman\\Protocols\\$scheme”;
-
if (!class_exists($this->protocol)) {
-
throw new Exception(“class \\Protocols\\$scheme not exist”);
-
}
-
}
-
} else {
-
$this->transport = self::$_builtinTransports[$scheme];
-
}
-
-
// 统计
-
self::$statistics[‘connection_count’]++;
-
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
-
$this->_contextOption = $context_option;
-
static::$connections[$this->_id] = $this;
-
}
-
-
/**
-
* 连接
-
*
-
* @return void
-
*/
-
public function connect()
-
{
-
if ($this->_status !== self::STATUS_INITIAL && $this->_status !== self::STATUS_CLOSING &&
-
$this->_status !== self::STATUS_CLOSED) {
-
return;
-
}
-
$this->_status = self::STATUS_CONNECTING;
-
$this->_connectStartTime = microtime(true);
-
if ($this->transport !== ‘unix’) {
-
// 异步打开socket连接
-
if ($this->_contextOption) {
-
$context = stream_context_create($this->_contextOption);
-
$this->_socket = stream_socket_client(“tcp://{$this->_remoteHost}:{$this->_remotePort}”,
-
$errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT, $context);
-
} else {
-
$this->_socket = stream_socket_client(“tcp://{$this->_remoteHost}:{$this->_remotePort}”,
-
$errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT);
-
}
-
} else {
-
$this->_socket = stream_socket_client(“{$this->transport}://{$this->_remoteAddress}”, $errno, $errstr, 0,
-
STREAM_CLIENT_ASYNC_CONNECT);
-
}
-
// 如果失败尝试发出onError回调。
-
if (!$this->_socket) {
-
$this->emitError(WORKERMAN_CONNECT_FAIL, $errstr);
-
if ($this->_status === self::STATUS_CLOSING) {
-
$this->destroy();
-
}
-
if ($this->_status === self::STATUS_CLOSED) {
-
$this->onConnect = null;
-
}
-
return;
-
}
-
// 将socket添加到全局事件循环等待连接已成功建立或失败。
-
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, ‘checkConnection’));
-
// For windows.
-
if(DIRECTORY_SEPARATOR === ‘\\’) {
-
Worker::$globalEvent->add($this->_socket, EventInterface::EV_EXCEPT, array($this, ‘checkConnection’));
-
}
-
}
-
-
/**
-
* 重新连接
-
*
-
* @param int $after
-
* @return void
-
*/
-
public function reconnect($after = 0)
-
{
-
$this->_status = self::STATUS_INITIAL;
-
static::$connections[$this->_id] = $this;
-
if ($this->_reconnectTimer) {
-
Timer::del($this->_reconnectTimer);
-
}
-
if ($after > 0) {
-
$this->_reconnectTimer = Timer::add($after, array($this, ‘connect’), null, false);
-
return;
-
}
-
$this->connect();
-
}
-
-
/**
-
* 取消重新连接
-
*/
-
public function cancelReconnect()
-
{
-
if ($this->_reconnectTimer) {
-
Timer::del($this->_reconnectTimer);
-
}
-
}
-
-
/**
-
* 获取远程地址
-
*
-
* @return string
-
*/
-
public function getRemoteHost()
-
{
-
return $this->_remoteHost;
-
}
-
-
/**
-
* 获取远程URI.
-
*
-
* @return string
-
*/
-
public function getRemoteURI()
-
{
-
return $this->_remoteURI;
-
}
-
-
/**
-
* 尝试发出onError回调
-
*
-
* @param int $code
-
* @param string $msg
-
* @return void
-
*/
-
protected function emitError($code, $msg)
-
{
-
$this->_status = self::STATUS_CLOSING;
-
if ($this->onError) {
-
try {
-
call_user_func($this->onError, $this, $code, $msg);
-
} catch (\Exception $e) {
-
Worker::log($e);
-
exit(250);
-
} catch (\Error $e) {
-
Worker::log($e);
-
exit(250);
-
}
-
}
-
}
-
-
/**
-
* 检查连接是否成功建立
-
*
-
* @param resource $socket
-
* @return void
-
*/
-
public function checkConnection()
-
{
-
if ($this->_status != self::STATUS_CONNECTING) {
-
return;
-
}
-
-
// 删除Windows的EV_EXPECT
-
if(DIRECTORY_SEPARATOR === ‘\\’) {
-
Worker::$globalEvent->del($this->_socket, EventInterface::EV_EXCEPT);
-
}
-
-
// 检查socket状态
-
if ($address = stream_socket_get_name($this->_socket, true)) {
-
// 非阻塞
-
stream_set_blocking($this->_socket, 0);
-
// 适配 hhvm
-
if (function_exists(‘stream_set_read_buffer’)) {
-
stream_set_read_buffer($this->_socket, 0);
-
}
-
// 尝试打开tcp的keepalive并禁用Nagle算法
-
if (function_exists(‘socket_import_stream’) && $this->transport === ‘tcp’) {
-
$raw_socket = socket_import_stream($this->_socket);
-
socket_set_option($raw_socket, SOL_SOCKET, SO_KEEPALIVE, 1);
-
socket_set_option($raw_socket, SOL_TCP, TCP_NODELAY, 1);
-
}
-
-
// 删除写侦听器
-
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
-
-
// SSL握手
-
if ($this->transport === ‘ssl’) {
-
$this->_sslHandshakeCompleted = $this->doSslHandshake($this->_socket);
-
} else {
-
// 等待发送的数据
-
if ($this->_sendBuffer) {
-
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, ‘baseWrite’));
-
}
-
}
-
-
// 注册侦听器等待读取事件。
-
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, ‘baseRead’));
-
-
$this->_status = self::STATUS_ESTABLISHED;
-
$this->_remoteAddress = $address;
-
-
// 调用onConnect回调
-
if ($this->onConnect) {
-
try {
-
call_user_func($this->onConnect, $this);
-
} catch (\Exception $e) {
-
Worker::log($e);
-
exit(250);
-
} catch (\Error $e) {
-
Worker::log($e);
-
exit(250);
-
}
-
}
-
// 调用protocol::onConnect
-
if (method_exists($this->protocol, ‘onConnect’)) {
-
try {
-
call_user_func(array($this->protocol, ‘onConnect’), $this);
-
} catch (\Exception $e) {
-
Worker::log($e);
-
exit(250);
-
} catch (\Error $e) {
-
Worker::log($e);
-
exit(250);
-
}
-
}
-
} else {
-
// 连接失败
-
$this->emitError(WORKERMAN_CONNECT_FAIL, ‘connect ‘ . $this->_remoteAddress . ‘ fail after ‘ . round(microtime(true) – $this->_connectStartTime, 4) . ‘ seconds’);
-
if ($this->_status === self::STATUS_CLOSING) {
-
$this->destroy();
-
}
-
if ($this->_status === self::STATUS_CLOSED) {
-
$this->onConnect = null;
-
}
-
}
-
}
-
}
转载请注明:SuperIT » 【workerman源码分析】异步TCP连接AsyncTcpConnection.php