Is is possible to get a line number from TextIO.read() ? If not, could you point out on an example of custom I/O which inherit from TextIO in Java
My problem is, I need to read a large file (150GB+), and generate a line number in 30 mins.
Using the below code, the dataflow job could not do scaling, it blocked only for 1 worker, and the time taken is about 5 hours
public class ApplyRowIndexFn extends DoFn<FileIO.ReadableFile, KV<Integer, String>> { protected final PCollectionView<TransformSideInput> transformFnInput; public ApplyRowIndexFn(PCollectionView<TransformSideInput> transformFnInput) { this.transformFnInput = transformFnInput; } @ProcessElement public void processElement(ProcessContext context) { TransformSideInput input = context.sideInput(transformFnInput); String header = input.getHeader(); try (Reader reader = Channels.newReader( FileSystems.open(context.element().getMetadata().resourceId()), "UTF-8"); BufferedReader buffer = new BufferedReader(reader)) { String row; int counter = 1; while ((row = buffer.readLine()) != null) { if (!row.equals(header)) { context.output(KV.of(counter, row)); counter = counter + 1; } } } catch (IOException e) { e.printStackTrace(); } } }``` https://stackoverflow.com/questions/67223056/apache-beam-textio-read-file-with-line-number April 23, 2021 at 10:06AM
没有评论:
发表评论