How to rename files or objects in Amazon S3?
March 9, 2023How to read and write Excel files with Spark?
March 16, 2023A pipeline is a definition of a series of processors that are to be executed in the same order as they are declared.
Think of a processor as a series of instructions that will be executed.
In this post we are going to create a pipeline to add a field named doc_timestamp to all the documents that are added to the index.
Creating a pipeline
With the below PUT we are creating a pipeline name doc_timestamp. This pipeline has only one processor which sets a field name doc_timestamp and set the value of the field to the timestamp when it is being ingested or added to the index.
curl -X PUT http://localhost:9200/_ingest/pipeline/doc_timestamp?pretty -H 'Content-Type: application/json' -d ' { "description": "pipeline to add timestamp to documents", "processors": [ { "set": { "field": "_source.doc_timestamp", "value": "{{_ingest.timestamp}}" } } ] }' { "acknowledged" : true }
Attach the pipeline to an index
Here we are attaching the pipeline doc_timestamp to account_v2 index but marking it as the default_pipeline for the index.
curl -X PUT http://localhost:9200/account_v2/_settings?pretty -H 'Content-Type: application/json' -d ' { "index.default_pipeline": "doc_timestamp" }' { "acknowledged" : true }
Now that the pipeline is attached to the index, anytime a document is added to the index a new field doc_timestamp will be added to the document. This doesn’t affect any of the existing documents in the index.
Let’s look up an existing document. We don’t see the doc_timestamp field in this document and it is expected.
curl -X GET localhost:9200/account_v2/_doc/735?pretty { "_index" : "account_v2", "_type" : "_doc", "_id" : "735", "_version" : 1, "_seq_no" : 344, "_primary_term" : 1, "found" : true, "_source" : { "account_number" : 735, "balance" : 3984, "firstname" : "Loraine", "lastname" : "Willis", "age" : 32, "gender" : "F", "address" : "928 Grove Street", "employer" : "Gadtron", "email" : "lorainewillis@gadtron.com", "city" : "Lowgap", "state" : "NY" } }
Add a new document to the index
Let’s add a new document to the index with id 2000.
curl -XPUT http://localhost:9200/account_v2/_doc/2000?pretty -H 'Content-Type: application/json' -d '{ "account_number": 2000, "balance": 16418, "firstname": "Elinor", "lastname": "Ratliff", "age": 36, "gender": "M", "address": "282 Kings Place", "employer": "Scentric", "email": "elinorratliff@scentric.com", "city": "Ribera", "state": "WA" }' { "_index" : "account_v2", "_type" : "_doc", "_id" : "2000", "_version" : 1, "result" : "created", "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "_seq_no" : 993, "_primary_term" : 1 }
Now that the document is added, let’s look up the document and there we see the new field doc_timestamp added to the document with the timestamp at which it was added to the index.
curl -X GET localhost:9200/account_v2/_doc/2000?pretty { "_index" : "account_v2", "_type" : "_doc", "_id" : "2000", "_version" : 1, "_seq_no" : 993, "_primary_term" : 1, "found" : true, "_source" : { "account_number" : 2000, "firstname" : "Elinor", "address" : "282 Kings Place", "gender" : "M", "city" : "Ribera", "lastname" : "Ratliff", "balance" : 16418, "employer" : "Scentric", "state" : "WA", "age" : 36, "email" : "elinorratliff@scentric.com", "doc_timestamp" : "2020-11-19T20:39:33.639398617Z" } }
1 Comment
[…] The Big Data in Real World team builds a pipeline: […]