/** * queue : 当前操作的队列. 设置队列名称即可 * durable: 当前队列是否开启持久化. 如果为true.当前mq服务重启之后,队列仍然存在 * exclusive: 当前队列是否独占此连接 * autoDelete: 当前队列是否自动删除 * arguments: 队列参数 */ channel.queueDeclare(QUEUE,true,false,false,null);
/**
* exchange: 交换机. 对于当前操作使用默认交换机 ""
* routingKey: 路由key. 如果当前使用默认交换机, routingKey的值就是当前队列的名称
* props: 参数
* body: 消息体
*/
String message = "hello rabbitmq";
channel.basicPublish("",QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
/** * queue : 队列名称 * autoAck: 是否自动应答 *callback: 消费者 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
DefaultConsumer consumer = new DefaultConsumer(finalChannel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String value = new String(body,"utf-8");
System.out.println("开始调用发货功能!!!!!");
System.out.println("根据发货功能结果进行判断");
if (true){
//发货成功
//通知消息队列删除此消息
finalChannel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE,false,consumer);
DefaultConsumer consumer = new DefaultConsumer(finalChannel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String value = new String(body,"utf-8");
try{
//调用发货
}
catch(Exception e){
//异常处理
}
finally{
//通知消息队列删除此消息
finalChannel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE,false,consumer);
DefaultConsumer consumer = new DefaultConsumer(finalChannel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String value = new String(body,"utf-8");
try{
//调用发货
}
catch(Exception e){
//异常处理
//第一个参数: 消息表示信息
//第二个参数:通知MQ服务当前消费者实例没有处理成功,让MQ服务将这个消息重新投递给其他消费者实例
//如果设置为了false,会导致就算MQ服务知道当前消费者实例没有处理成功, 但是依旧会删除这个消息.
channel.basicNack(envelope.getDeliveryTag(),true)
}
finally{
//通知消息队列删除此消息
finalChannel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
channel.basicConsume(QUEUE,false,consumer);
channel.basicQos(10)//设置预抓取消息总数为10