activeMQ+stomp+php实现消息队列

一、ActiveMQ的安装与配置

1、安装JDK

2、安装ActiveMQ

wget http://mirror.esocc.com/apache/activemq/apache-activemq/5.8.0/apache-activemq-5.8.0-bin.tar.gz

3、配置ActiveMQ,使其支持stomp

在配置文件/usr/local/activemq/conf/activemq.xml,添加

<transportConnectors>
<!– DOS protection, limit concurrent connections to 1000 and frame size to 100MB –>
<transportConnector name=”openwire” uri=”tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600″/>
<transportConnector name=”amqp” uri=”amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600″/>
<transportConnector name=”stomp” uri=”stomp://localhost:61613″/>
</transportConnectors>

4、启动ActiveMQ

/usr/local/activemq/bin/activemq start

二、安装php的stomp扩展

查看最新的stomp

http://pecl.php.net/package/stomp

wget http://pecl.php.net/get/stomp-1.0.5.tgz

tar -zxf  stomp-1.0.5.tgz

/usr/bin/phpize5

./configure –enable-stomp –with-php-config=/usr/bin/php-config5

make & make install

在php.ini中添加

extension = stomp.so

三、php—实现定时从消息队列里取出数据

从队列里取出数据(此程序以守护进行的方式运行),代码如下:

<?php
/**
* ActiveMQ Client
*
* @author yiluxiangbei<2498038528@qq.com>
*/

class MessageQueueApp extends BaseAppEx
{

/**
* Default prefetch size
*
* @var int
*/
public $prefetchSize = 1000;
/**
* sleep of time (unit sencond)
*
* @var int
*/
private $_sleepIntval = 60;

private $_stomp    = null;

private $_mysqli   = null;

public function run()
{
while(true){
try{
//connecting database
try {
} catch (Exception $ex) {
die(‘MySQL connection failed: ‘.$ex->getMessage());
}
//connecting activemq
try {
$this->_stomp = new Stomp($this->conf[‘MessageQueue’][‘stompUri’]);
} catch (StompException $e) {
die(‘Connection failed: ‘.$e->getMessage());
}

$this->_stomp->subscribe($this->conf[‘MessageQueue’][‘queueUri’], array(‘activemq.prefetchSize’ => $this->prefetchSize));

$frame = null;
$data = array();
while (TRUE == $this->_stomp->hasFrame())
{
$temp = array();
$frame = $this->_stomp->readFrame();
if (FALSE !== $frame)
{
//file_put_contents(ROOT_PATH.’/cache/MQ’, $frame->body.”\n”, FILE_APPEND);
//operate dataBase
$temp = json_decode($frame->body, true);

$data = array_merge($data, $temp);

$this->_stomp->ack($frame);

} else {
continue;
}
}

$result = $this->_handleData1($data);

}catch (Exception $e){
echo “Exception is: “.$e->getMessage().”\n\n”;
}
echo “Statistic Over, {$this->_sleepIntval} seconds to next statistics..\n\n”;
$this->_stomp->unsubscribe($this->conf[‘MessageQueue’][‘queueUri’]);
unset($this->_stomp);
$this->_mysqli->close();
sleep($this->_sleepIntval);
}
}

/*
* data to store(The three table is not related)
*
* @param data array
* @return result bool
*/
private function _handleData1($data)
{
$insert_1 = “INSERT INTO `msg1`(`info`, `time`) VALUES”;
$insert_2 = “INSERT INTO `msg2`(`relateId`, `info`, `time`) VALUES”;
$insert_3 = “INSERT INTO `msg3`(`relateId`, `info`, `time`) VALUES”;
$count = count($data);

if (!empty($data)) {
$i = 0;
foreach ($data as $randId => $val) {
if ($i == 0) {
$insert_1 = $insert_1.”(‘{$val[0][0]}’, ‘{$val[0][1]}’)”;
$insert_2 = $insert_2.”(2, ‘{$val[1][0]}’, ‘{$val[1][1]}’)”;
$insert_3 = $insert_3.”(3, ‘{$val[2][0]}’, ‘{$val[2][1]}’)”;
} else {
$insert_1 = $insert_1.”,(‘{$val[0][0]}’, ‘{$val[0][1]}’)”;
$insert_2 = $insert_2.”,(2, ‘{$val[1][0]}’, ‘{$val[1][1]}’)”;
$insert_3 = $insert_3.”,(3, ‘{$val[2][0]}’, ‘{$val[2][1]}’)”;
}

$i++;
}
}
//echo $insert_1,PHP_EOL;
//echo $insert_2,PHP_EOL;
//echo $insert_3,PHP_EOL;
$success = TRUE;
$this->_mysqli->autocommit(0);

$this->_mysqli->query($insert_1);
echo $this->_mysqli->affected_rows,PHP_EOL;
if ($count != $this->_mysqli->affected_rows) {
$success = FALSE;
}

$this->_mysqli->query($insert_2);
echo $this->_mysqli->affected_rows,PHP_EOL;
if ($count != $this->_mysqli->affected_rows) {
$success = FALSE;
}

$this->_mysqli->query($insert_3);
echo $this->_mysqli->affected_rows,PHP_EOL;
if ($count != $this->_mysqli->affected_rows) {
$success = FALSE;
}

if ($success) {
$this->_mysqli->commit();
} else {
$this->_mysqli->rollback();
}

$this->_mysqli->autocommit(1);

}

/*
* data to store(The three table is related)
*
* @param data array
* @return result bool
*/
private function _handleData($data)
{
$this->_mysqli->commit();
} else {
$this->_mysqli->rollback();
}

$this->_mysqli->autocommit(1);

}

