一、示例代码
我们从一句常用的sql开始分析
import spark.implicits._
import spark.sql
sql("SELECT * FROM ods.personal_info limit 10").show()
其中包含两个函数调用,sql()和show(),我们依次来分析下
二、sql()
1、SparkSession
它是使用Dataset和DataFrame API对Spark编程的入口点
//使用Spark执行SQL查询,将结果作为“DataFrame”返回。
def sql(sqlText: String): DataFrame = {
Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
这里要分析来两处:
1、sessionState.sqlParser.parsePlan(sqlText)
2、Dataset.ofRows()
我们先看下第1处,它是通过ParserInterface调用其子类AbstractSqlParser来实现
2、AbstractSqlParser
//为给定的SQL字符串创建逻辑计划
override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
//获取将ParseTree转换为AST的构建器(访问者)
//这里用到了ANTLR的知识
astBuilder.visitSingleStatement(parser.singleStatement()) match {
case plan: LogicalPlan => plan
case _ =>
val position = Origin(None, None)
throw QueryParsingErrors.sqlStatementUnsupportedError(sqlText, position)
}
}
protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
logDebug(s"Parsing command: $command")
//1、将sql转换成字符流
//2、将字符流全部转换成大写,这大大简化了对流的词法分析,同时我们可以保持原始命令
//3、词法分析是基于Hive的org.apache.hadoop.hive.ql.parse.ParseDriver.ANTLRNoCaseStringStream
//4、Hive词法分析是基于ANTLR4
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
//使用指定的令牌源和默认令牌通道 构造一个新的 CommonTokenStream
val tokenStream = new CommonTokenStream(lexer)
//解析
val parser = new SqlBaseParser(tokenStream)
parser.addParseListener(PostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
//spark.sql.legacy.setopsPrecedence.enabled 默认false
//当设置为true并且括号未指定求值顺序时,设置操作将在查询中从左向右执行。当设置为false且括号未指定求值顺序时,INTERSECT操作将在任何UNION、EXCEPT和MINUS操作之前执行。
parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
//spark.sql.legacy.exponentLiteralAsDecimal.enabled 默认值 false
//当设置为true时,具有指数的文字(例如1E-30)将被解析为Decimal而不是Double。
parser.legacy_exponent_literal_as_decimal_enabled = conf.exponentLiteralAsDecimalEnabled
//spark.sql.ansi.enabled 默认 false
//如果为true,Spark SQL将使用符合ANSI的方言,而不是符合Hive的方言。
//
parser.SQL_standard_keyword_behavior = conf.ansiEnabled
try {
try {
// 首先,尝试使用可能更快的SLL模式进行解析
parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
toResult(parser)
}
catch {
case e: ParseCancellationException =>
// 如果失败,则使用LL模式解析
tokenStream.seek(0) // 倒带输入流
parser.reset()
// 重试
parser.getInterpreter.setPredictionMode(PredictionMode.LL)
toResult(parser)
}
}
catch {
//......
}
}
接下来我们看看第2步:Dataset.ofRows()
3、Dataset.ofRows()
private[sql] object Dataset {
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
//根据逻辑计划获取一个QueryExecution
//使用Spark执行关系查询的主要工作流程。旨在让开发人员轻松访问查询执行的中间阶段。
val qe = sparkSession.sessionState.executePlan(logicalPlan)
//断言分析
qe.assertAnalyzed()
//最后构建一个row类型的Dataset也就是DataFrame返回
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
}
我们先整体看看这个逻辑:
1、数据方面:根据执行计划获取QueryExecution,并对该执行计划进行分析(只是分析这一步时懒惰的,需要真正触发执行时才校验)
2、schema:根据数据构建RowEncoder将数据进行类型转换,适配程序
接下来我们看看qe.assertAnalyzed()都做了什么
4、QueryExecution
def assertAnalyzed(): Unit = {
// Analyzer在try块外调用,以避免在下面的catch块内再次调用它
//它是一个懒执行的方法,只有触发action算子时才会执行
analyzed
try {
//这里会对sql做分析校验
sparkSession.sessionState.analyzer.checkAnalysis(analyzed)
} catch {
case e: AnalysisException =>
//解析异常,这里会对sql进行解析并根据不同规则约束抛出不同的异常
}
}
lazy val analyzed: LogicalPlan = {
SparkSession.setActiveSession(sparkSession)
sparkSession.sessionState.analyzer.execute(logical)
}
接下来我们看看是如何对sql进行检查的
5、CheckAnalysis
当sql无法分析时,抛出面向用户的错误。
def checkAnalysis(plan: LogicalPlan): Unit = {
//我们对规则进行升级和排序,以捕捉第一个可能的失败,而不是级联解决失败的结果。
//这里就不展开了,这里列举几个
//1、跳过已分析的子计划
//2、逻辑计划不应具有char/varchar类型的输出
//3、Namespace 、Table、View、Hint等不存在
//4、将 Table 的操作用在了 View 上
//5、表没有分区、不支持分区等等
//......
plan.foreachUp (......)
//度量指标操作
checkCollectedMetrics(plan)
//覆盖以提供额外检查以进行正确分析。这些规则将在我们的内置检查规则之后进行评估。
extendedCheckRules.foreach(_(plan))
//如果有解析异常直接将错误抛给用户
plan.foreachUp {
case o if !o.resolved =>
failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")
case _ =>
}
//递归地将此计划树中的所有节点标记为已分析
plan.setAnalyzed()
}
在构建Dataset时还需要构建一个RowEncoder,下面我们就来看看它
6、RowEncoder
它用来处理Spark SQL类型与其允许的外部类型之间的映射,比如:
BooleanType -> java.lang.Boolean
ByteType -> java.lang.Byte
ShortType -> java.lang.Short
IntegerType -> java.lang.Integer
FloatType -> java.lang.Float
DoubleType -> java.lang.Double
StringType -> String
DecimalType -> java.math.BigDecimal or scala.math.BigDecimal or DecimalDateType -> java.sql.Date if spark.sql.datetime.java8API.enabled is false
DateType -> java.time.LocalDate if spark.sql.datetime.java8API.enabled is trueTimestampType -> java.sql.Timestamp if spark.sql.datetime.java8API.enabled is false
TimestampType -> java.time.Instant if spark.sql.datetime.java8API.enabled is trueTimestampNTZType -> java.time.LocalDateTime
DayTimeIntervalType -> java.time.Duration
YearMonthIntervalType -> java.time.PeriodBinaryType -> byte array
ArrayType -> scala.collection.Seq or Array
MapType -> scala.collection.Map
StructType -> org.apache.spark.sql.Row
三、show()
sql()方法会返回一个DataFrame(其实也是一个Dataset),因此show()也是Dataset身上的。
1、Dataset
//以表格形式显示数据集的前20行。超过20个字符的字符串将被截断,所有单元格将向右对齐。
def show(): Unit = show(20)
//数据样例:
//year month AVG('Adj Close) MAX('Adj Close)
//1980 12 0.503218 0.595103
//1981 01 0.523289 0.570307
//1982 02 0.436504 0.475256
//1983 03 0.410516 0.442194
//1984 04 0.450090 0.483521
def show(numRows: Int): Unit = show(numRows, truncate = true)
//truncate 含义:
//是否截断长字符串。如果为true,则超过20个字符的字符串将被截断,所有单元格将正确对齐
def show(numRows: Int, truncate: Boolean): Unit = if (truncate) {
println(showString(numRows, truncate = 20))
} else {
println(showString(numRows, truncate = 0))
}
//将查询到的数据构造成控制台可展示的字符串
private[sql] def showString(_numRows: Int, truncate: Int = 20): String = {
//可展示的最大行数
val numRows = _numRows.max(0)
//从这里就看出来,toDF()函数执行就已经将数据拿到了,因此才可以take出前21条来
//我们后面重点看toDF()做了什么
val takeResult = toDF().take(numRows + 1)
val hasMoreData = takeResult.length > numRows
val data = takeResult.take(numRows)
lazy val timeZone =
DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)
// 对于数组值,将Seq和array替换为方括号。
//对于超出“truncate”字符的单元格,将其替换为第一个“truncate-3”和“…”
val rows: Seq[Seq[String]] = schema.fieldNames.toSeq +: data.map { row =>
row.toSeq.map { cell =>
val str = cell match {
case null => "null"
case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
case array: Array[_] => array.mkString("[", ", ", "]")
case seq: Seq[_] => seq.mkString("[", ", ", "]")
case d: Date =>
DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
case ts: Timestamp =>
DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(ts), timeZone)
case _ => cell.toString
}
if (truncate > 0 && str.length > truncate) {
// 对于长度小于4个字符的字符串,不要显示省略号。
if (truncate < 4) str.substring(0, truncate)
else str.substring(0, truncate - 3) + "..."
} else {
str
}
}: Seq[String]
}
val sb = new StringBuilder
val numCols = schema.fieldNames.length
// 将每列的宽度初始化为最小值“3”
val colWidths = Array.fill(numCols)(3)
// 计算每列的宽度
for (row <- rows) {
for ((cell, i) <- row.zipWithIndex) {
colWidths(i) = math.max(colWidths(i), cell.length)
}
}
// 创建分隔线
val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()
//列名填充
rows.head.zipWithIndex.map { case (cell, i) =>
if (truncate > 0) {
StringUtils.leftPad(cell, colWidths(i))
} else {
StringUtils.rightPad(cell, colWidths(i))
}
}.addString(sb, "|", "|", "|\n")
sb.append(sep)
// 数据填充
rows.tail.map {
_.zipWithIndex.map { case (cell, i) =>
if (truncate > 0) {
StringUtils.leftPad(cell.toString, colWidths(i))
} else {
StringUtils.rightPad(cell.toString, colWidths(i))
}
}.addString(sb, "|", "|", "|\n")
}
sb.append(sep)
// 对于具有多个“numRows”记录的数据
if (hasMoreData) {
val rowsString = if (numRows == 1) "row" else "rows"
sb.append(s"only showing top $numRows $rowsString\n")
}
sb.toString()
}
2、构建一个新的Dataset对象
sql()返回的是一个新的Dataset吗,并不是,而是自带的伴生对象Dataset
而调用了toDF()后,真的会new一个Dataset出来
def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))
那这个时候我们就很有必要看看Dataset类中有什么属性和自动调起的方法了
class Dataset[T] private[sql](
@transient val sparkSession: SparkSession,
@DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
encoder: Encoder[T])
extends Serializable {
//sql()中的那个断言解析是在这里调用的
queryExecution.assertAnalyzed()
def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
//根据逻辑计划创建一个queryExecution
this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder)
}
//这个应该是为了向前兼容
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
this(sqlContext.sparkSession, logicalPlan, encoder)
}
@transient private[sql] val logicalPlan: LogicalPlan = {
// 对于各种命令(如DDL)和具有副作用的查询,我们强制立即执行查询,让这些副作用迅速发生。
queryExecution.analyzed match {
case c: Command =>
//根据需要插入shuffle操作和内部行格式转换,为执行准备一个计划的[[SparkPlan]]
//SparkPlan 再调用 executeCollect() 运行此查询,将结果作为数组返回。
//最后还是调用RDD的collect(),运行一个Job来执行sql
LocalRelation(c.output, queryExecution.executedPlan.executeCollect())
case u @ Union(children) if children.forall(_.isInstanceOf[Command]) =>
LocalRelation(u.output, queryExecution.executedPlan.executeCollect())
case _ =>
queryExecution.analyzed
}
}
//目前[[ExpressionEncoder]]是[[Encoder]]的唯一实现,
//在这里我们显式地将传入的编码器转换为[[ExpressionEncoder]],并隐式标记它,
//以便我们在构建具有相同对象类型(可能会解析为不同模式)的新Dataset对象时使用它。
private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder)
//编码器主要用作Dataset中serde表达式的容器。
//我们通过这些serde表达式构建逻辑计划,并在查询框架内执行。
//但是,出于性能原因,我们可能希望使用编码器作为函数,将内部行反序列化为自定义对象,
//例如collect。在这里,我们解析并绑定编码器,以便稍后调用它的`fromRow`方法。
private val boundEnc =
exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer)
//sqlContext必须为val,因为导入隐式时需要一个稳定的标识符
@transient lazy val sqlContext: SQLContext = sparkSession.sqlContext
//将数据集的内容表示为“T”的“RDD”
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
}
四、总结
源码已经大概型的过了一遍,下面我们把SparkSQL执行的整个过程来捋一下
sql()
构建逻辑计划:
1、将sql字符串转换成大写的字符流
2、用ANTLR4对其进行词法分析
3、构建一个CommonTokenStream并进行解析
4、对有歧义的情况做设置,比如1E-30应该被解析为Decimal还是Double
5、首先,尝试使用可能更快的SLL模式进行解析,如果失败,则使用LL模式解析
构建DataFrame:
1、根据逻辑计划创建QueryExecution
2、断言分析sql预计的异常情况(如表、视图、库是否存在等)
3、new一个RowEncoder,为DataFrame准备schema
4、返回由预期数据(QueryExecution)和schema组成的DataFrame
show()
1、用户可以设置展示多少行结果,默认是20行
2、每列结果最多显示20个字符串,用户可以设置是截断还是...代替
3、调用toDF() new一个新的Dataset 这里面会做两件事情(1、规则优化。2、转化为RDD进行任务提交)如果任务执行成功,最终会获取到结果数据
4、3获取的是全量数据,需要根据用户设置的显示行数做截取
5、设置每列的宽度(最小值为3个字符)、分割线、表头和数据
6、控制台展示结果