[项目总结] 使用 Redis PUSH-POP 机制实现消息队列

背景

越来越多的传统行业为提高管理效率、优化工作流程和团队组织结构,创业公司 ToB 创业项目引入了 SASS 的概念,用软件来规范和以其传统企业转型升级,以期更快更强地适应商业互联网化改造。17年四月博主以远程的协作方式受雇于一家做二手车 SASS 软件的创业公司,其公司前身曾使用 JAVA 开发出一套适用于车场的交易市场管理系统,另外现阶段维护一套适用于各大小车商多端(PC 和 APP)的车辆从入库到销售跟踪全面覆盖的 SASS 软件。就在八月底,公司和国内某大型汽车集团旗下子公司达成合作,我方把已有的两套系统加以集成和改造适应对方需求。从开工到上线只有一个月,工期如此之短,从技能到身心对团队的考验都是比较大的。

畸形的系统结构

项目的系统结构是:交易市场管理系统管理车场内所有车商的车辆数据及日常运营管理(JAVA 实现的系统),APP 端有车商版和交易市场版,使用的角色分别为车场管理员和车商管理员,一套 Rails 搭建 Rails 服务 的 Web 服务给 APP 提供 API 。基于项目进度和系统架构考虑,交易市场管理系统和 Rails 服务后端使用同一套数据库。两套系统间对应的数据库表如下:

交易市场(ERP) Rails 服务 备注
trade cars 车辆
market companies 公司
agency shop 车商
staff user 用户

交易市场的业务逻辑只是修改自己的数据表,Rails 服务亦然。但是两套系统间对应的表需要同步数据,比如在 Rails 服务的手机端保存车辆的信息草稿,交易市场车辆列表中需要看到新录入车辆的信息,Rails 服务 cars 表产生一条记录,同步程序会在 trade 表产生与 cars 表新建的记录信息一致的记录。除了同步数据之外,消息队列还充当交易市场和 Rails 服务间系统调用的作用,譬如在交易市场的运营管理中新建系统公告,需调用 Rails 服务给 APP 推送消息的功能。

MQ 开源框架之选

实现消息队列的开源软件很多,曾简单使用过 RabbitMQ 作为 Bitcoin-core 和 Web 服务之间消息发送(比特币热钱包中交易被确认的回调消息发送给 Rails 应用)所以它是待选的开源软件之一;接着心想着要不问问前同事会选用什么方式实现,他们提到了 Redis zi自带的 pub/sub 机制,即发布-订阅模式,故加入待选项;最后一种方案是甲方的 CTO 提出来的,他建议使用 Redis 的 PUSH/POP 机制,利用列表(list)数据结构,模式为生产者 lpush 消息,消费者 brpop 消息。综合考虑之后选择了最后一种方式。在知乎上有一个问答很好地解释了使用 Redis 作消息队列的利弊 redis怎么做消息队列?

既然选择了 Redis PUSH-POP 机制sh实现,控制列的 push 和 pop 来实现生产者和消费者也不难,为了不重复造轮子,还是到 Github 搜一把看有没有封装好的 Gem ,我找到了一个叫做 chasqui 的 Gem :

Chasqui adds persistent publish-subscribe (pub-sub) messaging capabilities to Sidekiq and Resque workers.

阅读其源码时发现它也是通过 Redis 的 PUSH-POP 机制给 sidekiq 和 Resque worker 实现了简单的消息队列,和项目的需求很吻合,遂选用之。

集成 chasqui 到 Rails 项目

配置

项目中已有 Sidekiq 作为 background job processing ,只需把处理消息队列的 Sidekiq worker 加到 config/initializers/sidekiq.rb 文件中即可

redis_config = if Rails.env.production?
{ url: ENV["REDIS_URL"], password: ENV["REDIS_PWD"] }
               else
{ url: ENV["REDIS_URL"] }
               end

Sidekiq.configure_server do |config|
  config.average_scheduled_poll_interval = 2
  config.redis = redis_config
end

Sidekiq.configure_client do |config|
  config.redis = redis_config
end