/*
* data to store(The three table is related)
*
* @param data array
* @return result bool
*/
private function _handleData($data)
{
$success = TRUE;

$this->_mysqli->autocommit(0);

$insert_1 = “INSERT INTO `msg1`(`info`, `time`) VALUES(‘{$data[0][0]}’, ‘{$data[0][1]}’)”;
$result1 = $this->_mysqli->query($insert_1);
if (!$result1 || $this->_mysqli->affected_rows!=1) {
$success = FALSE;
}

$relateId = $this->_mysqli->insert_id;

$insert_2 = “INSERT INTO `msg2`(`relateId`, `info`, `time`) VALUES({$relateId},'{$data[1][0]}’,'{$data[1][1]}’)”;
$result2  = $this->_mysqli->query($insert_2);
if (!$result2 || $this->_mysqli->affected_rows!=1) {
$success = FALSE;
}

$insert_3 = “INSERT INTO `msg3`(`relateId`, `info`, `time`) VALUES({$relateId},'{$data[2][0]}’,'{$data[2][1]}’)”;
$result3  = $this->_mysqli->query($insert_3);
if (!$result3 || $this->_mysqli->affected_rows!=1) {
$success = FALSE;
}

if ($success) {
$this->_mysqli->commit();
} else {
$this->_mysqli->rollback();
}

$this->_mysqli->autocommit(1);

return $success;
}
}
?>

将数据插入队列中

<?php
class DebugApp extends BaseAppEx
{
/**
* test
*/

public function index()
{

$randId = uniqid();
$time = date(‘Y-m-d H:i:s’, time());

$data = array();

$data[$randId][0] = array(‘msg1’, $time);
$data[$randId][1] = array(‘msg2’, $time);
$data[$randId][2] = array(‘msg3’, $time);

$data = json_encode($data);

$this->_sendMQ($data);
}
private function _sendMQ($data)
{
try {
$stomp = new Stomp($this->conf[‘MessageQueue’][‘stompUri’]);
} catch (StompException $e) {
//die(‘Connection failed: ‘.$e->getMessage());
throw new Exception($e->getMessage());
}

if (empty($data)) {
throw new Exception(‘Parameter must not be empty!’);
}

$isSucc = $stomp->send($this->conf[‘MessageQueue’][‘queueUri’], $data, array(‘persistent’ => ‘true’));
if (false == $isSucc)
{
throw new Exception(“$data send failed!”);
}

unset($stomp);
}
}
?>

四、参考资料

http://activemq.apache.org/

http://activemq.apache.org/version-5-getting-started.html

http://activemq.apache.org/examples.html

http://activemq.apache.org/contributing.html

ActiveMQ+In+Action.pdf

gearman + php

向一个机器添加Gearman需要两步:

1.构建并启动这个守护进程

2.构建与php(或Python等)版本相匹配的PHP扩展。

我安装版本是Gearman 1.1.5:

1. 安装依赖包:

sudo apt-get update

sudo apt-get upgrade
sudo apt-get install gcc autoconf bison flex libtool make libboost-all-dev libcurl4-openssl-dev curl libevent-dev memcached uuid-dev libsqlite3-dev libmysqlclient-dev

2.下载Gearman版本

wget https://launchpad.net/gearmand/1.2/1.1.5/+download/gearmand-1.1.5.tar.gz

3.解压、编译、安装源码包

tar xvzf gearmand-1.1.5.tar.gz
cd gearmand-1.1.5
./configure
make
make install

注:这个过程中可能无法编译成功,这时候要根据个人实际情况,看它报的是什么错,然后根据报错,更新或者安装共享库。

sudo apt-get install ***等。

4.为大多数最新的共享库创建必须的链接和缓存。其中可能报:

error: gearman: error while loading shared libraries: libgearman.so.6: cannot open shared object file: No such file or directory

解决办法:sudo ldconfig

5.通过pecl安装gearman

sudo apt-get install php-pear
sudo pecl install gearman
sudo gedit /etc/php5/cgi/php.ini

注:有些版本的php.ini可能不在/etc/php5/cgi/这个路径下,我的是在/etc/php5/cli/这个路径下。

另外,可能你的机器上没安装php5这个工具,如果没安装的话,可以直接通过在先安装:sudo apt-get install php5

如果上面的方法无法安装gearman的php扩展,可以尝试下下面方法:

