使用 Beanstalk 实现微信支付的异步通知
编辑于 2021-09-25 13:17:38 阅读 1434
Beanstalk介绍
Beanstalk是一个基于内存的(binlog持久化到硬盘),事件驱动(libevent),简单、快速的任务队列,支持大部分编程语言,将前台的任务转为后台异步处理,为web开发提供更高弹性。它可以支持多个server(客户端支持),一个任务只会被投递到一台server,一个任务只会被一个消费者获取(Reverse)。
使用Beanstalk任务队列提升PHP异步处理能力,降低程序耦合度,使前台更专注,后台处理耗时、扩展性任务(也可以使用其他语言开发),使得web架构更具扩展性。
相比RabbitMQ,Beanstalk作为一个任务队列,设计比较简单,支持以下特性:
- 优先级(priority),可以对任务进行优先处理(或降级),越小的值优先级越高(0~4,294,967,295),默认按先进先出(FIFO)
- 延迟执行(delay),一个任务创建完成并稍后再执行(比如等待主从同步)
- 超时重试(TTR),一个任务没有在指定时间内完成,将会被重新投递,由其他客户端处理。客户端也可以主动进行延时(touch)或重新入队(release)
- 隐藏(bury),一个任务执行失败了,可以先隐藏,隐藏的任务可以被重新激活(kick).
应用场景
对接过微信支付的应该会知道,用户支付成功后,微信会给我们发一个异步通知,如果我们没有正确处理,这个通知会发多次,直到我们返回正确的标识。
今天我们就用 Beanstalk 实现一下这个通知(通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h - 总计 24h4m)
先看下结果,如下图,15s/15s/30s/3m都是正常的,第10m出现了1s误差,这个也算正常。后面的就不展示了,时间太长
目录结构
测试
composer up -d
访问 producer.php,向队列中推一条任务
执行 php consumer.php,结果如上图
代码
docker-compose.yml
version: '3'
networks:
web-network:
services:
docker-beanstalkd:
# registry.cn-hangzhou.aliyuncs.com/cuiw/beanstalkd:20210923
image: "beanstalkd:20210923"1️⃣
restart: always
tty: true
volumes:
- ./beanstalkd/data:/var/lib/beanstalkd
ports:
- "11300:11300"
networks:
- web-network
composer.json
{
"require": {
"pda/pheanstalk": "^4.0"
}
}
beanstalkd.php
<?php
require __DIR__ . '/../../vendor/autoload.php';
use Pheanstalk\Pheanstalk;
class beanstalkd{
public $conf=[
'host'=>'docker-beanstalkd',
'port'=>11300,
'timeout'=>10,
];
public static function factory(){
return new self();
}
public function handle(): Pheanstalk {
return Pheanstalk::create($this->conf['host'], $this->conf['port'], $this->conf['timeout']);
}
}
producer.php
<?php
require './beanstalkd.php';
$pheanstalk = beanstalkd::factory()->handle();
$re=$pheanstalk
->useTube('testtube')
->put(
json_encode(['test' => 'data'], JSON_UNESCAPED_UNICODE),
1024,
30,
60
);
echo json_encode(['id'=>$re->getId(), 'data'=>json_decode($re->getData(), true)], JSON_UNESCAPED_UNICODE);
consumer.php
<?php
require './beanstalkd.php';
$pheanstalk = beanstalkd::factory()->handle();
$pheanstalk->watch('testtube');
while (true) {
$job = $pheanstalk->reserve();
//echo $job->getData().PHP_EOL;
//处理任务
exec('php result.php', $re, $status);
$data=json_decode($re[0], true);
if ($data['err']==0) {
//删除任务
$pheanstalk->delete($job);
} else {
$stats = $pheanstalk->statsJob($job);
echo date("Y-m-d H:i:s").':'.json_encode($stats)."\n".PHP_EOL;
if ($stats['releases'] >=0 && $stats['releases'] <15) {
//15次以下延时返回队列,通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h - 总计 24h4m
$timer = [15,15,30,180,600,1200,1800,1800,1800,3600,10800,10800,10800,21600,21600];
$pheanstalk->release($job, 0, $timer[$stats['releases']]);
}else{
//错误次数过多时 bury
$pheanstalk->bury($job);
}
}
}
result.php
<?php
//处理过程,err==0为成功,
echo json_encode(['err'=>1, 'data'=>[]]);
其他
1️⃣ 构建 beanstalkd 容器 我已经build一个并上传到阿里云,可以直接使用:registry.cn-hangzhou.aliyuncs.com/cuiw/beanstalkd:20210923
git clone git@github.com:beanstalkd/beanstalkd.git
cd beanstalkd
docker build -t beanstalkd:20210923 .