设为首页收藏本站

小牛社区-大数据学习交流社区|大数据免费学习资源

 找回密码
 立即注册!

QQ登录

只需一步,快速开始

扫一扫,访问微社区

查看: 37|回复: 0

checkpoint的实现

[复制链接]

165

主题

0

帖子

26

积分

吃土小白

Rank: 1

积分
26
发表于 7 天前 | 显示全部楼层 |阅读模式
刘彬同学准备写一系列spark实战系列,本文是第二篇,checkpoint的实现!赞!推荐给大家,希望大家喜欢和支持!
系列文章
SparkContext 初始化内部原理
checkpoint 检查点是很多分布式系统的常用容错容灾机制,其本质就是将系统运行时的内存数据结构和状态持久化到磁盘上,在需要的时候对这些数据进行读取,然后重新构造出运行时的状态。在Spark中使用检查点就是为了将RDD的状态保存下来,在重新执行时就不需要计算,直接从检查点读取即可。
1Metadata checkpointing
保存流计算的定义信息到容错存储系统中,比如:HDFS(分布式存储系统),用来恢复应用程序中运行worker的节点故障,其中元数据包括:

  • Configuration:创建Spark Streaming应⽤用程序的配置信息
  • DStream operations:定义Streaming应⽤用程序的操作集合
  • Incomplete batches:操作存在队列中的未完成的批次
2Data checkpointing
保存生成的RDD到高可靠的存储系统中,这在有状态transformation中是必须的,在这样一个transformation中,生成的RDD依赖于之前批的RDD,随着时间的推移,这个依赖链长度会持续增长,在恢复过程中,为了避免无限增长,有状态的transformation的中间RDD会定期地存储到可靠的存储系统中,以截断这个依赖链
在Spark中checkpoint主要通过CheckpointRDD和RDDCheckpointData实现的,下来就来看⼀一下这两个的实现
http://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
CheckpointRDD : ⽤用来从存储中恢复检查点的RDD

checkpointRDD重写了RDD的五个方法:分别为doCheckpoint , checkpoint , localCheckpoint , getPartition , compute
doCheckpoint :用来保存检查点
checkpoint :用来读取检查点数据
localcheckpoint:本地的检查点
getPartition :获取检查点的分区数组
compute:用来从检查点中恢复数据
checkpointRDD为抽象类,有两个子类继承了它,分别为LocalCheckPoint和ReliableCheckpoint,但是这两个类都没有完全实现checkpointRDD
3ReliableCheckpoint
ReliableCheckpoint中定义了如下属性:
sc:即sparkcontext
checkpointPath:检查点的目录
_partitioner:调用方指定的分区计算器
hadoopConf:SparkContext的_hadoopConfiguration属性,即hadoop配置信息
cpath:类型为org.apache.hadoop.fs.path表⽰示checkpoint path对应的hadoop⽂文件系统中的路径
fs:使⽤用hadoopConf得到的org.apache.hadoop.fs.filesystem
broadcastedConf:调用SparkContext的broadcast方法对hadoopconf进行广播后返回的Broadcase对象
partitioner:ReliableCheckpointRDD的分区计算器,优先采用_partitioner指定的,否则调用ReliableCheckpointRDD的伴⽣生对象的readCheckpointedPartitionerFile方法从checkpoint path指定的检
查点目录下读取分区计算器
ReliableCheckpointRDD还提供了很多工具方法,比如:
writePartitionToCheckpointFile方法用于将RDD分区的数据写入到检查点目录下的⽂文件中
writePartitionerToCheckpointDir:用于将分区计算器的数据写入到检查点的目录下
writeRDDTocheckpointDirectory:用于将RDD的数据写入检查点目录

1)调用SparkContext的runJob方法将数据写入到检查点目录,将数据写入磁盘的函数是ReliableCheckpointRDD的伴生对象的writePartitionToCheckpointFile方法
2)如果RDD有分区计算器,那么调用ReliableCheckpointRDD的伴生对象writePartitionerToCheckpointDir将分区计算器的信息写入到检查点目录
3)创建并返回ReliableCheckpointRDD
readCheckpointFile 方法用于从检查点读取RDD的数据,代码如下:

4RDDCheckpointData的实现
RDDCheckpointData用于保存与检查点相关的信息,每个RDDCheckpointData实例都与一个RDD实例相关
RDDCheckpointData中一共有三个属性,分别为rdd,cpState,cpRDDcpState : 检查点的状态,cpstate的值来自于枚举类型Checkpoint-State,CheckpointState中定义了检查点的状态,包括初始化完成Initialed,正在保存检查点( CheckpointInProgress )和保存检查
点完毕( Checkpointed )
cpRDD : 保存检查点的RDD,即CheckpointRDD的实现类
RDDCheckpointData中也定义了一些方法:
isCheckpoint⽅方法⽤用于判断是否已经为RDDCheckpointData关联的RDD保存了检查点数据

checkpoint⽅方法用于将RDDCheckpointData关联的RDD数据保存到检查点的模版方法中

如果检查点状态时Initialized,那么将cpState设置为CheckpointInProgress,否则返回。
调用doCheckpoint方法保存检查点并生成CheckpointRDD
由cpRDD持有刚⽣生成的checkpointRDD,然后将cpState设置成Checkpointed,最后调⽤用RDD的markCheckpointed⽅方法,清空依赖,清空依赖是因为现在已经有了,之前就不需要了
checkpointRDD :checkpointRDD方法用于获取cpRDD持有的CheckpointRDD
getPartition:用来获取CheckpointRDD的分区数组

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册!

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册!

本版积分规则

快速回复 返回顶部 返回列表