国产传媒18精品免费观看,欧美人妻精品一区二区三区,999热线在线观看,www四虎最新成人永久网站

【ES三周年】使用 Ingest Pipeline 在 Elasticsearch 中對(duì)數(shù)據(jù)進(jìn)行預(yù)處理 世界熱點(diǎn)

首頁(yè) > 探索 > > 正文

日期:2023-02-20 08:58:45    來(lái)源:騰訊云    

Ingest pipeline 允許文檔在被索引之前對(duì)數(shù)據(jù)進(jìn)行預(yù)處理,將數(shù)據(jù)加工處理成我們需要的格式。例如,可以使用 ingest pipeline添加或者刪除字段,轉(zhuǎn)換類型,解析內(nèi)容等等。Pipeline 由一組處理器 Processor 構(gòu)成,每個(gè)處理器依次運(yùn)行,對(duì)傳入的文檔進(jìn)行特定的更改。Ingest pipeline 和 Logstash 中的 filter 作用相似,并且更加輕量和易于調(diào)試。


(資料圖片)

要使用 ingest pipeline,集群中必須至少有一個(gè)具有 ingest 角色的節(jié)點(diǎn)。對(duì)于大量攝取負(fù)載,建議設(shè)置專用的 ingest 節(jié)點(diǎn),要?jiǎng)?chuàng)建專用的 ingest 節(jié)點(diǎn),請(qǐng)?jiān)O(shè)置:

node.roles: [ ingest ]

1 Ingest Pipeline 的基本用法

1.1 創(chuàng)建和使用 Ingest Pipeline

接下來(lái)介紹一下 ingest pipeline 的創(chuàng)建與使用,如下所示,使用 ingest API 創(chuàng)建一個(gè)名為 my-pipeline的 ingest pipeline,在 processors 參數(shù)中指定了兩個(gè)處理器,set 處理器為文檔添加一個(gè)新的字段 location,設(shè)置值為 China;lowercase 處理器將 name 字段的所有字母轉(zhuǎn)換為小寫(xiě)。

PUT _ingest/pipeline/my-pipeline{  "description": "My first Ingest Pipeline",  "processors": [    {      "set": {        "description": "Add a new field",        "field": "location",        "value": "China"      }    },    {      "lowercase": {        "description": "Lowercase name",        "field": "name"      }    }  ]}

然后往索引 my-index 中寫(xiě)入一條數(shù)據(jù),通過(guò) pipeline 參數(shù)指定使用剛剛創(chuàng)建的 my-pipeline。

PUT my-index/_doc/1?pipeline=my-pipeline{  "name": "Tom",  "age": 18}

查看 id 為 1 的文檔,可以看到 name 字段由 Tom 轉(zhuǎn)換為 tom,并且新增了 location 字段,說(shuō)明 my-pipeline管道成功處理了攝入的數(shù)據(jù)。

GET my-index/_doc/1# 返回結(jié)果{  "_index" : "my-index",  "_type" : "_doc",  "_id" : "1",  "_version" : 1,  "_seq_no" : 0,  "_primary_term" : 1,  "found" : true,  "_source" : {    "name" : "tom",    "location" : "China",    "age" : 18  }}

1.2 使用 Simulate API 測(cè)試 Pipeline

為了讓開(kāi)發(fā)者更好地了解和使用 pipeline 中的處理器,Elasticsearch 提供了 simulate API 接口,方便我們對(duì) pipeline 進(jìn)行測(cè)試。如下所示,我們對(duì) 1.1 創(chuàng)建和使用 Ingest Pipeline 章節(jié)中創(chuàng)建的 my-pipeline進(jìn)行測(cè)試,在 docs 列表中我們可以填寫(xiě)多個(gè)原始文檔。

POST _ingest/pipeline/my-pipeline/_simulate{  "docs": [    {      "_source": {        "name": "Tom",        "age": 18      }    }  ]}

返回結(jié)果如下,可以看到模擬的結(jié)果和實(shí)際創(chuàng)建的文檔一致,只不過(guò) simulate API 并不會(huì)真正地創(chuàng)建這個(gè)文檔。

{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "name" : "tom",          "location" : "China",          "age" : 18        },        "_ingest" : {          "timestamp" : "2022-03-03T14:04:15.941884826Z"        }      }    }  ]}

除了在請(qǐng)求路徑中指定 pipeline,我們還可以在請(qǐng)求體中定義 pipeline 進(jìn)行模擬,這樣就不用預(yù)先創(chuàng)建好 pipeline,而是等到測(cè)試成功后再去創(chuàng)建 pipeline。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "set": {          "description": "Add a new field",          "field": "location",          "value": "China"        }      },      {        "lowercase": {          "description": "Lowercase name",          "field": "name"        }      }    ]  },  "docs": [    {      "_source": {        "name": "Tom",        "age": 18      }    }  ]}

1.3 異常處理

當(dāng)我們使用 pipeline 處理一個(gè)文檔的時(shí)候,有時(shí)并不是所有的文檔都很規(guī)范,這個(gè)時(shí)候可能就會(huì)出現(xiàn)文檔不能被正確解析或者發(fā)生異常的情況,此時(shí) Elasticsearch 會(huì)返回給客戶端一個(gè)錯(cuò)誤的信息,表明文檔不能被正確地處理。pipeline 中的處理器(processor)按照順序依次執(zhí)行,默認(rèn)情況下,當(dāng)處理器發(fā)生錯(cuò)誤或者異常時(shí),將會(huì)停止后續(xù)的處理。

在 ingest pipeline 中,異常處理可以分為 3 種情況:

在處理器中設(shè)置 ignore_failure: true,當(dāng)該處理器發(fā)生異常時(shí),允許忽略異常,繼續(xù)執(zhí)行后續(xù)的處理器。通過(guò) on_failure參數(shù)定義發(fā)生異常時(shí)執(zhí)行的處理器列表,該參數(shù)可以在 processor 級(jí)別中定義,也可以在 pipeline 級(jí)別中定義。使用 fail 處理器主動(dòng)拋出異常。

下面將會(huì)分別對(duì)上述 3 種情況進(jìn)行演示,首先模擬 2 個(gè)異常:

convert 處理器將 id 字段轉(zhuǎn)換為 long 類型,由于傳入文檔的 id 字段值設(shè)置為 S123456,無(wú)法轉(zhuǎn)換成 long 類型的數(shù)字,會(huì)產(chǎn)生 number_format_exception 的異常。date 處理器解析 timestamp 字段的日期格式,formats 參數(shù)要求輸入的格式是 yyyy-MM-dd HH:mm:ss,例如 2022-03-03 15:22:11,解析出日期對(duì)應(yīng)的年月日信息,以 yyyy/MM/dd 的格式輸出到 date 字段中,例如 2022/03/03。由于傳入文檔的 timestamp 字段的格式是 20220303 15:22:11,并不滿足 formats 參數(shù)要求的日期格式,因此會(huì)產(chǎn)生 date_time_parse_exception 的異常。

執(zhí)行以下 pipeline 測(cè)試語(yǔ)句,在請(qǐng)求路徑中加上 verbose 可以看到每個(gè)處理器的執(zhí)行情況。

