yar笔记


Yar 是什么

Yar是并行的RPC框架(Concurrent RPC framework)。

  • 支持多种打包协议(msgpack, json, php)。
  • 并行rpc调用

相关链接

yar安装

windows版本下载对应的扩展放到ext目录并更新php.ini:

[yar]
extension=php_yar.dll

Linux版本下载扩展的源码进行编译,将编译出来的so动态库放到extensions目录(例如/usr/local/php/lib/php/extensions/no-debug-non-zts-20151012/)并更新php.ini:

[yar]
extension=/usr/local/php/lib/php/extensions/no-debug-non-zts-20151012/yar.so

PECL扩展

pecl install msgpack
pecl install yar

Linux中编译Yar

$phpize
$configure --with-php-config=/path/to/php-config/ --enable-msgpack
$make && make install

$/path/to/phpize
$./configure --with-php-config=/path/to/php-config/
$make && make install

Yar使用

Server端示例:

<?php

class YarDemo
{
    /**
     * @param $msg 消息
     * @param string $reserve 保留
     * @return string 返回值
     */
    public function demo($msg, $reserve='')
    {
        return 'YarDemo->demo:' . $msg . $reserve;
    }
}

$service = new Yar_Server(new YarDemo);
$service->handle();

Client端示例:

  • Synchronous call(同步调用)
$client = new Yar_Client('http://laravel.pythonschoolrc.com/rpc/rpc.php');
$client->SetOpt(YAR_OPT_CONNECT_TIMEOUT, 1000);
$result = $client->demo('<h1>msg</h1>');
var_dump($result);
  • 并行化调用
<?php

function callback($retval, $callinfo)
{
    var_dump($retval);
}

function error_callback($type, $error, $callinfo)
{
    error_log($error);
}

function demo_1($msg)
{
    echo $msg;
}

Yar_Concurrent_Client::call("http://laravel.pythonschoolrc.com/rpc/rpc.php", "demo", ["parameters", " reserve1"], "callback");
// if the callback is not specificed,
// callback in loop will be used
Yar_Concurrent_Client::call("http://laravel.pythonschoolrc.com/rpc/rpc.php", "demo", ["parameters", " reserve2"]);
Yar_Concurrent_Client::call("http://laravel.pythonschoolrc.com/rpc/rpc.php",
    "demo",
    ["parameters", " reserve3"],
    "callback",
    "error_callback",
    [YAR_OPT_PACKAGER => "json"]);
Yar_Concurrent_Client::call("http://laravel.pythonschoolrc.com/rpc/rpc.php",
    "demo",
    ["parameters", " reserve4"],
    "callback",
    "error_callback",
    [YAR_OPT_TIMEOUT => 1]);
//send the requests,
//the error_callback is optional
Yar_Concurrent_Client::loop("demo_1", "error_callback");

Yar为了方便开发, 把文档和接口绑定到了一起, 对于上面的例子, 如果我们是简单的GET请求这个接口地址的话, 我们就会看到如下的信息页面:

这样, 我们可以在注释中,把接口的信息标注好, 就可以让文档和接口在一起了.

并行化调用

Server端

<?php
class API {
    /**
     * the doc info will be generated automatically into service info page.
     * @params
     * @return
     */
    public function api($parameter, $option = "foo") {
    }

    protected function client_can_not_see() {
    }
}

$service = new Yar_Server(new API());
$service->handle();
?>

Client端串行调用

