1 package com.mengyao.hadoop.mapreduce; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import java.sql.PreparedStatement; 7 import java.sql.ResultSet; 8 import java.sql.SQLException; 9 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.conf.Configured; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.io.LongWritable; 14 import org.apache.hadoop.io.Text; 15 import org.apache.hadoop.io.Writable; 16 import org.apache.hadoop.mapreduce.Job; 17 import org.apache.hadoop.mapreduce.Mapper; 18 import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; 19 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; 20 import org.apache.hadoop.mapreduce.lib.db.DBWritable; 21 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 22 import org.apache.hadoop.util.Tool; 23 import org.apache.hadoop.util.ToolRunner; 24 25 /** 26 * 使用DBInputFormat类读取数据库并将结果数据写到HDFS的/mapreduces/dboutput目录下, 27 * /mapreduces/dboutput/_SUCCESS:_SUCCESS空文件表示作业执行成功 28 * /mapreduces/dboutput/part-r-00000:文件表示作业的结果,内容如下: 29 * 0 1 New Balance 999复古跑鞋 ML999GY ML999GY 999.0 30 * 1 2 New Balance 999复古跑鞋 ML999BB ML999BB 999.0 31 * 2 3 New Balance 996复古跑鞋 MRL996DG MRL996DG 819.0 32 * 3 4 New Balance 996复古跑鞋 MRL996EM MRL996EM 819.0 33 * 4 5 New Balance 996复古跑鞋 MRL996ES MRL996ES 819.0 34 * 这个作业没有Reducer类,在默认的MapReduce作业中,如果输出的key,value是默认的LongWritable, Text,则Reducer类可以省略,省略不写时则默认启动一个Reducer 35 * 36 * 一定要记住在使用MapReduce操作数据库时一定要添加JDBC驱动jar包到Hadoop的classpath中,否则会报无法加载JDBC Driver类异常,如下: 37 * 1、我这里添加到/usr/local/installs/hadoop/share/hadoop/mapreduce/lib/mysql-connector-java-5.1.26-bin.jar这里了,务必要重启集群使classpath生效。 38 * 2、将JDBC驱动jar包打包到这个MapReduce作业jar包中。 39 * 40 * @author mengyao 41 * 42 */ 43 public class DBInputFormatApp extends Configured implements Tool { 44 45 /** 46 * 这个JavaBean需要实现Hadoop的序列化接口Writable和与数据库交互时的序列化接口DBWritable 47 * 官方API中解释如下: 48 * public class DBInputFormat49 * extends InputFormat implements Configurable 50 * 即Mapper的Key是LongWritable类型,不可改变;Value是继承自DBWritable接口的自定义JavaBean 51 * 52 * @author mengyao 53 * 54 */ 55 static class ProductWritable implements Writable, DBWritable { 56 57 private long id; // bigint(20) NOT NULL AUTO_INCREMENT, 58 private String name; // varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '商品名称', 59 private String model; // varchar(30) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '型号', 60 private String color; // varchar(10) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '颜色', 61 private double price; // decimal(10,0) DEFAULT NULL COMMENT '售价', 62 63 @Override 64 public void write(PreparedStatement ps) throws SQLException { 65 ps.setLong(1, id); 66 ps.setString(2, name); 67 ps.setString(3, model); 68 ps.setString(4, color); 69 ps.setDouble(5, price); 70 } 71 72 @Override 73 public void readFields(ResultSet rs) throws SQLException { 74 this.id = rs.getLong(1); 75 this.name = rs.getString(2); 76 this.model = rs.getString(3); 77 this.color = rs.getString(4); 78 this.price = rs.getDouble(5); 79 } 80 81 @Override 82 public void readFields(DataInput in) throws IOException { 83 this.id = in.readLong(); 84 this.name = in.readUTF(); 85 this.model = in.readUTF(); 86 this.color = in.readUTF(); 87 this.price = in.readDouble(); 88 } 89 90 @Override 91 public void write(DataOutput output) throws IOException { 92 output.writeLong(id); 93 output.writeUTF(name); 94 output.writeUTF(model); 95 output.writeUTF(color); 96 output.writeDouble(price); 97 } 98 99 @Override100 public String toString() {101 return id +"\t"+ name +"\t"+ model +"\t"+ color +"\t"+ price;102 }103 104 }105 106 static class DBInputFormatMapper extends Mapper {107 108 private LongWritable outputKey;109 private Text outputValue;110 111 @Override112 protected void setup(113 Mapper .Context context)114 throws IOException, InterruptedException {115 this.outputKey = new LongWritable();116 this.outputValue = new Text();117 }118 119 @Override120 protected void map(LongWritable key, ProductWritable value,121 Mapper .Context context)122 throws IOException, InterruptedException {123 outputKey.set(key.get());124 outputValue.set(value.toString());125 context.write(outputKey, outputValue);126 }127 }128 129 @Override130 public int run(String[] args) throws Exception {131 Configuration conf = getConf();132 //在创建Configuration对象时紧跟着配置当前作业需要使用的JDBC配置133 DBConfiguration.configureDB(134 conf,135 "com.mysql.jdbc.Driver",136 "jdbc:mysql://192.168.1.10:3306/shops",137 "root",138 "123456");139 140 Job job = Job.getInstance(conf, DBInputFormatApp.class.getSimpleName());141 job.setJarByClass(DBInputFormatApp.class);142 143 job.setInputFormatClass(DBInputFormat.class);144 FileOutputFormat.setOutputPath(job, new Path(args[0]));145 146 job.setMapperClass(DBInputFormatMapper.class);147 job.setMapOutputKeyClass(LongWritable.class);148 job.setMapOutputValueClass(Text.class);149 150 job.setOutputKeyClass(LongWritable.class);151 job.setOutputValueClass(Text.class);152 //配置当前作业要查询的SQL语句和接收查询结果的JavaBean153 DBInputFormat.setInput(154 job,155 ProductWritable.class,156 "SELECT `id`,`name`,`model`,`color`,`price` FROM `product`",157 "SELECT COUNT(1) FROM `product`");158 159 return job.waitForCompletion(true)?0:1;160 }161 162 public static int createJob(String[] args) {163 Configuration conf = new Configuration();164 conf.set("dfs.datanode.socket.write.timeout", "7200000");165 conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");166 conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");167 int status = 0;168 169 try {170 status = ToolRunner.run(conf, new DBInputFormatApp(), args);171 } catch (Exception e) {172 e.printStackTrace();173 }174 175 return status;176 }177 178 public static void main(String[] args) {179 args = new String[]{"/mapreduces/dboutput"};180 if (args.length!=1) {181 System.out.println("Usage: "+DBInputFormatApp.class.getName()+" Input paramters ");182 } else {183 int status = createJob(args);184 System.exit(status);185 }186 187 }188 189 }