Is there any way to index our CSV file into Elastic Search using Java , I was doing it using logstash earlier but now I need to code it in Java and run dynamically .. I tried with Index API but it doesn't work for my condition Can someone help me with that .. My CSV Data looks something like this this is just a sample I have this object in bulk ..
sample CSV Data is something like this ..
id profile_id hier_name attri_name item 1 1 CUSTOMER CUSTOMER C001 2 1 CUSTOMER CUSTOMER C002 3 1 CUSTOMER CUSTOMER C003
This is what I was trying for Bulk Insertion but it doesnt seem to be working with my current version of Elastic Search 7.12.0 ,
package com.javadeveloperzone; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.InetAddress; import java.util.Iterator; import java.util.concurrent.ExecutionException; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.transport.client.PreBuiltTransportClient; public class ESBulkIndexingExample { String indexName, indexTypeName; TransportClient client = null; public static void main(String[] args) { ESBulkIndexingExample esExample = new ESBulkIndexingExample(); try { esExample.initEStransportClinet(); System.out.println("init done"); esExample.CSVbulkImport(true); System.out.println("bulkimport done"); esExample.refreshIndices(); esExample.search(); } catch (Exception e) { e.printStackTrace(); } finally { esExample.closeTransportClient(); // close transport client } } public ESBulkIndexingExample() { indexName = "document"; indexTypeName = "bulkindexing"; } public boolean initEStransportClinet() { try { client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); return true; } catch (Exception ex) { ex.printStackTrace(); return false; } } public void CSVbulkImport(boolean isHeaderIncluded) throws IOException, ExecutionException, InterruptedException { BulkRequestBuilder bulkRequest = client.prepareBulk(); File file = new File( "/home/niteshb/Documents/workspace-spring-tool-suite-4-4.10.0.RELEASE/ElasticSearchService/src/main/resources/elasticdata.csv"); BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); String line = null; int count = 0, noOfBatch = 1; if (bufferedReader != null && isHeaderIncluded) { bufferedReader.readLine(); } while ((line = bufferedReader.readLine()) != null) { if (line.trim().length() == 0) { continue; } String data[] = line.split(","); if (data.length == 3) { try { XContentBuilder xContentBuilder = jsonBuilder().startObject().field("tenant_id", data[0]) .field("hierarchy_name", data[1]).field("attribute_name", data[2]).field("item_pk", data[3]) .endObject(); BulkRequestBuilder add = bulkRequest .add(client.prepareIndex(indexName, indexTypeName, data[0]).setSource(xContentBuilder)); System.out.println(add); if ((count + 1) % 500 == 0) { count = 0; addDocumentToESCluser(bulkRequest, noOfBatch, count); noOfBatch++; } } catch (Exception e) { e.printStackTrace(); } } else { System.out.println("Invalid data : " + line); } count++; } bufferedReader.close(); addDocumentToESCluser(bulkRequest, noOfBatch, count); } public void addDocumentToESCluser(BulkRequestBuilder bulkRequest, int noOfBatch, int count) { if (count == 0) { return; } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { System.out.println("Bulk Indexing failed for Batch : " + noOfBatch); int numberOfDocFailed = 0; Iterator<BulkItemResponse> iterator = bulkResponse.iterator(); while (iterator.hasNext()) { BulkItemResponse response = iterator.next(); if (response.isFailed()) { numberOfDocFailed++; } } System.out.println("Out of " + count + " documents, " + numberOfDocFailed + " documents failed"); System.out.println(bulkResponse.buildFailureMessage()); } else { System.out.println("Bulk Indexing Completed for batch : " + noOfBatch); } } public void refreshIndices() { client.admin().indices().prepareRefresh(indexName).get(); } public void search() { SearchResponse response = client.prepareSearch(indexName).setTypes(indexTypeName).get(); System.out.println("Total Hits : " + response.getHits().getTotalHits()); System.out.println(response); } public void closeTransportClient() { if (client != null) { client.close(); } } }
getting here error as
org.elasticsearch.action.ActionRequestValidationException: Validation Failed: 1: no requests added; at org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:15) at org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:425) at org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:31) at org.elasticsearch.client.transport.TransportProxyClient.lambda$execute$0(TransportProxyClient.java:44)
can someone help me with this ?
https://stackoverflow.com/questions/67319441/how-to-index-our-csv-file-into-elastic-search-using-java April 29, 2021 at 10:30PM
没有评论:
发表评论