中年engineerの独り言 - crumbjp

LinuxとApacheの憂鬱

zeromqその1

zeromqを弄る機会があったのでメモと
 Guideのレシピに載っていない使い方の紹介など。。

環境設定

zeromqインストール

 色々な人が記事を書いてくれているので省略。
 ./configure はノーオプションでビルドしました。

PHPバインディング(php-zmq)インストール
  • ビルド
git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure --with-zmq=/usr/local --with-php-config=/usr/local/bin/php-config
make
sudo make install
  • php.iniに以下の行を追加
 extension=zmq.so

eromqの肝(自分が苦しんだ部分)

  • ソケットパターン(REQ,REP,等)によってそれぞれ固有のメッセージフォーマットがある。
  • メッセージフォーマットはソケットパターンによって固有。これらはマルチパートで構成されている。
  • 例えば、、
    • REQでsend("FOO")とすると、先頭パートに""が追加され、実際のメッセージは["","FOO"]となって送信される。
    • recv()に関しても同様。パートの追加/削除が行われる。
  • REQ/REP等、自然に通信できる組み合わせはこのメッセージフォーマットが合っている。
    • REPのrecv()は""が現れる迄のパートを全て削る。結果、REQから送信された["","FOO"]は"FOO"となる。
  • メッセージフォーマットが合わないと例外。PHPバインディングはcatchしてくれていない様で容赦なくcoredump(orz...)
    • REQ/XREPで簡単に起きる。
  • 逆に言うとメッセージフォーマットさえ合わせてやればREQ/XREPでも通信可能。
    • XREPの送信の際にsend("",SNDMORE);send("MSG");って感じ。
  • 以上の事はこのスレッドを読んで霧が晴れたように理解できた
  • より詳細は本家Guideを参照してください。
  • グニャラくんのwktk運営日記がよく纏まってるかも。。
  • メッセージパートを弄りきる覚悟が決まれば柔軟なソケットパターンが使い易くなってくる。

というわけで

viva XREP!!

なぜ?

  • XREQ,XREPはrecv()の際、パート削除を行わない。(送られて来たデータが全て見える)
  • これらは送信の際に余計なパート追加を行わない。(どのソケットタイプが相手でも合わせられる)
  • zeromqはロードバランシングが今一なので、XREQよりも送信先を指定できるXREPの方が応用を利かせやすい。
    • zeromqのLBは今後の課題なのでしょう。RRしか出来ないし、connect側でのLBの挙動が酷過ぎる。

宛先指定通信

CLIENT側
== client.php ==
<?php
// 手抜き1:本当はrecv()の際先頭パートを見て誰から来たかの特定が必要
$context = new ZMQContext();
$client  = $context->getSocket(ZMQ::SOCKET_XREP);
$client->bind("tcp://127.0.0.1:4444");
sleep(1);
// WORKER1と通信
for ( $i = 0 ; $i < 10 ; $i++ ) {
  $client->send("WORKER1",ZMQ::MODE_SNDMORE); // 先頭パート:送信先
  $client->send("SEND TO WORKER1 $i");        // 第2パート:メッセージパート
  $from = $client->recv();
  if ( $client->getSockOpt(ZMQ::SOCKOPT_RCVMORE) ) {
    $r = $client->recv(); // 手抜き2:最終パートがメッセージなので・・・
  }
  var_dump($r);
  usleep(300000);
}
// WORKER2と通信
for ( $i = 0 ; $i < 10 ; $i++ ) {
  $client->send("WORKER2",ZMQ::MODE_SNDMORE);
  $client->send("SEND TO WORKER2 $i");
  $from = $client->recv();
  if ( $client->getSockOpt(ZMQ::SOCKOPT_RCVMORE) ) {
    $r = $client->recv();
  }
  var_dump($r);
  usleep(300000);
}
WORKER1
== worker1.php ==
<?php
$context = new ZMQContext();
$worker  = $context->getSocket(ZMQ::SOCKET_XREP);
$worker->setSockOpt(ZMQ::SOCKOPT_IDENTITY,"WORKER1");//自分の名前を指定
$worker->connect("tcp://127.0.0.1:4444");
sleep(1);

$recvMsg = array();
while ( true ) {
  $r = $worker->recv();
  var_dump($r);
  // 最終パート(メッセージ)以外は返信時に送り返すので保存しておく
  $recvMsg[]=$r;
  if ( $worker->getSockOpt(ZMQ::SOCKOPT_RCVMORE) ) {
    continue;
  }
  // 最終パート( メッセージ)を除く
  $msg = array_pop($recvMsg);
  var_dump($msg);
  // メッセージフォーマットを送り返す
  foreach($recvMsg as $head){
    $worker->send($head,ZMQ::MODE_SNDMORE);
  }
  $worker->send("WORKER1 REPLY : $r");    // 最終パート:メッセージパート
  $recvMsg = array(); // リセット
}
WORKER2
<?php
== worker2.php ==
// ほとんどworker1.phpと同じです
$context = new ZMQContext();
$worker  = $context->getSocket(ZMQ::SOCKET_XREP);
$worker->setSockOpt(ZMQ::SOCKOPT_IDENTITY,"WORKER2"); // 名前はWORKER2
$worker->connect("tcp://127.0.0.1:4444");
sleep(1);

$recvMsg = array();
while ( true ) {
  $r = $worker->recv();
  $recvMsg[]=$r;
  if ( $worker->getSockOpt(ZMQ::SOCKOPT_RCVMORE) ) {
    continue;
  }
  $msg = array_pop($recvMsg);
  var_dump($msg);
  foreach($recvMsg as $head){
    $worker->send($head,ZMQ::MODE_SNDMORE);
  }
  $worker->send("WORKER2 REPLY : $r");    // メッセージもWORKER2
  $recvMsg = array(); // リセット
}
実行結果(CLIENT側)
string(33) "WORKER1 REPLY : SEND TO WORKER1 0"
string(33) "WORKER1 REPLY : SEND TO WORKER1 1"
string(33) "WORKER1 REPLY : SEND TO WORKER1 2"
string(33) "WORKER1 REPLY : SEND TO WORKER1 3"
string(33) "WORKER1 REPLY : SEND TO WORKER1 4"
string(33) "WORKER1 REPLY : SEND TO WORKER1 5"
string(33) "WORKER1 REPLY : SEND TO WORKER1 6"
string(33) "WORKER1 REPLY : SEND TO WORKER1 7"
string(33) "WORKER1 REPLY : SEND TO WORKER1 8"
string(33) "WORKER1 REPLY : SEND TO WORKER1 9"
string(33) "WORKER2 REPLY : SEND TO WORKER2 0"
string(33) "WORKER2 REPLY : SEND TO WORKER2 1"
string(33) "WORKER2 REPLY : SEND TO WORKER2 2"
string(33) "WORKER2 REPLY : SEND TO WORKER2 3"
string(33) "WORKER2 REPLY : SEND TO WORKER2 4"
string(33) "WORKER2 REPLY : SEND TO WORKER2 5"
string(33) "WORKER2 REPLY : SEND TO WORKER2 6"
string(33) "WORKER2 REPLY : SEND TO WORKER2 7"
string(33) "WORKER2 REPLY : SEND TO WORKER2 8"
string(33) "WORKER2 REPLY : SEND TO WORKER2 9"

という訳で動いています。

しかしこれでは、幾つか問題があります。

  1. WORKER1,WORKER2が冗長に組めません。
  2. CLIENTは一人ではないかも。

普通のソケットプログラムではここからの対応が大変なのですが
ここからが柔軟さがzeromqの真骨頂です。

長くなったので次回に続く・・・