元类

元类

janice No Comments

python世界里,一切都是对象.即使定义的一个class,也是一个对象.
我们所熟知的type()函数的作用是返回一个参数的类型,但是实际上,它也有一种完全不同的能力,即接受一个类的一些描述作为参数,然后返回一个类。
举个栗子:

class test(object):
tt = True
def aa(x):
return x
# 就等价于
type("test", (object,), {"tt":True, "aa":lambda x: x})
# (属性和方法本质上都是方法)

元类是什么?元类实际上就是用来创建类的东西。python里面一切都是对象,即使你写的一个class,也会被python创建成一个对象.(Django里面的{model}.objects是不是感觉很怪?实际上就是这个元类的作用)

问题来了,既然类也是一个对象,那这个对象是怎么产生的呢?通过上面代码,就知道实际上是type这个玩意搞的鬼.type是一个元类(metaclass)的基础,在python世界里默认都是用type这个元类来创建类对象.

具体的就不说太多了,放个详细讲解链接:https://blog.csdn.net/weixin_35955795/article/details/52985170

yarn调度策略

janice No Comments

1.先进先出调度器

2.容量调度器

3.公平调度器

三种调度器的基础都是队列。每种调度器选择队列的任务已经队列任务等待有所不同。

先进先出调度器(FIFO Scheduler):

把提交的应用顺序排成一个队列,在资源分配的时候,先给队列中最前的应用进行分配资源。(可能导致大应用把资源都占满,导致其他应用等待时间长)

容量调度器(Capacity Scheduler):

有一个专门的队列运行小任务,为小任务分配会预先占用一定的集群资源。(相当于多个队列的FIFO Scheduler.可以弹性设置占用其他队列的资源)

公平调度器(Fair Scheduler):

动态调整资源,当程序运行大应用并且有小应用进来时,动态调整大应用资源,使资源公平分配给小应用。等小应用执行完毕,大应用又可以获取所有的资源。


操作系统任务调度:

1.先来先到

(1)维护一个后备作业的队列,该算法用队列头中获取一个或者多个作业进入内存,分配必要资源,创建进程放入到就绪队列中。

(2)此算法会被大作业所阻塞,使得后来的作业拥有很长的等待时间。

2.短作业优先

(1)也是维护一个队列,算法优先从队列中选取小作业进入内存,分配资源。

(2)当有大量的小作业提交时,算法会占满资源,导致大作业得不到资源。

3.优先级

(1)从队列中选择高优先级的作业进行资源分配。

(2)剥夺式和非剥夺式,剥夺式会干掉正在运行的低优先级任务,非剥夺式会等待低优先级任务执行完毕。

(3)静态优先级以及动态优先级,根据运行期间对CPU的占有动态调整优先级。

4.高相应比优先

(1)根据等待时间以及运行时间进行综合考虑的一种调度。

公式:(等待时间+要求服务时间) / (要求服务时间)

(2)对于短作业,要求服务时间短,则值会高,对于长作业,等待时间长了,值也会增高。

5.时间片轮转

(1)把就绪进程排成一个队列,按照先进先出对进程进行执行一定时间片,不论进程是否执行完毕。

(2)时间片的设置要得当, 如果时间片太长则退化成了FIFO,如果时间片太短,进程切换开销大。

6.多级反馈队列调度

(1)是优先级以及时间轮片的结合发展,能动态调整优先级以及时间片的大小,并且不必估计进程的执行时间。

(2)设计思想

  • 设置多个队列,每个队列优先级不同,一级队列优先级最高,二级次之,优先级随队列序号增大而减小。
  • 下级时间片比上级时间片增大一倍。序号越大,时间片越长。

(3)调度方式

  • 新进程会进入到一级队列,并且按照FIFO进行一级队列执行,如果进程在时间片内执行完,则清除,如果执行不完,则扔到二级队列中。仅当一级队列为空再执行二级队列,以此内推。
  • 如果在处理第I级队列进程时,I-1级队列前有进程进入,则触发抢占机制,优先处理I-1级前的任务。

(4)优势

  • 短作业优先处理完
  • 相对时间片轮转,短作业减少了切换开销
  • 相对优先级,长作业不会长时间得不到处理

spark内存说明

janice No Comments

单个executor内存:

SPARK_EXECUTOR_MEMORY+ spark.yarn.executor.memoryOverhead

内存说明:

