/** * queue : 当前操作的队列. 设置队列名称即可 * durable: 当前队列是否开启持久化. 如果为true.当前mq服务重启之后,队列仍然存在 * exclusive: 当前队列是否独占此连接 * autoDelete: 当前队列是否自动删除 * arguments: 队列参数 */ channel.queueDeclare(QUEUE,true,false,false,null);
/** * exchange: 交换机. 对于当前操作使用默认交换机 "" * routingKey: 路由key. 如果当前使用默认交换机, routingKey的值就是当前队列的名称 * props: 参数 * body: 消息体 */ String message = "hello rabbitmq"; channel.basicPublish("",QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
/** * queue : 队列名称 * autoAck: 是否自动应答 *callback: 消费者 */ channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
DefaultConsumer consumer = new DefaultConsumer(finalChannel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String value = new String(body,"utf-8"); System.out.println("开始调用发货功能!!!!!"); System.out.println("根据发货功能结果进行判断"); if (true){ //发货成功 //通知消息队列删除此消息 finalChannel.basicAck(envelope.getDeliveryTag(),false); } } }; channel.basicConsume(QUEUE,false,consumer);
DefaultConsumer consumer = new DefaultConsumer(finalChannel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String value = new String(body,"utf-8"); try{ //调用发货 } catch(Exception e){ //异常处理 } finally{ //通知消息队列删除此消息 finalChannel.basicAck(envelope.getDeliveryTag(),false); } } }; channel.basicConsume(QUEUE,false,consumer);
DefaultConsumer consumer = new DefaultConsumer(finalChannel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String value = new String(body,"utf-8"); try{ //调用发货 } catch(Exception e){ //异常处理 //第一个参数: 消息表示信息 //第二个参数:通知MQ服务当前消费者实例没有处理成功,让MQ服务将这个消息重新投递给其他消费者实例 //如果设置为了false,会导致就算MQ服务知道当前消费者实例没有处理成功, 但是依旧会删除这个消息. channel.basicNack(envelope.getDeliveryTag(),true) } finally{ //通知消息队列删除此消息 finalChannel.basicAck(envelope.getDeliveryTag(),false); } } }; channel.basicConsume(QUEUE,false,consumer);
channel.basicQos(10)//设置预抓取消息总数为10