在 Elasticsearch 中,可以使用 Painless 腳本來實現一些非標準的處理結果。這些腳本可以直接嵌入到數據處理管道中,但為了使腳本與管道相互獨立,還可以將腳本單獨存儲在 Elasticsearch 中,并在數據攝取管道(Ingest pipeline)中按需調用它們。
這種存儲腳本的方式,咱們之前也有過介紹,Elasticsearch 中有個專有術語名詞與之對應,叫:stored script 存儲腳本。通過 stored script 方式,可以在不同的地方重復使用同一段腳本,而無需復制代碼。
在Elasticsearch中使用 stored script 存儲腳本是一種高效且靈活的方法,特別適用于那些需要在多個數據處理場景中重復使用相同邏輯的場合。通過這種方式,可以構建更加模塊化、易于管理的數據處理管道。
如下腳本的目的是將源數據中的字段從Base64格式轉換為解碼后的文本。
PUT /_scripts/decodebase64{ "script": { "description": "Decode base64", "lang": "painless", "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; ctx[target]=src.decodeBase64();" }}
腳本解讀如下:
這個腳本可以在Elasticsearch的攝取管道中使用,用于在數據索引之前動態地對字段進行Base64解碼。
如下腳本僅驗證,實戰中可忽略。
GET /_scripts/decodebase64
召回結果如下:
{ "_id": "decodebase64", "found": true, "script": { "lang": "painless", "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; ctx[target]=src.decodeBase64();" }}
注意:之前咱們很少這么用。看細節,上面的召回結果有 "_id": "decodebase64", 你關注一下,一會就能用到!
PUT /_ingest/pipeline/decodebase64{ "description": "Decode hash values", "processors": [ { "script": { "id": "decodebase64", "params": { "field": "name_base64", "target_field": "name" } } } ]}
上述代碼創建了一個名為 decodebase64 的 Elasticsearch 攝取管道,其功能是使用存儲的腳本 decodebase64 將字段 name_base64 中的 Base64 編碼值解碼,并將解碼后的文本存儲到 name 字段中。
和咱們之前講的不同的地方、靈活的地方在于:field 和 target_field 變成了變量了,可以靈活按照項目需求替換之。
POST /fruits/_bulk?pipeline=decodebase64{"index":{"_id":"1"}}{"name_base64":"QXBwbGU="}{"index":{"_id":"2"}}{"name_base64":"QW5hbmFz"}{"index":{"_id":"3"}}{"name_base64":"Q2hlcnJ5"}
如上 bulk 批量寫入的時候指定 pipeline 的方式,咱們之前也少有講解。
GET fruits/_search
結果如下圖所示:
圖片
我們清晰的看到,咱們寫入的 name_base64 字段借助我們創建的管道、基于存儲腳本解碼為 name字段值。
不著急下結論,咱們再看一組例子。
步驟參見第2部分,咱們只講重點。
如下存儲腳本的目的:在Elasticsearch中創建并存儲一個名為decodehex的腳本,該腳本用于將HEX(十六進制)編碼的字符串轉換為普通文本。
PUT /_scripts/decodehex{ "script": { "description": "Decode HEX", "lang": "painless", "source": "def src=ctx[params['field']]; if (src == null) { return; } def target=params['target_field']; StringBuilder sb = new StringBuilder(); for (int i = 0; i < src.length(); i += 2) { String byteStr = src.substring(i, i + 2); char byteChar = (char) Integer.parseInt(byteStr, 16); sb.append(byteChar) } ctx[target] = sb.toString();" }}
腳本解讀如下:
如上腳本可以在Elasticsearch的攝取管道中使用,用于在數據索引之前動態地對字段進行 HEX 解碼。
如下腳本僅驗證,實戰中可忽略。
GET /_scripts/decodehex
召回結果如下:
圖片
PUT /_ingest/pipeline/decodehex{ "description": "Decode hash values", "processors": [ { "script": { "id": "decodehex", "params": { "field": "color_hex", "target_field": "color" } } } ]}
該管道的功能是使用存儲的腳本 decodehex 來處理數據:它會取 color_hex 字段中的HEX(十六進制)編碼字符串,將其解碼成普通文本,并將解碼后的結果存儲到 color 字段中。這個過程主要用于在將數據索引到 Elasticsearch 之前自動進行數據轉換和預處理。
同樣,靈活的地方在于:field、target_field 是變量。
POST /fruits_ext/_bulk?pipeline=decodehex{"index":{"_id":"1"}}{"color_hex":"477265656e"}{"index":{"_id":"2"}}{"color_hex":"59656c6c6f77"}{"index":{"_id":"3"}}{"color_hex":"526564"}
如上 bulk 批量寫入的時候指定 pipeline 的方式,咱們之前也少有講解。
GET fruits_ext/_search
結果如下圖所示:
圖片
當然,第2部分、第3部分的存儲腳本使用可以靈活的整合為一部分,如下所示。
PUT /_ingest/pipeline/decodehashes{ "description": "Decode hash values", "processors": [ { "script": { "id": "decodebase64", "params": { "field": "name_base64", "target_field": "name" } } }, { "script": { "id": "decodehex", "params": { "field": "color_hex", "target_field": "color" } } } ]}
批量構建數據結果:
POST /fruits_all/_bulk?pipeline=decodehashes{"index":{"_id":"1"}}{"name_base64":"QXBwbGU=","color_hex":"477265656e"}{"index":{"_id":"2"}}{"name_base64":"QW5hbmFz","color_hex":"59656c6c6f77"}{"index":{"_id":"3"}}{"name_base64":"Q2hlcnJ5","color_hex":"526564"}
執行檢索效果:
圖片
我們一起探索了如何在Elasticsearch中創建并存儲腳本,以及如何檢索這些腳本,以確認它們的 id 和內容。我們還學習了如何在數據處理的攝取管道中調用這些存儲的腳本。
通過這種方法,你可以有效地節省存儲空間,并減少因重復編寫相同腳本而可能出現的錯誤。簡而言之,你只需編寫和存儲一次腳本,就可以在多個地方反復使用,這無疑提高了工作效率,同時也使得數據處理過程更加流暢和可靠。
本文鏈接:http://www.www897cc.com/showinfo-26-57396-0.htmlElasticsearch 8.X 小技巧:使用存儲腳本優化數據索引與轉換過程
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
下一篇: 記錄業務系統操作日志方案實踐