Ingest pipeline 允許文檔在被索引之前對(duì)數(shù)據(jù)進(jìn)行預(yù)處理,將數(shù)據(jù)加工處理成我們需要的格式。例如,可以使用 ingest pipeline添加或者刪除字段,轉(zhuǎn)換類型,解析內(nèi)容等等。Pipeline 由一組處理器 Processor 構(gòu)成,每個(gè)處理器依次運(yùn)行,對(duì)傳入的文檔進(jìn)行特定的更改。Ingest pipeline 和 Logstash 中的 filter 作用相似,并且更加輕量和易于調(diào)試。
(資料圖片)
要使用 ingest pipeline,集群中必須至少有一個(gè)具有 ingest 角色的節(jié)點(diǎn)。對(duì)于大量攝取負(fù)載,建議設(shè)置專用的 ingest 節(jié)點(diǎn),要?jiǎng)?chuàng)建專用的 ingest 節(jié)點(diǎn),請(qǐng)?jiān)O(shè)置:
node.roles: [ ingest ]
1 Ingest Pipeline 的基本用法
1.1 創(chuàng)建和使用 Ingest Pipeline
接下來(lái)介紹一下 ingest pipeline 的創(chuàng)建與使用,如下所示,使用 ingest API 創(chuàng)建一個(gè)名為 my-pipeline
的 ingest pipeline,在 processors 參數(shù)中指定了兩個(gè)處理器,set 處理器為文檔添加一個(gè)新的字段 location,設(shè)置值為 China;lowercase 處理器將 name 字段的所有字母轉(zhuǎn)換為小寫(xiě)。
PUT _ingest/pipeline/my-pipeline{ "description": "My first Ingest Pipeline", "processors": [ { "set": { "description": "Add a new field", "field": "location", "value": "China" } }, { "lowercase": { "description": "Lowercase name", "field": "name" } } ]}
然后往索引 my-index 中寫(xiě)入一條數(shù)據(jù),通過(guò) pipeline 參數(shù)指定使用剛剛創(chuàng)建的 my-pipeline
。
PUT my-index/_doc/1?pipeline=my-pipeline{ "name": "Tom", "age": 18}
查看 id 為 1 的文檔,可以看到 name 字段由 Tom 轉(zhuǎn)換為 tom,并且新增了 location 字段,說(shuō)明 my-pipeline
管道成功處理了攝入的數(shù)據(jù)。
GET my-index/_doc/1# 返回結(jié)果{ "_index" : "my-index", "_type" : "_doc", "_id" : "1", "_version" : 1, "_seq_no" : 0, "_primary_term" : 1, "found" : true, "_source" : { "name" : "tom", "location" : "China", "age" : 18 }}
1.2 使用 Simulate API 測(cè)試 Pipeline
為了讓開(kāi)發(fā)者更好地了解和使用 pipeline 中的處理器,Elasticsearch 提供了 simulate API 接口,方便我們對(duì) pipeline 進(jìn)行測(cè)試。如下所示,我們對(duì) 1.1 創(chuàng)建和使用 Ingest Pipeline 章節(jié)中創(chuàng)建的 my-pipeline
進(jìn)行測(cè)試,在 docs 列表中我們可以填寫(xiě)多個(gè)原始文檔。
POST _ingest/pipeline/my-pipeline/_simulate{ "docs": [ { "_source": { "name": "Tom", "age": 18 } } ]}
返回結(jié)果如下,可以看到模擬的結(jié)果和實(shí)際創(chuàng)建的文檔一致,只不過(guò) simulate API 并不會(huì)真正地創(chuàng)建這個(gè)文檔。
{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "tom", "location" : "China", "age" : 18 }, "_ingest" : { "timestamp" : "2022-03-03T14:04:15.941884826Z" } } } ]}
除了在請(qǐng)求路徑中指定 pipeline,我們還可以在請(qǐng)求體中定義 pipeline 進(jìn)行模擬,這樣就不用預(yù)先創(chuàng)建好 pipeline,而是等到測(cè)試成功后再去創(chuàng)建 pipeline。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "set": { "description": "Add a new field", "field": "location", "value": "China" } }, { "lowercase": { "description": "Lowercase name", "field": "name" } } ] }, "docs": [ { "_source": { "name": "Tom", "age": 18 } } ]}
1.3 異常處理
當(dāng)我們使用 pipeline 處理一個(gè)文檔的時(shí)候,有時(shí)并不是所有的文檔都很規(guī)范,這個(gè)時(shí)候可能就會(huì)出現(xiàn)文檔不能被正確解析或者發(fā)生異常的情況,此時(shí) Elasticsearch 會(huì)返回給客戶端一個(gè)錯(cuò)誤的信息,表明文檔不能被正確地處理。pipeline 中的處理器(processor)按照順序依次執(zhí)行,默認(rèn)情況下,當(dāng)處理器發(fā)生錯(cuò)誤或者異常時(shí),將會(huì)停止后續(xù)的處理。
在 ingest pipeline 中,異常處理可以分為 3 種情況:
在處理器中設(shè)置ignore_failure: true
,當(dāng)該處理器發(fā)生異常時(shí),允許忽略異常,繼續(xù)執(zhí)行后續(xù)的處理器。通過(guò) on_failure
參數(shù)定義發(fā)生異常時(shí)執(zhí)行的處理器列表,該參數(shù)可以在 processor 級(jí)別中定義,也可以在 pipeline 級(jí)別中定義。使用 fail 處理器主動(dòng)拋出異常。下面將會(huì)分別對(duì)上述 3 種情況進(jìn)行演示,首先模擬 2 個(gè)異常:
convert 處理器將 id 字段轉(zhuǎn)換為 long 類型,由于傳入文檔的 id 字段值設(shè)置為 S123456,無(wú)法轉(zhuǎn)換成 long 類型的數(shù)字,會(huì)產(chǎn)生 number_format_exception 的異常。date 處理器解析 timestamp 字段的日期格式,formats 參數(shù)要求輸入的格式是 yyyy-MM-dd HH:mm:ss,例如 2022-03-03 15:22:11,解析出日期對(duì)應(yīng)的年月日信息,以 yyyy/MM/dd 的格式輸出到 date 字段中,例如 2022/03/03。由于傳入文檔的 timestamp 字段的格式是 20220303 15:22:11,并不滿足 formats 參數(shù)要求的日期格式,因此會(huì)產(chǎn)生 date_time_parse_exception 的異常。執(zhí)行以下 pipeline 測(cè)試語(yǔ)句,在請(qǐng)求路徑中加上 verbose 可以看到每個(gè)處理器的執(zhí)行情況。
POST _ingest/pipeline/_simulate?verbose{ "pipeline": { "processors": [ { "convert": { "field": "id", "type": "long" } }, { "date": { "field": "timestamp", // 解析的字段 "formats": [ "yyyy-MM-dd HH:mm:ss" // 解析的格式 ], "output_format": "yyyy/MM/dd", // 輸出的格式 "target_field": "date" // 輸出的字段 } } ] }, "docs": [ { "_source": { "id": "S123456", "timestamp": "20220303 15:22:11", "message": "User login successfully" } } ]}
返回結(jié)果如下,盡管我們?nèi)藶橹圃炝?2 個(gè)異常,但是只看到了 convert 處理器的異常報(bào)錯(cuò),這是因?yàn)楫?dāng)處理器發(fā)生錯(cuò)誤或者異常時(shí),將會(huì)停止后續(xù)的處理,直接向客戶端返回錯(cuò)誤信息。
{ "docs" : [ { "processor_results" : [ { "processor_type" : "convert", "status" : "error", "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to convert [S123456] to long" } ], "type" : "illegal_argument_exception", "reason" : "unable to convert [S123456] to long", "caused_by" : { "type" : "number_format_exception", "reason" : "For input string: \"S123456\"" } } } ] } ]}
1.3.1 ignore_failure 忽略異常
在處理器中設(shè)置 ignore_failure 參數(shù)為 true,當(dāng)該處理器發(fā)生異常時(shí),允許忽略異常,繼續(xù)執(zhí)行后續(xù)的處理器。
POST _ingest/pipeline/_simulate?verbose{ "pipeline": { "processors": [ { "convert": { "field": "id", "type": "long", "ignore_failure": true // 忽略異常 } }, { "date": { "field": "timestamp", "formats": [ "yyyy-MM-dd HH:mm:ss" ], "output_format": "yyyy/MM/dd", "target_field": "date" } } ] }, "docs": [ { "_source": { "id": "S123456", "timestamp": "2022/03/03 15:22:11", "message": "User login successfully" } } ]}
這次在返回結(jié)果中可以看到有 2 個(gè)異常信息,其中 convert 處理器的 status 的值為 error_ignored,表示該異常被忽略了,在 doc 中可以看到該處理器處理完畢后的結(jié)果,可以看到 id 字段的內(nèi)容保留不變。接著 pipeline 繼續(xù)往后執(zhí)行,當(dāng)執(zhí)行到 date 處理器時(shí),再次發(fā)生異常,由于 date 處理器中未對(duì)異常進(jìn)行處理,此時(shí)向客戶端返回異常信息。
{ "docs" : [ { "processor_results" : [ { "processor_type" : "convert", "status" : "error_ignored", // 第 1 個(gè)異常,忽略異常 "ignored_error" : { "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to convert [S123456] to long" } ], "type" : "illegal_argument_exception", "reason" : "unable to convert [S123456] to long", "caused_by" : { "type" : "number_format_exception", "reason" : "For input string: \"S123456\"" } } }, "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "id" : "S123456", // 跳過(guò) convert 處理器對(duì) id 字段的處理 "message" : "User login successfully", "timestamp" : "2022/03/03 15:22:11" }, "_ingest" : { "pipeline" : "_simulate_pipeline", "timestamp" : "2022-03-04T02:48:13.562353005Z" } } }, { "processor_type" : "date", "status" : "error", // 第 2 個(gè)異常 "error" : { "root_cause" : [ { "type" : "illegal_argument_exception", "reason" : "unable to parse date [2022/03/03 15:22:11]" } ], "type" : "illegal_argument_exception", "reason" : "unable to parse date [2022/03/03 15:22:11]", "caused_by" : { "type" : "illegal_argument_exception", "reason" : "failed to parse date field [2022/03/03 15:22:11] with format [yyyy-MM-dd HH:mm:ss]", "caused_by" : { "type" : "date_time_parse_exception", "reason" : "Text "2022/03/03 15:22:11" could not be parsed at index 4" } } } } ] } ]}
1.3.2 on_failure 處理異常
使用 on_failure 參數(shù)可以定義發(fā)生異常時(shí)執(zhí)行的處理器列表,該參數(shù)允許在 processor 和 pipeline 級(jí)別中定義。在 pipeline 級(jí)別定義時(shí),on_failure 捕獲整個(gè) pipeline 發(fā)生的任何異常,當(dāng)產(chǎn)生異常時(shí)直接執(zhí)行 on_failure 中定義的處理器列表,不會(huì)再執(zhí)行后續(xù)的處理器。
在 processor 級(jí)別定義時(shí),on_failure 參數(shù)可以針對(duì)單個(gè)處理器進(jìn)行異常處理,會(huì)繼續(xù)執(zhí)行后續(xù)的處理器。
on_failure 參數(shù)可以同時(shí)在 pipeline 和 processor 中定義,這兩者并不沖突,比較推薦的做法是,針對(duì)某些處理器設(shè)置 processor 級(jí)別的 on_failure 處理規(guī)則,另外設(shè)置 pipeline 級(jí)別的 on_failure 處理規(guī)則作為一條兜底的規(guī)則,當(dāng) processor 級(jí)別的 on_failure 處理規(guī)則也發(fā)生異常時(shí)或者沒(méi)有設(shè)置異常處理的處理器發(fā)生異常時(shí),就可以應(yīng)用這條兜底的規(guī)則,這樣做的好處就是可以盡可能地保證我們的 ingest pipeline 的健壯性。
如下所示,在 pipeline 級(jí)別設(shè)置了 on_failure 的處理規(guī)則,注意這里的 on_failure 參數(shù)和 processors 參數(shù)是處于同一層級(jí)的。當(dāng)發(fā)生異常時(shí),on_failure 會(huì)執(zhí)行里面的 set 處理器,將索引名改為 failure-index,該索引專門用于記錄 pipeline 處理異常的文檔。之后我們就可以在 failure-index 索引中去查看哪些文檔在預(yù)處理時(shí)發(fā)生了異常,方便后續(xù)實(shí)施相應(yīng)的補(bǔ)救措施。
PUT _ingest/pipeline/failure-test-pipeline{ "processors": [ { "convert": { "field": "id", "type": "long" } }, { "date": { "field": "timestamp", "formats": [ "yyyy-MM-dd HH:mm:ss" ], "output_format": "yyyy/MM/dd", "target_field": "date" } } ], "on_failure": [ // 發(fā)生異常時(shí)執(zhí)行的處理器列表 { "set": { "field": "_index", // 通過(guò) _index 元數(shù)據(jù)字段,可以改變寫(xiě)入的索引 "value": "failure-index" } } ]}
然后往 my-index 索引中插入一條有錯(cuò)誤的文檔,將文檔 _id
設(shè)置為 1。從返回結(jié)果來(lái)看,并沒(méi)有異常報(bào)錯(cuò),文檔成功寫(xiě)入了。但是仔細(xì)觀察可以發(fā)現(xiàn),文檔并沒(méi)有寫(xiě)入 my-index 索引,而是寫(xiě)入了我們記錄異常的索引 failure-index。
PUT my-index/_doc/1?pipeline=failure-test-pipeline{ "id": "S123456", "timestamp": "2022/03/03 15:22:11", "message": "User login successfully"}# 返回結(jié)果{ "_index" : "failure-index", // 寫(xiě)入了記錄異常的索引 "_type" : "_doc", "_id" : "1", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1}
查詢 my-index 文檔,確實(shí)沒(méi)有找到 _id 為 1 的這條文檔。
GET my-index/_doc/1# 返回結(jié)果{ "error" : { "root_cause" : [ { "type" : "index_not_found_exception", "reason" : "no such index [my-index]", "resource.type" : "index_expression", "resource.id" : "my-index", "index_uuid" : "_na_", "index" : "my-index" } ], "type" : "index_not_found_exception", "reason" : "no such index [my-index]", "resource.type" : "index_expression", "resource.id" : "my-index", "index_uuid" : "_na_", "index" : "my-index" }, "status" : 404}
查詢 failure-index 索引可以找到這條處理異常的文檔。
GET failure-index/_doc/1# 返回結(jié)果{ "_index" : "failure-index", "_type" : "_doc", "_id" : "1", "_version" : 1, "_seq_no" : 0, "_primary_term" : 1, "found" : true, "_source" : { "id" : "S123456", "message" : "User login successfully", "timestamp" : "2022/03/03 15:22:11" }}
對(duì)于我們來(lái)說(shuō),目前 failure-index 索引記錄的信息十分有限,根據(jù)以上內(nèi)容我們無(wú)法知道是哪個(gè)處理器在執(zhí)行時(shí)產(chǎn)生了異常。在 on_failure 中提供了以下 4 個(gè)元數(shù)據(jù)字段方便我們進(jìn)行故障定位:
on_failure_pipeline
:產(chǎn)生異常的 pipeline 類型的處理器中引用的 pipeline。ingest pipeline 中有一個(gè) pipeline 類型的處理器,該處理器也可以指定使用其他的 pipeline,這里注意區(qū)分 pipeline 類型的處理器和 pipeline 管道。on_failure_message
:報(bào)錯(cuò)的內(nèi)容。on_failure_processor_type
:產(chǎn)生異常的處理器的標(biāo)簽,標(biāo)簽可以在處理器中通過(guò) tag 參數(shù)指定。當(dāng) pipeline 中使用了多個(gè)相同類型的處理器時(shí),根據(jù)指定的標(biāo)簽可以方便我們進(jìn)行區(qū)分。on_failure_processor_tag
:產(chǎn)生異常的處理器的類型。如下所示,我們?cè)?on_failure 參數(shù)新增了一個(gè) set 處理器,將錯(cuò)誤信息寫(xiě)入 failure-index 的 failure 字段中。
PUT _ingest/pipeline/failure-test-pipeline{ "processors": [ { "convert": { "tag": "my-index-convert", // 設(shè)置處理器的標(biāo)簽,方便定位問(wèn)題 "field": "id", "type": "long" } }, { "date": { "tag": "my-index-date", // 設(shè)置處理器的標(biāo)簽,方便定位問(wèn)題 "field": "timestamp", "formats": [ "yyyy-MM-dd HH:mm:ss" ], "output_format": "yyyy/MM/dd", "target_field": "date" } } ], "on_failure": [ { "set": { "field": "_index", "value": "failure-index" } }, { "set": { "field": "failure", "value": { "on_failure_pipeline": "{{ _ingest.on_failure_pipeline }}", "on_failure_message": "{{_ingest.on_failure_message}}", "on_failure_processor_type": "{{_ingest.on_failure_processor_type}}", "on_failure_processor_tag": "{{ _ingest.on_failure_processor_tag }}" } } } ]}
然后往 my-index 索引中插入一條有錯(cuò)誤的文檔,將文檔 _id
設(shè)置為 2。
PUT my-index/_doc/2?pipeline=failure-test-pipeline{ "id": "S123456", "timestamp": "2022/03/03 15:22:11", "message": "User login successfully"}
查看 failure-index 索引記錄的錯(cuò)誤信息,可以得知打了 my-index-convert 標(biāo)簽的 convert 類型的處理器在處理 S123456 字符串時(shí)引發(fā)了異常。細(xì)心的同學(xué)可能會(huì)注意到, 在返回結(jié)果中 on_failure_pipeline 的內(nèi)容為空,這是由于異常并不是由 pipeline 類型的處理器產(chǎn)生的,所以這里的結(jié)果是空值。如果只是想獲取客戶端直接調(diào)用的 ingest pipeline,那么可以通過(guò) _ingest.pipeline
來(lái)獲取。
{ "_index" : "failure-index", "_type" : "_doc", "_id" : "2", "_version" : 1, "_seq_no" : 1, "_primary_term" : 1, "found" : true, "_source" : { "failure" : { "on_failure_pipeline" : "", // 產(chǎn)生異常的 pipeline 類型的處理器中引用的 pipeline "on_failure_message" : "For input string: \\\"S123456\\\"", // 報(bào)錯(cuò)的內(nèi)容 "on_failure_processor_tag" : "my-index-convert", // 產(chǎn)生異常的處理器的標(biāo)簽 "on_failure_processor_type" : "convert" // 產(chǎn)生異常的處理器的類型 }, "id" : "S123456", "message" : "User login successfully", "timestamp" : "2022/03/03 15:22:11" }}
上面的示例介紹了 on_failure 參數(shù)在 pipeline 級(jí)別的處理,現(xiàn)在介紹下 on_failure 參數(shù)如何在 processor 級(jí)別進(jìn)行處理。如下所示,在 convert 和 date 處理器中分別通過(guò) on_failure 參數(shù)設(shè)置了發(fā)生異常時(shí)執(zhí)行的處理器列表:當(dāng)convert 進(jìn)行類型轉(zhuǎn)換發(fā)生異常時(shí),將當(dāng)前時(shí)間的毫秒數(shù)設(shè)置 id 字段的值;當(dāng) date 處理器解析時(shí)間發(fā)生異常時(shí),使用 ingest 攝取時(shí)間的日期戳作為 date 字段的值。
PUT _ingest/pipeline/failure-test-pipeline{ "processors": [ { "convert": { "field": "id", "type": "long", "on_failure": [ // 發(fā)生異常時(shí)將當(dāng)前時(shí)間的毫秒數(shù)設(shè)置 id 字段的值 { "script": { "source": """ long timeNow = Calendar.getInstance().getTimeInMillis(); ctx.id = timeNow; """ } } ] } }, { "date": { "field": "timestamp", "formats": [ "yyyy-MM-dd HH:mm:ss" ], "output_format": "yyyy/MM/dd", "target_field": "date", "on_failure": [ // 發(fā)生異常時(shí)使用 ingest 攝取時(shí)間的日期戳作為 date 字段的值 { "set": { "field": "date", "value": "{{_ingest.timestamp}}" } }, { "date": { "field": "date", "formats": [ "yyyy-MM-dd"T"HH:mm:ss.SSSZ" ], "output_format": "yyyy/MM/dd", "target_field": "date" } } ] } } ]}
然后往 my-index 索引中插入一條有錯(cuò)誤的文檔,將文檔 _id
設(shè)置為 3。文檔正常寫(xiě)入 my-index 中,沒(méi)有返回報(bào)錯(cuò)信息。
PUT my-index/_doc/3?pipeline=failure-test-pipeline{ "id": "S123456", "timestamp": "2022/03/03 15:22:11", "message": "User login successfully"}# 返回結(jié)果{ "_index" : "my-index", "_type" : "_doc", "_id" : "3", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 0, "_primary_term" : 1}
獲取 my-index 索引中 _id
為 3 的文檔,可以看到 id 字段的值并不是傳入的原始文檔中的 S123456,而是當(dāng)前時(shí)間對(duì)應(yīng)的毫秒值;date 字段的值被設(shè)置為了 ingest 攝取時(shí)間的日期。
GET my-index/_doc/3# 返回結(jié)果{ "_index" : "my-index", "_type" : "_doc", "_id" : "3", "_version" : 1, "_seq_no" : 0, "_primary_term" : 1, "found" : true, "_source" : { "date" : "2022/03/03", "id" : 1646349731000, "message" : "User login successfully", "timestamp" : "2022/03/03 15:22:11" }}
1.3.3 fail 主動(dòng)拋出異常
和 ignore_failure, on_failure 兩種處理異常的方式不同,使用 fail 處理器可以基于某些條件主動(dòng)拋出異常,當(dāng)你想要主動(dòng)讓 pipeline 失敗并且返回特定的報(bào)錯(cuò)信息給請(qǐng)求者時(shí),可以使用這種方式。如下所示,當(dāng) tags 字段中不包含 production 時(shí),fail 處理器會(huì)主動(dòng)拋出異常,在 message 參數(shù)中可以自定義相應(yīng)的報(bào)錯(cuò)信息。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "fail": { "if": "ctx.tags.contains("production") != true", "message": "The production tag is not present, found tags: {{{tags}}}" } } ] }, "docs": [ { "_source": { "tags": ["development"] } } ]}# 返回結(jié)果{ "docs" : [ { "error" : { "root_cause" : [ { "type" : "fail_processor_exception", // 自定義的報(bào)錯(cuò)信息 "reason" : "The production tag is not present, found tags: {0=development}" } ], "type" : "fail_processor_exception", "reason" : "The production tag is not present, found tags: {0=development}" } } ]}
1.4 執(zhí)行條件判斷
每種類型的處理器中都支持 if 參數(shù)判斷執(zhí)行處理器的條件,在 if 參數(shù)中使用 painless腳本進(jìn)行邏輯判斷,當(dāng) if 的判斷結(jié)果為 true 時(shí),相應(yīng)的處理器才會(huì)執(zhí)行。如下所示,創(chuàng)建了 if-test-pipeline,我們只想日志級(jí)別是 error 的消息,當(dāng) level 字段的值是 notice 時(shí),丟棄該文檔。
PUT _ingest/pipeline/if-test-pipeline{ "processors": [ { "drop": { "description": "Drop documents with level of notice", "if": "ctx.level == "notice"" } } ]}
然后往 log-index 索引中寫(xiě)入兩條文檔,指定使用 if-test-pipeline,其中一條文檔的 level 值等于 notice,另一條的 level 值等于 error。
POST log-index/_doc?pipeline=if-test-pipeline{ "level": "notice", "message": "this is a notice log"}POST log-index/_doc?pipeline=if-test-pipeline{ "level": "error", "message": "this is a error log"}
查詢 log-index 索引,只返回了 1 條文檔,level 等于 notice 的文檔被丟棄了。
GET log-index/_search# 返回結(jié)果{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "log-index", "_type" : "_doc", "_id" : "fV9ET38BKRZVqZj9X8yC", "_score" : 1.0, "_source" : { "level" : "error", "message" : "this is a error log" } } ] }}
接下來(lái)介紹一種高級(jí)的用法,將一個(gè) pipeline 作為多個(gè)不同的索引或者數(shù)據(jù)流默認(rèn)的 pipeline,在這個(gè) pipeline 中創(chuàng)建多個(gè) pipeline 類型的處理器,每個(gè)處理器根據(jù)傳入的文檔選擇后臺(tái)真正要執(zhí)行的 pipeline。這樣做的好處就是,如果要更改后臺(tái)使用的 pipeline,只需要修改默認(rèn)的 pipeline 中引用的 pipeline 即可,客戶端的代碼或者索引中的設(shè)置無(wú)需修改,可以做到業(yè)務(wù)無(wú)感知的切換。如下所示,先創(chuàng)建兩個(gè) pipeline,其中 httpd_pipeline 用于處理 http 相關(guān)的日志,syslog_pipeline 用于處理 syslog 相關(guān)的日志。
PUT _ingest/pipeline/httpd_pipeline{ "processors": [ { "set": { "field": "message", "value": "this is a apache_httpd log" } } ]}PUT _ingest/pipeline/syslog_pipeline{ "processors": [ { "set": { "field": "message", "value": "this is a syslog log" } } ]}
接著創(chuàng)建一個(gè) default_pipeline,使用 if 參數(shù)進(jìn)行判斷,當(dāng) service 字段的值等于 apache_httpd 時(shí),執(zhí)行 httpd_pipeline,當(dāng) service 字段的值等于 syslog 時(shí),執(zhí)行 syslog_pipeline。
PUT _ingest/pipeline/default_pipeline{ "processors": [ { "pipeline": { "description": "If "service" is "apache_httpd", use "httpd_pipeline"", "if": "ctx.service == "apache_httpd"", "name": "httpd_pipeline" } }, { "pipeline": { "description": "If "service" is "syslog", use "syslog_pipeline"", "if": "ctx.service == "syslog"", "name": "syslog_pipeline" } } ]}
使用 simulate API 進(jìn)行驗(yàn)證,可以看到由于傳入的文檔的 service 字段的值是 syslog,因此這條文檔被交給 syslog_pipeline 進(jìn)行處理。
POST _ingest/pipeline/default_pipeline/_simulate{ "docs": [ { "_source": { "service": "syslog" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "this is a syslog log", // syslog_pipeline 添加的內(nèi)容 "service" : "syslog" }, "_ingest" : { "timestamp" : "2022-03-04T07:18:53.531846541Z" } } } ]}
2 Processor 處理器
下表列出了 Elasticsearch 所有 processor 處理器的類型,并且根據(jù)各個(gè)處理器的用途作了相應(yīng)的分類。下面的小節(jié)中僅會(huì)演示說(shuō)明一些常用的處理器,未介紹到的部分讀者可以自行查閱官方文檔。
類別 | 處理器 | 作用 |
---|---|---|
數(shù)組處理 | append | 添加元素 |
數(shù)組處理 | sort | 對(duì)數(shù)組中的元素進(jìn)行排序 |
數(shù)組處理 | join | 將數(shù)組中的每個(gè)元素拼接成單個(gè)字符串 |
數(shù)組處理 | foreach | 遍歷處理數(shù)組中的元素 |
結(jié)構(gòu)化數(shù)據(jù)處理 | json | 將 json 字符串轉(zhuǎn)換為結(jié)構(gòu)化的 json 對(duì)象 |
結(jié)構(gòu)化數(shù)據(jù)處理 | kv | 以鍵值對(duì)的方式提取字段 |
結(jié)構(gòu)化數(shù)據(jù)處理 | csv | 從單個(gè)文本字段中提取 CSV 行中的字段 |
匹配處理 | gsub | 替換字符串中指定的內(nèi)容,支持正則表達(dá)式匹配 |
匹配處理 | grok | 使用正則表達(dá)式提取字段,grok 處理器內(nèi)置預(yù)定義的表達(dá)式 |
匹配處理 | dissect | 和 grok 處理器類似,語(yǔ)法比 grok 簡(jiǎn)單,不使用正則表達(dá)式。可以使用修飾符控制解析方式 |
字符串處理 | lowercase | 將字符串轉(zhuǎn)換為小寫(xiě) |
字符串處理 | uppercase | 將字符串轉(zhuǎn)換為大寫(xiě) |
字符串處理 | split | 指定分隔符將字符串拆分為數(shù)組 |
字符串處理 | html_strip | 刪除字符串中的 HTLM 標(biāo)簽 |
字符串處理 | trim | 去掉字符串中的前后空格 |
字段處理 | rename | 重命名字段 |
字段處理 | remove | 刪除字段 |
字段處理 | set | 為字段賦值 |
字段處理 | script | 處理復(fù)雜的邏輯,可以執(zhí)行內(nèi)聯(lián)或者存儲(chǔ)腳本 |
字段處理 | dot_expander | 將帶有點(diǎn)的字段擴(kuò)展為對(duì)象字段 |
文檔處理 | drop | 刪除文檔 |
文檔處理 | fingerprint | 計(jì)算文檔內(nèi)容的哈希值 |
網(wǎng)絡(luò)處理 | network_direction | 根據(jù)給定的源 IP 地址、目標(biāo) IP 地址和內(nèi)部網(wǎng)絡(luò)列表下計(jì)算網(wǎng)絡(luò)請(qǐng)求的方向 |
網(wǎng)絡(luò)處理 | community_id | 計(jì)算網(wǎng)絡(luò)流數(shù)據(jù)中的 community id, 可以使用 community id 來(lái)關(guān)聯(lián)與單個(gè)流相關(guān)的網(wǎng)絡(luò)事件 |
網(wǎng)絡(luò)處理 | registered_domain | 從完全限定域名 (FQDN) 中提取注冊(cè)域(也稱為有效頂級(jí)域或 eTLD)、子域和頂級(jí)域。 |
HTTP 處理 | urldecode | URL 解碼 |
HTTP 處理 | user_agent | 從 user_agent 中提取詳細(xì)信息, 例如操作系統(tǒng), 瀏覽器版本等等 |
HTTP 處理 | uri_parts | 從 URI 中提取詳細(xì)信息, 例如域名, 端口, 路徑等等 |
外部結(jié)合 | pipeline | 執(zhí)行另一個(gè) ingest pipeline |
外部結(jié)合 | enrich | 添加來(lái)自另一個(gè)索引的數(shù)據(jù),類似關(guān)系型數(shù)據(jù)庫(kù)中的 join 關(guān)聯(lián)查詢 |
外部結(jié)合 | geoip | 根據(jù)來(lái)自 Maxmind 數(shù)據(jù)庫(kù)的數(shù)據(jù)添加有關(guān) IP 地址地理位置的信息 |
外部結(jié)合 | set_security_user | 獲取索引文檔用戶的詳細(xì)信息,例如 username, roles, email, full_name, metadata |
外部結(jié)合 | inference | 使用預(yù)訓(xùn)練的數(shù)據(jù)分析模型來(lái)處理數(shù)據(jù),用于機(jī)器學(xué)習(xí)領(lǐng)域 |
時(shí)間處理 | date_index_name | 根據(jù)文檔中的時(shí)間戳字段將文檔寫(xiě)入基于時(shí)間的索引 |
時(shí)間處理 | date | 從字段中解析日期作為文檔的時(shí)間戳 |
類型處理 | convert | 字段類型轉(zhuǎn)換,例如 "1234" -> 1234 |
類型處理 | byte | 將人類可讀的字節(jié)值轉(zhuǎn)換為字節(jié)的數(shù)值,例如 1kb -> 1024 |
異常處理 | fail | 主動(dòng)拋出異常 |
圖形處理 | circle | 將圓形轉(zhuǎn)換為近似多邊形 |
2.1 Lowercase & Uppercase
lowercase 處理器可以將字符串轉(zhuǎn)換為其等效的小寫(xiě)字母。如果該字段是一個(gè)字符串?dāng)?shù)組,則該數(shù)組的所有成員都將被轉(zhuǎn)換。uppercase 處理器和 lowercase 相反,將字符串轉(zhuǎn)換為大寫(xiě)字母。如下所示,使用 lowercase 處理器將 name 字段轉(zhuǎn)換為小寫(xiě)字母。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "lowercase": { "field": "name" } } ] }, "docs": [ { "_source": { "name": "Tom" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "tom" }, "_ingest" : { "timestamp" : "2022-02-27T10:43:11.718792423Z" } } } ]}
2.2 Split
split 處理器可以根據(jù)指定的分隔符,將字符串拆分為數(shù)組。如下所示,以 _
符號(hào)作為分隔符,將 num 字段拆分為數(shù)組。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "split": { "field": "num", "separator": "_" } } ] }, "docs": [ { "_source": { "num": "111_222_333_444" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "num" : [ "111", "222", "333", "444" ] }, "_ingest" : { "timestamp" : "2022-02-27T11:10:25.249883405Z" } } } ]}
2.3 Trim
trim 處理器可以去掉字符串頭尾的空格。如下所示,使用 trim 處理器去掉 message 字段頭尾的空格。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "trim": { "field": "message" } } ] }, "docs": [ { "_source": { "message": " Elasticsearch is the distributed search and analytics engine " } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "Elasticsearch is the distributed search and analytics engine" }, "_ingest" : { "timestamp" : "2022-02-27T11:12:26.952402786Z" } } } ]}
2.4 Join
join 處理器可以將數(shù)組中的每個(gè)元素拼接成單個(gè)字符串。如下所示,使用 _
符號(hào)作為分隔符,將 animal 字段中的元素拼接成單個(gè)字符串。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "join": { "field": "animal", "separator": "-" } } ] }, "docs": [ { "_source": { "animal": ["dog", "cat", "monkey"] } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "animal" : "dog-cat-monkey" }, "_ingest" : { "timestamp" : "2022-02-27T10:33:39.63520118Z" } } } ]}
2.5 Foreach
使用 foreach 處理器可以遍歷數(shù)組,對(duì)其中的每個(gè)元素進(jìn)行處理,使用 processor 參數(shù)指定一個(gè)處理器來(lái)處理數(shù)組中元素。在 foreach 處理器內(nèi)引用的處理通過(guò) _ingest._value
鍵來(lái)獲取數(shù)組中每個(gè)元素的值。如下所示,將 values 字段中的每個(gè)元素轉(zhuǎn)換為大寫(xiě)字母。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "foreach": { "field": "values", "processor": { "uppercase": { "field": "_ingest._value" } } } } ] }, "docs": [ { "_source": { "values" : ["foo", "bar", "baz"] } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "values" : [ "FOO", "BAR", "BAZ" ] }, "_ingest" : { "_value" : null, "timestamp" : "2022-02-27T10:06:44.235660464Z" } } } ]}
2.6 KV
kv 處理器可以以鍵值對(duì)的方式提取字段。如下所示,以空格作為不同鍵值對(duì)的分隔符,以 =
拆分每組鍵值對(duì)的鍵和值。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "kv": { "field": "message", "field_split": " ", // 拆分鍵值對(duì) "value_split": "=" // 拆分鍵值對(duì)的鍵和值 } } ] }, "docs": [ { "_source": { "message": "ip=1.2.3.4 error=REFUSED" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "ip=1.2.3.4 error=REFUSED", "error" : "REFUSED", "ip" : "1.2.3.4" }, "_ingest" : { "timestamp" : "2022-02-27T10:40:31.072140367Z" } } } ]}
2.7 CSV
csv 處理器會(huì)將字段中的內(nèi)容看作 csv 文本的一行,根據(jù) separator 參數(shù)指定的分隔符,將拆分后的值賦值給 target_fields 列表中定義的字段。如下所示,將 person 字段按照 |
符號(hào)進(jìn)行拆分,依次賦值給 name, age, country 字段。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "csv": { "field": "person", "target_fields": [ // 指定每列的字段值 "name", "age", "country" ], "separator": "|" // 字段間的分隔符 } } ] }, "docs": [ { "_source": { "person": "zhangsan|18|china" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "country" : "china", "person" : "zhangsan|18|china", "name" : "zhangsan", "age" : "18" }, "_ingest" : { "timestamp" : "2022-02-24T09:39:48.708832221Z" } } } ]}
2.8 Grok
grok 處理器可以使用正則表達(dá)式來(lái)提取字段,并且內(nèi)置了許多常用的表達(dá)式,可以直接通過(guò)表達(dá)式別名進(jìn)行使用。可以使用以下命令獲取所有 grok 內(nèi)置的表達(dá)式。
GET _ingest/processor/grok?s
返回結(jié)果如下,例如我們想匹配 IP 地址就可以直接使用 %{IP}
進(jìn)行匹配,想匹配 MAC 地址可以使用 %{MAC}
進(jìn)行匹配。表達(dá)式別名還可以引用其他的表達(dá)式別名,比如表達(dá)式別名 IP 就引用了IPV4 和 IPV6 兩個(gè)別名。
接下來(lái)我們嘗試使用 grok 處理器解析一條日志。%{
表示將表達(dá)式匹配的值賦值到指定的 field 字段中,表達(dá)式可以是我們自定義的表達(dá)式,也可以是表達(dá)式別名;%{
表示只匹配不賦值。
55.3.244.1 GET /index.html 15824 0.043"
在上面的日志中:
55.3.244.2 是客戶端的 IP 地址,使用%{IP:client}
匹配 IP 地址,賦值到 client 字段中;GET 是 HTTP 的請(qǐng)求方法,使用 %{WORD:method}
匹配數(shù)字和字母,賦值到 method 字段中;/index.html 是請(qǐng)求的 URI 路徑,使用 %{URIPATHPARAM:request}
匹配 URI 路徑和參數(shù),賦值到 request 字段中;15824 是請(qǐng)求的字節(jié)大小,使用 %{NUMBER:bytes:int}
匹配數(shù)字,賦值到 bytes 字段中,并且將字段設(shè)置為 int 類型;0.043 是請(qǐng)求的處理時(shí)間,使用 %{NUMBER:duration:double}
匹配數(shù)字,賦值到 duration字段中,并且將字段設(shè)置為 double 類型。上面用到的 IP, WORD, URIPATHPARAM, NUMBER 表達(dá)式都是 grok 內(nèi)置的表達(dá)式別名,可以直接拿來(lái)使用。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"] } } ] }, "docs":[ { "_source": { "message": "55.3.244.1 GET /index.html 15824 0.043" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "duration" : 0.043, "request" : "/index.html", "method" : "GET", "bytes" : 15824, "client" : "55.3.244.1", "message" : "55.3.244.1 GET /index.html 15824 0.043" }, "_ingest" : { "timestamp" : "2022-03-01T03:33:57.627169176Z" } } } ]}
除了使用 grok 內(nèi)置的表達(dá)式以外,grok 處理器也允許我們自定義表達(dá)式??梢栽?pattern_definitions
參數(shù)中進(jìn)行設(shè)置,其中鍵是我們自定義表達(dá)式的別名,值是具體的正則表達(dá)式。如下所示,我們定義了兩個(gè)表達(dá)式別名:FAVORITE_DOG 使用正則表達(dá)式 \w+
, 匹配數(shù)字和字母,注意這里需要額外使用一個(gè) \
來(lái)進(jìn)行轉(zhuǎn)義;RGB 可以匹配 RED,GREEN,BLUE 3 種顏色。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "grok": { "field": "message", "patterns": [ "my %{FAVORITE_DOG:dog} is colored %{RGB:color}" ], "pattern_definitions": { // 自定義表達(dá)式 "FAVORITE_DOG": "\\w+", // 匹配數(shù)字和字母 "RGB": "RED|GREEN|BLUE" // 匹配 3 個(gè)顏色 } } } ] }, "docs": [ { "_source": { "message": "my beagle is colored BLUE" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "my beagle is colored BLUE", "color" : "BLUE", "dog" : "beagle" }, "_ingest" : { "timestamp" : "2022-03-01T03:34:33.933398405Z" } } } ]}
有時(shí)候一種匹配規(guī)則可能難以匹配所有的內(nèi)容,我們可以在正則表達(dá)式中通過(guò)或的邏輯進(jìn)行判斷,但是這樣會(huì)使得寫(xiě)出來(lái)的表達(dá)式難以閱讀。這里還有一種更好的方法,在 grok 處理器中,patterns 參數(shù)允許填寫(xiě)多個(gè)表達(dá)式,這樣我們的匹配規(guī)則看上去就一目了然,處理器會(huì)使用最先匹配到的表達(dá)式。如下所示,我們?cè)O(shè)置了 FAVORITE_DOG 和 FAVORITE_CAT 兩個(gè)表達(dá)式都用于解析 pet 字段,如果想要知道是哪個(gè)表達(dá)式匹配了內(nèi)容,可以設(shè)置參數(shù) "trace_match": true
,這樣在返回結(jié)果的 _grok_match_index 字段中可以看到匹配了哪個(gè)表達(dá)式,其中 1 表示匹配了第二個(gè)表達(dá)式。
POST _ingest/pipeline/_simulate{ "pipeline": { "description": "parse multiple patterns", "processors": [ { "grok": { "field": "message", "patterns": [ // patterns 是數(shù)組, 可以填寫(xiě)多個(gè)表達(dá)式 "%{FAVORITE_DOG:pet}", "%{FAVORITE_CAT:pet}" ], "pattern_definitions": { "FAVORITE_DOG": "beagle", "FAVORITE_CAT": "burmese" }, "trace_match": true // 顯示匹配了哪一個(gè)表達(dá)式, 第一個(gè)從 0 開(kāi)始 } } ] }, "docs": [ { "_source": { "message": "I love burmese cats!" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "I love burmese cats!", "pet" : "burmese" }, "_ingest" : { "_grok_match_index" : "1", // 匹配了第 2 個(gè)表達(dá)式 "timestamp" : "2022-03-01T03:35:05.490483581Z" } } } ]}
在 Kibana 的界面上還提供了 Grok Debugger 方便我們調(diào)試 grok 表達(dá)式。點(diǎn)擊 Management -> Dev Tools -> Grok Gebugger進(jìn)入調(diào)試界面。
從上圖可以看到,調(diào)試界面分為以下 4 個(gè)部分:
Sample Data: 填寫(xiě)測(cè)試的文本。Grok Pattern:填寫(xiě) grok 表達(dá)式,相當(dāng)于 grok 處理器中 patterns 定義的內(nèi)容。Custom Patterns:自定義表達(dá)式,相當(dāng)于 grok 處理器中 pattern_definitions 定義的內(nèi)容。在 Custom Patterns 中每行表示一個(gè)自定義表達(dá)式,最左邊的字符串表示我們自定義的表達(dá)式別名,右邊內(nèi)容是表達(dá)式的內(nèi)容,不需要進(jìn)行符號(hào)轉(zhuǎn)義。Structured Data:處理完的結(jié)果。我們將示例中的內(nèi)容按照上面的說(shuō)明填寫(xiě)到相應(yīng)的位置,點(diǎn)擊 Simulate,就可以看到解析完成后的結(jié)構(gòu)化數(shù)據(jù)了。
2.9 Dissect
dissect 和 grok 處理器類似,都是用于從單個(gè)文本字段中提取結(jié)構(gòu)化字段。與 grok 相比,dissect 最大的優(yōu)勢(shì)就是簡(jiǎn)單和快速,dissect 在解析時(shí)不使用正則表達(dá)式,這使得 dissect 的語(yǔ)法更加簡(jiǎn)單,并且執(zhí)行速度比 grok 更快。當(dāng)然 grok 也有自己的獨(dú)到之處,grok 可以同時(shí)使用多個(gè) patterns 來(lái)對(duì)內(nèi)容來(lái)進(jìn)行匹配,這是 dissect 所不具備的能力。接下來(lái)首先介紹一下 dissect 處理器簡(jiǎn)單的使用方法,如下所示,我們要對(duì)一行日志內(nèi)容進(jìn)行解析,%{
表示將匹配到的字符串作為 field 字段的值。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}" } } ] }, "docs": [ { "_source": { "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "request" : "/english/venues/cities/images/montpellier/18.gif", "auth" : "-", "ident" : "-", "verb" : "GET", "message" : """1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] "GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0" 200 3171""", "@timestamp" : "30/Apr/1998:22:00:52 +0000", "size" : "3171", "clientip" : "1.2.3.4", "httpversion" : "1.0", "status" : "200" }, "_ingest" : { "timestamp" : "2022-03-01T06:37:23.791866312Z" } } } ]}
在 dissect 中可以使用修飾符改變默認(rèn)的匹配規(guī)則,例如可以指定 dissect 忽略某些字段、拼接多個(gè)字符等等。dissect 的修飾符說(shuō)明如下表所示。
修飾符 | 用途 | 位置 | 示例 |
---|---|---|---|
-> | 跳過(guò) -> 右邊重復(fù)的字符 | 最右邊 | %{keyname1->} |
將多個(gè)結(jié)果附加到一起作為輸出 | 左邊 | %{+keyname} %{+keyname} | |
和 /n | 指定附加結(jié)果的順序 | 號(hào)在左邊,/n 放在右邊,n 是順序的數(shù)字 | %{+keyname/2} %{+keyname/1} |
? | 跳忽略匹配項(xiàng) | 左邊 | %{?keyname} |
和 & | 輸出鍵設(shè)置為 * 的值,輸出值設(shè)置為 & 的值 | 左邊 | %{*key} %{&value} |
dissect 默認(rèn)的匹配算法非常嚴(yán)格,要求 pattern 中的所有字符都與源字符串完全匹配。例如 %{a} %
只能匹配“字符串1 字符串2”(中間 1 個(gè)空格),將無(wú)法匹配“字符串1 字符串2”(中間 5 個(gè)空格)。要處理這種情況就可以使用 ->
修飾符跳過(guò)箭頭右邊重復(fù)的字符,例如 %{a->} %
就可以跳過(guò)字符串1 和字符串 2 中間的多個(gè)空格,只對(duì)空格匹配一次。要跳過(guò)的字符我們可以自由設(shè)置,如下所示,使用 ->
修飾符跳過(guò)重復(fù)的 ~
字符。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{ts->}~%{level}" // 跳過(guò)重復(fù)的 ~ 字符 } } ] }, "docs": [ { "_source": { "message": "1998-08-10T17:15:42,466~~~~~~~WARN" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "message" : "1998-08-10T17:15:42,466~~~~~~~WARN", "level" : "WARN", "ts" : "1998-08-10T17:15:42,466" }, "_ingest" : { "timestamp" : "2022-03-01T06:38:20.328535452Z" } } } ]}
假如我們想將多個(gè)匹配的字符拼接為一個(gè)字段,可以使用 +
修飾符,append_separator 參數(shù)可以指定分隔符, 默認(rèn)以空格作為分隔符。如下所示,我們將匹配的多個(gè)字符串拼接為 name 字段,使用 ,
作為分隔符。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{+name} %{+name} %{+name} %{+name}", "append_separator": "," } } ] }, "docs": [ { "_source": { "message": "john jacob jingleheimer schmidt" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "john,jacob,jingleheimer,schmidt", "message" : "john jacob jingleheimer schmidt" }, "_ingest" : { "timestamp" : "2022-03-02T13:41:40.058126802Z" } } } ]}
如果我們想改變字符串拼接的順序,可以同時(shí)使用 +
和 /n
修飾符指定順序,其中 n 是順序的數(shù)字。如下所示,可以看到返回結(jié)果中的 name 字段按照我們指定的順序拼接。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{+name/2} %{+name/4} %{+name/3} %{+name/1}", "append_separator": "," } } ] }, "docs": [ { "_source": { "message": "john jacob jingleheimer schmidt" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "schmidt,john,jingleheimer,jacob", "message" : "john jacob jingleheimer schmidt" }, "_ingest" : { "timestamp" : "2022-03-02T13:47:44.332086601Z" } } } ]}
前面提到過(guò),dissect 要求 pattern 中的所有字符都與源字符串完全匹配,否則解析將不會(huì)成功。如果我們僅僅想讓某些字符串在匹配時(shí)充當(dāng)“占位”的角色,并不想讓它出現(xiàn)在最終的文檔中,那么就可以使用 ?
修飾符來(lái)忽略最終結(jié)果中的匹配項(xiàng)。除了使用 ?
修飾符以外,還可以用一個(gè)空鍵 %{}
實(shí)現(xiàn)相同的效果,但是為了便于閱讀,建議還是使用 %{?
的方式。如下所示,ident 和 auth 字段都不會(huì)出現(xiàn)在最終的結(jié)果中,僅用于字符串匹配。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "%{clientip} %{?ident} %{?auth} [%{@timestamp}]" } } ] }, "docs": [ { "_source": { "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000]" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "@timestamp" : "30/Apr/1998:22:00:52 +0000", "message" : "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000]", "clientip" : "1.2.3.4" }, "_ingest" : { "timestamp" : "2022-03-02T13:50:56.099402273Z" } } } ]}
*
和 &
修飾符可以用于解析包含鍵值對(duì)的內(nèi)容,其中輸出鍵設(shè)置為 *
的值,輸出值設(shè)置為 &
的值。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "dissect": { "field": "message", "pattern": "[%{ts}] [%{level}] %{*p1}:%{&p1} %{*p2}:%{&p2}" } } ] }, "docs": [ { "_source": { "message": "[2018-08-10T17:15:42,466] [ERR] ip:1.2.3.4 error:REFUSED" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "level" : "ERR", "ip" : "1.2.3.4", "message" : "[2018-08-10T17:15:42,466] [ERR] ip:1.2.3.4 error:REFUSED", "error" : "REFUSED", "ts" : "2018-08-10T17:15:42,466" }, "_ingest" : { "timestamp" : "2022-03-02T14:00:54.96982616Z" } } } ]}
2.10 Rename
rename 處理器用于重命名現(xiàn)有字段。如果該字段不存在或者重命名的字段已存在,則會(huì)引發(fā)異常。如下所示,將 provider 字段重命名為 cloud.provider。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "rename": { "field": "provider", "target_field": "cloud.provider" } } ] }, "docs": [ { "_source": { "provider": "Aliyun" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "cloud" : { "provider" : "Aliyun" } }, "_ingest" : { "timestamp" : "2022-02-27T10:57:47.821558199Z" } } } ]}
2.11 Remove
remove 處理器用于刪除現(xiàn)有字段。如果刪除的字段不存在,則會(huì)引發(fā)異常。如下所示,使用 remove 處理器刪除文檔中的 name 和 location 字段。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "remove": { "field": ["age", "location"] } } ] }, "docs": [ { "_source": { "name": "tom", "age": 18, "location": "United States" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "name" : "tom" }, "_ingest" : { "timestamp" : "2022-02-27T10:56:05.119755281Z" } } } ]}
2.12 Set
set 處理器用于為字段賦值,并且在賦值的時(shí)候還可以使用 {{{ }}}
符號(hào)從其他字段復(fù)制值,然后和指定字符串進(jìn)行拼接。如下所示,將 version 字段的值設(shè)置為 2,host.os.name 字段的值為 copy from 字符串拼接 os 字段的結(jié)果。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "set": { "field": "host.os.name", "value": "copy from {{{os}}}" // 從 os 字段復(fù)制值進(jìn)行拼接 } }, { "set": { "field": "version", "value": "2" // 設(shè)置靜態(tài)值 } } ] }, "docs": [ { "_source": { "os": "Ubuntu" } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "host" : { "os" : { "name" : "copy from Ubuntu" } }, "os" : "Ubuntu", "version" : "2" }, "_ingest" : { "timestamp" : "2022-02-28T13:39:31.035666829Z" } } } ]}
2.13 Script
對(duì)于復(fù)雜的處理邏輯,如果使用 Elasticseach 其他自帶的處理器無(wú)法實(shí)現(xiàn),那么可以嘗試在 script 處理器中編寫(xiě)腳本進(jìn)行處理。在 script 處理器中通過(guò) lang 參數(shù)可以指定腳本語(yǔ)言,通常我們使用 painless 作為腳本語(yǔ)言,這也是 Elasticsearch 中默認(rèn)的腳本語(yǔ)言。在 script 處理器中,腳本在 ingest 上下文中運(yùn)行,我們可以通過(guò) ctx["field"]
或者ctx.field
語(yǔ)法來(lái)訪問(wèn)文檔中的字段。如下所示,傳入的文檔中有一個(gè)數(shù)字類型的參數(shù) num,我們?cè)谀_本中通過(guò) if else 條件語(yǔ)句進(jìn)行判斷,當(dāng) num 等于 7 時(shí),將 result 的值設(shè)置為 happy;當(dāng) num 等于 4 時(shí),將 result 的結(jié)果設(shè)置為 sad;當(dāng) num 是其他值時(shí),將 result 的結(jié)果設(shè)置為 normal。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "script": { "lang": "painless", "source": """ if(ctx.num == 7){ ctx.result = "happy" }else if(ctx.num == 4){ ctx.result = "sad" }else { ctx.result = "normal" } """ } } ] }, "docs": [ { "_source": { "num": 7 } } ]}# 返回結(jié)果{ "docs" : [ { "doc" : { "_index" : "_index", "_type" : "_doc", "_id" : "_id", "_source" : { "result" : "happy", "num" : 7 }, "_ingest" : { "timestamp" : "2022-03-02T14:20:27.776240111Z" } } } ]}
2.14 Drop
drop 處理器可以根據(jù)條件刪除指定的文檔。如下所示,刪除 name 字段值為 tom 的文檔。
POST _ingest/pipeline/_simulate{ "pipeline": { "processors": [ { "drop": { "if": "ctx.name == "tom"" } } ] }, "docs": [ { "_source": { "name": "tom", "age": 18 } } ]}# 返回結(jié)果{ "docs" : [ null ]}
3 Ingest Pipeline 應(yīng)用場(chǎng)景
Ingest Pipeline 主要有以下 4 類應(yīng)用場(chǎng)景:
寫(xiě)入時(shí)指定 pipeline,單條寫(xiě)入或者使用 _bulk API 批量寫(xiě)入時(shí)都可以使用。更新時(shí)指定 pipeline。定義索引或者模板時(shí)指定 pipeline,有兩個(gè)相關(guān)的參數(shù):-index.default_pipeline
參數(shù)可以定義default pipeline(默認(rèn)執(zhí)行的 pipeline),當(dāng)請(qǐng)求中沒(méi)有指定 pipeline 時(shí)執(zhí)行;- index.final_pipeline
參數(shù)可以定義final pipeline(最終執(zhí)行的 pipeline),在所有 pipeline 執(zhí)行完后再執(zhí)行。reindex 時(shí)指定 pipeline,在重建索引或者數(shù)據(jù)遷移時(shí)使用。3.1 寫(xiě)入時(shí)指定 Pipeline
首先創(chuàng)建一個(gè)名為 lowercase-pipeline
的 pipeline,它的作用是將 name 字段轉(zhuǎn)換為小寫(xiě)字母。
PUT _ingest/pipeline/lowercase-pipeline{ "processors": [ { "lowercase": { "field": "name" } } ]}
單條寫(xiě)入或者通過(guò) _bulk
API 批量寫(xiě)入時(shí)都可以通過(guò) pipeline
參數(shù)指定使用的 pipeline。
# 寫(xiě)入單條數(shù)據(jù)時(shí)指定 pipelienPOST index-1/_doc?pipeline=lowercase-pipeline{ "name": "Tom", "age": 20}# _bulk 寫(xiě)入多條文檔時(shí)指定 pipelinePUT index-1/_bulk?pipeline=lowercase-pipeline{"index":{ }}{"name":"Peter","age":17}{"index":{}}{"name":"Mary","age":19}
查看寫(xiě)入的文檔,可以看到所有文檔的 name 字段都轉(zhuǎn)換為了小寫(xiě)字母。
GET index-1/_search# 返回結(jié)果{ "_index" : "index-1", "_type" : "_doc", "_id" : "g196X38BKRZVqZj9rsyn", "_score" : 1.0, "_source" : { "name" : "tom", "age" : 20 }},{ "_index" : "index-1", "_type" : "_doc", "_id" : "hF96X38BKRZVqZj9scwO", "_score" : 1.0, "_source" : { "name" : "peter", "age" : 17 }},{ "_index" : "index-1", "_type" : "_doc", "_id" : "hV96X38BKRZVqZj9scwO", "_score" : 1.0, "_source" : { "name" : "mary", "age" : 19 }}
3.2 更新時(shí)指定 Pipeline
使用 _update_by_query API
可以批量更新索引中的文檔,通常會(huì)結(jié)合pipeline 來(lái)對(duì)文檔進(jìn)行更新。以下示例中我們對(duì)索引中的所有文檔進(jìn)行更新,也可以在 _update_by_query API
中使用 DSL 語(yǔ)句過(guò)濾出需要更新的文檔。
# 往源索引中插入數(shù)據(jù)PUT index-2/_doc/1{ "name": "Smith", "age": 18}PUT index-2/_doc/1{ "name": "Mike", "age": 16}# 使用 update_by_query 進(jìn)行更新,可以寫(xiě) DSL 語(yǔ)句過(guò)濾出需要更新的文檔POST index-2/_update_by_query?pipeline=lowercase-pipeline
3.3 定義索引或者模板時(shí)指定 Pipeline
在定義索引或者模板時(shí)可以使用 index.default_pipeline 參數(shù)指定 default pipeline(默認(rèn)執(zhí)行的 pipeline),index.final_pipeline 參數(shù)指定 final pipeline(最終執(zhí)行的 pipeline)。default pipeline 與 final pipeline 實(shí)際上都是普通的 ingest pipeline,只是和一般的 pipeline 執(zhí)行時(shí)機(jī)不同;default pipeline 執(zhí)行的時(shí)機(jī)是當(dāng)前寫(xiě)入請(qǐng)求沒(méi)有指定 pipeline 時(shí),final pipeline 執(zhí)行的時(shí)機(jī)是在所有 pipeline 執(zhí)行完畢后。
如上圖所示,如果當(dāng)前的寫(xiě)入或者更新請(qǐng)求中指定了 pipeline,則會(huì)先執(zhí)行自定義的 pipeline,當(dāng)所有的 pipeline 執(zhí)行完畢后再執(zhí)行 final pipeline(如果索引顯式設(shè)置了index.final_pipeline);如果當(dāng)前的寫(xiě)入或者更新請(qǐng)求中沒(méi)有指定 pipeline,并且索引顯式設(shè)置了 index.default_pipeline 參數(shù)時(shí),則會(huì)先執(zhí)行 default pipeline,最后再執(zhí)行 final pipeline。
為了完成下面的演示,在前面 lowercase-pipeline 的基礎(chǔ)上,現(xiàn)在再創(chuàng)建兩個(gè) pipeline,其中 uppercase-pipeline 的作用是 name 字段轉(zhuǎn)換為小寫(xiě)字母,set-pipeline 的作用是為文檔添加一個(gè) message 字段。
PUT _ingest/pipeline/uppercase-pipeline{ "processors": [ { "uppercase": { "field": "name" } } ]}PUT _ingest/pipeline/set-pipeline{ "processors": [ { "set": { "field": "message", "value": "set by final pipeline" } } ]}
接下來(lái)創(chuàng)建一個(gè)索引 index-3,在 settings 中指定索引的 default_pipeline 為 lowercase-pipeline,final_pipeline 為 set-pipeline。
PUT index-3{ "settings": { "index": { "default_pipeline": "lowercase-pipeline", // 默認(rèn)執(zhí)行的 pipeline "final_pipeline": "set-pipeline" // 最終執(zhí)行的 pipeline } }}
然后往索引中插入兩條文檔,其中 _id
為 1 的文檔在寫(xiě)入時(shí)不指定 pipeline,_id
為 2 的文檔在寫(xiě)入時(shí)指定使用 uppercase-pipeline。
PUT index-3/_doc/1{ "name": "Lisa", "age": 18}# 在寫(xiě)入時(shí)指定 pipeline 覆蓋 default_pipelinePUT index-3/_doc/2?pipeline=uppercase-pipeline{ "name": "Jerry", "age": 21}
查詢最終保存的文檔,可以看到 final pipeline 始終會(huì)執(zhí)行,2 個(gè)文檔都添加了 message 字段;由于寫(xiě)入 _id
為 2 的文檔時(shí)指定使用了 uppercase-pipeline,所以該文檔沒(méi)有執(zhí)行 default pipeline,而是執(zhí)行了 uppercase-pipeline 將字母轉(zhuǎn)換為大寫(xiě)。
GET index-3/_search# 返回結(jié)果{ "_index" : "index-3", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "name" : "lisa", "message" : "set by final pipeline", "age" : 18 }},{ "_index" : "index-3", "_type" : "_doc", "_id" : "2", "_score" : 1.0, "_source" : { "name" : "JERRY", "message" : "set by final pipeline", "age" : 21 }}
3.4 Reindex 時(shí)指定 Pipeline
Elasticsearch 提供了 reindex API 用于將文檔從源索引復(fù)制到目標(biāo)索引,在 reindex 時(shí)可以指定 pipeline 對(duì)復(fù)制的文檔進(jìn)行加工處理。如下所示,先創(chuàng)建源索引 source-index,并插入 1 條文檔。
PUT source-index/_doc/1{ "name": "Jack", "age": 18}
然后在 reindex 時(shí)指定使用 lowercase-pipeline,目標(biāo)索引名設(shè)置為 dest-index。
POST _reindex{ "source": { "index": "source-index" }, "dest": { "index": "dest-index", "pipeline": "lowercase-pipeline" }}
查看目標(biāo)索引,name 字段已經(jīng)成功轉(zhuǎn)換為了小寫(xiě)字母。
GET dest-index/_search# 返回結(jié)果{ "took" : 1, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "dest-index", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "name" : "jack", "age" : "18" } } ] }}
4 總結(jié)
Ingest pipeline 是 Elasticsearch 的一個(gè)非常實(shí)用的功能,它能夠幫助用戶在數(shù)據(jù)進(jìn)入 Elasticsearch 索引之前對(duì)其進(jìn)行預(yù)處理,從而提高搜索和分析的效率和準(zhǔn)確性。
本文向讀者介紹了如何有效地創(chuàng)建,管理和測(cè)試 ElasticSearch Ingest Pipeline。在第一小節(jié)中首先說(shuō)明了 ingest pipeline 的基本用法,包括創(chuàng)建和使用 ingest pipeline,使用 simulate API 對(duì) pipeline 進(jìn)行測(cè)試,以及如何處理 pipeline 中的異常;在第二小節(jié)中,將 ingest pipeline 中的 processor 處理器根據(jù)用途作了分類說(shuō)明,并通過(guò)示例展示了常見(jiàn)的幾個(gè) processor 的用法;在最后一個(gè)小節(jié)中歸納了 ingest pipeline 的 4 個(gè)應(yīng)用場(chǎng)景。
關(guān)鍵詞: 數(shù)據(jù)處理 正則表達(dá)式 編程算法