admin管理员组

文章数量:1516870

前面我们已经实现了[rabbitMQ的helloWorld]参见( ),
这篇中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务。

工作队列的主要任务是:避免立刻执行资源密集型任务,然后必须等待其完成。相反地,我们进行任务调度:我们把任务封装为消息发送给队列。工作进行在后台运行并不断的从队列中取出任务然后执行。当你运行了多个工作进程时,任务队列中的任务将会被工作进程共享执行。
这样的概念在web应用中极其有用,当在很短的HTTP请求间需要执行复杂的任务。

一 准备工作

1.1 发送端

我们使用Thread.sleep来模拟耗时的任务。我们在发送到队列的消息的末尾添加一定数量的点,每个点代表在工作线程中需要耗时1秒,例如hello…将会需要等待3秒。
NewTask.java

package com.gta.goldnock.mq.task;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
publicclassNewTask {
   
   privatefinalstatic String TASK_QUEUE_NAME = "task_queue";
    publicstaticvoidmain(String[] args) throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        //发送10条消息,依次在消息后面附加1-10个点  for (int i = 0; i < 10; i++)  
        {  
            String dots = "";  
            for (int j = 0; j <= i; j++)  
            {  
                dots += ".";  
            }  
            String message = "helloworld" + dots+dots.length();  
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes());  
            System.out.println(" [x] Sent '" + message + "'");
        }  
        channel.close();
        connection.close();
    }
}
1.2 接收端

Worker.java

package com.gta.goldnock.mq.task;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
 * 
* @ClassName: Worker
* @Description: TODO(消息接收类)
* @author yuhuan.gao
* @date 2017年1月20日 上午11:23:48
*
 */publicclassWorker {
   
   //定义一个接收消息队列privatestaticfinal String TASK_QUEUE_NAME = "task_queue";
      publicstaticvoidmain(String[] argv) throws Exception {
        //创建连接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        //申明接收消息队列
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//      channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {
             @OverridepublicvoidhandleDelivery(String consumerTag, Envelope envelope,
                     AMQP.BasicProperties properties, byte[] body) throws IOE

本文标签: 进阶指南系统编程