Spark JDBC操作MySQL方式详细讲解

 更新时间:2023年02月01日 11:46:37   作者:CarveStone  
这篇文章主要介绍了Spark JDBC操作MySQL方式,Spark SQL可以通过JDBC从传统的关系型数据库中读写数据,读取数据后直接生成的是DataFrame,然后再加上借助于Spark SQL丰富的API来进行各种操作

JDBC操作MySQL

在实际的企业级开发环境中,如果数据规模特S别大,此时采用传统的SQL语句去处理的话一般需要分成很多批次处理,而且很容易造成数据库服务宕机,且实际的处理过程可能会非常复杂,通过传统的Java EE等技术可能很难或者不方便实现处理算法,此时采用SparkSQL进行分布式分析处理就可以非常好的解决该问题,在生产环境下,一般会在Spark SQL和具体要操作的DB之间加上一个缓冲层次,例如中间使用Redis或者Kafka。

Spark SQL可以通过JDBC从传统的关系型数据库中读写数据,读取数据后直接生成的是DataFrame,然后再加上借助于Spark SQL丰富的API来进行各种操作。从计算数据规模的角度去讲,集群并行访问数据库数据,调用Data Frame Reader的Format(“JDBC”)的方式说明Spark SQL操作的数据来源是通过JDBC获得,JDBC后端一般都是数据库,例如MySQL、Oracle等。

JDBC读取数据方式

单Partition(无并发)

调用函数格式:def jdbc(url: String, table: String, properties: Properties): DataFrame

  • url:代表数据库的JDBC链接地址;
  • table:具体要链接的数据库;

这种方法是将所有的数据放在一个Partition中进行操作(即并发度为1),意味着无论给的资源有多少,只有一个Task会执行任务,执行效率比较慢,并且容易出现OOM。使用如下,在spark-shell中执行:

/*此为代码格式,实际中使用应替换相应字段中的内容*/
val url = "jdbc:mysql://localhost:/database"        
val tableName = "table"
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username")  //实际使用中替换username为相应的用户名
prop.setProperty("password","pwd")   //实际使用中替换pwd为相应的密码

根据Long类型字段分区

/*此为代码格式,实际中使用应替换相应字段中的内容*/
def jdbc(
url: String,
table: String,
columnName: String, // 根据该字段分区,需要为整型,比如 id 等
lowerBound: Long, // 分区的下界
upperBound: Long, // 分区的上界
numPartitions: Int,  //分区的个数
connectionProperties: Properties): DataFrame

根据字段将数据进行分区,放进不同的Partition中,执行效率较快,但是只能根据数据字段作为分区关键字。使用如下:

/*此为代码格式,实际中使用应替换相应字段中的内容*/
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
val columnName = "colName"
val lowerBound = 1,
val upperBound = 10000000,
val numPartitions = 10,
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")

将字段 colName 中发 1~10000000 条数据分区到 10 个 Partition 中。

根据任意类型字段分区

/*此为代码格式,实际中使用应替换相应字段中的内容*/
jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame

以下使用时间字段进行分区:

/*此为代码格式,实际中使用应替换相应字段中的内容*/
val url = "jdbc:mysql://mysqlHost:3306/database"
val tableName = "table"
// 设置连接用户&密码
val prop = new java.util.Properties
prop.setProperty("user","username")
prop.setProperty("password","pwd")
/**
* 将 9 月 16-12 月 15 三个月的数据取出,按时间分为 6 个 partition
* 为了减少事例代码,这里的时间都是写死的
* modified_time 为时间字段
*/
val predicates =
Array(
"2015-09-16" -> "2015-09-30",
"2015-10-01" -> "2015-10-15",
"2015-10-16" -> "2015-10-31",
"2015-11-01" -> "2015-11-14",
"2015-11-15" -> "2015-11-30",
"2015-12-01" -> "2015-12-15"
).map {
case (start, end) =>
s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time
as date) <= date '$end'"
}

这种方法可以使用任意字段进行分区,比较灵活,适用于各种场景。以MySQL 3000W数据量为例,如果单分区count,若干分钟就会报OOM;如果分成5~20个分区后,count操作只需要2s,效率会明显提高,这里就凸显出JDBC高并发的优势。Spark高并发度可以大幅度提高读取以及处理数据的速度,但是如果设置过高(大量的Partition同时读取)也可能会将数据源数据库宕掉。

JDBC读取MySQL数据

下面来进行实际操作,首先需要配置MySQL

  • 免密登陆:mysql -uroot
  • 查看数据库:show databases;
  • 使用MySQL数据库:use mysql;

修改表格的权限,目的是为了使其他主机可以远程连接 MySQL,通过此命令可以查看访问用户允许的主机名。

  • 查看所有用户及其host:select host, user from user;
  • 将相应用户数据表中的host字段改成’%':update user set host="%" where user="root";
  • 刷新修改权限flush privileges;