# 把处理消息队列的 worker 绑定到对应的 channel
# worker 作为消费者的角色,监听着 Chasqui 守护进程分发过来的消息
Chasqui.subscribe do
  on 'company_sync', DataSync::CompanyWorker
  on 'shop_sync', DataSync::ShopWorker
  on 'user_sync', DataSync::UserWorker
  on 'car_sync', DataSync::CarWorker
  on 'announcement_sync', DataSync::AnnouncementWorker
  on 'pc_token_sync', DataSync::PcTokenWorker
end

on 在这里的第一个参数是 channel 名称,每种类型的数据我都起了一个 channel 处理;第二参数是普通的 Sidekiq worker ,指定该 worker 处理对应 channel 发送过来的消息。

最后还需要给 chasqui 配置连接 redis 的设置

# config/initializers/chasqui.rb
Chasqui.configure do |c|
  c.redis = if Rails.env.production?
              Redis.new(url: ENV["REDIS_URL"], password: ENV["REDIS_PWD"])
            else
              Redis.new(url: ENV["REDIS_URL"])
            end
end

订阅者和业务逻辑实现

接下来我们只解释交易市场数据同步的过程。在 ERP 中新建一个交易市场记录时(Market),需要发送消息给队列,数据同步程序需要在 Rails 服务新建一条公司记录(companies)和交易市场管理员用户的记录;管理员用户又得在 ERP 的 staff 表和 Rails 服务的 user 表产生两条记录,staffs 表的记录在 ERP 中登录用到( JAVA 实现的交易市场系统)users 表的记录在 Rails 服务的 APP 登录中用到。来看看 CompanyWorker 的代码:

class DataSync::CompanyWorker
include Sidekiq::Worker
sidekiq_options retry: 1, queue: :data_sync

# {"channel"=>"company_sync", "syn_source_id"=>"1", "to"=>"chelaike"}
def perform(body, *args)
  to = body.try(:fetch, "to")
  syn_source_id = body.try(:fetch, "syn_source_id")

  Rails.logger.info ">>>>> company_sync json #{body}"

  syn_source = (to == "chelaike") ? (Erp::Market.find(syn_source_id)) : (Company.find(syn_source_id))
  DataSynService::Company.new(syn_source).execute
end
end

同步数据的 worker 统一使用 data_sync queue ,这中类型的队列只部署在一台应用服务器上。我规范了消息队列的 json 的内容格式,例如

{
"channel"=>"company_sync",
"syn_source_id"=>"1",
"to"=>"chelaike"
}

channel 是消息类型、syn_source_id 是需要同步的数据库记录 ID 、to 为哪个系统需要同步数据;例如在 ERP 中新建交易市场,那么 channel 就是 company_sync , syn_source_id 就是 Market 表中新建的记录 ID,to 的值为 chelaike 表示需要往 Rails 服务系统中同步数据。那么,JAVA 系统中往 Redis 中 push 数据的格式应该是:

redis.lpush 'chasqui:chasqui-inbox',
{
  "channel": "company_sync",
  "syn_source_id": "1",
  "to": "chelaike"
}.to_json

同步程序的业务逻辑抽象出来放在 service 目录下,来看看 service/data_sync/company.rb

module DataSynsService
class Company < Base
  def execute
    syn_obj
  end

  private
  # 需要同步的记录
  # 例如:@syn_source 为 chelaike 的 Company 记录, 则 @obj 为 erp 的 Market 的记录
  def syn_obj
    ActiveRecord::Base.transaction do
      case @syn_source.class.to_s
      when "Company"
        Rails.logger.info "Company sync >>>> chelaike to erp"
        obj = @syn_source.erp_market
        obj.blank? ? (::Erp::Market.create(erp_params)) : (obj.update_attributes(erp_params))
        users = @syn_source.users  # 用来判断是否要同步用户数据
        Rails.logger.info "新建与公司对应的市场成功: #{obj.try(:market_name)}"
      when "Erp::Market"
        Rails.logger.info "Company sync >>>> erp to chelaike"
        obj = @syn_source.chelaike_company
        obj.blank? ? (obj = ::Company.create(chelaike_params)) : (obj.update_attributes(chelaike_params))
        obj.update_attributes(erp_market_id: @syn_source.id) if obj.present?
        staffs = @syn_source.staffs # 用来判断是否要同步用户数据
        Rails.logger.info "新建与市场对应的公司成功: #{obj.try(:name)}"
      end
      DataSynService::SyncWithUser.execute(@syn_source) unless (staffs.present? || users.present?)
    end
  end

  def chelaike_params
    {
      name: @syn_source.market_name,
      contact: @syn_source.market_linkman_name,
      contact_mobile: @syn_source.market_linkman_mobile,
      company_state: @syn_source.market_status,
      erp_market_id: @syn_source.id
    }
  end

  def erp_params
    {
      market_name: @syn_source.name,
      market_linkman_name: @syn_source.contact,
      market_linkman_mobile: @syn_source.contact_mobile,
      market_status: @syn_source.company_state
    }
  end
