2021年4月22日星期四

Apache BEAM TextIO read file with line number

Is is possible to get a line number from TextIO.read() ? If not, could you point out on an example of custom I/O which inherit from TextIO in Java

My problem is, I need to read a large file (150GB+), and generate a line number in 30 mins.

Using the below code, the dataflow job could not do scaling, it blocked only for 1 worker, and the time taken is about 5 hours

public class ApplyRowIndexFn extends DoFn<FileIO.ReadableFile, KV<Integer, String>> {        protected final PCollectionView<TransformSideInput> transformFnInput;        public ApplyRowIndexFn(PCollectionView<TransformSideInput> transformFnInput) {          this.transformFnInput = transformFnInput;      }        @ProcessElement      public void processElement(ProcessContext context) {          TransformSideInput input = context.sideInput(transformFnInput);          String header = input.getHeader();          try (Reader reader = Channels.newReader(                  FileSystems.open(context.element().getMetadata().resourceId()), "UTF-8");               BufferedReader buffer = new BufferedReader(reader)) {              String row;              int counter = 1;              while ((row = buffer.readLine()) != null) {                  if (!row.equals(header)) {                      context.output(KV.of(counter, row));                      counter = counter + 1;                  }              }          } catch (IOException e) {              e.printStackTrace();          }      }  }```  
https://stackoverflow.com/questions/67223056/apache-beam-textio-read-file-with-line-number April 23, 2021 at 10:06AM

没有评论:

发表评论