Connection.php 10 KB
Newer Older
Carsten Brandt committed
1 2 3
<?php
/**
 * @link http://www.yiiframework.com/
4
 * @copyright Copyright (c) 2008 Yii Software LLC
Carsten Brandt committed
5 6 7
 * @license http://www.yiiframework.com/license/
 */

8
namespace yii\elasticsearch;
Carsten Brandt committed
9

10
use Yii;
Carsten Brandt committed
11 12
use yii\base\Component;
use yii\base\InvalidConfigException;
13
use yii\helpers\Json;
Carsten Brandt committed
14 15

/**
16 17
 * elasticsearch Connection is used to connect to an elasticsearch cluster version 0.20 or higher
 *
Qiang Xue committed
18 19 20
 * @property string $driverName Name of the DB driver. This property is read-only.
 * @property boolean $isActive Whether the DB connection is established. This property is read-only.
 *
Carsten Brandt committed
21 22 23
 * @author Carsten Brandt <mail@cebe.cc>
 * @since 2.0
 */
24
class Connection extends Component
Carsten Brandt committed
25 26 27 28 29 30 31
{
	/**
	 * @event Event an event that is triggered after a DB connection is established
	 */
	const EVENT_AFTER_OPEN = 'afterOpen';

	/**
32
	 * @var boolean whether to autodetect available cluster nodes on [[open()]]
Carsten Brandt committed
33
	 */
34
	public $autodetectCluster = true;
Carsten Brandt committed
35
	/**
36 37 38
	 * @var array cluster nodes
	 * This is populated with the result of a cluster nodes request when [[autodetectCluster]] is true.
	 * @see http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/cluster-nodes-info.html#cluster-nodes-info
Carsten Brandt committed
39
	 */
40 41 42 43 44 45 46
	public $nodes = [
		['http_address' => 'inet[/127.0.0.1:9200]'],
	];
	/**
	 * @var array the active node. key of [[nodes]]. Will be randomly selected on [[open()]].
	 */
	public $activeNode;
Carsten Brandt committed
47

48 49
	// TODO http://www.elasticsearch.org/guide/en/elasticsearch/client/php-api/current/_configuration.html#_example_configuring_http_basic_auth
	public $auth = [];
50 51 52 53 54 55 56 57 58 59 60 61 62
	/**
	 * @var float timeout to use for connecting to an elasticsearch node.
	 * This value will be used to configure the curl `CURLOPT_CONNECTTIMEOUT` option.
	 * If not set, no explicit timeout will be set for curl.
	 */
	public $connectionTimeout = null;
	/**
	 * @var float timeout to use when reading the response from an elasticsearch node.
	 * This value will be used to configure the curl `CURLOPT_TIMEOUT` option.
	 * If not set, no explicit timeout will be set for curl.
	 */
	public $dataTimeout = null;

Carsten Brandt committed
63 64 65

	public function init()
	{
AlexGx committed
66
		foreach ($this->nodes as $node) {
67 68 69
			if (!isset($node['http_address'])) {
				throw new InvalidConfigException('Elasticsearch node needs at least a http_address configured.');
			}
Carsten Brandt committed
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
		}
	}

	/**
	 * Closes the connection when this component is being serialized.
	 * @return array
	 */
	public function __sleep()
	{
		$this->close();
		return array_keys(get_object_vars($this));
	}

	/**
	 * Returns a value indicating whether the DB connection is established.
	 * @return boolean whether the DB connection is established
	 */
	public function getIsActive()
	{
89
		return $this->activeNode !== null;
Carsten Brandt committed
90 91 92 93 94 95 96 97 98
	}

	/**
	 * Establishes a DB connection.
	 * It does nothing if a DB connection has already been established.
	 * @throws Exception if connection fails
	 */
	public function open()
	{
99 100
		if ($this->activeNode !== null) {
			return;
101
		}
102 103 104 105 106 107 108 109
		if (empty($this->nodes)) {
			throw new InvalidConfigException('elasticsearch needs at least one node to operate.');
		}
		if ($this->autodetectCluster) {
			$node = reset($this->nodes);
			$host = $node['http_address'];
			if (strncmp($host, 'inet[/', 6) == 0) {
				$host = substr($host, 6, -1);
Carsten Brandt committed
110
			}
111
			$response = $this->httpRequest('GET', 'http://' . $host . '/_cluster/nodes');
112 113 114
			$this->nodes = $response['nodes'];
			if (empty($this->nodes)) {
				throw new Exception('cluster autodetection did not find any active node.');
Carsten Brandt committed
115
			}
116 117 118 119 120 121 122 123 124 125
		}
		$this->selectActiveNode();
		Yii::trace('Opening connection to elasticsearch. Nodes in cluster: ' . count($this->nodes)
			. ', active node: ' . $this->nodes[$this->activeNode]['http_address'], __CLASS__);
		$this->initConnection();
	}

	/**
	 * select active node randomly
	 */
126
	protected function selectActiveNode()
127 128 129
	{
		$keys = array_keys($this->nodes);
		$this->activeNode = $keys[rand(0, count($keys) - 1)];
Carsten Brandt committed
130 131 132 133 134 135 136 137
	}

	/**
	 * Closes the currently active DB connection.
	 * It does nothing if the connection is already closed.
	 */
	public function close()
	{
138 139 140
		Yii::trace('Closing connection to elasticsearch. Active node was: '
			. $this->nodes[$this->activeNode]['http_address'], __CLASS__);
		$this->activeNode = null;
Carsten Brandt committed
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
	}

	/**
	 * Initializes the DB connection.
	 * This method is invoked right after the DB connection is established.
	 * The default implementation triggers an [[EVENT_AFTER_OPEN]] event.
	 */
	protected function initConnection()
	{
		$this->trigger(self::EVENT_AFTER_OPEN);
	}

	/**
	 * Returns the name of the DB driver for the current [[dsn]].
	 * @return string name of the DB driver
	 */
	public function getDriverName()
	{
		return 'elasticsearch';
	}
161

162 163 164 165 166 167
	/**
	 * Creates a command for execution.
	 * @param array $config the configuration for the Command class
	 * @return Command the DB command
	 */
	public function createCommand($config = [])
168
	{
169 170 171 172
		$this->open();
		$config['db'] = $this;
		$command = new Command($config);
		return $command;
173 174
	}

175
	public function getQueryBuilder()
176
	{
177
		return new QueryBuilder($this);
178 179
	}

180
	public function get($url, $options = [], $body = null, $raw = false)
181 182
	{
		$this->open();
183
		return $this->httpRequest('GET', $this->createUrl($url, $options), $body, $raw);
184 185 186 187 188
	}

	public function head($url, $options = [], $body = null)
	{
		$this->open();
189
		return $this->httpRequest('HEAD', $this->createUrl($url, $options), $body);
190 191
	}

192
	public function post($url, $options = [], $body = null, $raw = false)
193 194
	{
		$this->open();
195
		return $this->httpRequest('POST', $this->createUrl($url, $options), $body, $raw);
196 197
	}

198
	public function put($url, $options = [], $body = null, $raw = false)
199 200
	{
		$this->open();
201
		return $this->httpRequest('PUT', $this->createUrl($url, $options), $body, $raw);
202 203
	}

204
	public function delete($url, $options = [], $body = null, $raw = false)
205 206
	{
		$this->open();
207
		return $this->httpRequest('DELETE', $this->createUrl($url, $options), $body, $raw);
208 209 210 211
	}

	private function createUrl($path, $options = [])
	{
212
		if (!is_string($path)) {
AlexGx committed
213
			$url = implode('/', array_map(function ($a) {
214 215 216 217 218 219 220 221 222 223
				return urlencode(is_array($a) ? implode(',', $a) : $a);
			}, $path));
			if (!empty($options)) {
				$url .= '?' . http_build_query($options);
			}
		} else {
			$url = $path;
			if (!empty($options)) {
				$url .= (strpos($url, '?') === false ? '?' : '&') . http_build_query($options);
			}
224
		}
225
		return [$this->nodes[$this->activeNode]['http_address'], $url];
226 227
	}

228
	protected function httpRequest($method, $url, $requestBody = null, $raw = false)
229 230 231 232 233 234 235 236
	{
		$method = strtoupper($method);

		// response body and headers
		$headers = [];
		$body = '';

		$options = [
237
			CURLOPT_USERAGENT      => 'Yii Framework 2 ' . __CLASS__,
238 239 240 241 242
			CURLOPT_RETURNTRANSFER => false,
			CURLOPT_HEADER         => false,
			// http://www.php.net/manual/en/function.curl-setopt.php#82418
			CURLOPT_HTTPHEADER     => ['Expect:'],

AlexGx committed
243
			CURLOPT_WRITEFUNCTION  => function ($curl, $data) use (&$body) {
244 245 246 247
				$body .= $data;
				return mb_strlen($data, '8bit');
			},
			CURLOPT_HEADERFUNCTION => function($curl, $data) use (&$headers) {
Luciano Baraglia committed
248
				foreach (explode("\r\n", $data) as $row) {
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
					if (($pos = strpos($row, ':')) !== false) {
						$headers[strtolower(substr($row, 0, $pos))] = trim(substr($row, $pos + 1));
					}
				}
				return mb_strlen($data, '8bit');
			},
			CURLOPT_CUSTOMREQUEST  => $method,
		];
		if ($this->connectionTimeout !== null) {
			$options[CURLOPT_CONNECTTIMEOUT] = $this->connectionTimeout;
		}
		if ($this->dataTimeout !== null) {
			$options[CURLOPT_TIMEOUT] = $this->dataTimeout;
		}
		if ($requestBody !== null) {
			$options[CURLOPT_POSTFIELDS] = $requestBody;
		}
		if ($method == 'HEAD') {
			$options[CURLOPT_NOBODY] = true;
			unset($options[CURLOPT_WRITEFUNCTION]);
		}

271 272
		if (is_array($url)) {
			list($host, $q) = $url;
273 274 275 276 277
			if (strncmp($host, 'inet[', 5) == 0) {
				$host = substr($host, 5, -1);
				if (($pos = strpos($host, '/')) !== false) {
					$host = substr($host, $pos + 1);
				}
278
			}
279
			$profile = $method . ' ' . $q . '#' . $requestBody;
280 281 282
			$url = 'http://' . $host . '/' . $q;
		} else {
			$profile = false;
283 284
		}

285 286 287 288
		Yii::trace("Sending request to elasticsearch node: $url\n$requestBody", __METHOD__);
		if ($profile !== false) {
			Yii::beginProfile($profile, __METHOD__);
		}
289

290
		$curl = curl_init($url);
291
		curl_setopt_array($curl, $options);
292 293 294 295 296 297 298 299 300
		if (curl_exec($curl) === false) {
			throw new Exception('Elasticsearch request failed: ' . curl_errno($curl) . ' - ' . curl_error($curl), [
				'requestMethod' => $method,
				'requestUrl' => $url,
				'requestBody' => $requestBody,
				'responseHeaders' => $headers,
				'responseBody' => $body,
			]);
		}
301 302 303 304

		$responseCode = curl_getinfo($curl, CURLINFO_HTTP_CODE);
		curl_close($curl);

305 306 307
		if ($profile !== false) {
			Yii::endProfile($profile, __METHOD__);
		}
308

309 310 311 312 313 314 315
		if ($responseCode >= 200 && $responseCode < 300) {
			if ($method == 'HEAD') {
				return true;
			} else {
				if (isset($headers['content-length']) && ($len = mb_strlen($body, '8bit')) < $headers['content-length']) {
					throw new Exception("Incomplete data received from elasticsearch: $len < {$headers['content-length']}", [
						'requestMethod' => $method,
316
						'requestUrl' => $url,
317 318 319 320 321 322 323
						'requestBody' => $requestBody,
						'responseCode' => $responseCode,
						'responseHeaders' => $headers,
						'responseBody' => $body,
					]);
				}
				if (isset($headers['content-type']) && !strncmp($headers['content-type'], 'application/json', 16)) {
324
					return $raw ? $body : Json::decode($body);
325 326 327
				}
				throw new Exception('Unsupported data received from elasticsearch: ' . $headers['content-type'], [
					'requestMethod' => $method,
328
					'requestUrl' => $url,
329 330 331 332 333 334 335 336 337 338 339
					'requestBody' => $requestBody,
					'responseCode' => $responseCode,
					'responseHeaders' => $headers,
					'responseBody' => $body,
				]);
			}
		} elseif ($responseCode == 404) {
			return false;
		} else {
			throw new Exception("Elasticsearch request failed with code $responseCode.", [
				'requestMethod' => $method,
340
				'requestUrl' => $url,
341 342 343 344 345 346 347
				'requestBody' => $requestBody,
				'responseCode' => $responseCode,
				'responseHeaders' => $headers,
				'responseBody' => $body,
			]);
		}
	}
348 349 350 351 352 353 354

	public function getNodeInfo()
	{
		return $this->get([]);
	}

	public function getClusterState()
355
	{
356
		return $this->get(['_cluster', 'state']);
357
	}
AlexGx committed
358
}