end
end

其实也很简单,就是两个表之间,根据需要同步的目标系统从源数据中复制数据匹配到对应的字段。以上是交易市场和Rails 服务后端两个系统间 market 和 company 之间的数据同步,数据同步其他 worker 和 service 都大同小异,在这就不一一列举,读者举一反三即可。

chasqui dispatch

在 chasqui 文档中,对 dispatch 的描述如下:

The broker is a ruby daemon that listens for events (messages) published to channels (topics) and forwards those events to registered subscribers. In order to work, your broker must use the same Redis database as your Sidekiq/Resque workers.

意思就是需开启一个守护进程监听生产者往通道发送消息的事件,并把消息转发给于 channel 绑定的对应订阅者 (sidekiq worker) ,并且需要注意的是:** 消费者和生产者连接的必须是同一个 Redis 的同一 DB 。通过命令行(chasqui -r redis://localhost:6379/0)启动 chasqui broker ,但是我发现如果 redis 设置了带有特殊字符的密码,通过 bash 识别不了指令,故把启动 broker 放到了 Rails 的 bin 目录启动,启动脚本如下:

#!/usr/bin/env ruby
require 'chasqui'
require 'redis'
redis_url = "redis://redis_url:port"
redis_pwd = "pwd"
Chasqui.configure do |c|
  c.redis = Redis.new(url: redis_url, password: redis_pwd)
end
Chasqui::Broker.start

以上就是 chasqui 利用 Redis PUSH-POP 实现了一个简单 MQ 的全部,并投入了生产环境。当初选用通过 MQ 实现数据同步也是为项目快速上线的无奈之举,以后应该会有所修改或者弃掉这个功能。

部署之坑

Rails 服务的后端服务使用 capistrano 部署,上文提到 chasqui Broker 启动脚本在 bin 目录下,故业务把启动 broker 的脚本放到 cap 中自动化部署。思路很简单,就是 cd 到项目的根目录再通过 nohup 执行一个 Ruby 可执行文件。但是遇到了 nohup 没执行完 ssh 就退出的问题,具体我已再 Ruby-China 上写出 请问 capistrano 如何执行 Rails 项目 bin 目录下的可执行文件在 deploy.rb 正确的脚本片段如下:

after :restart, :clear_cache do
on roles(:app), in: :groups, limit: 3, wait: 10 do
end

on roles(:chasqui) do
  execute "for i in $( ps ax | awk '/chasqui_start/ {print $1}' ); do kill ${i}; done"
  execute "bash -l -c 'cd #{deploy_to}/current && (nohup rvm use 2.2.4 do ruby bin/chasqui_start 2>&1 &) && sleep 2 && ps -ef | grep chasqui_start' "
end
end

同时还需要把 chasqui 这个 role 放进 config/deploy/production.rb 中:

server "server_a_ip", user: "deploy", roles: %w{app db web lina migration etl car_publisher data_sync chasqui}
server "server_b_ip", user: "deploy", roles: %w{app db web lina migration etl car_publisher}

写在最后,关于创业

创业就是给自己找罪受,利用这次和甲方合作通过技术换取更多的非技术资源。同时也希望团队为期一个月的集中开发功不唐捐。

0 条评论
您想说点什么吗?