Monthly Archives: October 2012

Hadoop: 为什么报“Retrying connect to server: localhost/127.0.0.1:8020”

我明明把端口配成了8021,为什么客户端还会连8020?

很有可能是你的NameNode并没有起起来,可以jps看一下有没有这样一行:

引用
4245 NameNode

如果没有,去$HADOOP_INSTALL/logs里看一下相关的NameNode日志。按经验,如果你用的pseudo模式并且hadoop.tmp.dir没有显示设置,那很有可能是因为你的hdfs环境已经被破坏,因为hdfs默认把文件放/tmp目录下,/tmp很不可靠。 这种情况下,你应该重新格式化一下hdfs文件系统

HDFS NameNode的备份

据象书说,有两种模式:

 

  1. 直接设置,将NameNode中的每一个改变都传达到其他存储系统中。这个可以保证强一致性。

  2. 使用Secondary NameNode,定期复制数据。 由于是“定期”,所以在当机时一定会丢失数据。

更多细节待以后补充。

HDFS结点之间的交互图

基本抄自象书,我只是加了几条线





注: NameNode只提供元数据,数据交换在客户端和DataNode之间直接发生,以免NameNode成为瓶颈



注:默认情况下,写入的数据会有三份复本,分布在两个机架上(鸡蛋不放在同一个篮子里)

搭建Hadoop的Pseudo-Distributed Mode环境

仅供复制

修改配置文件

<!--修改conf/core-site.xml-->
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost/</value>  <!--默认的文件系统是本机hdfs系统-->
  </property>
</configuration>
<!--修改conf/hdfs-site.xml-->
<configuration>
<property>
    <name>dfs.replication</name>  
    <value>1</value> <!--pseudo-distributed模式下没法做replication-->
</property>
</configuration>

<!--修改conf/mapred-site.xml-->
<configuration>
<property>
    <name>mapred.job.tracker</name>
    <value>localhost:8021</value>
  </property>
</configuration>

使本机可以免密码登录本机

$ssh-add
$ssh localhost #测试一下要不要输入密码

格式化HDFS文件系统

$hadoop namenode -format #经测试,文件系统创建在/tmp/hadoop-kent/dfs/name中

启动Hadoop后台服务

$start-dfs.sh
$start-mapred.sh

通过浏览器察看状态

http://localhost:50070/

http://localhost:50030/

操纵一下hdfs中的文件

$hadoop fs -copyFromLocal 1k.log hdfs://localhost/firsttry/1k.log
$hadoop fs -ls / #列出hdfs的根目录

停止hadoop服务

$stop-dfs.sh
$stop-mapred.sh

HDFS API示例代码

无干货,仅供复制

public class HdfsExample {

	public static void main(String[] args) throws IOException {
		String dir = "/home/kent";
		String fileUrl = "hdfs://localhost" + dir + "/" + System.currentTimeMillis() + "hdfsExample.txt";

		FileSystem fs = FileSystem.get(URI.create(fileUrl), new Configuration());

		// create a file
		System.out.println("Creating hdfs file : " + fileUrl);
		Path path = new Path(fileUrl);
		FSDataOutputStream out = fs.create(path);

		// write to a file
		out.write("Hello, HDFS Example!".getBytes());
		out.flush();
		out.sync();

		// check a file's status status
		FileStatus status = fs.getFileStatus(path);
		System.out.println("The example file's status is: " + ToStringBuilder.reflectionToString(status, ToStringStyle.SHORT_PREFIX_STYLE));

		// read a file and print the content on console
		FSDataInputStream in = fs.open(path);
		System.out.println("The content of " + fileUrl + " is: ");
		IOUtils.copyBytes(in, System.out, 4096);
		System.out.println();

		// detelte a file
		fs.delete(path, false);

		// list the dir
		FileStatus[] listStatus = fs.listStatus(path);
		System.out.println("Num of files in dir now " + dir + " is: " + (listStatus == null ? 0 : listStatus.length));

	}
}


hadoop map-reduce 入门示例代码

无任何干货,仅供复制

程序说明:

  1. 分析一个应该的访问日志文件,找出每个用户ID的访问次数。日志格式基本上是:"2012-10-26 14:41:30,748  userNameId-777 from IP-10.232.25.144 invoked URL-http://xxx/hello.jsonp"

  2. Standalone模式,但直接用maven项目所依赖的hadoop库,你不必再另装hadoop

<!-- pom.xml -->
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-core</artifactId>
  <version>1.0.4</version>
</dependency>

//Mapper
public class Coupon11LogMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

	@Override
	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
		String line = value.toString();

		String accessRegex = ".*userNameId\\-(\\d+).*";
		Pattern pattern = Pattern.compile(accessRegex);
		Matcher matcher = pattern.matcher(line);
		if (!matcher.find()) {
			return;
		}
		String userNameId = matcher.group(1);
		context.write(new Text(userNameId), new LongWritable(1l));
	};

 
}

//Reducer
public class Coupon11LogReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

	@Override
	protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
		Long sum = 0l;
		for (LongWritable value : values) {
			sum = sum + value.get();
		}
		context.write(key, new LongWritable(sum));
	}

}

//Job Runner


public class Coupon11LogJobMain {

	public static void main(String[] args) throws Exception {

		String inputFile = "/home/kent/dev/hadoop/bigdata/coupon11/coupon11.log";
		String outDir = "/home/kent/dev/hadoop/bigdata/coupon11/output" + System.currentTimeMillis();

		Job job = new Job();
		job.setJarByClass(Coupon11LogJobMain.class);

		FileInputFormat.addInputPaths(job, inputFile);
		FileOutputFormat.setOutputPath(job, new Path(outDir));

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		job.setMapperClass(Coupon11LogMapper.class);
		job.setReducerClass(Coupon11LogReducer.class);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

java正则”组”例子


		String accessRegex = ".*userNameId\\-(\\d+).*";
		String text = "2012-10-26 14:41:30,748  userNameId-777 from IP-10.232.25.144 invoked URL-http://xxx/hello.jsonp";

		Pattern pattern = Pattern.compile(accessRegex);
		Matcher matcher = pattern.matcher(text);
		if (matcher.find()) {
			System.out.println(matcher.group(1));
		}