ShareText.Cn

搜索
最新
Untitled
  1. package cn.gaojunliang;
  2.  
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
  7. import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
  8. import org.apache.orc.*;
  9.  
  10. import java.io.IOException;
  11. import java.text.SimpleDateFormat;
  12. import java.util.Date;
  13. import java.util.List;
  14. import java.util.Random;
  15. import java.util.UUID;
  16.  
  17. public class OrcFileReader {
  18.     private Configuration conf = null;
  19.     private FileSystem fs = null;
  20.     private String inputDir = null;
  21.     private String outputDir = null;
  22.  
  23.     private static final String p = "/user/hive/warehouse/test.db/hdp_kfktostm_origincall_hotlog/dt=2019-02-22/20190304_031221_09431_girvj_10931bff-f4eb-487b-b3a0-96069782fc4a";
  24.  
  25.     /**
  26.      * 初始化
  27.      * @param dir1
  28.      * @param dir2
  29.      * @throws IOException
  30.      */
  31.     OrcFileReader(String dir1,String dir2) throws IOException {
  32.         this.conf = new Configuration();
  33.         this.conf.set("fs.defaultFS","hdfs://localhost:9000");
  34.         this.conf.set("io.compression.codecs","org.apache.hadoop.io.compress.DefaultCodec");
  35.         this.inputDir = dir1;
  36.         this.outputDir = dir2;
  37.         this.fs = FileSystem.get(conf);
  38.     }
  39.  
  40.     //length用户要求产生字符串的长度
  41.     public static String getRandomString(int length){
  42.         String str="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
  43.         Random random=new Random();
  44.         StringBuffer sb=new StringBuffer();
  45.         for(int i=0;i<length;i++){
  46.             int number=random.nextInt(62);
  47.             sb.append(str.charAt(number));
  48.         }
  49.         return sb.toString();
  50.     }
  51.  
  52.     public void run() throws Exception {
  53.  
  54.         Date date = new Date();
  55.         SimpleDateFormat ft = new SimpleDateFormat("yyyyMMddhhmmss_");
  56.         String targetFile = ft.format(date)+getRandomString(6)+"_"+ UUID.randomUUID();
  57.  
  58.         String schema= "struct<uuid:string,data:string,create_time:string,insert_time:string,idcard:string,applyid:string,mobileencrypted:string,hours:string>";
  59.  
  60.         List<String> fileList = HadoopUtil.listDirectoryAsListOfString(inputDir, fs);
  61.  
  62.         read(p);
  63.  
  64.         // test write
  65.         //write(schema, "/user/hive/warehouse/test.db/hdp_kfktostm_origincall_hotlog_1/dt=2019-02-22/"+targetFile);
  66.     }
  67.  
  68.  
  69.     public void read(String path) throws Exception {
  70.         Reader reader = OrcFile.createReader(new Path(path),
  71.                 OrcFile.readerOptions(conf));
  72.         RecordReader rows = reader.rows();
  73.  
  74.         VectorizedRowBatch batch = reader.getSchema().createRowBatch();
  75.  
  76.  
  77.  
  78.         while (rows.nextBatch(batch)) {
  79.             System.out.println("batch size: " + batch.size);
  80.             int colsNum = batch.cols.length;
  81.             for(int r=0; r < batch.size; ++r) {
  82.  
  83.                     StringBuilder sb = new StringBuilder();
  84.                     batch.cols[0].stringifyValue(sb, r);
  85.                     System.out.println(sb.toString());
  86.  
  87.                 System.out.println("===============");
  88.             }
  89.         }
  90.         rows.close();
  91.     }
  92.  
  93.     public void write(String sch, String path) throws  Exception{
  94.         // 定义schema, schema: "struct<x:int,y:int>"
  95.         TypeDescription schema = TypeDescription.fromString(sch);
  96.  
  97.         // 创建writer
  98.         Writer writer = OrcFile.createWriter(new Path(path), OrcFile.writerOptions(conf).setSchema(schema));
  99.  
  100.         VectorizedRowBatch batch = schema.createRowBatch();
  101.         BytesColumnVector col0 = (BytesColumnVector) batch.cols[0];
  102.         BytesColumnVector col1 = (BytesColumnVector) batch.cols[1];
  103.         BytesColumnVector col2 = (BytesColumnVector) batch.cols[2];
  104.         BytesColumnVector col3 = (BytesColumnVector) batch.cols[3];
  105.         BytesColumnVector col4 = (BytesColumnVector) batch.cols[4];
  106.         BytesColumnVector col5 = (BytesColumnVector) batch.cols[5];
  107.         BytesColumnVector col6 = (BytesColumnVector) batch.cols[6];
  108.         BytesColumnVector col7 = (BytesColumnVector) batch.cols[7];
  109.  
  110.         // 每个文件写300条
  111.         for(int r=0; r < 300; ++r) {
  112.             int row = batch.size++;
  113.             col0.setVal(row,"aaa".getBytes());
  114.             col1.setVal(row,"bbb".getBytes());
  115.  
  116.             // 每50条输出一次
  117.             if (batch.size == 50) {
  118.                 writer.addRowBatch(batch);
  119.                 batch.reset();
  120.             }
  121.         }
  122.         if (batch.size != 0) {
  123.             writer.addRowBatch(batch);
  124.             batch.reset();
  125.         }
  126.         writer.close();
  127.     }
  128.  
  129.  
  130.     public static void main(String[] args) throws Exception {
  131.         OrcFileReader orcFileReader = new OrcFileReader(args[0],args[1]);
  132.         orcFileReader.run();
  133.  
  134.     }
  135.  
  136. }
Parsed in 0.002 seconds