2021年5月2日星期日

How to Index our CSV file into Elastic Search using Java?

Is there any way to index our CSV file into Elastic Search using Java , I was doing it using logstash earlier but now I need to code it in Java and run dynamically .. I tried with Index API but it doesn't work for my condition Can someone help me with that .. My CSV Data looks something like this this is just a sample I have this object in bulk ..

sample CSV Data is something like this ..

 id  profile_id  hier_name       attri_name     item    1   1          CUSTOMER        CUSTOMER        C001    2   1          CUSTOMER        CUSTOMER        C002    3   1          CUSTOMER        CUSTOMER        C003  

This is what I was trying for Bulk Insertion but it doesnt seem to be working with my current version of Elastic Search 7.12.0 ,

     package com.javadeveloperzone;    import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;    import java.io.BufferedReader;  import java.io.File;  import java.io.FileReader;  import java.io.IOException;  import java.net.InetAddress;  import java.util.Iterator;  import java.util.concurrent.ExecutionException;    import org.elasticsearch.action.bulk.BulkItemResponse;  import org.elasticsearch.action.bulk.BulkRequestBuilder;  import org.elasticsearch.action.bulk.BulkResponse;  import org.elasticsearch.action.search.SearchResponse;  import org.elasticsearch.client.transport.TransportClient;  import org.elasticsearch.common.settings.Settings;  import org.elasticsearch.common.transport.TransportAddress;  import org.elasticsearch.common.xcontent.XContentBuilder;  import org.elasticsearch.transport.client.PreBuiltTransportClient;    public class ESBulkIndexingExample {        String indexName, indexTypeName;      TransportClient client = null;        public static void main(String[] args) {          ESBulkIndexingExample esExample = new ESBulkIndexingExample();          try {              esExample.initEStransportClinet();              System.out.println("init done");              esExample.CSVbulkImport(true);              System.out.println("bulkimport done");              esExample.refreshIndices();                esExample.search();          } catch (Exception e) {              e.printStackTrace();          } finally {              esExample.closeTransportClient(); // close transport client          }      }        public ESBulkIndexingExample() {          indexName = "document";          indexTypeName = "bulkindexing";      }        public boolean initEStransportClinet() {          try {              client = new PreBuiltTransportClient(Settings.EMPTY)                      .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));                return true;          } catch (Exception ex) {              ex.printStackTrace();              return false;          }      }        public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException {            BulkRequestBuilder bulkRequest = client.prepareBulk();            File file = new File(                  "/home/niteshb/Documents/workspace-spring-tool-suite-4-4.10.0.RELEASE/ElasticSearchService/src/main/resources/elasticdata.csv");          BufferedReader bufferedReader = new BufferedReader(new FileReader(file));            String line = null;          int count = 0, noOfBatch = 1;          if (bufferedReader != null && isHeaderIncluded) {              bufferedReader.readLine();          }          while ((line = bufferedReader.readLine()) != null) {                if (line.trim().length() == 0) {                  continue;              }              String data[] = line.split(",");              if (data.length == 3) {                    try {                      XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[0])                              .field("hierarchy_name", data[1]).field("attribute_name", data[2]).field("item_pk", data[3])                              .endObject();                        BulkRequestBuilder add = bulkRequest                              .add(client.prepareIndex(indexName, indexTypeName, data[0]).setSource(xContentBuilder));                        System.out.println(add);                      if ((count + 1) % 500 == 0) {                          count = 0;                          addDocumentToESCluser(bulkRequest, noOfBatch, count);                          noOfBatch++;                      }                  } catch (Exception e) {                      e.printStackTrace();                  }              } else {                  System.out.println("Invalid data : " + line);              }              count++;          }          bufferedReader.close();          addDocumentToESCluser(bulkRequest, noOfBatch, count);        }        public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) {            if (count == 0) {              return;          }          BulkResponse bulkResponse = bulkRequest.execute().actionGet();          if (bulkResponse.hasFailures()) {              System.out.println("Bulk Indexing failed for Batch : " + noOfBatch);                int numberOfDocFailed = 0;              Iterator<BulkItemResponse> iterator = bulkResponse.iterator();              while (iterator.hasNext()) {                  BulkItemResponse response = iterator.next();                  if (response.isFailed()) {                      numberOfDocFailed++;                  }              }              System.out.println("Out of " + count + " documents, " + numberOfDocFailed + " documents failed");              System.out.println(bulkResponse.buildFailureMessage());          } else {              System.out.println("Bulk Indexing Completed for batch : " + noOfBatch);          }      }        public void refreshIndices() {          client.admin().indices().prepareRefresh(indexName).get();      }        public void search() {            SearchResponse response = client.prepareSearch(indexName).setTypes(indexTypeName).get();          System.out.println("Total Hits : " + response.getHits().getTotalHits());          System.out.println(response);      }        public void closeTransportClient() {          if (client != null) {              client.close();          }      }  }  

getting here error as

org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added;  at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15)  at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:425)  at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:31)  at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute$0(TransportProxyClient.java:44)  

can someone help me with this ?

https://stackoverflow.com/questions/67319441/how-to-index-our-csv-file-into-elastic-search-using-java April 29, 2021 at 10:30PM

没有评论:

发表评论