Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
.DS_Store
.phpunit.result.cache
.ac-php-conf.json
build/
composer.lock
composer.phar
vendor/
vendor/
node_modules/
package-lock.json
7 changes: 2 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
[![Build Status](https://github.com/Textalk/websocket-php/actions/workflows/acceptance.yml/badge.svg)](https://github.com/Textalk/websocket-php/actions)
[![Coverage Status](https://coveralls.io/repos/github/Textalk/websocket-php/badge.svg?branch=master)](https://coveralls.io/github/Textalk/websocket-php)

## Archived project
## Description

This project has been archived and is no longer maintained. No bug fix and no additional features will be added.<br>
You won't be able to submit new issues or pull requests, and no additional features will be added

This library has been replaced by [sirn-se/websocket-php](https://github.com/sirn-se/websocket-php)
This project was forked from archived project [Textalk/websocket-php](https://github.com/Textalk/websocket-php) and upgraded with non-blocking reads. Primary goal is to use only the client for non-blocking reads (server is largely left unchanged, but the state is not checked anymore). Non-blocking reads is enabled for client by default and returns empty message immediately. This feature is useful, if one has a threading mechanism already and just want to check for received messages.

## Websocket Client and Server for PHP

Expand Down
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "textalk/websocket",
"name": "narenknn/websocket",
"description": "WebSocket client and server",
"license": "ISC",
"type": "library",
Expand All @@ -26,7 +26,7 @@
"phrity/net-uri": "^1.0",
"phrity/util-errorhandler": "^1.0",
"psr/log": "^1.0 | ^2.0 | ^3.0",
"psr/http-message": "^1.0"
"psr/http-message": "^2.0"
},
"require-dev": {
"phpunit/phpunit": "^9.0",
Expand Down
14 changes: 11 additions & 3 deletions lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Client implements LoggerAwareInterface
'persistent' => false,
'return_obj' => false,
'timeout' => 5,
'blocking' => false,
];

private $socket_uri;
Expand Down Expand Up @@ -157,6 +158,14 @@ public function pong(string $payload = ''): void
$this->send($payload, 'pong');
}

/**
* Returns true when equal pongs received for pings.
*/
public function pongs4pings(): bool
{
return $this->connection->pongs4pings();
}

/**
* Send message.
* @param string $payload Message to send.
Expand Down Expand Up @@ -233,7 +242,6 @@ public function receive()
return $return;
}


/* ---------- Connection functions ----------------------------------------------- */

/**
Expand Down Expand Up @@ -442,11 +450,11 @@ function ($key, $value) {
}

$keyAccept = trim($matches[1]);
$expectedResonse = base64_encode(
$expectedResponse = base64_encode(
pack('H*', sha1($key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))
);

if ($keyAccept !== $expectedResonse) {
if ($keyAccept !== $expectedResponse) {
$error = 'Server sent bad upgrade response.';
$this->logger->error($error);
throw new ConnectionException($error);
Expand Down
165 changes: 143 additions & 22 deletions lib/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ class Connection implements LoggerAwareInterface
private $read_buffer;
private $msg_factory;
private $options = [];
private $pings = 0;
private $pongs = 0;
private $nbstat = [
'state' => 0,
'data' => '',
'dlen' => 0,
'data0' => '',
'data1' => '',
'data2' => '',
'data3' => '',
];

protected $is_closing = false;
protected $close_status = null;
Expand All @@ -42,6 +53,9 @@ public function __construct($stream, array $options = [])
$this->setOptions($options);
$this->setLogger(new NullLogger());
$this->msg_factory = new Factory();
if (false == $this->options['blocking']) {
stream_set_blocking($this->stream, $this->options['blocking']);
}
}

public function __destruct()
Expand Down Expand Up @@ -97,6 +111,9 @@ public function close(int $status = 1000, string $message = 'ttfn'): void
// Push a message to stream
public function pushMessage(Message $message, bool $masked = true): void
{
if ('ping' == $message->getOpcode()) { /* keep count */
$this->pings++;
}
$frames = $message->getFrames($masked, $this->options['fragment_size']);
foreach ($frames as $frame) {
$this->pushFrame($frame);
Expand All @@ -118,6 +135,8 @@ public function pullMessage(): Message

if ($opcode == 'close') {
$this->close();
} else if ('pong' == $opcode) {
$this->pongs++;
}

// Continuation and factual opcode
Expand Down Expand Up @@ -156,15 +175,24 @@ public function pullMessage(): Message
return $message;
}


/* ---------- Frame I/O methods -------------------------------------------------- */

// Pull frame from stream
private function pullFrame(): array
{
/* take care of non-blocking case */
$nbret = [true/*final*/, ''/*payload*/, 'text'/*opcode*/, false/*masked*/];

// Read the fragment "header" first, two bytes.
$data = $this->read(2);
list ($byte_1, $byte_2) = array_values(unpack('C*', $data));
if (0 == $this->nbstat['state']) {
$this->nbstat['data0'] = $this->read(2);
if (strlen($this->nbstat['data0']) < 2) {
return $nbret;
}
// print ("header:" . bin2hex($this->nbstat['data0']) . "\n");
$this->nbstat['state'] = 1;
}
list ($byte_1, $byte_2) = array_values(unpack('C*', $this->nbstat['data0']));
$final = (bool)($byte_1 & 0b10000000); // Final fragment marker.
$rsv = $byte_1 & 0b01110000; // Unused bits, ignore

Expand All @@ -186,33 +214,62 @@ private function pullFrame(): array
// Payload length
$payload_length = $byte_2 & 0b01111111;

if ($payload_length > 125) {
if ($payload_length === 126) {
$data = $this->read(2); // 126: Payload is a 16-bit unsigned int
$payload_length = current(unpack('n', $data));
} else {
$data = $this->read(8); // 127: Payload is a 64-bit unsigned int
$payload_length = current(unpack('J', $data));
if (1 == $this->nbstat['state']) {
if ($payload_length > 125) {
if ($payload_length === 126) {
$this->nbstat['data1'] = $this->read(2); // 126: Payload is a 16-bit unsigned int
if (strlen($this->nbstat['data1']) < 2) {
return $nbret;
}
$payload_length = current(unpack('n', $this->nbstat['data1']));
} else {
$this->nbstat['data1'] = $this->read(8); // 127: Payload is a 64-bit unsigned int
if (strlen($this->nbstat['data1']) < 8) {
return $nbret;
}
$payload_length = current(unpack('J', $this->nbstat['data1']));
}
}
$this->nbstat['state'] = $masked ? 2 : 3;
} else { /* if entered again, I need to restore the $payload_length */
if ($payload_length > 125) {
if ($payload_length === 126) {
$payload_length = current(unpack('n', $this->nbstat['data1']));
} else {
$payload_length = current(unpack('J', $this->nbstat['data1']));
}
}
}

// Get masking key.
if ($masked) {
$masking_key = $this->read(4);
if (2 == $this->nbstat['state']) {
$this->nbstat['data2'] = $this->read(4);
if (strlen($this->nbstat['data2']) < 2) {
return $nbret;
}
$this->nbstat['state'] = 3;
}
$masking_key = $this->nbstat['data2'];

// Get the actual payload, if any (might not be for e.g. close frames.
if ($payload_length > 0) {
$data = $this->read($payload_length);
if (3 == $this->nbstat['state']) {
if ($payload_length > 0) {
$this->nbstat['data3'] .= $this->read($payload_length - strlen($this->nbstat['data3']));
if (strlen($this->nbstat['data3']) < $payload_length) {
return $nbret;
}

if ($masked) {
// Unmask payload.
for ($i = 0; $i < $payload_length; $i++) {
$payload .= ($data[$i] ^ $masking_key[$i % 4]);
if ($masked) {
// Unmask payload.
for ($i = 0; $i < $payload_length; $i++) {
$payload .= ($this->nbstat['data3'][$i] ^ $masking_key[$i % 4]);
}
} else {
$payload = $this->nbstat['data3'];
}
} else {
$payload = $data;
}
$this->nbstat['state'] = 0;
$this->nbstat['data3'] = '';
}

$this->logger->debug("[connection] Pulled '{opcode}' frame", [
Expand Down Expand Up @@ -263,6 +320,8 @@ private function pushFrame(array $frame): void
$data .= $payload;
}

// print("pushFrame:" . $frame[2] . " final:" . ($frame[0] ? "true" : "false") . " len:" . strlen($frame[1]) . " datalen:" . strlen($data) . " data:" . bin2hex($data) . "\n");

$this->write($data);

$this->logger->debug("[connection] Pushed '{$opcode}' frame", [
Expand Down Expand Up @@ -437,22 +496,71 @@ public function getLine(int $length, string $ending): string
*/
public function gets(int $length): string
{
stream_set_blocking($this->stream, true);

$line = fgets($this->stream, $length);
if ($line === false) {
$this->throwException('Could not read from stream');
}
$read = strlen($line);
$this->logger->debug("Read {$read} bytes of line.");

stream_set_blocking($this->stream, $this->options['blocking']);

return $line;
}

/**
* Non-blocking Read characters from stream.
* @param int $length Maximum number of bytes to read
* @return string when full data was read else will return an empty string
*/
private function nbread(int $len = 1): string
{
$data = '';
if ($this->nbstat['dlen'] < $len) {
$rdat = fread($this->stream, $len - $this->nbstat['dlen']);
if ($rdat) {
$this->nbstat['data'] .= $rdat;
$this->nbstat['dlen'] += strlen($rdat);
} else {
$meta = stream_get_meta_data($this->stream);
if (!empty($meta['timed_out'])) {
$message = 'Client read timeout';
$this->logger->error($message, $meta);
throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
}
}
}
if ($this->nbstat['dlen'] < $len)
return $data;
/* success, obtained required data */
$data = $this->nbstat['data'];
$this->nbstat['data'] = '';
$this->nbstat['dlen'] = 0;
return $data;
}

/**
* Returns true when equal pongs received for pings
*/
public function pongs4pings(): bool
{
return ($this->pings > 0) && ($this->pings == $this->pongs);
}

/**
* Read characters from stream.
* @param int $length Maximum number of bytes to read
* @return string Read data
*/
public function read(string $length): string
public function read(int $length): string
{
/* non-blocking */
if (false == $this->options['blocking']) {
return $this->nbread($length);
}
/* blocking */
$data = '';
while (strlen($data) < $length) {
$buffer = fread($this->stream, $length - strlen($data));
Expand All @@ -475,6 +583,7 @@ public function read(string $length): string
$read = strlen($data);
$this->logger->debug("Read {$read} of {$length} bytes.");
}

return $data;
}

Expand All @@ -485,7 +594,19 @@ public function read(string $length): string
public function write(string $data): void
{
$length = strlen($data);
$written = fwrite($this->stream, $data);
$loopcnt = 0;
do {
$length = strlen($data);
$written = fwrite($this->stream, $data);
if ((false == $this->options['blocking']) && ($written < $length)) {
/* when non-blocking, less data may be written, try few more times */
usleep(1000);
$data = substr($data, -($length-$written));
$loopcnt++;
// if ($loopcnt > 20) break;
continue;
}
} while ($written < $length);
if ($written === false) {
$this->throwException("Failed to write {$length} bytes.");
}
Expand Down
1 change: 1 addition & 0 deletions lib/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Server implements LoggerAwareInterface
'port' => 8000,
'return_obj' => false,
'timeout' => null,
'blocking' => true,
];

protected $port;
Expand Down
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"devDependencies": {
"websocket": "^1.0.35",
"ws": "^8.19.0"
}
}
8 changes: 8 additions & 0 deletions phpunit-integration.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" bootstrap="tests/bootstrap-integration.php" colors="true" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
<testsuites>
<testsuite name="Integration tests">
<file>tests/ClientIntegrationTest.php</file>
</testsuite>
</testsuites>
</phpunit>
2 changes: 2 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
<testsuites>
<testsuite name="Unit tests">
<directory suffix=".php">tests</directory>
<exclude>tests/ClientIntegrationTest.php</exclude>
<exclude>tests/bin</exclude>
</testsuite>
</testsuites>
</phpunit>
Loading