SpringBoot集成Hadoop对HDFS的文件操作方法
一、对HDFS操作设计以下几个主要的类:
Configuration
:封装了客户端或者服务器的配置信息;
FileSystem
:此类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作通过FileSystem
的静态方法get
获得该对象,例:FileSystem hdfs = FileSystem.get(conf)
;
FSDataInputStream
:这是HDFS中的输入流,通过由FileSystem
的open
方法获取;
FSDataOutputStream
:这是HDFS中的输出流,通过由FileSystem
的create
方法获取。
二、依赖配置
Maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hdfs</groupId> <artifactId>HadoopTest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>HadoopTest</name> <url>http://maven.apache.org</url> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> <relativePath /> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>cn.bestwu</groupId> <artifactId>ik-analyzers</artifactId> <version>5.1.0</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
Application.properties
# tomcat thread = 200 server.tomcat.max-threads=1000 # edit tomcat port server.port=8900 # session time 30 server.session-timeout=60 spring.application.name=hadoop spring.servlet.multipart.max-file-size=50MB spring.servlet.multipart.max-request-size=50MB hdfs.path=hdfs://localhost:9000 hdfs.username=linhaiy logging.config=classpath:logback.xml
三、HDFS文件操作接口开发
Config
package com.hadoop.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; /** * HDFS配置类 * @author linhaiy * @date 2019.05.18 */ @Configuration public class HdfsConfig { @Value("${hdfs.path}") private String path; public String getPath() { return path; } public void setPath(String path) { this.path = path; } }
Entity
package com.hadoop.hdfs.entity; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 用户实体类 * @author linhaiy * @date 2019.05.18 */ public class User implements Writable { private String username; private Integer age; private String address; public User() { super(); // TODO Auto-generated constructor stub } public User(String username, Integer age, String address) { super(); this.username = username; this.age = age; this.address = address; } @Override public void write(DataOutput output) throws IOException { // 把对象序列化 output.writeChars(username); output.writeInt(age); output.writeChars(address); } @Override public void readFields(DataInput input) throws IOException { // 把序列化的对象读取到内存中 username = input.readUTF(); age = input.readInt(); address = input.readUTF(); } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } @Override public String toString() { return "User [username=" + username + ", age=" + age + ", address=" + address + "]"; } }
Service
package com.hadoop.hdfs.service; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.PostConstruct; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.web.multipart.MultipartFile; import com.hadoop.util.JsonUtil; @Component public class HdfsService { @Value("${hdfs.path}") private String path; @Value("${hdfs.username}") private String username; private static String hdfsPath; private static String hdfsName; private static final int bufferSize = 1024 * 1024 * 64; /** * 获取HDFS配置信息 * @return */ private static Configuration getConfiguration() { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", hdfsPath); return configuration; } /** * 获取HDFS文件系统对象 * @return * @throws Exception */ public static FileSystem getFileSystem() throws Exception { // 客户端去操作hdfs时是有一个用户身份的,默认情况下hdfs客户端api会从jvm中获取一个参数作为自己的用户身份 // DHADOOP_USER_NAME=hadoop // 也可以在构造客户端fs对象时,通过参数传递进去 FileSystem fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName); return fileSystem; } /** * 在HDFS创建文件夹 * @param path * @return * @throws Exception */ public static boolean mkdir(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } if (existFile(path)) { return true; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); boolean isOk = fs.mkdirs(srcPath); fs.close(); return isOk; } /** * 判断HDFS文件是否存在 * @param path * @return * @throws Exception */ public static boolean existFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } FileSystem fs = getFileSystem(); Path srcPath = new Path(path); boolean isExists = fs.exists(srcPath); return isExists; } /** * 读取HDFS目录信息 * @param path * @return * @throws Exception */ public static List<Map<String, Object>> readPathInfo(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path newPath = new Path(path); FileStatus[] statusList = fs.listStatus(newPath); List<Map<String, Object>> list = new ArrayList<>(); if (null != statusList && statusList.length > 0) { for (FileStatus fileStatus : statusList) { Map<String, Object> map = new HashMap<>(); map.put("filePath", fileStatus.getPath()); map.put("fileStatus", fileStatus.toString()); list.add(map); } return list; } else { return null; } } /** * HDFS创建文件 * @param path * @param file * @throws Exception */ public static void createFile(String path, MultipartFile file) throws Exception { if (StringUtils.isEmpty(path) || null == file.getBytes()) { return; } String fileName = file.getOriginalFilename(); FileSystem fs = getFileSystem(); // 上传时默认当前目录,后面自动拼接文件的目录 Path newPath = new Path(path + "/" + fileName); // 打开一个输出流 FSDataOutputStream outputStream = fs.create(newPath); outputStream.write(file.getBytes()); outputStream.close(); fs.close(); } /** * 读取HDFS文件内容 * @param path * @return * @throws Exception */ public static String readFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); FSDataInputStream inputStream = null; try { inputStream = fs.open(srcPath); // 防止中文乱码 BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String lineTxt = ""; StringBuffer sb = new StringBuffer(); while ((lineTxt = reader.readLine()) != null) { sb.append(lineTxt); } return sb.toString(); } finally { inputStream.close(); fs.close(); } } /** * 读取HDFS文件列表 * @param path * @return * @throws Exception */ public static List<Map<String, String>> listFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); // 递归找到所有文件 RemoteIterator<LocatedFileStatus> filesList = fs.listFiles(srcPath, true); List<Map<String, String>> returnList = new ArrayList<>(); while (filesList.hasNext()) { LocatedFileStatus next = filesList.next(); String fileName = next.getPath().getName(); Path filePath = next.getPath(); Map<String, String> map = new HashMap<>(); map.put("fileName", fileName); map.put("filePath", filePath.toString()); returnList.add(map); } fs.close(); return returnList; } /** * HDFS重命名文件 * @param oldName * @param newName * @return * @throws Exception */ public static boolean renameFile(String oldName, String newName) throws Exception { if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) { return false; } FileSystem fs = getFileSystem(); // 原文件目标路径 Path oldPath = new Path(oldName); // 重命名目标路径 Path newPath = new Path(newName); boolean isOk = fs.rename(oldPath, newPath); fs.close(); return isOk; } /** * 删除HDFS文件 * @param path * @return * @throws Exception */ public static boolean deleteFile(String path) throws Exception { if (StringUtils.isEmpty(path)) { return false; } if (!existFile(path)) { return false; } FileSystem fs = getFileSystem(); Path srcPath = new Path(path); boolean isOk = fs.deleteOnExit(srcPath); fs.close(); return isOk; } /** * 上传HDFS文件 * @param path * @param uploadPath * @throws Exception */ public static void uploadFile(String path, String uploadPath) throws Exception { if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) { return; } FileSystem fs = getFileSystem(); // 上传路径 Path clientPath = new Path(path); // 目标路径 Path serverPath = new Path(uploadPath); // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false fs.copyFromLocalFile(false, clientPath, serverPath); fs.close(); } /** * 下载HDFS文件 * @param path * @param downloadPath * @throws Exception */ public static void downloadFile(String path, String downloadPath) throws Exception { if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) { return; } FileSystem fs = getFileSystem(); // 上传路径 Path clientPath = new Path(path); // 目标路径 Path serverPath = new Path(downloadPath); // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false fs.copyToLocalFile(false, clientPath, serverPath); fs.close(); } /** * HDFS文件复制 * @param sourcePath * @param targetPath * @throws Exception */ public static void copyFile(String sourcePath, String targetPath) throws Exception { if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) { return; } FileSystem fs = getFileSystem(); // 原始文件路径 Path oldPath = new Path(sourcePath); // 目标路径 Path newPath = new Path(targetPath); FSDataInputStream inputStream = null; FSDataOutputStream outputStream = null; try { inputStream = fs.open(oldPath); outputStream = fs.create(newPath); IOUtils.copyBytes(inputStream, outputStream, bufferSize, false); } finally { inputStream.close(); outputStream.close(); fs.close(); } } /** * 打开HDFS上的文件并返回byte数组 * @param path * @return * @throws Exception */ public static byte[] openFileToBytes(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); try { FSDataInputStream inputStream = fs.open(srcPath); return IOUtils.readFullyToByteArray(inputStream); } finally { fs.close(); } } /** * 打开HDFS上的文件并返回java对象 * @param path * @return * @throws Exception */ public static <T extends Object> T openFileToObject(String path, Class<T> clazz) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } String jsonStr = readFile(path); return JsonUtil.fromObject(jsonStr, clazz); } /** * 获取某个文件在HDFS的集群位置 * @param path * @return * @throws Exception */ public static BlockLocation[] getFileBlockLocations(String path) throws Exception { if (StringUtils.isEmpty(path)) { return null; } if (!existFile(path)) { return null; } FileSystem fs = getFileSystem(); // 目标路径 Path srcPath = new Path(path); FileStatus fileStatus = fs.getFileStatus(srcPath); return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); } @PostConstruct public void getPath() { hdfsPath = this.path; } @PostConstruct public void getName() { hdfsName = this.username; } public static String getHdfsPath() { return hdfsPath; } public String getUsername() { return username; } }
Controller
package com.hadoop.hdfs.controller; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.BlockLocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.multipart.MultipartFile; import com.hadoop.hdfs.entity.User; import com.hadoop.hdfs.service.HdfsService; import com.hadoop.util.Result; @RestController @RequestMapping("/hadoop/hdfs") public class HdfsAction { private static Logger LOGGER = LoggerFactory.getLogger(HdfsAction.class); /** * 创建文件夹 * @param path * @return * @throws Exception */ @RequestMapping(value = "mkdir", method = RequestMethod.POST) @ResponseBody public Result mkdir(@RequestParam("path") String path) throws Exception { if (StringUtils.isEmpty(path)) { LOGGER.debug("请求参数为空"); return new Result(Result.FAILURE, "请求参数为空"); } // 创建空文件夹 boolean isOk = HdfsService.mkdir(path); if (isOk) { LOGGER.debug("文件夹创建成功"); return new Result(Result.SUCCESS, "文件夹创建成功"); } else { LOGGER.debug("文件夹创建失败"); return new Result(Result.FAILURE, "文件夹创建失败"); } } /** * 读取HDFS目录信息 * @param path * @return * @throws Exception */ @PostMapping("/readPathInfo") public Result readPathInfo(@RequestParam("path") String path) throws Exception { List<Map<String, Object>> list = HdfsService.readPathInfo(path); return new Result(Result.SUCCESS, "读取HDFS目录信息成功", list); } /** * 获取HDFS文件在集群中的位置 * @param path * @return * @throws Exception */ @PostMapping("/getFileBlockLocations") public Result getFileBlockLocations(@RequestParam("path") String path) throws Exception { BlockLocation[] blockLocations = HdfsService.getFileBlockLocations(path); return new Result(Result.SUCCESS, "获取HDFS文件在集群中的位置", blockLocations); } /** * 创建文件 * @param path * @return * @throws Exception */ @PostMapping("/createFile") public Result createFile(@RequestParam("path") String path, @RequestParam("file") MultipartFile file) throws Exception { if (StringUtils.isEmpty(path) || null == file.getBytes()) { return new Result(Result.FAILURE, "请求参数为空"); } HdfsService.createFile(path, file); return new Result(Result.SUCCESS, "创建文件成功"); } /** * 读取HDFS文件内容 * @param path * @return * @throws Exception */ @PostMapping("/readFile") public Result readFile(@RequestParam("path") String path) throws Exception { String targetPath = HdfsService.readFile(path); return new Result(Result.SUCCESS, "读取HDFS文件内容", targetPath); } /** * 读取HDFS文件转换成Byte类型 * @param path * @return * @throws Exception */ @PostMapping("/openFileToBytes") public Result openFileToBytes(@RequestParam("path") String path) throws Exception { byte[] files = HdfsService.openFileToBytes(path); return new Result(Result.SUCCESS, "读取HDFS文件转换成Byte类型", files); } /** * 读取HDFS文件装换成User对象 * @param path * @return * @throws Exception */ @PostMapping("/openFileToUser") public Result openFileToUser(@RequestParam("path") String path) throws Exception { User user = HdfsService.openFileToObject(path, User.class); return new Result(Result.SUCCESS, "读取HDFS文件装换成User对象", user); } /** * 读取文件列表 * @param path * @return * @throws Exception */ @PostMapping("/listFile") public Result listFile(@RequestParam("path") String path) throws Exception { if (StringUtils.isEmpty(path)) { return new Result(Result.FAILURE, "请求参数为空"); } List<Map<String, String>> returnList = HdfsService.listFile(path); return new Result(Result.SUCCESS, "读取文件列表成功", returnList); } /** * 重命名文件 * @param oldName * @param newName * @return * @throws Exception */ @PostMapping("/renameFile") public Result renameFile(@RequestParam("oldName") String oldName, @RequestParam("newName") String newName) throws Exception { if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) { return new Result(Result.FAILURE, "请求参数为空"); } boolean isOk = HdfsService.renameFile(oldName, newName); if (isOk) { return new Result(Result.SUCCESS, "文件重命名成功"); } else { return new Result(Result.FAILURE, "文件重命名失败"); } } /** * 删除文件 * @param path * @return * @throws Exception */ @PostMapping("/deleteFile") public Result deleteFile(@RequestParam("path") String path) throws Exception { boolean isOk = HdfsService.deleteFile(path); if (isOk) { return new Result(Result.SUCCESS, "delete file success"); } else { return new Result(Result.FAILURE, "delete file fail"); } } /** * 上传文件 * @param path * @param uploadPath * @return * @throws Exception */ @PostMapping("/uploadFile") public Result uploadFile(@RequestParam("path") String path, @RequestParam("uploadPath") String uploadPath) throws Exception { HdfsService.uploadFile(path, uploadPath); return new Result(Result.SUCCESS, "upload file success"); } /** * 下载文件 * @param path * @param downloadPath * @return * @throws Exception */ @PostMapping("/downloadFile") public Result downloadFile(@RequestParam("path") String path, @RequestParam("downloadPath") String downloadPath) throws Exception { HdfsService.downloadFile(path, downloadPath); return new Result(Result.SUCCESS, "download file success"); } /** * HDFS文件复制 * @param sourcePath * @param targetPath * @return * @throws Exception */ @PostMapping("/copyFile") public Result copyFile(@RequestParam("sourcePath") String sourcePath, @RequestParam("targetPath") String targetPath) throws Exception { HdfsService.copyFile(sourcePath, targetPath); return new Result(Result.SUCCESS, "copy file success"); } /** * 查看文件是否已存在 * @param path * @return * @throws Exception */ @PostMapping("/existFile") public Result existFile(@RequestParam("path") String path) throws Exception { boolean isExist = HdfsService.existFile(path); return new Result(Result.SUCCESS, "file isExist: " + isExist); } }
四、一些测试结果截图
到此这篇关于SpringBoot集成Hadoop——对HDFS的文件操作的文章就介绍到这了,更多相关SpringBoot 操作HDFS内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
springBoo3.0集成knife4j4.1.0的详细教程(swagger3)
这篇文章主要介绍了springBoo3.0集成knife4j4.1.0的详细教程(swagger3),本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下2023-07-07
最新评论