SQLServer数据库之DataFrame与数据库的相互转化
小标 2018-10-19 来源 : 阅读 2399 评论 0

摘要:本文主要向大家介绍了SQLServer数据库之DataFrame与数据库的相互转化,通过具体的内容向大家展现,希望对大家学习SQLServer数据库有所帮助。

本文主要向大家介绍了SQLServer数据库之DataFrame与数据库的相互转化,通过具体的内容向大家展现,希望对大家学习SQLServer数据库有所帮助。


在Spark中,Dataframe简直可以称为内存中的文本文件。
就像在电脑上直接操作txt、 csv、 json文件一样简单。
 
val sparkConf = new SparkConf().setAppName("df2db").setMaster("local[1]")
val sc = new SparkContext(sparkConf)
val sqlContext : SQLContext = new SQLContext(sc)
val df = sqlContext.read.format("csv").option("header","true").load("D:\\spark test\\123")
val snapTable = "env0y"
df.registerTempTable(snapTable)
 
 
以上寥寥数语就把一个csv文件转为DataFrame并注册为一张临时表了,这时候就可以像操作数据库表一样操作这个snapTable了:
 
val sql = "SELECT * FROM " + snapTable
val dfTmp = this.sqlContext.sql(sql)
 
 
这样写代码方便简单,但可惜的是DataFrame毕竟仅仅存在于内存中,我们业务代码只会输出算法里规定的结果
 
也就是说,假如结果出错,不好定位到底是DataFrame本身数据有误,还是代码中的SQL写错了。。。
 
假如能随时随地操作DataFrame就好了,怎么办呢?
把DataFrame保存到真实的数据库去:
 
import java.util.Properties
val connectionUrl = "jdbc:sqlserver://10.119.46.153:1433"
val table = "Nettransmit.dbo.df2mssql"
val prop = new Properties()
prop.put("JDBC.Driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
prop.put("user", "sa")
prop.put("password", "ibas.1597")
val dfWriter = df.write.mode(SaveMode.Overwrite).jdbc(connectionUrl, table, prop)
 
这下好了,如果计算出错了,我们直接连上数据库几条sql就能debug个八九不离十。
唯一要注意的是,DataFrame to Database不是业务要求,所以上面的代码只能在开发模式或者测试模式的时候存在,正式发布版不应该出现 ?
 
既然可以写进去,自然也可以读出来:
//SqlServer 2 Dataframe
val dfviatable = sqlContext.read.jdbc(connectionUrl,table,prop)
dfviatable.show(10)
 
以上,DataFrame和数据库之间的极简交互就完成了,但如果业务中真的有读写数据库的需求了,性能问题可能会成为瓶颈,要注意的。
 
接下来是那么一点点优化。
从csv到DataFrame,我们使用df.printSchema()语句可以在控制台看到类似下面的输出:
root
 |-- IMSI: string (nullable = true)
|-- UserType: string (nullable = true)
 |-- Total PS Traffic(KB): string (nullable = true)
 |-- Total Online Time(s): string (nullable = true)
 |-- Total CS Traffic (ERL): string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Series: string (nullable = true)
 |-- OS: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- FDD LTE: string (nullable = true)
 |-- TDD LTE: string (nullable = true)
|-- Only Report 3G Capability: string (nullable = true)
 
也就是说,写入到数据库之后每个字段的类型都是string,这显然是一种浪费。
 
而且很多值完全可以使用int或者double或者bool类型。
 
怎么办呢?得修改数据库的“方言”,就像在c++中std::locale 建立本地规则一样。
 
为了方便起见,封装一下:
import java.io.{File, FileInputStream}
import java.util.Properties
 
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SaveMode}
 
/**
  * Created by env0y on 2017/11/24.
  */
object dataframe2db {
  def df2db(df: DataFrame,table: String,properties: String) = {
    try{
      val is = new FileInputStream(new File(properties))
      val prop = new Properties()
      prop.load(is)
      val url = String.valueOf(prop.get("url"))//
      JdbcDialects.registerDialect(SQLServerDialect)
      df.write.mode(SaveMode.Overwrite).jdbc(url,table,prop)
      is.close()
    }
  }
 
  val SQLServerDialect = new JdbcDialect {
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")
 
    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
      case StringType      => Some(JdbcType("NVARCHAR(128)", java.sql.Types.VARCHAR))
      case BooleanType     => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
      case IntegerType     => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
      case LongType        => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
      case DoubleType      => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
      case FloatType       => Some(JdbcType("REAL", java.sql.Types.REAL))
      case ShortType       => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
      case ByteType        => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
      case BinaryType      => Some(JdbcType("BINARY", java.sql.Types.BINARY))
      case DateType        => Some(JdbcType("DATE", java.sql.Types.DATE))
      case TimestampType   => Some(JdbcType("DATE", java.sql.Types.DATE))
      //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
      case t: DecimalType  => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
      case _ => throw new IllegalArgumentException(s"Don‘t know how to save ${dt.json} to JDBC")
    }
  }
 
}
 
然后像这样调用:
dataframe2db.df2db(df,"Nettransmit.dbo.df2dbff","D:\\ database.properties")
 
第三个参数是数据库的属性配置文件,内容类似以下:
#\u5F00\u53D1\u6570\u636E\u5E93
driver=com.microsoft.sqlserver.jdbc.SQLServerDriver
url=jdbc:sqlserver://10.119.46.153:1433;databaseName=TspManagement
username=sa
password=ibas.1597
 
这时候再去观察从DataFrame写入到数据库中表会发现,字段属性都变成NVARCHAR(128)了~~
 
 
 
另外,直接修改DataFrame里面的Schema类型也很简单:
val df1 = df.withColumn("Only Report 3G Capability",col("Only Report 3G Capability").cast(DataTypes.FloatType))
df1.printSchema()
 
 
就这些,以上Spark的版本是1.6. 涉及的数据库是sqlServer.

本文由职坐标整理并发布,希望对同学们学习SQL Server有所帮助,更多内容请关注职坐标数据库SQL Server数据库频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程