zeromqその2
前回のつづき
WORKER側の冗長化
WORKERを直接CLIENTに繋げるのでは無く一段ゲートウェイを噛ませるだけです。
構成のイメージ
|= [WORKER1_GW] <= [WORKER1] | <= [WORKER1] [CLIENT] <= <= [WORKER1] | : |= [WORKER2_GW] <= [WORKER2] <= [WORKER2] <= [WORKER2] :
WORKER1をちょっと変更
== worker1.php == <?php $context = new ZMQContext(); $worker = $context->getSocket(ZMQ::SOCKET_XREP); // 名乗るのを止めます(ゲートウェイに名乗って貰うので) // $worker->setSockOpt(ZMQ::SOCKOPT_IDENTITY,"WORKER1");//自分の名前を指定 // 繋ぎ先をGWへ変更 // $worker->connect("tcp://127.0.0.1:4444"); $worker->connect("ipc:///tmp/worker1"); sleep(1); // 後は一緒 $recvMsg = array(); while ( true ) { $r = $worker->recv(); $recvMsg[]=$r; if ( $worker->getSockOpt(ZMQ::SOCKOPT_RCVMORE) ) { continue; } $msg = array_pop($recvMsg); var_dump($msg); foreach($recvMsg as $head){ $worker->send($head,ZMQ::MODE_SNDMORE); } $worker->send("WORKER1 REPLY : $r"); $recvMsg = array(); }
WORKER1 GATEWAY worker1.phpの代わりにClientに接続
== worker1gw.php == <?php // クライアント側ソケット $side_client = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_XREP); $side_client->setSockOpt(ZMQ::SOCKOPT_IDENTITY,"WORKER1"); // 僕がWORKER1です。 $side_client = $side_client->connect("tcp://127.0.0.1:4444"); // ワーカー側ソケット(RRバランシング) $side_worker = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_XREQ); $side_worker = $side_worker->bind("ipc:///tmp/worker1"); sleep(1); $dev = new ZMQDevice(ZMQ::DEVICE_QUEUE,$side_client,$side_worker);
WORKER2側は省略
たったこれだけで終わりです。
スバラシイ!!
CLIENT側を冗長化
CLIENT側も同様にゲートウェイを噛ませます。
構成のイメージ
[CLIENT] =| |= [WORKER1_GW] <= [WORKER1] [CLIENT] =| | <= [WORKER1] [CLIENT] =| => [CLIENT_GW] <= <= [WORKER1] [CLIENT] =| | : [CLIENT] =| |= [WORKER2_GW] <= [WORKER2] [CLIENT] =| <= [WORKER2] [CLIENT] =| <= [WORKER2] : :
CLIENTをちょっと変更
== client.php == <?php $context = new ZMQContext(); // 送信先を決めるのはゲートウェイになるのでXREQへ変更 // $client = $context->getSocket(ZMQ::SOCKET_XREP); $client = $context->getSocket(ZMQ::SOCKET_XREQ); // 繋ぎ先をゲートウェイへ変更 // $client->bind("tcp://127.0.0.1:4444"); $client->connect("ipc:///tmp/client"); sleep(1); // 後は一緒 for ( $i = 0 ; $i < 10 ; $i++ ) { $client->send("WORKER1",ZMQ::MODE_SNDMORE); $client->send("SEND TO WORKER1 $i"); $from = $client->recv(); if ( $client->getSockOpt(ZMQ::SOCKOPT_RCVMORE) ) { $r = $client->recv(); } var_dump($r); usleep(300000); } for ( $i = 0 ; $i < 10 ; $i++ ) { $client->send("WORKER2",ZMQ::MODE_SNDMORE); $client->send("SEND TO WORKER2 $i"); $from = $client->recv(); if ( $client->getSockOpt(ZMQ::SOCKOPT_RCVMORE) ) { $r = $client->recv(); } var_dump($r); usleep(300000); }
CLIENT GATEWAY client.phpの代わりにworker-gateway(worker)に接続
== clientgw.php == <?php // クライアント側ソケット $context = new ZMQContext(); $side_client = $context->getSocket(ZMQ::SOCKET_XREQ); $side_client->bind("ipc:///tmp/client"); sleep(1); // ワーカー側ソケット(RRバランシング) $side_worker = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_XREP); $side_worker = $side_worker->bind("tcp://127.0.0.1:4444"); sleep(1); $dev = new ZMQDevice(ZMQ::DEVICE_QUEUE,$side_client,$side_worker);
こっち側も大したこと無いですね。
因みにこのゲートウェイ達はZMQDeviceを利用しているのでC言語の世界で動作しているので高速です。(zeroコピーらしい)
こんな感じで結構柔軟に構成変更が出来るのでzeromqはお勧めです。
気が向いたら不満な点も書くかもしれません。
(上でちょっと触れたバランシングの件)
今回のソースはこちら