<?php
$client = new Yar_Client("http://host/api/");
$result = $client->api("parameter);
?>

并行化调用

<?php
function callback($retval, $callinfo) {
     var_dump($retval);
}

Yar_Concurrent_Client::call("http://host/api/", "api", array("parameters"), "callback");
Yar_Concurrent_Client::call("http://host/api/", "api", array("parameters"), "callback");
Yar_Concurrent_Client::call("http://host/api/", "api", array("parameters"), "callback");
Yar_Concurrent_Client::call("http://host/api/", "api", array("parameters"), "callback");
Yar_Concurrent_Client::loop(); //send
?>

这样, 所有的请求会一次发出, 只要有任何一个请求完成, 回调函数”callback”就会被立即调用.

这里还有一个细节, Yar见缝插针的不会浪费任何时间, 在这些请求发送完成以后, Yar会调用一次callback, 和普通的请求返回回调不同, 这次的调用的$callinfo参数为空.

这样一来, 我们就可以先发送请求, 然后再第一次回调, 继续做我们当前进程的工作, 等所有工作结束以后, 再交给Yar去获取并行RPC的响应.

<?php
function callback($retval, $callinfo) {
    if ($callinfo == NULL) {
       //做本地的逻辑
       return TRUE;
    }

     //RPC请求返回, 返回值在$retval
}

有了这些, 我们就可以把一个Web应用中, 多个数据源并行处理, 从而也能把这些逻辑解耦, 分开部署…

示例

server端

<?php

class API {
    /**
     * the doc info will be generated automatically into service info page.
     * @params
     * @return
     */
    public function test1() {
        sleep(1);
        return 'test1';

    }

    public function test2() {
        sleep(3);
        return 'test2';
    }

}

$service = new Yar_Server(new API());
$service->handle();

Client端

<?php

function callback($retval, $callinfo) {
    //var_dump($retval);

    error_log(time().':callinfo:'.json_encode($callinfo).PHP_EOL, 3, 't.log');

    if ($callinfo == NULL) {
        //做本地的逻辑
        //return TRUE;
        error_log(time().':'.'send req success'.PHP_EOL, 3, 't.log');
    }else{
        error_log(time().':'.$retval.PHP_EOL, 3, 't.log');
    }
}

function callback2($retval, $callinfo) {

}

function error_callback($type, $error, $callinfo) {
    error_log($error);
}

//并行调用:
//1、所有请求发送成功,Yar会调用一次callback,其中$callinfo为null
//2、每个请求执行完成,获取到了结果,也会去调用callback,其中$callinfo不为null
$res = Yar_Concurrent_Client::call("http://laravel.pythonschoolrc.com/rpc/rpc.php", "test1");
$res1 = Yar_Concurrent_Client::call("http://laravel.pythonschoolrc.com/rpc/rpc.php", "test2");
$res2 = Yar_Concurrent_Client::loop("callback", "error_callback");  //send

日志t.log

1493881383:callinfo:null
1493881383:send req success
1493881385:callinfo:{"sequence":1,"uri":"http:\/\/laravel.pythonschoolrc.com\/rpc\/rpc.php","method":"test1"}
1493881385:test1
1493881388:callinfo:{"sequence":2,"uri":"http:\/\/laravel.pythonschoolrc.com\/rpc\/rpc.php","method":"test2"}
1493881388:test2

log验证了yar的执行过程。那么,实际应用中,我们就可以先发送请求, 请求发送完毕, 然后得到第一次回调($callinfo为null), 继续做我们当前进程的工作; 等所有工作结束以后, 再交给Yar去获取并行RPC的响应:

yar构造函数

实际项目里,Server端里为避免每次实例化当前类,可以写个父类:

<?php

/**
 *Yar控制器类
 */
class YarAction{

    /**
     * 架构函数
     * @access public
     */
    public function __construct() {

        //判断扩展是否存在
        if(!extension_loaded('yar'))
            die('yar not support');
        //实例化Yar_Server
        $server     =   new Yar_Server($this);
        // 启动server
        $server->handle();
    }
}

php 安装yar扩展

git:https://github.com/laruence/yar

先克隆 如果没有 git 需要先安装

yum install git

然后 克隆

git clone https://github.com/laruence/yar.git

然后 进入yar 目录

cd yar

开始编译安装

phpize
 ./configure --with-php-config=/usr/bin/php-config
make && make install

然后把扩展添加到 php配置文件中

cd /etc/php.d
vim yar.ini 怎么感觉有问题应该是php.ini吧

写入:

extension=yar.so

保存退出。

重启php nginx

nginx -s reload

RPC调用的流程

  • 1)服务消费方(client)调用以本地调用方式调用服务;
  • 2)client stub接收到调用后负责将方法、参数等组装成能够进行网络传输的消息体;
  • 3)client stub找到服务地址,并将消息发送到服务端;
  • 4)server stub收到消息后进行解码;
  • 5)server stub根据解码结果调用本地的服务;
  • 6)本地服务执行并将结果返回给server stub;
  • 7)server stub将返回结果打包成消息并发送至消费方;
  • 8)client stub接收到消息,并进行解码;
  • 9)服务消费方得到最终结果。