$ wget http://pecl.php.net/get/gearman-1.1.1.tgz
$ tar zxvf gearman-1.0.2.tgz
$ cd gearman-1.0.2/
$ phpize
$ make
$ make install
$ sudo echo “extension = gearman.so” > /etc/php5/conf.d/gearman.ini

6. 在php.ini文件末尾添加”extension=gearman.so”

7. 检测扩展是否安装成功

$ php –info | grep “gearman support”
gearman support => enabled

显示出:gearman support => enabled,就表示安装成功啦。

8 测试

1)sudo ldconfig

2)启动gearmand: gearmand -d &

这一步可能会遇到:

启动这个 agent,即 Gearman 守护程序:
/usr/local/sbin/gearmand –daemon
报错:Could not open log file “/usr/local/var/log/gearmand.log”, from “/usr/sbin”, switching to stderr. (No such file or directory)
解决:
mkdir -p /usr/local/var/log/
cd /usr/local/var/log/
touch gearmand.log
再次尝试启动:
/usr/local/sbin/gearmand –daemon
成功运行.查看进程:ps -ef | grep gearmand
root     19390     1  0 17:50 ?        00:00:00 gearmand –daemon
root     19403     1  0 17:54 ?        00:00:00 /usr/local/sbin/gearmand –daemon
root     19406  1556  0 17:54 pts/3    00:00:00 grep gearmand

3)查看gearmand是否在运行:ps auxw | grep [g]earmand

4)检查germand的任务检测端口4730:sudo lsof -i tcp:4730

9.例子:从PHP使用Gearman

=====================================================

从 PHP 使用 Gearman 类似于之前的示例,惟一的区别在于这里是在 PHP 内创建 producer 和 consumer。
每个 consumer 的工作均封装在一个或多个 PHP 函数内。
先用 PHP 编写的一个 Gearman worker。将这些代码保存在一个名为 worker.php 的文件中。
<?php
$worker= new GearmanWorker();
$worker->addServer();
$worker->addFunction(“title”, “title_function”);
while ($worker->work());

function title_function($job)
{
return ucwords(strtolower($job->workload()));
}
?>

再用 PHP 编写的一个 producer,或 client。将此代码保存在一个名为 client.php 的文件内。
<?php
$client= new GearmanClient();
$client->addServer();
print $client->do(“title”, “All The World’s a stage!”);
print “\n”;
?>

现在,可以用如下的命令行连接客户机与 worker 了:
php worker.php &
php client.php
结果:
All The World’s a stage!

10. 对上面的例子中代码的一些分析:

首先, PHP Gearman Extension 提供了一个名为 GearmanClient 的类别,它可以让程式安排工作给 Job Server 。而 addServer 方法表示要通知的是哪些 Job Server ,也就是说如果有多台 Job Server 的话,就可以透过 addServer 新增。然后我们将要呼叫哪个 Worker 以及该 Worker 所需要的资料,利用 GearmanClient 的 doBackground 方法传送过去。 doBackground 方法顾名思义就是在背景执行, Client 在丢出需求后就可以继续处理其他的程式,也就是我们常说的「射后不理」。
doBackground 方法的第一个参数是告诉 Job Server 要执行哪个功能,而这个功能则是由 Worker 提供的;要注意是,这个参数只是识别用的,并不是真正的函式名称。而第二个参数是要传给 Worker 的资料,它必须是个字串;因此如果要传送的是阵列的话,我们就要用 PHP 的 serialize 函式来对这些资料做序列化。
PHP 的 Gearman Extension 也提供了一个 GearmanWorker 类别,让我们可以实作 Worker 。而 GearmanWorker 类别也提供了addServer 方法,让所生成的 Worker 物件可以注册到 Job Server 中。
另外 GearmanWorker 类别也提供了 addFuncton 方法,告诉 Job Server 自己可以处理哪些工作。 addFunction 的第一个参数就是对应到 GearmanClient::doBackground 方法的第一个参数,也就是功能名称;这使得 Client 和 Worker 能透过这个名称来互相沟通。而第二个参数则是一个callback函式,它会指向真正应该要处理该工作的函式或类别方法等。
最后因为 Worker 因为要随时准备服务,是不能被中断的,因此我们透过一个无限迴圈来让它常驻在 Job Server 中。

 

 

 

已守护进程运行worker.php

# nohup php -c /usr/local/php/etc/php.ini worker.php >/dev/null 2>&1 &


这里,有几点需要说明一下:
1、这里直接用php cli方式运行,添加-c参数是为了加载php.ini配置文件,以加载gearman扩展
2、worker应该做成守护进程(CLI模式),可以开启多个,这样client发起的任务就会分发到各个worker分别来执行(自动负载均衡 )
这个例子由于太过简单,即使开启多个worker也无法看出效果,不过可以通过终止其中一个,可以看出系统自动切换到其他worker继续正常执行
3、同理,client也是可以开启多个的(模型请参考之前的那边日志)

4、同时,job也可以开启多个,以避免单点故障