通过命令修改host为%,表示任意IP地址都可以登录。出现ERROR 1062 (23000): Duplicate entry '%-root' for key 'PRIMARY',是因为 user+host 是主键,不能重复,可以不用理会。也可通过以下命令删除user 为空的内容来解决:delete from user where user='';

在MySQL创建数据库和表格,插入数据,查看:

create database test;    //创建数据库test
use test;        //进入数据库test
create table people( name varchar(12), age int);         //创建表格people并构建结构
insert into people values ("Andy",30),("Justin",19),("Dela",25),("Magi",20),("Pule",21),("Mike",12);        //向people表中插入数据
select * from people;         //输出people表中全部数据

编写代码读取MySQL表中数据:

//导入依赖环境
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
import java.util.Properties
val url = "jdbc:mysql://localhost/test"       //MySQL地址及数据库
val username = "root"       //用户名
val sqlContext = new SQLContext(sc)       
sc.setLogLevel("WARN")
val uri = url + "?user=" + username + "&useUnicode=true&characterEncoding=UTF-8"           //设置读取路径及用户名
val properties = new Properties()      //创建JDBC连接信息
properties.put("user","root")
properties.put("driver", "com.mysql.jdbc.Driver")
val df_test: DataFrame = spark.sqlContext.read.jdbc(uri, "people", properties)  //读取数据
df_test.select("name","age").collect().foreach(row => {         //输出数据
	println("name " + row(0) + ", age" + row(1))
})
df_test.write.mode("append").jdbc(uri,"people",properties)     //向people表中写入读出的数据,相当于people表中有两份一样的数据

到此这篇关于Spark JDBC操作MySQL方式详细讲解的文章就介绍到这了,更多相关Spark JDBC操作MySQL内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot集成drools的实现示例

    SpringBoot集成drools的实现示例

    本文主要介绍了SpringBoot集成drools的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-05-05
  • spring boot写java web和接口

    spring boot写java web和接口

    这篇文章主要介绍了spring boot写java web和接口,Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程,该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置,下面详细内容需要的小伙伴可以参考一下
    2022-01-01
  • JAVA工程中引用本地jar的3种常用简单方式

    JAVA工程中引用本地jar的3种常用简单方式

    Jar文件的全称是Java Archive File即Java归档文件,主要是对class文件进行压缩,是一种压缩文件,和常见的zip压缩文件兼容,下面这篇文章主要给大家介绍了关于JAVA工程中引用本地jar的3种常用简单方式,需要的朋友可以参考下
    2024-03-03
  • java 数据结构之栈与队列

    java 数据结构之栈与队列

    这篇文章主要介绍了java 数据结构之栈与队列的相关资料,这里对java中的栈和队列都做出实现实例来帮助大家理解学习数据结构,需要的朋友可以参考下
    2017-07-07
  • SpringCloud微服务架构升级汇总

    SpringCloud微服务架构升级汇总

    这篇文章主要介绍了SpringCloud微服务架构升级汇总,它提倡将单一应用程序划分成一组小的服务,服务之间互相协调、互相配合,为用户提供最终价值,需要的朋友可以参考下
    2019-06-06
  • MyEclipse整合ssh三大框架环境搭载用户注册源码下载

    MyEclipse整合ssh三大框架环境搭载用户注册源码下载

    这篇文章主要为大家详细介绍了如何使用MyEclipse整合ssh三大框架进行环境搭载,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-10-10
  • java Struts2框架下实现文件上传功能

    java Struts2框架下实现文件上传功能

    这篇文章主要为大家详细介绍了java Struts2框架下实现文件上传功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-10-10
  • Netty进阶之ChannelPoolMap源码解析

    Netty进阶之ChannelPoolMap源码解析

    这篇文章主要介绍了Netty进阶之ChannelPoolMap源码解析,ChannelPoolMap是用来存储ChannelPool和指定key的一个集合Map,实际的应用场景就是服务器端是一个分布式集群服务,拥有多个配置地址,这样我们就可以配置多个服务地址,减轻单台服务器的压力,需要的朋友可以参考下
    2023-11-11
  • SpringMVC中的HandlerMapping详解

    SpringMVC中的HandlerMapping详解

    这篇文章主要介绍了SpringMVC中的HandlerMapping详解,HandlerMapping是请求映射处理器,也就是通过请求的url找到对应的逻辑处理单元(Controller),注意这里只是建立请求与Controller的映射关系,最终的处理是通过HandlerAdapt来进行处理的,需要的朋友可以参考下
    2023-09-09
  • JavaWeb实现文件上传与下载实例详解

    JavaWeb实现文件上传与下载实例详解

    在Web应用程序开发中,文件上传与下载功能是非常常用的功能,下面通过本文给大家介绍JavaWeb实现文件上传与下载实例详解,对javaweb文件上传下载相关知识感兴趣的朋友一起学习吧
    2016-02-02

最新评论