博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce的DBInputFormat使用
阅读量:5045 次
发布时间:2019-06-12

本文共 7655 字,大约阅读时间需要 25 分钟。

1 package com.mengyao.hadoop.mapreduce;  2   3 import java.io.DataInput;  4 import java.io.DataOutput;  5 import java.io.IOException;  6 import java.sql.PreparedStatement;  7 import java.sql.ResultSet;  8 import java.sql.SQLException;  9  10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.conf.Configured; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.LongWritable; 14 import org.apache.hadoop.io.Text; 15 import org.apache.hadoop.io.Writable; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; 19 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; 20 import org.apache.hadoop.mapreduce.lib.db.DBWritable; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 import org.apache.hadoop.util.Tool; 23 import org.apache.hadoop.util.ToolRunner; 24  25 /** 26  * 使用DBInputFormat类读取数据库并将结果数据写到HDFS的/mapreduces/dboutput目录下, 27  *         /mapreduces/dboutput/_SUCCESS:_SUCCESS空文件表示作业执行成功 28  *         /mapreduces/dboutput/part-r-00000:文件表示作业的结果,内容如下: 29  *             0    1    New Balance 999复古跑鞋    ML999GY    ML999GY    999.0 30  *             1    2    New Balance 999复古跑鞋    ML999BB    ML999BB    999.0 31  *             2    3    New Balance 996复古跑鞋    MRL996DG    MRL996DG    819.0 32  *             3    4    New Balance 996复古跑鞋    MRL996EM    MRL996EM    819.0 33  *             4    5    New Balance 996复古跑鞋    MRL996ES    MRL996ES    819.0 34  * 这个作业没有Reducer类,在默认的MapReduce作业中,如果输出的key,value是默认的LongWritable, Text,则Reducer类可以省略,省略不写时则默认启动一个Reducer 35  * 36  * 一定要记住在使用MapReduce操作数据库时一定要添加JDBC驱动jar包到Hadoop的classpath中,否则会报无法加载JDBC Driver类异常,如下: 37  *       1、我这里添加到/usr/local/installs/hadoop/share/hadoop/mapreduce/lib/mysql-connector-java-5.1.26-bin.jar这里了,务必要重启集群使classpath生效。 38  *       2、将JDBC驱动jar包打包到这个MapReduce作业jar包中。 39  * 40  * @author mengyao 41  * 42  */ 43 public class DBInputFormatApp extends Configured implements Tool { 44  45     /** 46      * 这个JavaBean需要实现Hadoop的序列化接口Writable和与数据库交互时的序列化接口DBWritable 47      * 官方API中解释如下: 48      *         public class DBInputFormat
49 * extends InputFormat
implements Configurable 50 * 即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean 51 * 52 * @author mengyao 53 * 54 */ 55 static class ProductWritable implements Writable, DBWritable { 56 57 private long id; // bigint(20) NOT NULL AUTO_INCREMENT, 58 private String name; // varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '商品名称', 59 private String model; // varchar(30) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '型号', 60 private String color; // varchar(10) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '颜色', 61 private double price; // decimal(10,0) DEFAULT NULL COMMENT '售价', 62 63 @Override 64 public void write(PreparedStatement ps) throws SQLException { 65 ps.setLong(1, id); 66 ps.setString(2, name); 67 ps.setString(3, model); 68 ps.setString(4, color); 69 ps.setDouble(5, price); 70 } 71 72 @Override 73 public void readFields(ResultSet rs) throws SQLException { 74 this.id = rs.getLong(1); 75 this.name = rs.getString(2); 76 this.model = rs.getString(3); 77 this.color = rs.getString(4); 78 this.price = rs.getDouble(5); 79 } 80 81 @Override 82 public void readFields(DataInput in) throws IOException { 83 this.id = in.readLong(); 84 this.name = in.readUTF(); 85 this.model = in.readUTF(); 86 this.color = in.readUTF(); 87 this.price = in.readDouble(); 88 } 89 90 @Override 91 public void write(DataOutput output) throws IOException { 92 output.writeLong(id); 93 output.writeUTF(name); 94 output.writeUTF(model); 95 output.writeUTF(color); 96 output.writeDouble(price); 97 } 98 99 @Override100 public String toString() {101 return id +"\t"+ name +"\t"+ model +"\t"+ color +"\t"+ price;102 }103 104 }105 106 static class DBInputFormatMapper extends Mapper
{107 108 private LongWritable outputKey;109 private Text outputValue;110 111 @Override112 protected void setup(113 Mapper
.Context context)114 throws IOException, InterruptedException {115 this.outputKey = new LongWritable();116 this.outputValue = new Text();117 }118 119 @Override120 protected void map(LongWritable key, ProductWritable value,121 Mapper
.Context context)122 throws IOException, InterruptedException {123 outputKey.set(key.get());124 outputValue.set(value.toString());125 context.write(outputKey, outputValue);126 }127 }128 129 @Override130 public int run(String[] args) throws Exception {131 Configuration conf = getConf();132 //在创建Configuration对象时紧跟着配置当前作业需要使用的JDBC配置133 DBConfiguration.configureDB(134 conf,135 "com.mysql.jdbc.Driver",136 "jdbc:mysql://192.168.1.10:3306/shops",137 "root",138 "123456");139 140 Job job = Job.getInstance(conf, DBInputFormatApp.class.getSimpleName());141 job.setJarByClass(DBInputFormatApp.class);142 143 job.setInputFormatClass(DBInputFormat.class);144 FileOutputFormat.setOutputPath(job, new Path(args[0]));145 146 job.setMapperClass(DBInputFormatMapper.class);147 job.setMapOutputKeyClass(LongWritable.class);148 job.setMapOutputValueClass(Text.class);149 150 job.setOutputKeyClass(LongWritable.class);151 job.setOutputValueClass(Text.class);152 //配置当前作业要查询的SQL语句和接收查询结果的JavaBean153 DBInputFormat.setInput(154 job,155 ProductWritable.class,156 "SELECT `id`,`name`,`model`,`color`,`price` FROM `product`",157 "SELECT COUNT(1) FROM `product`");158 159 return job.waitForCompletion(true)?0:1;160 }161 162 public static int createJob(String[] args) {163 Configuration conf = new Configuration();164 conf.set("dfs.datanode.socket.write.timeout", "7200000");165 conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");166 conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");167 int status = 0;168 169 try {170 status = ToolRunner.run(conf, new DBInputFormatApp(), args);171 } catch (Exception e) {172 e.printStackTrace();173 }174 175 return status;176 }177 178 public static void main(String[] args) {179 args = new String[]{"/mapreduces/dboutput"};180 if (args.length!=1) {181 System.out.println("Usage: "+DBInputFormatApp.class.getName()+" Input paramters
");182 } else {183 int status = createJob(args);184 System.exit(status);185 }186 187 }188 189 }

 

转载于:https://www.cnblogs.com/mengyao/archive/2013/02/05/4865565.html

你可能感兴趣的文章
Beta阶段总结
查看>>
嵌入式Linux开发环境搭建,问题ping、nfs的解决
查看>>
iOS之设置头像(访问系统相册、本地上传)
查看>>
iOS之限制TextField的输入长度
查看>>
webapi 安全验证与权限验证
查看>>
tomcat下部署多个项目
查看>>
Docker-Compose
查看>>
Python list知识内容
查看>>
C - A Plug for UNIX - poj 1087(最大流)
查看>>
UNICODE与ANSI的区别
查看>>
代码备份2
查看>>
工作流基本特性及说明
查看>>
高手给菜鸟学习Linux的10个建议
查看>>
洗牌算法Fisher_Yates原理
查看>>
functools 之 partial(偏函数)
查看>>
多线程2--毕向东基础视频教程学习笔记
查看>>
结对第二次作业
查看>>
jQuery 1.7的隐藏改动
查看>>
初学ant
查看>>
bzoj 4295 [PA2015]Hazard 贪心,暴力
查看>>