1、ExecutorMemory为JVM进程的Java堆区域。大小通过属性spark.executor.memory设置。用于缓存RDD数据的memoryStore位于这一区域。 一个Executor用于存储RDD的空间=(ExecutorMemory– MEMORY_USED_BY_RUNTIME) * spark.storage.memoryFraction *spark.storage.safetyFraction 参数说明:
spark.storage.memoryFraction该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。
spark.storage.safetyFraction:Spark1.5.1进程的默认堆空间是1g,为了安全考虑同时避免OOM,Spark只允许利用90%的堆空间,spark中使用spark.storage.safetyFraction用来配置该值(默认是0.9).
(spark.shuffle.memoryFraction该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。)

2.MemoryOverhead是JVM进程中除Java堆以外占用的空间大小,包括方法区(永久代)、Java虚拟机栈、本地方法栈、JVM进程本身所用的内存、直接内存(Direct Memory)等。 通过spark.yarn.executor.memoryOverhead设置,单位MB。如果Java堆或者永久代的内存不足,则会产生各种OOM异常,executor会被结束。 在Java堆以外的JVM进程内存占用较多的情况下,应该将MemoryOverhead设置为一个足够大的值,以防JVM进程因实际占用的内存超标而被kill。
当在YARN上运行Spark作业,每个Spark executor作为一个YARN容器运行。Spark可以使得多个Tasks在同一个容器里面运行。 同样,实际运行过程中ExecutorMemory+MemoryOverhead之和(JVM进程总内存)超过container的容量。YARN会直接杀死container。


Overhead大小

Executor端:由spark.yarn.executor.memoryOverhead设置,spark1.5.1默认值executorMemory * 0.10与384的最大值。
Driver端:由spark.yarn.driver.memoryOverhead设置,spark1.5.1默认值driverMemory * 0.10与384的最大值。


注意:

  • --executor-memory/spark.executor.memory 控制 executor 的堆的大小,但是 JVM 本身也会占用一定的堆空间,比如内部的 String 或者直接 byte buffer, spark.yarn.XXX.memoryOverhead 属性决定向 YARN 请求的每个 executor 或dirver或am 的额外堆内存大小,默认值为 max(384, 0.07 * spark.executor.memory )
  • 在 executor 执行的时候配置过大的 memory 经常会导致过长的GC延时,64G是推荐的一个 executor 内存大小的上限。
  • HDFS client 在大量并发线程时存在性能问题。大概的估计是每个 executor 中最多5个并行的 task 就可以占满写入带宽。

另外,因为任务是提交到YARN上运行的,所以YARN中有几个关键参数,参考YARN的内存和CPU配置:

  • yarn.app.mapreduce.am.resource.mb :AM能够申请的最大内存,默认值为1536MB
  • yarn.nodemanager.resource.memory-mb :nodemanager能够申请的最大内存,默认值为8192MB
  • yarn.scheduler.minimum-allocation-mb :调度时一个container能够申请的最小资源,默认值为1024MB
  • yarn.scheduler.maximum-allocation-mb :调度时一个container能够申请的最大资源,默认值为8192MB

spark reduceByKey和groupbyByKey区别

janice No Comments

reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。

groupByKey也是对每个key进行操作,但只生成一个sequence。最后还是要利用map操作对这些sequence进一步操作.

多重校验插件(pytest-assume)

janice No Comments

前言

pytest中可以用python的assert断言,也可以写多个断言,但一个失败,后面的断言将不再执行

 

安装插件

pip3 install pytest-assume -i http://pypi.douban.com/simple/ --trusted-host pypi.douban.com

 

 

assert多重断言

def test_add1():
    assert 1 + 4 == 5
    assert 1 + 3 == 3
    assert 2 + 5 == 7
    assert 2 + 5 == 9
    print("测试完成")

执行结果

结论

可以看到,第二行断言失败之后,后面的断言也不会执行,包括正常的代码

 

pytest.assume多重断言

def test_add2():
    pytest.assume(1 + 4 == 5)
    pytest.assume(1 + 3 == 3)
    pytest.assume(2 + 5 == 7)
    pytest.assume(2 + 5 == 9)
    print("测试完成")

执行结果

结论

  • 可以看到,第二行即使断言失败,后面的断言还是会继续执行
  • 这有助于我们分析和查看到底一共有哪些断言是失败的
  • 而且最后的代码也还会正常执行,比直接用assert更高效

 

分类目录

关于我

 Janice 詹

QQ   455899417

janice2014@foxmail.com