POST _ingest/pipeline/_simulate?verbose{  "pipeline": {    "processors": [      {        "convert": {          "field": "id",          "type": "long"        }      },      {        "date": {          "field": "timestamp", // 解析的字段          "formats": [            "yyyy-MM-dd HH:mm:ss" // 解析的格式          ],          "output_format": "yyyy/MM/dd",  // 輸出的格式          "target_field": "date"  // 輸出的字段        }      }    ]  },  "docs": [    {      "_source": {        "id": "S123456",        "timestamp": "20220303 15:22:11",        "message": "User login successfully"      }    }  ]}

返回結(jié)果如下,盡管我們?nèi)藶橹圃炝?2 個(gè)異常,但是只看到了 convert 處理器的異常報(bào)錯(cuò),這是因?yàn)楫?dāng)處理器發(fā)生錯(cuò)誤或者異常時(shí),將會(huì)停止后續(xù)的處理,直接向客戶端返回錯(cuò)誤信息。

{  "docs" : [    {      "processor_results" : [        {          "processor_type" : "convert",          "status" : "error",          "error" : {            "root_cause" : [              {                "type" : "illegal_argument_exception",                "reason" : "unable to convert [S123456] to long"              }            ],            "type" : "illegal_argument_exception",            "reason" : "unable to convert [S123456] to long",            "caused_by" : {              "type" : "number_format_exception",              "reason" : "For input string: \"S123456\""            }          }        }      ]    }  ]}

1.3.1 ignore_failure 忽略異常

在處理器中設(shè)置 ignore_failure 參數(shù)為 true,當(dāng)該處理器發(fā)生異常時(shí),允許忽略異常,繼續(xù)執(zhí)行后續(xù)的處理器。

POST _ingest/pipeline/_simulate?verbose{  "pipeline": {    "processors": [      {        "convert": {          "field": "id",          "type": "long",          "ignore_failure": true // 忽略異常        }      },      {        "date": {          "field": "timestamp",          "formats": [            "yyyy-MM-dd HH:mm:ss"          ],          "output_format": "yyyy/MM/dd",          "target_field": "date"        }      }    ]  },  "docs": [    {      "_source": {        "id": "S123456",        "timestamp": "2022/03/03 15:22:11",        "message": "User login successfully"      }    }  ]}

這次在返回結(jié)果中可以看到有 2 個(gè)異常信息,其中 convert 處理器的 status 的值為 error_ignored,表示該異常被忽略了,在 doc 中可以看到該處理器處理完畢后的結(jié)果,可以看到 id 字段的內(nèi)容保留不變。接著 pipeline 繼續(xù)往后執(zhí)行,當(dāng)執(zhí)行到 date 處理器時(shí),再次發(fā)生異常,由于 date 處理器中未對(duì)異常進(jìn)行處理,此時(shí)向客戶端返回異常信息。

{  "docs" : [    {      "processor_results" : [        {          "processor_type" : "convert",            "status" : "error_ignored", // 第 1 個(gè)異常,忽略異常          "ignored_error" : {             "error" : {                "root_cause" : [                {                  "type" : "illegal_argument_exception",                  "reason" : "unable to convert [S123456] to long"                }              ],              "type" : "illegal_argument_exception",              "reason" : "unable to convert [S123456] to long",              "caused_by" : {                "type" : "number_format_exception",                "reason" : "For input string: \"S123456\""              }            }          },          "doc" : {            "_index" : "_index",            "_type" : "_doc",            "_id" : "_id",            "_source" : {              "id" : "S123456",  // 跳過(guò) convert 處理器對(duì) id 字段的處理              "message" : "User login successfully",              "timestamp" : "2022/03/03 15:22:11"            },            "_ingest" : {              "pipeline" : "_simulate_pipeline",              "timestamp" : "2022-03-04T02:48:13.562353005Z"            }          }        },        {          "processor_type" : "date",           "status" : "error",  // 第 2 個(gè)異常          "error" : {            "root_cause" : [              {                "type" : "illegal_argument_exception",                "reason" : "unable to parse date [2022/03/03 15:22:11]"              }            ],            "type" : "illegal_argument_exception",            "reason" : "unable to parse date [2022/03/03 15:22:11]",            "caused_by" : {              "type" : "illegal_argument_exception",              "reason" : "failed to parse date field [2022/03/03 15:22:11] with format [yyyy-MM-dd HH:mm:ss]",              "caused_by" : {                "type" : "date_time_parse_exception",                "reason" : "Text "2022/03/03 15:22:11" could not be parsed at index 4"              }            }          }        }      ]    }  ]}

1.3.2 on_failure 處理異常

使用 on_failure 參數(shù)可以定義發(fā)生異常時(shí)執(zhí)行的處理器列表,該參數(shù)允許在 processor 和 pipeline 級(jí)別中定義。在 pipeline 級(jí)別定義時(shí),on_failure 捕獲整個(gè) pipeline 發(fā)生的任何異常,當(dāng)產(chǎn)生異常時(shí)直接執(zhí)行 on_failure 中定義的處理器列表,不會(huì)再執(zhí)行后續(xù)的處理器。

在 processor 級(jí)別定義時(shí),on_failure 參數(shù)可以針對(duì)單個(gè)處理器進(jìn)行異常處理,會(huì)繼續(xù)執(zhí)行后續(xù)的處理器。

on_failure 參數(shù)可以同時(shí)在 pipeline 和 processor 中定義,這兩者并不沖突,比較推薦的做法是,針對(duì)某些處理器設(shè)置 processor 級(jí)別的 on_failure 處理規(guī)則,另外設(shè)置 pipeline 級(jí)別的 on_failure 處理規(guī)則作為一條兜底的規(guī)則,當(dāng) processor 級(jí)別的 on_failure 處理規(guī)則也發(fā)生異常時(shí)或者沒(méi)有設(shè)置異常處理的處理器發(fā)生異常時(shí),就可以應(yīng)用這條兜底的規(guī)則,這樣做的好處就是可以盡可能地保證我們的 ingest pipeline 的健壯性。

如下所示,在 pipeline 級(jí)別設(shè)置了 on_failure 的處理規(guī)則,注意這里的 on_failure 參數(shù)和 processors 參數(shù)是處于同一層級(jí)的。當(dāng)發(fā)生異常時(shí),on_failure 會(huì)執(zhí)行里面的 set 處理器,將索引名改為 failure-index,該索引專門用于記錄 pipeline 處理異常的文檔。之后我們就可以在 failure-index 索引中去查看哪些文檔在預(yù)處理時(shí)發(fā)生了異常,方便后續(xù)實(shí)施相應(yīng)的補(bǔ)救措施。

PUT _ingest/pipeline/failure-test-pipeline{  "processors": [    {      "convert": {        "field": "id",        "type": "long"      }    },    {      "date": {        "field": "timestamp",        "formats": [          "yyyy-MM-dd HH:mm:ss"        ],        "output_format": "yyyy/MM/dd",        "target_field": "date"      }    }  ],  "on_failure": [ // 發(fā)生異常時(shí)執(zhí)行的處理器列表    {      "set": {        "field": "_index", // 通過(guò) _index 元數(shù)據(jù)字段,可以改變寫(xiě)入的索引        "value": "failure-index"      }    }  ]}

然后往 my-index 索引中插入一條有錯(cuò)誤的文檔,將文檔 _id設(shè)置為 1。從返回結(jié)果來(lái)看,并沒(méi)有異常報(bào)錯(cuò),文檔成功寫(xiě)入了。但是仔細(xì)觀察可以發(fā)現(xiàn),文檔并沒(méi)有寫(xiě)入 my-index 索引,而是寫(xiě)入了我們記錄異常的索引 failure-index。

PUT my-index/_doc/1?pipeline=failure-test-pipeline{  "id": "S123456",  "timestamp": "2022/03/03 15:22:11",  "message": "User login successfully"}# 返回結(jié)果{  "_index" : "failure-index", // 寫(xiě)入了記錄異常的索引  "_type" : "_doc",  "_id" : "1",  "_version" : 1,  "result" : "created",  "_shards" : {    "total" : 2,    "successful" : 1,    "failed" : 0  },  "_seq_no" : 0,  "_primary_term" : 1}

查詢 my-index 文檔,確實(shí)沒(méi)有找到 _id 為 1 的這條文檔。

GET my-index/_doc/1# 返回結(jié)果{  "error" : {    "root_cause" : [      {        "type" : "index_not_found_exception",        "reason" : "no such index [my-index]",        "resource.type" : "index_expression",        "resource.id" : "my-index",        "index_uuid" : "_na_",        "index" : "my-index"      }    ],    "type" : "index_not_found_exception",    "reason" : "no such index [my-index]",    "resource.type" : "index_expression",    "resource.id" : "my-index",    "index_uuid" : "_na_",    "index" : "my-index"  },  "status" : 404}

查詢 failure-index 索引可以找到這條處理異常的文檔。

GET failure-index/_doc/1# 返回結(jié)果{  "_index" : "failure-index",  "_type" : "_doc",  "_id" : "1",  "_version" : 1,  "_seq_no" : 0,  "_primary_term" : 1,  "found" : true,  "_source" : {    "id" : "S123456",    "message" : "User login successfully",    "timestamp" : "2022/03/03 15:22:11"  }}

對(duì)于我們來(lái)說(shuō),目前 failure-index 索引記錄的信息十分有限,根據(jù)以上內(nèi)容我們無(wú)法知道是哪個(gè)處理器在執(zhí)行時(shí)產(chǎn)生了異常。在 on_failure 中提供了以下 4 個(gè)元數(shù)據(jù)字段方便我們進(jìn)行故障定位:

on_failure_pipeline:產(chǎn)生異常的 pipeline 類型的處理器中引用的 pipeline。ingest pipeline 中有一個(gè) pipeline 類型的處理器,該處理器也可以指定使用其他的 pipeline,這里注意區(qū)分 pipeline 類型的處理器和 pipeline 管道。on_failure_message:報(bào)錯(cuò)的內(nèi)容。on_failure_processor_type:產(chǎn)生異常的處理器的標(biāo)簽,標(biāo)簽可以在處理器中通過(guò) tag 參數(shù)指定。當(dāng) pipeline 中使用了多個(gè)相同類型的處理器時(shí),根據(jù)指定的標(biāo)簽可以方便我們進(jìn)行區(qū)分。on_failure_processor_tag:產(chǎn)生異常的處理器的類型。

如下所示,我們?cè)?on_failure 參數(shù)新增了一個(gè) set 處理器,將錯(cuò)誤信息寫(xiě)入 failure-index 的 failure 字段中。

PUT _ingest/pipeline/failure-test-pipeline{  "processors": [    {      "convert": {        "tag": "my-index-convert", // 設(shè)置處理器的標(biāo)簽,方便定位問(wèn)題        "field": "id",        "type": "long"      }    },    {      "date": {        "tag": "my-index-date", // 設(shè)置處理器的標(biāo)簽,方便定位問(wèn)題        "field": "timestamp",        "formats": [          "yyyy-MM-dd HH:mm:ss"        ],        "output_format": "yyyy/MM/dd",        "target_field": "date"      }    }  ],  "on_failure": [    {      "set": {        "field": "_index",        "value": "failure-index"      }    },    {      "set": {        "field": "failure",        "value": {          "on_failure_pipeline": "{{ _ingest.on_failure_pipeline }}",          "on_failure_message": "{{_ingest.on_failure_message}}",          "on_failure_processor_type": "{{_ingest.on_failure_processor_type}}",          "on_failure_processor_tag": "{{ _ingest.on_failure_processor_tag }}"        }      }    }  ]}

然后往 my-index 索引中插入一條有錯(cuò)誤的文檔,將文檔 _id設(shè)置為 2。

PUT my-index/_doc/2?pipeline=failure-test-pipeline{  "id": "S123456",  "timestamp": "2022/03/03 15:22:11",  "message": "User login successfully"}

查看 failure-index 索引記錄的錯(cuò)誤信息,可以得知打了 my-index-convert 標(biāo)簽的 convert 類型的處理器在處理 S123456 字符串時(shí)引發(fā)了異常。細(xì)心的同學(xué)可能會(huì)注意到, 在返回結(jié)果中 on_failure_pipeline 的內(nèi)容為空,這是由于異常并不是由 pipeline 類型的處理器產(chǎn)生的,所以這里的結(jié)果是空值。如果只是想獲取客戶端直接調(diào)用的 ingest pipeline,那么可以通過(guò) _ingest.pipeline來(lái)獲取。

{  "_index" : "failure-index",  "_type" : "_doc",  "_id" : "2",  "_version" : 1,  "_seq_no" : 1,  "_primary_term" : 1,  "found" : true,  "_source" : {    "failure" : {      "on_failure_pipeline" : "", // 產(chǎn)生異常的 pipeline 類型的處理器中引用的 pipeline      "on_failure_message" : "For input string: \\\"S123456\\\"", // 報(bào)錯(cuò)的內(nèi)容      "on_failure_processor_tag" : "my-index-convert", // 產(chǎn)生異常的處理器的標(biāo)簽      "on_failure_processor_type" : "convert" // 產(chǎn)生異常的處理器的類型    },    "id" : "S123456",    "message" : "User login successfully",    "timestamp" : "2022/03/03 15:22:11"  }}

上面的示例介紹了 on_failure 參數(shù)在 pipeline 級(jí)別的處理,現(xiàn)在介紹下 on_failure 參數(shù)如何在 processor 級(jí)別進(jìn)行處理。如下所示,在 convert 和 date 處理器中分別通過(guò) on_failure 參數(shù)設(shè)置了發(fā)生異常時(shí)執(zhí)行的處理器列表:當(dāng)convert 進(jìn)行類型轉(zhuǎn)換發(fā)生異常時(shí),將當(dāng)前時(shí)間的毫秒數(shù)設(shè)置 id 字段的值;當(dāng) date 處理器解析時(shí)間發(fā)生異常時(shí),使用 ingest 攝取時(shí)間的日期戳作為 date 字段的值。

PUT _ingest/pipeline/failure-test-pipeline{  "processors": [    {      "convert": {        "field": "id",        "type": "long",        "on_failure": [ // 發(fā)生異常時(shí)將當(dāng)前時(shí)間的毫秒數(shù)設(shè)置 id 字段的值          {            "script": {                "source": """                 long timeNow = Calendar.getInstance().getTimeInMillis();                 ctx.id = timeNow;              """            }          }        ]      }    },    {      "date": {        "field": "timestamp",        "formats": [          "yyyy-MM-dd HH:mm:ss"        ],        "output_format": "yyyy/MM/dd",        "target_field": "date",        "on_failure": [  // 發(fā)生異常時(shí)使用 ingest 攝取時(shí)間的日期戳作為 date 字段的值          {            "set": {              "field": "date",              "value": "{{_ingest.timestamp}}"            }          },          {            "date": {              "field": "date",              "formats": [                "yyyy-MM-dd"T"HH:mm:ss.SSSZ"              ],              "output_format": "yyyy/MM/dd",              "target_field": "date"            }          }        ]      }    }  ]}

然后往 my-index 索引中插入一條有錯(cuò)誤的文檔,將文檔 _id設(shè)置為 3。文檔正常寫(xiě)入 my-index 中,沒(méi)有返回報(bào)錯(cuò)信息。

PUT my-index/_doc/3?pipeline=failure-test-pipeline{  "id": "S123456",  "timestamp": "2022/03/03 15:22:11",  "message": "User login successfully"}# 返回結(jié)果{  "_index" : "my-index",  "_type" : "_doc",  "_id" : "3",  "_version" : 1,  "result" : "created",  "_shards" : {    "total" : 2,    "successful" : 1,    "failed" : 0  },  "_seq_no" : 0,  "_primary_term" : 1}

獲取 my-index 索引中 _id為 3 的文檔,可以看到 id 字段的值并不是傳入的原始文檔中的 S123456,而是當(dāng)前時(shí)間對(duì)應(yīng)的毫秒值;date 字段的值被設(shè)置為了 ingest 攝取時(shí)間的日期。

GET my-index/_doc/3# 返回結(jié)果{  "_index" : "my-index",  "_type" : "_doc",  "_id" : "3",  "_version" : 1,  "_seq_no" : 0,  "_primary_term" : 1,  "found" : true,  "_source" : {    "date" : "2022/03/03",    "id" : 1646349731000,    "message" : "User login successfully",    "timestamp" : "2022/03/03 15:22:11"  }}

1.3.3 fail 主動(dòng)拋出異常

和 ignore_failure, on_failure 兩種處理異常的方式不同,使用 fail 處理器可以基于某些條件主動(dòng)拋出異常,當(dāng)你想要主動(dòng)讓 pipeline 失敗并且返回特定的報(bào)錯(cuò)信息給請(qǐng)求者時(shí),可以使用這種方式。如下所示,當(dāng) tags 字段中不包含 production 時(shí),fail 處理器會(huì)主動(dòng)拋出異常,在 message 參數(shù)中可以自定義相應(yīng)的報(bào)錯(cuò)信息。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "fail": {          "if": "ctx.tags.contains("production") != true",          "message": "The production tag is not present, found tags: {{{tags}}}"        }      }    ]  },  "docs": [    {      "_source": {        "tags": ["development"]      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "error" : {        "root_cause" : [          {            "type" : "fail_processor_exception",            // 自定義的報(bào)錯(cuò)信息            "reason" : "The production tag is not present, found tags: {0=development}"          }        ],        "type" : "fail_processor_exception",        "reason" : "The production tag is not present, found tags: {0=development}"      }    }  ]}

1.4 執(zhí)行條件判斷

每種類型的處理器中都支持 if 參數(shù)判斷執(zhí)行處理器的條件,在 if 參數(shù)中使用 painless腳本進(jìn)行邏輯判斷,當(dāng) if 的判斷結(jié)果為 true 時(shí),相應(yīng)的處理器才會(huì)執(zhí)行。如下所示,創(chuàng)建了 if-test-pipeline,我們只想日志級(jí)別是 error 的消息,當(dāng) level 字段的值是 notice 時(shí),丟棄該文檔。

PUT _ingest/pipeline/if-test-pipeline{  "processors": [    {      "drop": {        "description": "Drop documents with level of notice",        "if": "ctx.level == "notice""      }    }  ]}

然后往 log-index 索引中寫(xiě)入兩條文檔,指定使用 if-test-pipeline,其中一條文檔的 level 值等于 notice,另一條的 level 值等于 error。

POST log-index/_doc?pipeline=if-test-pipeline{  "level": "notice",  "message": "this is a notice log"}POST log-index/_doc?pipeline=if-test-pipeline{  "level": "error",  "message": "this is a error log"}

查詢 log-index 索引,只返回了 1 條文檔,level 等于 notice 的文檔被丟棄了。

GET log-index/_search# 返回結(jié)果{  "took" : 1,  "timed_out" : false,  "_shards" : {    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0  },  "hits" : {    "total" : {      "value" : 1,      "relation" : "eq"    },    "max_score" : 1.0,    "hits" : [      {        "_index" : "log-index",        "_type" : "_doc",        "_id" : "fV9ET38BKRZVqZj9X8yC",        "_score" : 1.0,        "_source" : {          "level" : "error",          "message" : "this is a error log"        }      }    ]  }}

接下來(lái)介紹一種高級(jí)的用法,將一個(gè) pipeline 作為多個(gè)不同的索引或者數(shù)據(jù)流默認(rèn)的 pipeline,在這個(gè) pipeline 中創(chuàng)建多個(gè) pipeline 類型的處理器,每個(gè)處理器根據(jù)傳入的文檔選擇后臺(tái)真正要執(zhí)行的 pipeline。這樣做的好處就是,如果要更改后臺(tái)使用的 pipeline,只需要修改默認(rèn)的 pipeline 中引用的 pipeline 即可,客戶端的代碼或者索引中的設(shè)置無(wú)需修改,可以做到業(yè)務(wù)無(wú)感知的切換。如下所示,先創(chuàng)建兩個(gè) pipeline,其中 httpd_pipeline 用于處理 http 相關(guān)的日志,syslog_pipeline 用于處理 syslog 相關(guān)的日志。

PUT _ingest/pipeline/httpd_pipeline{  "processors": [    {      "set": {        "field": "message",        "value": "this is a apache_httpd log"      }    }  ]}PUT _ingest/pipeline/syslog_pipeline{  "processors": [    {      "set": {        "field": "message",        "value": "this is a syslog log"      }    }  ]}

接著創(chuàng)建一個(gè) default_pipeline,使用 if 參數(shù)進(jìn)行判斷,當(dāng) service 字段的值等于 apache_httpd 時(shí),執(zhí)行 httpd_pipeline,當(dāng) service 字段的值等于 syslog 時(shí),執(zhí)行 syslog_pipeline。

PUT _ingest/pipeline/default_pipeline{  "processors": [    {      "pipeline": {        "description": "If "service" is "apache_httpd", use "httpd_pipeline"",        "if": "ctx.service == "apache_httpd"",        "name": "httpd_pipeline"      }    },    {      "pipeline": {        "description": "If "service" is "syslog", use "syslog_pipeline"",        "if": "ctx.service == "syslog"",        "name": "syslog_pipeline"      }    }  ]}

使用 simulate API 進(jìn)行驗(yàn)證,可以看到由于傳入的文檔的 service 字段的值是 syslog,因此這條文檔被交給 syslog_pipeline 進(jìn)行處理。

POST _ingest/pipeline/default_pipeline/_simulate{  "docs": [    {      "_source": {        "service": "syslog"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "message" : "this is a syslog log", // syslog_pipeline 添加的內(nèi)容          "service" : "syslog"        },        "_ingest" : {          "timestamp" : "2022-03-04T07:18:53.531846541Z"        }      }    }  ]}

2 Processor 處理器

下表列出了 Elasticsearch 所有 processor 處理器的類型,并且根據(jù)各個(gè)處理器的用途作了相應(yīng)的分類。下面的小節(jié)中僅會(huì)演示說(shuō)明一些常用的處理器,未介紹到的部分讀者可以自行查閱官方文檔。

類別

處理器

作用

數(shù)組處理

append

添加元素

數(shù)組處理

sort

對(duì)數(shù)組中的元素進(jìn)行排序

數(shù)組處理

join

將數(shù)組中的每個(gè)元素拼接成單個(gè)字符串

數(shù)組處理

foreach

遍歷處理數(shù)組中的元素

結(jié)構(gòu)化數(shù)據(jù)處理

json

將 json 字符串轉(zhuǎn)換為結(jié)構(gòu)化的 json 對(duì)象

結(jié)構(gòu)化數(shù)據(jù)處理

kv

以鍵值對(duì)的方式提取字段

結(jié)構(gòu)化數(shù)據(jù)處理

csv

從單個(gè)文本字段中提取 CSV 行中的字段

匹配處理

gsub

替換字符串中指定的內(nèi)容,支持正則表達(dá)式匹配

匹配處理

grok

使用正則表達(dá)式提取字段,grok 處理器內(nèi)置預(yù)定義的表達(dá)式

匹配處理

dissect

和 grok 處理器類似,語(yǔ)法比 grok 簡(jiǎn)單,不使用正則表達(dá)式。可以使用修飾符控制解析方式

字符串處理

lowercase

將字符串轉(zhuǎn)換為小寫(xiě)

字符串處理

uppercase

將字符串轉(zhuǎn)換為大寫(xiě)

字符串處理

split

指定分隔符將字符串拆分為數(shù)組

字符串處理

html_strip

刪除字符串中的 HTLM 標(biāo)簽

字符串處理

trim

去掉字符串中的前后空格

字段處理

rename

重命名字段

字段處理

remove

刪除字段

字段處理

set

為字段賦值

字段處理

script

處理復(fù)雜的邏輯,可以執(zhí)行內(nèi)聯(lián)或者存儲(chǔ)腳本

字段處理

dot_expander

將帶有點(diǎn)的字段擴(kuò)展為對(duì)象字段

文檔處理

drop

刪除文檔

文檔處理

fingerprint

計(jì)算文檔內(nèi)容的哈希值

網(wǎng)絡(luò)處理

network_direction

根據(jù)給定的源 IP 地址、目標(biāo) IP 地址和內(nèi)部網(wǎng)絡(luò)列表下計(jì)算網(wǎng)絡(luò)請(qǐng)求的方向

網(wǎng)絡(luò)處理

community_id

計(jì)算網(wǎng)絡(luò)流數(shù)據(jù)中的 community id, 可以使用 community id 來(lái)關(guān)聯(lián)與單個(gè)流相關(guān)的網(wǎng)絡(luò)事件

網(wǎng)絡(luò)處理

registered_domain

從完全限定域名 (FQDN) 中提取注冊(cè)域(也稱為有效頂級(jí)域或 eTLD)、子域和頂級(jí)域。

HTTP 處理

urldecode

URL 解碼

HTTP 處理

user_agent

從 user_agent 中提取詳細(xì)信息, 例如操作系統(tǒng), 瀏覽器版本等等

HTTP 處理

uri_parts

從 URI 中提取詳細(xì)信息, 例如域名, 端口, 路徑等等

外部結(jié)合

pipeline

執(zhí)行另一個(gè) ingest pipeline

外部結(jié)合

enrich

添加來(lái)自另一個(gè)索引的數(shù)據(jù),類似關(guān)系型數(shù)據(jù)庫(kù)中的 join 關(guān)聯(lián)查詢

外部結(jié)合

geoip

根據(jù)來(lái)自 Maxmind 數(shù)據(jù)庫(kù)的數(shù)據(jù)添加有關(guān) IP 地址地理位置的信息

外部結(jié)合

set_security_user

獲取索引文檔用戶的詳細(xì)信息,例如 username, roles, email, full_name, metadata

外部結(jié)合

inference

使用預(yù)訓(xùn)練的數(shù)據(jù)分析模型來(lái)處理數(shù)據(jù),用于機(jī)器學(xué)習(xí)領(lǐng)域

時(shí)間處理

date_index_name

根據(jù)文檔中的時(shí)間戳字段將文檔寫(xiě)入基于時(shí)間的索引

時(shí)間處理

date

從字段中解析日期作為文檔的時(shí)間戳

類型處理

convert

字段類型轉(zhuǎn)換,例如 "1234" -> 1234

類型處理

byte

將人類可讀的字節(jié)值轉(zhuǎn)換為字節(jié)的數(shù)值,例如 1kb -> 1024

異常處理

fail

主動(dòng)拋出異常

圖形處理

circle

將圓形轉(zhuǎn)換為近似多邊形

2.1 Lowercase & Uppercase

lowercase 處理器可以將字符串轉(zhuǎn)換為其等效的小寫(xiě)字母。如果該字段是一個(gè)字符串?dāng)?shù)組,則該數(shù)組的所有成員都將被轉(zhuǎn)換。uppercase 處理器和 lowercase 相反,將字符串轉(zhuǎn)換為大寫(xiě)字母。如下所示,使用 lowercase 處理器將 name 字段轉(zhuǎn)換為小寫(xiě)字母。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "lowercase": {          "field": "name"        }      }    ]  },  "docs": [    {      "_source": {        "name": "Tom"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "name" : "tom"        },        "_ingest" : {          "timestamp" : "2022-02-27T10:43:11.718792423Z"        }      }    }  ]}

2.2 Split

split 處理器可以根據(jù)指定的分隔符,將字符串拆分為數(shù)組。如下所示,以 _符號(hào)作為分隔符,將 num 字段拆分為數(shù)組。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "split": {          "field": "num",          "separator": "_"        }      }    ]  },  "docs": [    {      "_source": {        "num": "111_222_333_444"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "num" : [            "111",            "222",            "333",            "444"          ]        },        "_ingest" : {          "timestamp" : "2022-02-27T11:10:25.249883405Z"        }      }    }  ]}

2.3 Trim

trim 處理器可以去掉字符串頭尾的空格。如下所示,使用 trim 處理器去掉 message 字段頭尾的空格。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "trim": {          "field": "message"        }      }    ]  },  "docs": [    {      "_source": {        "message": "    Elasticsearch is the distributed search and analytics engine    "      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "message" : "Elasticsearch is the distributed search and analytics engine"        },        "_ingest" : {          "timestamp" : "2022-02-27T11:12:26.952402786Z"        }      }    }  ]}

2.4 Join

join 處理器可以將數(shù)組中的每個(gè)元素拼接成單個(gè)字符串。如下所示,使用 _符號(hào)作為分隔符,將 animal 字段中的元素拼接成單個(gè)字符串。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "join": {          "field": "animal",          "separator": "-"        }      }    ]  },  "docs": [    {      "_source": {        "animal": ["dog", "cat", "monkey"]      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "animal" : "dog-cat-monkey"        },        "_ingest" : {          "timestamp" : "2022-02-27T10:33:39.63520118Z"        }      }    }  ]}

2.5 Foreach

使用 foreach 處理器可以遍歷數(shù)組,對(duì)其中的每個(gè)元素進(jìn)行處理,使用 processor 參數(shù)指定一個(gè)處理器來(lái)處理數(shù)組中元素。在 foreach 處理器內(nèi)引用的處理通過(guò) _ingest._value鍵來(lái)獲取數(shù)組中每個(gè)元素的值。如下所示,將 values 字段中的每個(gè)元素轉(zhuǎn)換為大寫(xiě)字母。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "foreach": {          "field": "values",          "processor": {            "uppercase": {              "field": "_ingest._value"            }          }        }      }    ]  },  "docs": [    {      "_source": {         "values" : ["foo", "bar", "baz"]      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "values" : [            "FOO",            "BAR",            "BAZ"          ]        },        "_ingest" : {          "_value" : null,          "timestamp" : "2022-02-27T10:06:44.235660464Z"        }      }    }  ]}

2.6 KV

kv 處理器可以以鍵值對(duì)的方式提取字段。如下所示,以空格作為不同鍵值對(duì)的分隔符,以 =拆分每組鍵值對(duì)的鍵和值。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "kv": {          "field": "message",          "field_split": " ", // 拆分鍵值對(duì)          "value_split": "=" // 拆分鍵值對(duì)的鍵和值        }      }    ]  },  "docs": [    {      "_source": {        "message": "ip=1.2.3.4 error=REFUSED"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "message" : "ip=1.2.3.4 error=REFUSED",          "error" : "REFUSED",          "ip" : "1.2.3.4"        },        "_ingest" : {          "timestamp" : "2022-02-27T10:40:31.072140367Z"        }      }    }  ]}

2.7 CSV

csv 處理器會(huì)將字段中的內(nèi)容看作 csv 文本的一行,根據(jù) separator 參數(shù)指定的分隔符,將拆分后的值賦值給 target_fields 列表中定義的字段。如下所示,將 person 字段按照 |符號(hào)進(jìn)行拆分,依次賦值給 name, age, country 字段。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "csv": {          "field": "person",          "target_fields": [ // 指定每列的字段值            "name",            "age",            "country"          ],          "separator": "|" // 字段間的分隔符        }      }    ]  },  "docs": [    {      "_source": {        "person": "zhangsan|18|china"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "country" : "china",          "person" : "zhangsan|18|china",          "name" : "zhangsan",          "age" : "18"        },        "_ingest" : {          "timestamp" : "2022-02-24T09:39:48.708832221Z"        }      }    }  ]}

2.8 Grok

grok 處理器可以使用正則表達(dá)式來(lái)提取字段,并且內(nèi)置了許多常用的表達(dá)式,可以直接通過(guò)表達(dá)式別名進(jìn)行使用。可以使用以下命令獲取所有 grok 內(nèi)置的表達(dá)式。

GET _ingest/processor/grok?s

返回結(jié)果如下,例如我們想匹配 IP 地址就可以直接使用 %{IP}進(jìn)行匹配,想匹配 MAC 地址可以使用 %{MAC}進(jìn)行匹配。表達(dá)式別名還可以引用其他的表達(dá)式別名,比如表達(dá)式別名 IP 就引用了IPV4 和 IPV6 兩個(gè)別名。

接下來(lái)我們嘗試使用 grok 處理器解析一條日志。%{:}表示將表達(dá)式匹配的值賦值到指定的 field 字段中,表達(dá)式可以是我們自定義的表達(dá)式,也可以是表達(dá)式別名;%{}表示只匹配不賦值。

55.3.244.1 GET /index.html 15824 0.043"

在上面的日志中:

55.3.244.2 是客戶端的 IP 地址,使用 %{IP:client}匹配 IP 地址,賦值到 client 字段中;GET 是 HTTP 的請(qǐng)求方法,使用 %{WORD:method}匹配數(shù)字和字母,賦值到 method 字段中;/index.html 是請(qǐng)求的 URI 路徑,使用 %{URIPATHPARAM:request}匹配 URI 路徑和參數(shù),賦值到 request 字段中;15824 是請(qǐng)求的字節(jié)大小,使用 %{NUMBER:bytes:int}匹配數(shù)字,賦值到 bytes 字段中,并且將字段設(shè)置為 int 類型;0.043 是請(qǐng)求的處理時(shí)間,使用 %{NUMBER:duration:double}匹配數(shù)字,賦值到 duration字段中,并且將字段設(shè)置為 double 類型。

上面用到的 IP, WORD, URIPATHPARAM, NUMBER 表達(dá)式都是 grok 內(nèi)置的表達(dá)式別名,可以直接拿來(lái)使用。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "grok": {          "field": "message",          "patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes:int} %{NUMBER:duration:double}"]        }      }    ]  },  "docs":[    {      "_source": {        "message": "55.3.244.1 GET /index.html 15824 0.043"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "duration" : 0.043,          "request" : "/index.html",          "method" : "GET",          "bytes" : 15824,          "client" : "55.3.244.1",          "message" : "55.3.244.1 GET /index.html 15824 0.043"        },        "_ingest" : {          "timestamp" : "2022-03-01T03:33:57.627169176Z"        }      }    }  ]}

除了使用 grok 內(nèi)置的表達(dá)式以外,grok 處理器也允許我們自定義表達(dá)式??梢栽?pattern_definitions參數(shù)中進(jìn)行設(shè)置,其中鍵是我們自定義表達(dá)式的別名,值是具體的正則表達(dá)式。如下所示,我們定義了兩個(gè)表達(dá)式別名:FAVORITE_DOG 使用正則表達(dá)式 \w+, 匹配數(shù)字和字母,注意這里需要額外使用一個(gè) \來(lái)進(jìn)行轉(zhuǎn)義;RGB 可以匹配 RED,GREEN,BLUE 3 種顏色。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "grok": {          "field": "message",          "patterns": [            "my %{FAVORITE_DOG:dog} is colored %{RGB:color}"          ],          "pattern_definitions": { // 自定義表達(dá)式            "FAVORITE_DOG": "\\w+",  // 匹配數(shù)字和字母            "RGB": "RED|GREEN|BLUE"  // 匹配 3 個(gè)顏色          }        }      }    ]  },  "docs": [    {      "_source": {        "message": "my beagle is colored BLUE"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "message" : "my beagle is colored BLUE",          "color" : "BLUE",          "dog" : "beagle"        },        "_ingest" : {          "timestamp" : "2022-03-01T03:34:33.933398405Z"        }      }    }  ]}

有時(shí)候一種匹配規(guī)則可能難以匹配所有的內(nèi)容,我們可以在正則表達(dá)式中通過(guò)或的邏輯進(jìn)行判斷,但是這樣會(huì)使得寫(xiě)出來(lái)的表達(dá)式難以閱讀。這里還有一種更好的方法,在 grok 處理器中,patterns 參數(shù)允許填寫(xiě)多個(gè)表達(dá)式,這樣我們的匹配規(guī)則看上去就一目了然,處理器會(huì)使用最先匹配到的表達(dá)式。如下所示,我們?cè)O(shè)置了 FAVORITE_DOG 和 FAVORITE_CAT 兩個(gè)表達(dá)式都用于解析 pet 字段,如果想要知道是哪個(gè)表達(dá)式匹配了內(nèi)容,可以設(shè)置參數(shù) "trace_match": true,這樣在返回結(jié)果的 _grok_match_index 字段中可以看到匹配了哪個(gè)表達(dá)式,其中 1 表示匹配了第二個(gè)表達(dá)式。

POST _ingest/pipeline/_simulate{  "pipeline": {    "description": "parse multiple patterns",    "processors": [      {        "grok": {          "field": "message",          "patterns": [  // patterns 是數(shù)組, 可以填寫(xiě)多個(gè)表達(dá)式            "%{FAVORITE_DOG:pet}",            "%{FAVORITE_CAT:pet}"          ],          "pattern_definitions": {            "FAVORITE_DOG": "beagle",            "FAVORITE_CAT": "burmese"          },          "trace_match": true // 顯示匹配了哪一個(gè)表達(dá)式, 第一個(gè)從 0 開(kāi)始        }      }    ]  },  "docs": [    {      "_source": {        "message": "I love burmese cats!"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "message" : "I love burmese cats!",          "pet" : "burmese"        },        "_ingest" : {          "_grok_match_index" : "1", // 匹配了第 2 個(gè)表達(dá)式          "timestamp" : "2022-03-01T03:35:05.490483581Z"        }      }    }  ]}

在 Kibana 的界面上還提供了 Grok Debugger 方便我們調(diào)試 grok 表達(dá)式。點(diǎn)擊 Management -> Dev Tools -> Grok Gebugger進(jìn)入調(diào)試界面。

從上圖可以看到,調(diào)試界面分為以下 4 個(gè)部分:

Sample Data: 填寫(xiě)測(cè)試的文本。Grok Pattern:填寫(xiě) grok 表達(dá)式,相當(dāng)于 grok 處理器中 patterns 定義的內(nèi)容。Custom Patterns:自定義表達(dá)式,相當(dāng)于 grok 處理器中 pattern_definitions 定義的內(nèi)容。在 Custom Patterns 中每行表示一個(gè)自定義表達(dá)式,最左邊的字符串表示我們自定義的表達(dá)式別名,右邊內(nèi)容是表達(dá)式的內(nèi)容,不需要進(jìn)行符號(hào)轉(zhuǎn)義。Structured Data:處理完的結(jié)果。

我們將示例中的內(nèi)容按照上面的說(shuō)明填寫(xiě)到相應(yīng)的位置,點(diǎn)擊 Simulate,就可以看到解析完成后的結(jié)構(gòu)化數(shù)據(jù)了。

2.9 Dissect

dissect 和 grok 處理器類似,都是用于從單個(gè)文本字段中提取結(jié)構(gòu)化字段。與 grok 相比,dissect 最大的優(yōu)勢(shì)就是簡(jiǎn)單和快速,dissect 在解析時(shí)不使用正則表達(dá)式,這使得 dissect 的語(yǔ)法更加簡(jiǎn)單,并且執(zhí)行速度比 grok 更快。當(dāng)然 grok 也有自己的獨(dú)到之處,grok 可以同時(shí)使用多個(gè) patterns 來(lái)對(duì)內(nèi)容來(lái)進(jìn)行匹配,這是 dissect 所不具備的能力。接下來(lái)首先介紹一下 dissect 處理器簡(jiǎn)單的使用方法,如下所示,我們要對(duì)一行日志內(nèi)容進(jìn)行解析,%{}表示將匹配到的字符串作為 field 字段的值。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "dissect": {          "field": "message",          "pattern": "%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}"        }      }    ]  },  "docs": [    {      "_source": {        "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "request" : "/english/venues/cities/images/montpellier/18.gif",          "auth" : "-",          "ident" : "-",          "verb" : "GET",          "message" : """1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] "GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0" 200 3171""",          "@timestamp" : "30/Apr/1998:22:00:52 +0000",          "size" : "3171",          "clientip" : "1.2.3.4",          "httpversion" : "1.0",          "status" : "200"        },        "_ingest" : {          "timestamp" : "2022-03-01T06:37:23.791866312Z"        }      }    }  ]}

在 dissect 中可以使用修飾符改變默認(rèn)的匹配規(guī)則,例如可以指定 dissect 忽略某些字段、拼接多個(gè)字符等等。dissect 的修飾符說(shuō)明如下表所示。

修飾符

用途

位置

示例

->

跳過(guò) -> 右邊重復(fù)的字符

最右邊

%{keyname1->}

將多個(gè)結(jié)果附加到一起作為輸出

左邊

%{+keyname} %{+keyname}

和 /n

指定附加結(jié)果的順序

號(hào)在左邊,/n 放在右邊,n 是順序的數(shù)字

%{+keyname/2} %{+keyname/1}

?

跳忽略匹配項(xiàng)

左邊

%{?keyname}

和 &

輸出鍵設(shè)置為 * 的值,輸出值設(shè)置為 & 的值

左邊

%{*key} %{&value}

dissect 默認(rèn)的匹配算法非常嚴(yán)格,要求 pattern 中的所有字符都與源字符串完全匹配。例如 %{a} %只能匹配“字符串1 字符串2”(中間 1 個(gè)空格),將無(wú)法匹配“字符串1 字符串2”(中間 5 個(gè)空格)。要處理這種情況就可以使用 ->修飾符跳過(guò)箭頭右邊重復(fù)的字符,例如 %{a->} %就可以跳過(guò)字符串1 和字符串 2 中間的多個(gè)空格,只對(duì)空格匹配一次。要跳過(guò)的字符我們可以自由設(shè)置,如下所示,使用 ->修飾符跳過(guò)重復(fù)的 ~字符。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "dissect": {          "field": "message",          "pattern": "%{ts->}~%{level}" // 跳過(guò)重復(fù)的 ~ 字符        }      }    ]  },  "docs": [    {      "_source": {        "message": "1998-08-10T17:15:42,466~~~~~~~WARN"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "message" : "1998-08-10T17:15:42,466~~~~~~~WARN",          "level" : "WARN",          "ts" : "1998-08-10T17:15:42,466"        },        "_ingest" : {          "timestamp" : "2022-03-01T06:38:20.328535452Z"        }      }    }  ]}

假如我們想將多個(gè)匹配的字符拼接為一個(gè)字段,可以使用 +修飾符,append_separator 參數(shù)可以指定分隔符, 默認(rèn)以空格作為分隔符。如下所示,我們將匹配的多個(gè)字符串拼接為 name 字段,使用 ,作為分隔符。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "dissect": {          "field": "message",          "pattern": "%{+name} %{+name} %{+name} %{+name}",          "append_separator": ","        }      }    ]  },  "docs": [    {      "_source": {        "message": "john jacob jingleheimer schmidt"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "name" : "john,jacob,jingleheimer,schmidt",          "message" : "john jacob jingleheimer schmidt"        },        "_ingest" : {          "timestamp" : "2022-03-02T13:41:40.058126802Z"        }      }    }  ]}

如果我們想改變字符串拼接的順序,可以同時(shí)使用 +/n修飾符指定順序,其中 n 是順序的數(shù)字。如下所示,可以看到返回結(jié)果中的 name 字段按照我們指定的順序拼接。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "dissect": {          "field": "message",          "pattern": "%{+name/2} %{+name/4} %{+name/3} %{+name/1}",          "append_separator": ","        }      }    ]  },  "docs": [    {      "_source": {        "message": "john jacob jingleheimer schmidt"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "name" : "schmidt,john,jingleheimer,jacob",          "message" : "john jacob jingleheimer schmidt"        },        "_ingest" : {          "timestamp" : "2022-03-02T13:47:44.332086601Z"        }      }    }  ]}

前面提到過(guò),dissect 要求 pattern 中的所有字符都與源字符串完全匹配,否則解析將不會(huì)成功。如果我們僅僅想讓某些字符串在匹配時(shí)充當(dāng)“占位”的角色,并不想讓它出現(xiàn)在最終的文檔中,那么就可以使用 ?修飾符來(lái)忽略最終結(jié)果中的匹配項(xiàng)。除了使用 ?修飾符以外,還可以用一個(gè)空鍵 %{}實(shí)現(xiàn)相同的效果,但是為了便于閱讀,建議還是使用 %{?}的方式。如下所示,ident 和 auth 字段都不會(huì)出現(xiàn)在最終的結(jié)果中,僅用于字符串匹配。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "dissect": {          "field": "message",          "pattern": "%{clientip} %{?ident} %{?auth} [%{@timestamp}]"        }      }    ]  },  "docs": [    {      "_source": {        "message": "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000]"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "@timestamp" : "30/Apr/1998:22:00:52 +0000",          "message" : "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000]",          "clientip" : "1.2.3.4"        },        "_ingest" : {          "timestamp" : "2022-03-02T13:50:56.099402273Z"        }      }    }  ]}

*&修飾符可以用于解析包含鍵值對(duì)的內(nèi)容,其中輸出鍵設(shè)置為 *的值,輸出值設(shè)置為 &的值。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "dissect": {          "field": "message",          "pattern": "[%{ts}] [%{level}] %{*p1}:%{&p1} %{*p2}:%{&p2}"        }      }    ]  },  "docs": [    {      "_source": {        "message": "[2018-08-10T17:15:42,466] [ERR] ip:1.2.3.4 error:REFUSED"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "level" : "ERR",          "ip" : "1.2.3.4",          "message" : "[2018-08-10T17:15:42,466] [ERR] ip:1.2.3.4 error:REFUSED",          "error" : "REFUSED",          "ts" : "2018-08-10T17:15:42,466"        },        "_ingest" : {          "timestamp" : "2022-03-02T14:00:54.96982616Z"        }      }    }  ]}

2.10 Rename

rename 處理器用于重命名現(xiàn)有字段。如果該字段不存在或者重命名的字段已存在,則會(huì)引發(fā)異常。如下所示,將 provider 字段重命名為 cloud.provider。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "rename": {          "field": "provider",          "target_field": "cloud.provider"        }      }    ]  },  "docs": [    {      "_source": {        "provider": "Aliyun"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "cloud" : {            "provider" : "Aliyun"          }        },        "_ingest" : {          "timestamp" : "2022-02-27T10:57:47.821558199Z"        }      }    }  ]}

2.11 Remove

remove 處理器用于刪除現(xiàn)有字段。如果刪除的字段不存在,則會(huì)引發(fā)異常。如下所示,使用 remove 處理器刪除文檔中的 name 和 location 字段。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "remove": {          "field": ["age", "location"]        }      }    ]  },  "docs": [    {      "_source": {        "name": "tom",        "age": 18,        "location": "United States"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "name" : "tom"        },        "_ingest" : {          "timestamp" : "2022-02-27T10:56:05.119755281Z"        }      }    }  ]}

2.12 Set

set 處理器用于為字段賦值,并且在賦值的時(shí)候還可以使用 {{{ }}}符號(hào)從其他字段復(fù)制值,然后和指定字符串進(jìn)行拼接。如下所示,將 version 字段的值設(shè)置為 2,host.os.name 字段的值為 copy from 字符串拼接 os 字段的結(jié)果。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "set": {          "field": "host.os.name",          "value": "copy from {{{os}}}" // 從 os 字段復(fù)制值進(jìn)行拼接        }      },      {        "set": {          "field": "version",          "value": "2" // 設(shè)置靜態(tài)值        }      }    ]  },  "docs": [    {      "_source": {        "os": "Ubuntu"      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "host" : {            "os" : {              "name" : "copy from Ubuntu"            }          },          "os" : "Ubuntu",          "version" : "2"        },        "_ingest" : {          "timestamp" : "2022-02-28T13:39:31.035666829Z"        }      }    }  ]}

2.13 Script

對(duì)于復(fù)雜的處理邏輯,如果使用 Elasticseach 其他自帶的處理器無(wú)法實(shí)現(xiàn),那么可以嘗試在 script 處理器中編寫(xiě)腳本進(jìn)行處理。在 script 處理器中通過(guò) lang 參數(shù)可以指定腳本語(yǔ)言,通常我們使用 painless 作為腳本語(yǔ)言,這也是 Elasticsearch 中默認(rèn)的腳本語(yǔ)言。在 script 處理器中,腳本在 ingest 上下文中運(yùn)行,我們可以通過(guò) ctx["field"]或者ctx.field語(yǔ)法來(lái)訪問(wèn)文檔中的字段。如下所示,傳入的文檔中有一個(gè)數(shù)字類型的參數(shù) num,我們?cè)谀_本中通過(guò) if else 條件語(yǔ)句進(jìn)行判斷,當(dāng) num 等于 7 時(shí),將 result 的值設(shè)置為 happy;當(dāng) num 等于 4 時(shí),將 result 的結(jié)果設(shè)置為 sad;當(dāng) num 是其他值時(shí),將 result 的結(jié)果設(shè)置為 normal。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "script": {          "lang": "painless",          "source": """              if(ctx.num == 7){                ctx.result = "happy"              }else if(ctx.num == 4){                ctx.result = "sad"              }else {                ctx.result = "normal"              }          """        }      }    ]  },  "docs": [    {      "_source": {        "num": 7      }    }  ]}# 返回結(jié)果{  "docs" : [    {      "doc" : {        "_index" : "_index",        "_type" : "_doc",        "_id" : "_id",        "_source" : {          "result" : "happy",          "num" : 7        },        "_ingest" : {          "timestamp" : "2022-03-02T14:20:27.776240111Z"        }      }    }  ]}

2.14 Drop

drop 處理器可以根據(jù)條件刪除指定的文檔。如下所示,刪除 name 字段值為 tom 的文檔。

POST _ingest/pipeline/_simulate{  "pipeline": {    "processors": [      {        "drop": {          "if": "ctx.name == "tom""        }      }    ]  },  "docs": [    {      "_source": {        "name": "tom",        "age": 18      }    }  ]}# 返回結(jié)果{  "docs" : [    null  ]}

3 Ingest Pipeline 應(yīng)用場(chǎng)景

Ingest Pipeline 主要有以下 4 類應(yīng)用場(chǎng)景:

寫(xiě)入時(shí)指定 pipeline,單條寫(xiě)入或者使用 _bulk API 批量寫(xiě)入時(shí)都可以使用。更新時(shí)指定 pipeline。定義索引或者模板時(shí)指定 pipeline,有兩個(gè)相關(guān)的參數(shù):- index.default_pipeline參數(shù)可以定義default pipeline(默認(rèn)執(zhí)行的 pipeline),當(dāng)請(qǐng)求中沒(méi)有指定 pipeline 時(shí)執(zhí)行;- index.final_pipeline參數(shù)可以定義final pipeline(最終執(zhí)行的 pipeline),在所有 pipeline 執(zhí)行完后再執(zhí)行。reindex 時(shí)指定 pipeline,在重建索引或者數(shù)據(jù)遷移時(shí)使用。

3.1 寫(xiě)入時(shí)指定 Pipeline

首先創(chuàng)建一個(gè)名為 lowercase-pipeline的 pipeline,它的作用是將 name 字段轉(zhuǎn)換為小寫(xiě)字母。

PUT _ingest/pipeline/lowercase-pipeline{  "processors": [    {      "lowercase": {        "field": "name"      }    }  ]}

單條寫(xiě)入或者通過(guò) _bulkAPI 批量寫(xiě)入時(shí)都可以通過(guò) pipeline參數(shù)指定使用的 pipeline。

# 寫(xiě)入單條數(shù)據(jù)時(shí)指定 pipelienPOST index-1/_doc?pipeline=lowercase-pipeline{  "name": "Tom",  "age": 20}# _bulk 寫(xiě)入多條文檔時(shí)指定 pipelinePUT index-1/_bulk?pipeline=lowercase-pipeline{"index":{ }}{"name":"Peter","age":17}{"index":{}}{"name":"Mary","age":19}

查看寫(xiě)入的文檔,可以看到所有文檔的 name 字段都轉(zhuǎn)換為了小寫(xiě)字母。

GET index-1/_search# 返回結(jié)果{  "_index" : "index-1",  "_type" : "_doc",  "_id" : "g196X38BKRZVqZj9rsyn",  "_score" : 1.0,  "_source" : {    "name" : "tom",    "age" : 20  }},{  "_index" : "index-1",  "_type" : "_doc",  "_id" : "hF96X38BKRZVqZj9scwO",  "_score" : 1.0,  "_source" : {    "name" : "peter",    "age" : 17  }},{  "_index" : "index-1",  "_type" : "_doc",  "_id" : "hV96X38BKRZVqZj9scwO",  "_score" : 1.0,  "_source" : {    "name" : "mary",    "age" : 19  }}

3.2 更新時(shí)指定 Pipeline

使用 _update_by_query API可以批量更新索引中的文檔,通常會(huì)結(jié)合pipeline 來(lái)對(duì)文檔進(jìn)行更新。以下示例中我們對(duì)索引中的所有文檔進(jìn)行更新,也可以在 _update_by_query API中使用 DSL 語(yǔ)句過(guò)濾出需要更新的文檔。

# 往源索引中插入數(shù)據(jù)PUT index-2/_doc/1{  "name": "Smith",  "age": 18}PUT index-2/_doc/1{  "name": "Mike",  "age": 16}# 使用 update_by_query 進(jìn)行更新,可以寫(xiě) DSL 語(yǔ)句過(guò)濾出需要更新的文檔POST index-2/_update_by_query?pipeline=lowercase-pipeline

3.3 定義索引或者模板時(shí)指定 Pipeline

在定義索引或者模板時(shí)可以使用 index.default_pipeline 參數(shù)指定 default pipeline(默認(rèn)執(zhí)行的 pipeline),index.final_pipeline 參數(shù)指定 final pipeline(最終執(zhí)行的 pipeline)。default pipeline 與 final pipeline 實(shí)際上都是普通的 ingest pipeline,只是和一般的 pipeline 執(zhí)行時(shí)機(jī)不同;default pipeline 執(zhí)行的時(shí)機(jī)是當(dāng)前寫(xiě)入請(qǐng)求沒(méi)有指定 pipeline 時(shí),final pipeline 執(zhí)行的時(shí)機(jī)是在所有 pipeline 執(zhí)行完畢后。

如上圖所示,如果當(dāng)前的寫(xiě)入或者更新請(qǐng)求中指定了 pipeline,則會(huì)先執(zhí)行自定義的 pipeline,當(dāng)所有的 pipeline 執(zhí)行完畢后再執(zhí)行 final pipeline(如果索引顯式設(shè)置了index.final_pipeline);如果當(dāng)前的寫(xiě)入或者更新請(qǐng)求中沒(méi)有指定 pipeline,并且索引顯式設(shè)置了 index.default_pipeline 參數(shù)時(shí),則會(huì)先執(zhí)行 default pipeline,最后再執(zhí)行 final pipeline。

為了完成下面的演示,在前面 lowercase-pipeline 的基礎(chǔ)上,現(xiàn)在再創(chuàng)建兩個(gè) pipeline,其中 uppercase-pipeline 的作用是 name 字段轉(zhuǎn)換為小寫(xiě)字母,set-pipeline 的作用是為文檔添加一個(gè) message 字段。

PUT _ingest/pipeline/uppercase-pipeline{  "processors": [    {      "uppercase": {        "field": "name"      }    }  ]}PUT _ingest/pipeline/set-pipeline{  "processors": [    {      "set": {        "field": "message",        "value": "set by final pipeline"      }    }  ]}

接下來(lái)創(chuàng)建一個(gè)索引 index-3,在 settings 中指定索引的 default_pipeline 為 lowercase-pipeline,final_pipeline 為 set-pipeline。

PUT index-3{  "settings": {    "index": {      "default_pipeline": "lowercase-pipeline", // 默認(rèn)執(zhí)行的 pipeline      "final_pipeline": "set-pipeline" // 最終執(zhí)行的 pipeline    }  }}

然后往索引中插入兩條文檔,其中 _id為 1 的文檔在寫(xiě)入時(shí)不指定 pipeline,_id為 2 的文檔在寫(xiě)入時(shí)指定使用 uppercase-pipeline。

PUT index-3/_doc/1{  "name": "Lisa",  "age": 18}# 在寫(xiě)入時(shí)指定 pipeline 覆蓋 default_pipelinePUT index-3/_doc/2?pipeline=uppercase-pipeline{  "name": "Jerry",  "age": 21}

查詢最終保存的文檔,可以看到 final pipeline 始終會(huì)執(zhí)行,2 個(gè)文檔都添加了 message 字段;由于寫(xiě)入 _id為 2 的文檔時(shí)指定使用了 uppercase-pipeline,所以該文檔沒(méi)有執(zhí)行 default pipeline,而是執(zhí)行了 uppercase-pipeline 將字母轉(zhuǎn)換為大寫(xiě)。

GET index-3/_search# 返回結(jié)果{  "_index" : "index-3",  "_type" : "_doc",  "_id" : "1",  "_score" : 1.0,  "_source" : {    "name" : "lisa",    "message" : "set by final pipeline",    "age" : 18  }},{  "_index" : "index-3",  "_type" : "_doc",  "_id" : "2",  "_score" : 1.0,  "_source" : {    "name" : "JERRY",    "message" : "set by final pipeline",    "age" : 21  }}

3.4 Reindex 時(shí)指定 Pipeline

Elasticsearch 提供了 reindex API 用于將文檔從源索引復(fù)制到目標(biāo)索引,在 reindex 時(shí)可以指定 pipeline 對(duì)復(fù)制的文檔進(jìn)行加工處理。如下所示,先創(chuàng)建源索引 source-index,并插入 1 條文檔。

PUT source-index/_doc/1{  "name": "Jack",  "age": 18}

然后在 reindex 時(shí)指定使用 lowercase-pipeline,目標(biāo)索引名設(shè)置為 dest-index。

POST _reindex{  "source": {    "index": "source-index"  },  "dest": {    "index": "dest-index",    "pipeline": "lowercase-pipeline"  }}

查看目標(biāo)索引,name 字段已經(jīng)成功轉(zhuǎn)換為了小寫(xiě)字母。

GET dest-index/_search# 返回結(jié)果{  "took" : 1,  "timed_out" : false,  "_shards" : {    "total" : 1,    "successful" : 1,    "skipped" : 0,    "failed" : 0  },  "hits" : {    "total" : {      "value" : 1,      "relation" : "eq"    },    "max_score" : 1.0,    "hits" : [      {        "_index" : "dest-index",        "_type" : "_doc",        "_id" : "1",        "_score" : 1.0,        "_source" : {          "name" : "jack",          "age" : "18"        }      }    ]  }}

4 總結(jié)

Ingest pipeline 是 Elasticsearch 的一個(gè)非常實(shí)用的功能,它能夠幫助用戶在數(shù)據(jù)進(jìn)入 Elasticsearch 索引之前對(duì)其進(jìn)行預(yù)處理,從而提高搜索和分析的效率和準(zhǔn)確性。

本文向讀者介紹了如何有效地創(chuàng)建,管理和測(cè)試 ElasticSearch Ingest Pipeline。在第一小節(jié)中首先說(shuō)明了 ingest pipeline 的基本用法,包括創(chuàng)建和使用 ingest pipeline,使用 simulate API 對(duì) pipeline 進(jìn)行測(cè)試,以及如何處理 pipeline 中的異常;在第二小節(jié)中,將 ingest pipeline 中的 processor 處理器根據(jù)用途作了分類說(shuō)明,并通過(guò)示例展示了常見(jiàn)的幾個(gè) processor 的用法;在最后一個(gè)小節(jié)中歸納了 ingest pipeline 的 4 個(gè)應(yīng)用場(chǎng)景。

關(guān)鍵詞: 數(shù)據(jù)處理 正則表達(dá)式 編程算法

下一篇:2023必富的生肖 橫財(cái)纏身的4大生肖 世界資訊
上一篇:最后一頁(yè)

科技

 
久久婷婷五月综合97色直播| 奶头被民工吸的又大又黑| 最新亚洲人成网站在线观看| 中文字幕乱码人妻无码久久| 欧美性人人天天夜夜摸| 甜蜜惩罚我是看守专用宠物| 一本一道av无码中文字幕﹣百度| 欧美rapper潮水抽筋| 丰满妇女强制高潮18XXXX| 亚洲综合色一区二区三区| 51久久成人国产精品麻豆| 亚洲av永久无码一区二区三区| 亚洲av片一区二区三区| 色偷偷色噜噜狠狠成人免费视频| 51国偷自产一区二区三区| 国产成人午夜高潮毛片| 婷婷成人丁香七月综合激情| 国产精品无码一区二区三区免费 | 色欲av午夜一区二区三区| 贵妇情欲按摩a片| 一边亲一面膜下| 2012中文字幕高清在线中文字幕| 少妇蹲下买菜露出毛| 又长又粗又爽又黄少妇毛片| 蜜桃视频直播app| 我和闺蜜在ktv被八人伦| 欧美牲交a欧牲交aⅴ久久| 国产看黄网站又黄又爽又色| 他含着她的乳奶揉搓揉视频捏| 亚洲日韩精品av成人波多野| 久久精品人人做人人爽电影蜜桃| sm抽打调教女人光屁股的视频| 人妻办公室出轨上司hd院线| 亚洲精品久久激情国产片| 优优人体大尺大尺无毒不卡| 国产性色强伦免费视频| 沈阳45老熟女高潮喷水亮点| 国精品无码一区二区三区左线| 999热线在线观看| 教室停电h嗯啊好硬好湿| 欧洲精品码一区二区三区免费看|