package test.hadoop.util;
import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import com.emar.adwiser.common.util.LogUtil;
import com.emar.adwiser.common.util.Logs;
/***
* HDFS 工具类
* @author zhaidw
*
*/
public class HDFSUtil {
private static Logs log = LogUtil.getLog(HDFSUtil.class);
public synchronized static FileSystem getFileSystem(String ip, int port) {
FileSystem fs = null;
String url = "hdfs://" + ip + ":" + String.valueOf(port);
Configuration config = new Configuration();
config.set("fs.default.name", url);
try {
fs = FileSystem.get(config);
} catch (Exception e) {
log.error("getFileSystem failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
return fs;
}
public synchronized static void listNode(FileSystem fs) {
DistributedFileSystem dfs = (DistributedFileSystem) fs;
try {
DatanodeInfo[] infos = dfs.getDataNodeStats();
for (DatanodeInfo node : infos) {
System.out.println("HostName: " + node.getHostName() + "/n"
+ node.getDatanodeReport());
System.out.println("--------------------------------");
}
} catch (Exception e) {
log.error("list node list failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
}
/**
* 打印系统配置
*
* @param fs
*/
public synchronized static void listConfig(FileSystem fs) {
Iterator<Entry<String, String>> entrys = fs.getConf().iterator();
while (entrys.hasNext()) {
Entry<String, String> item = entrys.next();
log.info(item.getKey() + ": " + item.getValue());
}
}
/**
* 创建目录和父目录
*
* @param fs
* @param dirName
*/
public synchronized static void mkdirs(FileSystem fs, String dirName) {
// Path home = fs.getHomeDirectory();
Path workDir = fs.getWorkingDirectory();
String dir = workDir + "/" + dirName;
Path src = new Path(dir);
// FsPermission p = FsPermission.getDefault();
boolean succ;
try {
succ = fs.mkdirs(src);
if (succ) {
log.info("create directory " + dir + " successed. ");
} else {
log.info("create directory " + dir + " failed. ");
}
} catch (Exception e) {
log.error("create directory " + dir + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
}
/**
* 删除目录和子目录
*
* @param fs
* @param dirName
*/
public synchronized static void rmdirs(FileSystem fs, String dirName) {
// Path home = fs.getHomeDirectory();
Path workDir = fs.getWorkingDirectory();
String dir = workDir + "/" + dirName;
Path src = new Path(dir);
boolean succ;
try {
succ = fs.delete(src, true);
if (succ) {
log.info("remove directory " + dir + " successed. ");
} else {
log.info("remove directory " + dir + " failed. ");
}
} catch (Exception e) {
log.error("remove directory " + dir + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
}
/**
* 上传目录或文件
*
* @param fs
* @param local
* @param remote
*/
public synchronized static void upload(FileSystem fs, String local,
String remote) {
// Path home = fs.getHomeDirectory();
Path workDir = fs.getWorkingDirectory();
Path dst = new Path(workDir + "/" + remote);
Path src = new Path(local);
try {
fs.copyFromLocalFile(false, true, src, dst);
log.info("upload " + local + " to " + remote + " successed. ");
} catch (Exception e) {
log.error("upload " + local + " to " + remote + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
}
/**
* 下载目录或文件
*
* @param fs
* @param local
* @param remote
*/
public synchronized static void download(FileSystem fs, String local,
String remote) {
// Path home = fs.getHomeDirectory();
Path workDir = fs.getWorkingDirectory();
Path dst = new Path(workDir + "/" + remote);
Path src = new Path(local);
try {
fs.copyToLocalFile(false, dst, src);
log.info("download from " + remote + " to " + local
+ " successed. ");
} catch (Exception e) {
log.error("download from " + remote + " to " + local + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
}
/**
* 字节数转换
*
* @param size
* @return
*/
public synchronized static String convertSize(long size) {
String result = String.valueOf(size);
if (size < 1024 * 1024) {
result = String.valueOf(size / 1024) + " KB";
} else if (size >= 1024 * 1024 && size < 1024 * 1024 * 1024) {
result = String.valueOf(size / 1024 / 1024) + " MB";
} else if (size >= 1024 * 1024 * 1024) {
result = String.valueOf(size / 1024 / 1024 / 1024) + " GB";
} else {
result = result + " B";
}
return result;
}
/**
* 遍历HDFS上的文件和目录
*
* @param fs
* @param path
*/
public synchronized static void listFile(FileSystem fs, String path) {
Path workDir = fs.getWorkingDirectory();
Path dst;
if (null == path || "".equals(path)) {
dst = new Path(workDir + "/" + path);
} else {
dst = new Path(path);
}
try {
String relativePath = "";
FileStatus[] fList = fs.listStatus(dst);
for (FileStatus f : fList) {
if (null != f) {
relativePath = new StringBuffer()
.append(f.getPath().getParent()).append("/")
.append(f.getPath().getName()).toString();
if (f.isDir()) {
listFile(fs, relativePath);
} else {
System.out.println(convertSize(f.getLen()) + "/t/t"
+ relativePath);
}
}
}
} catch (Exception e) {
log.error("list files of " + path + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
} finally {
}
}
public synchronized static void write(FileSystem fs, String path,
String data) {
// Path home = fs.getHomeDirectory();
Path workDir = fs.getWorkingDirectory();
Path dst = new Path(workDir + "/" + path);
try {
FSDataOutputStream dos = fs.create(dst);
dos.writeUTF(data);
dos.close();
log.info("write content to " + path + " successed. ");
} catch (Exception e) {
log.error("write content to " + path + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
}
public synchronized static void append(FileSystem fs, String path,
String data) {
// Path home = fs.getHomeDirectory();
Path workDir = fs.getWorkingDirectory();
Path dst = new Path(workDir + "/" + path);
try {
FSDataOutputStream dos = fs.append(dst);
dos.writeUTF(data);
dos.close();
log.info("append content to " + path + " successed. ");
} catch (Exception e) {
log.error("append content to " + path + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
}
public synchronized static String read(FileSystem fs, String path) {
String content = null;
// Path home = fs.getHomeDirectory();
Path workDir = fs.getWorkingDirectory();
Path dst = new Path(workDir + "/" + path);
try {
// reading
FSDataInputStream dis = fs.open(dst);
content = dis.readUTF();
dis.close();
log.info("read content from " + path + " successed. ");
} catch (Exception e) {
log.error("read content from " + path + " failed :"
+ ExceptionUtils.getFullStackTrace(e));
}
return content;
}
}
分享到:
相关推荐
A.2实验二:熟悉常用的HDFS操作 ...(3)熟悉HDFS操作常用的Java API。 A.2.2 实验平台 (1)操作系统:Linux(建议Ubuntu 16.04)。(2) Hadoop版本:2.7.1。 (3)JDK版本:1.7或以上版本。(4) Java IDE:Eclipse。
数据科学导论 实验2:熟悉常用的HDFS操作 1. 编程实现以下指定功能,并利用 Hadoop 提供的 Shell 命令完成相同任务: 2. 编程实现一个类“MyFSDataInputStream”,该类继承“org.apache.hadoop.fs.FSDataInputStream...
本实例依托于springmvc框架,采用maven作为jar包管理仓库,通过引用hadoop相关jar包,编写了hdfs操作工具类,能够在web端实现文件远程上传至hdfs。
第1章 HDFS HA及解决方案 1.1 HDFS系统架构 1.2 HA定义 1.3 HDFS HA原因分析及应对措施 1.3.1 可靠性 1.3.2 可维护性 1.4 现有HDFS HA解决方案 1.4.1 Hadoop的元数据备份方案 1.4.2 Hadoop的SecondaryNameNode方案 ...
01_hadoop_hdfs1分布式文件系统01 02_hadoop_hdfs1分布式文件系统02 03_hadoop_hdfs1分布式文件系统03 04_hadoop_hdfs1分布式文件系统04 05_hadoop_hdfs1分布式文件系统05 06_hadoop_hdfs1分布式文件系统06 07_...
036 HDFS Java API 两种方式介绍及使用URL API详解一 037 使用URL API详解二 038 使用HDFS FileSystem API 详解 039 HDFS文件系统读写流程及HDFS API两种方式读取文件 040 详解HDFS API之FileSystem方式基本操作二 ...
hadoop-hdfs Hadoop分布式文件系统hdfs代码分析目录介绍Datanode-数据块基本结构主要介绍了HDFS中第二关系块结构,数据块到数据中断的映射关系。退役-中断退款主要介绍了数据异步下线取消机制。INode-文件目录结构...
4、HDFS-java操作类HDFSUtil及junit测试(HDFS的常见操作以及HA环境的配置) 网址:https://blog.csdn.net/chenwewi520feng/article/details/130334620 本文编写了java对HDFS的常见操作,并且均测试通过。 其功能...
编写程序,实现在hdfs分布式文件系统平台上创建目录。编写任务,实现通过本地文件上传到hdfs分布式文件系统平台,如将本地文件/root/HelloWorld.txt文件上传到hdfs平台/javaApi目录下
基于hadoop的Hive数据仓库JavaAPI简单调用的实例,关于Hive的简介在此不赘述。hive提供了三种用户接口:CLI,JDBC/ODBC和 WebUI CLI,即Shell命令行 JDBC/ODBC 是 Hive 的Java,与使用传统数据库JDBC的方式类似 Web...
HDFS的Shell操作,bin/hadoop fs 具体命令 OR bin/hdfs dfs 具体命令 dfs是fs的实现类等等。
5.1.4 HDFS文件与操作方法 451 5.1.5 HDFS文件读写方法 452 5.2 文件读操作与输入流 452 5.2.1 打开文件 452 5.2.2 读操作――DFSInputStream实现 461 5.3 文件短路读操作 481 5.3.1 短路读共享内存 482...
Hdfs操作类-------->Hdfs.java HBase操作类------->HBaseUtil.java 短信操作类-------->Sms.java 联系人操作类------>Contact.java 文件操作类-------->MyFile.java 上面的Action都配置到Struts.xml中。 Client端...
现有的大数据平台Hadoop、Spark等都在处理文本数据方面具有很好的支持,并且效率也经过了各种优化,所以在利用分布式框架来处理日志类数据,工作难度往往是如何对这些数据进行逻辑上的处理。但是对于非结构化数据,...
Hadoop分布式文件系统;Hadoop的I/O、MapReduce应用程序开发;MapReduce的工作机制;MapReduce的类型和格式;MapReduce的特性;如何构建Hadoop集群,如何管理Hadoop;Pig简介;Hbase简介;Hive简介;ZooKeeper简介;...
第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据块 namenode和datanode 命令行接口 基本文件系统操作 Hadoop文件系统 接口 Java接口 从Hadoop URL中读取数据 通过FileSystem API读取数据 写入数据 目录 查询...
而进行海量计算需要一个稳定的,安全的数据容器,才有了Hadoop分布式文件系统(HDFS,Hadoop Distributed File System)。 HDFS通信部分使用org.apache.hadoop.ipc,可以很快使用RPC.Server.start()构造一个节点...
3.1 HDFS 文件操作 3.1.1 基本文件命令 3.1.2 编程读写HDFS 3.2 剖析MapReduce 程序 3.2.1 Hadoop数据类型 3.2.2 Mapper 3.2.3 Reducer 3.2.4 Partitioner:重定向Mapper输出 3.2.5 Combiner:本地reduce ...
这些操作是透明的,与普通的文件系统API没有区别。 MapReduce则是JobTracker节点为主,分配工作以及负责和用户程序通信。 HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算。 Hadoop也跟其他...
二、 课程的任务 通过本课程的学习,使学生学会搭建Hadoop完全分布式集群,掌握HDFS的原理和基础操作,掌握MapReduce原理架构、MapReduce程序的编写。为将来从事大数据挖掘研究工作以及后续课程的学习奠定基础。