public class RabbitMQTransactionDemo { private static final String QUEUE_NAME = "transaction_queue"; public static void main(String[] args) { try { // 創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 創建連接 Connection connection = factory.newConnection(); // 創建信道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); try { // 開啟事務 channel.txSelect(); // 發送消息 String message = "Hello, RabbitMQ!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 提交事務 channel.txCommit(); } catch (Exception e) { // 事務回滾 channel.txRollback(); e.printStackTrace(); } // 關閉信道和連接 channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } }}
public class RocketMQTransactionDemo { public static void main(String[] args) throws Exception { // 創建事務消息生產者 TransactionMQProducer producer = new TransactionMQProducer("group_name"); producer.setNamesrvAddr("localhost:9876"); // 設置事務監聽器 producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 執行本地事務邏輯,根據業務邏輯結果返回相應的狀態 // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務提交 // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務回滾 // 返回 LocalTransactionState.UNKNOW 表示事務狀態未知 } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 根據消息的狀態,來判斷本地事務的最終狀態 // 返回 LocalTransactionState.COMMIT_MESSAGE 表示事務提交 // 返回 LocalTransactionState.ROLLBACK_MESSAGE 表示事務回滾 // 返回 LocalTransactionState.UNKNOW 表示事務狀態未知 } }); // 啟動事務消息生產者 producer.start(); // 構造消息 Message msg = new Message("topic_name", "tag_name", "Hello, RocketMQ!".getBytes()); // 發送事務消息 TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("Send Result: " + sendResult); // 關閉事務消息生產者 producer.shutdown(); }}
public class KafkaTransactionDemo { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id"); Producer<String, String> producer = new KafkaProducer<>(props); // 初始化事務 producer.initTransactions(); try { // 開啟事務 producer.beginTransaction(); // 發送消息 ProducerRecord<String, String> record = new ProducerRecord<>("topic_name", "Hello, Kafka!"); producer.send(record); // 提交事務 producer.commitTransaction(); } catch (ProducerFencedException e) { // 處理異常情況 producer.close(); } finally { producer.close(); } }}
RabbitMQ使用ACK(消息確認)機制來確保消息的可靠傳遞。消費者收到消息后,需要向RabbitMQ發送ACK來確認消息的處理狀態。只有在收到ACK后,RabbitMQ才會將消息標記為已成功傳遞,否則會將消息重新投遞給其他消費者或者保留在隊列中。
以下是RabbitMQ ACK的Java示例:
public class RabbitMQAckDemo { public static void main(String[] args) throws Exception { // 創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 創建連接 Connection connection = factory.newConnection(); // 創建信道 Channel channel = connection.createChannel(); // 聲明隊列 String queueName = "queue_name"; channel.queueDeclare(queueName, false, false, false, null); // 創建消費者 String consumerTag = "consumer_tag"; boolean autoAck = false; // 關閉自動ACK // 消費消息 channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // 消費消息 String message = new String(body, "UTF-8"); System.out.println("Received message: " + message); try { // 模擬處理消息的業務邏輯 processMessage(message); // 手動發送ACK確認消息 long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); } catch (Exception e) { // 處理消息異常,可以選擇重試或者記錄日志等操作 System.out.println("Failed to process message: " + message); e.printStackTrace(); // 手動發送NACK拒絕消息,并可選是否重新投遞 long deliveryTag = envelope.getDeliveryTag(); boolean requeue = true; // 重新投遞消息 channel.basicNack(deliveryTag, false, requeue); } } }); } private static void processMessage(String message) { // 模擬處理消息的業務邏輯 }}
RocketMQ的ACK機制由消費者控制,消費者從消息隊列中消費消息后,可以手動發送ACK確認消息的處理狀態。只有在收到ACK后,RocketMQ才會將消息標記為已成功消費,否則會將消息重新投遞給其他消費者。
public class RocketMQAckDemo { public static void main(String[] args) throws Exception { // 創建消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name"); consumer.setNamesrvAddr("localhost:9876"); // 訂閱消息 consumer.subscribe("topic_name", "*"); // 注冊消息監聽器 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt message : msgs) { try { // 消費消息 String msgBody = new String(message.getBody(), "UTF-8"); System.out.println("Received message: " + msgBody); // 模擬處理消息的業務邏輯 processMessage(msgBody); // 手動發送ACK確認消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 處理消息異常,可以選擇重試或者記錄日志等操作 System.out.println("Failed to process message: " + new String(message.getBody())); e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); // 啟動消費者 consumer.start(); } private static void processMessage(String message) { // 模擬處理消息的業務邏輯 }}
Kafka的ACK機制用于控制生產者在發送消息后,需要等待多少個副本確認才視為消息發送成功。這個機制可以通過設置acks參數來進行配置。
下面是一個使用Java編寫的Kafka生產者示例代碼:
public class KafkaProducerDemo { public static void main(String[] args) { // 配置Kafka生產者的參數 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Kafka集群的地址和端口 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 鍵的序列化器 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值的序列化器 props.put("acks", "all"); // 設置ACK機制為所有副本都確認 // 創建生產者實例 KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 構造消息 String topic = "my_topic"; String key = "my_key"; String value = "Hello, Kafka!"; // 創建消息記錄 ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value); // 發送消息 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.err.println("發送消息出現異常:" + exception.getMessage()); } else { System.out.println("消息發送成功!位于分區 " + metadata.partition() + ",偏移量 " + metadata.offset()); } } }); // 關閉生產者 producer.close(); }}
本文鏈接:http://www.www897cc.com/showinfo-26-10561-0.htmlMQ黃金三劍客 Rabbit Rocket Kafka深入解密常見問題及功能對比指南
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: Github的一個奇技淫巧,你學會了嗎?
下一篇: 圖解「正向代理」的原理 + 實踐應用