Pulsar3.0 是 Pulsar 社區(qū)推出的第一個(gè) LTS 長(zhǎng)期支持版本。
圖片
如圖所示,LTS 版本會(huì)最長(zhǎng)支持到 36 個(gè)月,而 Feature 版本最多只有六個(gè)月;類似于我們使用的 JDK11,17,21 都是可以長(zhǎng)期使用的;所以也推薦大家都升級(jí)到 LTS 版本。
作為首個(gè) LTS 版本,3.0 自然也是自帶了許多新特性,這個(gè)會(huì)在后續(xù)介紹。
先來看看升級(jí)指南:
圖片
在官方的兼容表中會(huì)發(fā)現(xiàn):不推薦跨版本升級(jí)。
也就是說如果你現(xiàn)在還在使用的是 2.10.x,那么推薦是先升級(jí)到 2.11.x 然后再升級(jí)到 3.0.x.
而且根據(jù)我們的使用經(jīng)驗(yàn)來看,首個(gè)版本是不保險(xiǎn)的,即便是 LTS 版本;所以不推薦直接升級(jí)到 3.0.0,而是更推薦 3.0.1+,這個(gè)小版本會(huì)修復(fù) 3.0 所帶來的一些 bug。
先講一下我們的升級(jí)流程,大家可以用做參考。
根據(jù)我們的使用場(chǎng)景,為了以防萬一,首先需要將我們的插件依賴升級(jí)到對(duì)應(yīng)的版本。
圖片
其實(shí)簡(jiǎn)單來說就是更新下依賴,然后再重新打包,在后續(xù)的流程進(jìn)行測(cè)試。
之后是預(yù)熱鏡像,我們使用 harbor 搭建了自己的 docker 鏡像倉庫,這樣在升級(jí)重啟鏡像的時(shí)候可以更快的從內(nèi)網(wǎng)拉取鏡像。
畢竟一個(gè) pulsar-all 的鏡像也不小,盡量的縮短啟動(dòng)時(shí)間。
預(yù)熱的過程也很簡(jiǎn)單:
docker pull apachepulsar/pulsar-all:3.0.1docker tag apachepulsar/pulsar-all:3.0.1 harbor-private.xx.com/pulsar/pulsar-all:3.0.1docker image push harbor-private.xx.com/pulsar/pulsar-all:3.0.1
之后升級(jí)的時(shí)候就可以使用私服的鏡像了。
我這邊有寫了一個(gè) cli 可以幫我快速創(chuàng)建或升級(jí)一個(gè)集群,然后觸發(fā)我所編寫的功能測(cè)試。
./pulsar-upgrade-cli upgrade pulsar-test ./charts/pulsar --version x.x.x -f charts/pulsar/values.yaml -n pulsar-test
這個(gè) cli 很簡(jiǎn)單,一共就做三件事:
之后的效果如下:
圖片
主要就是覆蓋了我們的使用場(chǎng)景,都跑通過之后才會(huì)走后續(xù)的流程。
圖片
之后會(huì)啟動(dòng)一個(gè) 200 左右的并發(fā)生產(chǎn)和消費(fèi)數(shù)據(jù),模擬線上的使用情況,會(huì)一直讓這個(gè)任務(wù)跑著,大概一晚上就可以了,第二天通過監(jiān)控查看:
組件的升級(jí)步驟這里參考了官方指南:https://pulsar.apache.org/docs/3.1.x/administration-upgrade/#upgrade-zookeeper-optional
圖片
只要一步步按照這個(gè)流程走,問題不大,哪一步出現(xiàn)問題后需要及時(shí)回滾,回滾流程參考下面的回滾部分。
同時(shí)在升級(jí)過程中需要一直查看 broker 的 error 日志,如果有明顯的不符合預(yù)期的日志一定要注意。
在升級(jí) bookkeeper 的時(shí)候,broker 可能會(huì)出現(xiàn) bk 連接失敗的異常,這個(gè)可以不用在意。
都升級(jí)完后就是線上業(yè)務(wù)驗(yàn)證環(huán)節(jié)了:
當(dāng)出現(xiàn)異常的時(shí)候需要立即回滾,這里的異常一般就是消息收發(fā)異常,客戶端掉線等。
經(jīng)過我的測(cè)試 3.0.x 的存儲(chǔ)和之前的版本是兼容的,所以 bookkeeper 都能降級(jí)其他的組件就沒啥可擔(dān)心的了。
需要降級(jí)時(shí)直接將所有組件降級(jí)為上一個(gè)版本即可。
因?yàn)槭菑?2.x 升級(jí)到 3.x 也是涉及到了跨大版本,所以也準(zhǔn)備了災(zāi)難恢復(fù)的方案。
比如極端情況下升級(jí)失敗,所有數(shù)據(jù)丟失的情況。
整個(gè)災(zāi)難恢復(fù)的主要目的就是恢復(fù)后的集群對(duì)外提供的域名不發(fā)生變化,同時(shí)所有的客戶端可以自動(dòng)重連上來,也就是最壞的情況下所有的數(shù)據(jù)丟了可以接受,但不能影響業(yè)務(wù)正常使用。
所以我們的流程如下:
@SneakyThrows @Test void backup(){ List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList("tenant/namespace"); log.info("topic size={}",topicList.size()); // create a custom thread pool CopyOnWriteArrayList<TopicMeta> dataList = new CopyOnWriteArrayList<>(); ExecutorService customThreadPool = Executors.newFixedThreadPool(10); for (String topicName : topicList) { customThreadPool.execute(()-> { PartitionedTopicMetadata metadata; try { metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topicName); TopicMeta topicMeta = new TopicMeta(); // backup topic topicMeta.setName(topicName); topicMeta.setPartition(metadata.partitions); // backup permission Map<String, Set<AuthAction>> permissions = pulsarAdmin.topics().getPermissions(topicName); topicMeta.setPermissions(permissions); // back sub List<String> subscriptions = new ArrayList<>(); PartitionedTopicStats topicStats = pulsarAdmin.topics().getPartitionedStats(topicName, true); topicStats.getSubscriptions().forEach((k,v)-> subscriptions.add(k)); topicMeta.setSubscriptions(subscriptions); dataList.add(topicMeta); } catch (PulsarAdminException e) { throw new RuntimeException(e); } }); } customThreadPool.shutdown(); while (!customThreadPool.isTerminated()) { } log.info("{}",dataList.size()); log.info("{}",JSONUtil.toJsonStr(dataList)); }// TopicMetaData@Data public class TopicMeta { private String name; private int partition; Map<String, Set<AuthAction>> permissions; List<String> subscriptions = new ArrayList<>(); }
第一步是備份 topic:
因?yàn)槲覀兛蛻舳耸褂昧?JWT 驗(yàn)證,所有為了使得恢復(fù)的 Pulsar 集群可以讓客戶端無縫切換到新集群,因此必須得使用相同的公私鑰。
這個(gè)其實(shí)比較簡(jiǎn)單,我們使用的是 helm 安裝的集群,所以只需要備份好 Secret 即可。
apiVersion: v1 data: PRIVATEKEY: XXX PUBLICKEY: XXX kind: Secret metadata: name: pulsar-token-asymmetric-key namespace: pulsar type: Opaque # 還有幾個(gè) superUser 的 Secret
首先使用 helm 重新創(chuàng)建一個(gè)新集群:
./scripts/pulsar/prepare_helm_release.sh -n pulsar -k pulsarhelm install / --values charts/pulsar/values.yaml / --set namespace=pulsar/ --set initialize=true / pulsar ./charts/pulsar -n pulsar
直接使用剛才備份的公私鑰覆蓋到新集群即可。
進(jìn)入 toolset pod 創(chuàng)建需要使用的 tenant/namespace
k exec -it pulsar-toolset-0 -n pulsar bashbin/pulsar-admin tenants create tenantbin/pulsar-admin namespaces create tenant/namespace
之后便是最重要的元數(shù)據(jù)恢復(fù)了:
@SneakyThrows @Test void restore() { PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://url:8080") .authentication(AuthenticationFactory.token(token)) .build(); Path filePath = Path.of("restore-ns.json"); String fileContent = Files.readString(filePath); List<TopicMeta> topicMetaList = JSON.parseArray(fileContent, TopicMeta.class); ExecutorService customThreadPool = Executors.newFixedThreadPool(50); for (TopicMeta topicMeta : topicMetaList) { customThreadPool.execute(() -> { // Create topic try { pulsarAdmin.topics().createPartitionedTopic(topicMeta.getName(), topicMeta.getPartition()); } catch (PulsarAdminException e) { log.error("Create topic error"); } // Create sub for (String subscription : topicMeta.getSubscriptions()) { try { pulsarAdmin.topics().createSubscription(topicMeta.getName(), subscription, MessageId.latest); } catch (PulsarAdminException e) { log.error("createSubscription error"); } } // Grant permission topicMeta.getPermissions().forEach((role, authActions) -> { permission(pulsarAdmin, topicMeta.getName(), role, authActions); }); log.info("topic:{} restore success", topicMeta.getName()); }); } customThreadPool.shutdown(); while (!customThreadPool.isTerminated()) { } log.info("restore success"); }private synchronized void permission(PulsarAdmin pulsarAdmin, String topic, String role, Set<AuthAction> authActions) { try { pulsarAdmin.topics().grantPermission(topic, role, authActions); } catch (PulsarAdminException e) { log.error("grantPermission error", e); } }
流程和備份類似:
因?yàn)槭跈?quán)接口限制了并發(fā)調(diào)用,所有需要加鎖,導(dǎo)致整個(gè)恢復(fù)的流程就會(huì)比較慢。
8000 topic 的 namespace 大概恢復(fù)時(shí)間為 40min 左右。
之后依次恢復(fù)其他 namespace 即可。
admin.namespaces().setNamespaceMessageTTL("tenant/namespace", 3600 * 6);admin.namespaces().setBacklogQuota("tenant/namespace", BacklogQuota)
如果之前的集群有設(shè)置 TTL 或者是 backlogQuota 時(shí)都需要手動(dòng)恢復(fù)。
以上就是整個(gè)升級(jí)和災(zāi)難恢復(fù)的流程,當(dāng)然災(zāi)難恢復(fù)希望大家不要碰到。
我會(huì)在下一篇詳細(xì)介紹 Pulsar 3.0 的新功能以及所碰到的一些坑。
本文鏈接:http://www.www897cc.com/showinfo-26-53341-0.htmlPulsar3.0 升級(jí)指北,你學(xué)會(huì)些什么?
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識(shí),若有侵權(quán)等問題請(qǐng)及時(shí)與本網(wǎng)聯(lián)系,我們將在第一時(shí)間刪除處理。郵件:2376512515@qq.com