spark中pyspark :add

一、RDD的介绍(了解)

RDD:resilient distributed dataset(弹性分布式数据集合 ) spark的计算核心,spark采用rdd管理数据

  • RDD

    • RDD是spark的一种数据模型(规定数据的存储结构和计算方法)

    • python中的数据模型

      • list [] 可以重复存储数据 append

      • set{} 不允许重复存储

      • dict {k:v} get(key)

    • RDD的模型可以对内存数进行共享管理

  • 分布式

    • 数据可以在多台服务器上同时计算执行

  • 弹性

    • 可以根据计算的需求将数据进行分区拆分,本质就是将数据分成多份

二、RDD的特点(了解)

  • 分区

    • 可以将计算的海量数据分成多份,需要分成多少可分区可以通过方法指定

    • 每个分区都可以对应一个task线程执行计算

  • 只读

    • rdd中的数据不能直接修改,需要通过方法计算后得到一个新的rdd

    • rdd本身存储的数只能读取

  • 依赖

    • rdd之间是有依赖关系的

    • 新的rdd是通过旧的rdd计算得到

  • 缓存

    • 可以将计算的中结果缓存起来,如果后续计算错误时,可以从缓存位置重新计算

    • 将数据存储在内存或本地磁盘

    • 作用是容错

    • 缓存在执行计算任务程序结束后会释放删除

  • checkpoint

    • 作用和缓存一样

    • checkpoint可以将数据存储在分布式存储系统中,比如hdfs

三、创建RDD数据(掌握)

将需要计算的数据转为rdd的数据,就可以利用spark的内存计算方法进行分布式计算操作,这些计算方法就是有rdd提供的

rdd数据的转化方法是有sparkcontext提供的,所以需要先生成sparkcontext,sparkcontext中还包含资源申请和任务划分功能

SparkContext称为Spark的入口类

3-1 Python数据转化为rdd

# 导入sparkcontext
from pyspark import SparkContext
​
# 创建SparkContext对象
sc = SparkContext()
​
# 将Python数据转为rdd
# data_int = 10  # 数值类型不能转化rdd
# 能for循环遍历的数据都能转为rdd
data_str = 'abc'
data_list = [1, 2, 3, 4]
data_dict = {'a': 1, 'b': 2}
data_set = {1, 2, 3, 4}
data_tuple = (1, 2, 3, 4)
rdd = sc.parallelize(data_tuple)
​
# rdd的计算
​
​
# rdd的数据输出展示
# 获取所有rdd数据
res = rdd.collect()
print(res)

3-2 文件数据(hdfs)转化为rdd

# 将读取的hdfs文件数据转为rdd
from pyspark import SparkContext
​
# 生成SparkContext类对象
sc = SparkContext()
​
# 读取文件数据转为rdd
rdd1  = sc.textFile('hdfs://node1:8020/data')
rdd2  = sc.textFile('/data/words.txt')
​
# 查看数据
res = rdd1.collect()
print(res)
res = rdd2.collect()
print(res)
​

3-3 rdd的分区

  • python数据转发的分区数指定

# RDD分区使用
# 导入sparkcontext
from pyspark import SparkContext
​
# 创建SparkContext对象
sc = SparkContext()
​
# 创建生成rdd是可以指定分区数
# Python数据转为rdd指定
# numSlices 可以指定分区数
rdd_py = sc.parallelize([1,2,3,4,5,6],numSlices=10)
​
​
# rdd计算
​
# 查看rdd分区数据
res1  = rdd_py.glom().collect()
print(res1)
​
​
  • 读取的文件数据进行分区数指定

# RDD分区使用
# 导入sparkcontext
from pyspark import SparkContext
​
# 创建SparkContext对象
sc = SparkContext()
​
# 创建生成rdd是可以指定分区数
# file文件读取数据指定分区数据
# minPartitions 指定分区
# 文件大小/分区数  = 值 -----余数
# 余数/值 * 100%=百分比    百分比大于10% 会多创建一个分区
rdd_file = sc.textFile('hdfs://node1:8020/data',minPartitions=1)
# 在spark并行度部分会讲解如何根据资源设置分区数
​
# rdd计算
​
# 查看rdd分区数据
​
res2  = rdd_file.glom().collect()
print(res2)
​

3-4 小文件数据读取

300M 3个块 对应三个分区

在一个目录下,有多个文件,如果文件的大小不够一个块的大小,一个文件就对应一个分区,文件超过一个块,那就一个block(128M)块对应一个分区。

目录下都是小文件,那么读取目录下的文件数据,会对应很多个分区


一个分区对应一个task线程,当小文件过多时,会占用大量的线程,造成资源浪费

使用wholeTextFiles方法可以解决

该方法会现将读取到的数据合并在一起,然后重新进行分区

# 导入sparkcontext
from pyspark import SparkContext
​
# 创建SparkContext对象
sc = SparkContext(master='yarn')
# rdd = sc.textFile('hdfs://node1:8020/data')
# rdd计算
# wholeTextFiles 会合并小文件数据
# minPartitions 指定分区数
rdd_mini = sc.wholeTextFiles('hdfs://node1:8020/data',minPartitions=1)
​
# 展示数据
# res1 = rdd.glom().collect()
# print(res1)
​
res2 = rdd_mini.glom().collect()
print(res2)

java.lang.NoSuchMethodError

java包类的冲突

node1操作 同步到node2和node3

四、常用RDD算子(掌握)

将数据转化为rdd之后,就需要进行rdd的计算了,rdd提供了计算方法

rdd的方法又称为rdd算子

4-1 算子(方法)介绍

rdd中封装了各种算子方便进行计算,主要分为两类

  • transformation

    • 转化算子 对rdd数据进行转化计算得到新的rdd ,定义了一个线程任务

  • action

    • 执行算子 触发计算任务,让计算任务进行执行,得到结果

    • 触发线程执行的

rdd的转化算子大部分都是从rdd中读取元素数据(rdd中每条数据),具体计算需要开发人员编写函数传递到rdd算子中

rdd的执行算子则大部分是用来获取数据 collect方法就是触发算子

4-2 常用transformation算子(掌握)

  • map

    • rdd.map(lambda 参数:参数计算)

    • 参数接受每个元素数据

# 转化算子map的使用
from pyspark import SparkContext
​
# 创建SparkContext对象
sc = SparkContext()
​
# 生成rdd
data = [1, 2, 3, 4]
rdd = sc.parallelize(data)
​
​
# 对rdd进行计算
# 转化算子map使用
# 将处理数据函数当成参数传递给map
# 定义函数只需要一个接受参数
def func(x):
    """
        数据计算逻辑函数
    :param x: 接收每一个rdd的元素数据
    :return:
    """
    return x + 1
​
​
def func2(x):
    """
        数据计算逻辑函数
    :param x: 接收每一个rdd的元素数据
    :return:
    """
    return str(x)
​
​
# 转化算子执行后会返回新的rdd
rdd_map = rdd.map(func)
rdd_map2 = rdd.map(func2)
rdd_map3 = rdd_map2.map(lambda x: [x])
​
# 对rdd数据结果展示
# 使用rdd的触发算子,collect获取是所有的rdd元素数据
res = rdd_map.collect()
print(res)
​
res2 = rdd_map2.collect()
print(res2)
​
​
res3 = rdd_map3.collect()
print(res3)
​
  • flatMap

    • 处理的是二维嵌套列表数据 [[1,2,3],[4,5,6],[7,8,9]] [1,2,3,4]

    • rdd.flatMap(lambda 参数:[参数计算])

from pyspark import SparkContext
​
# 创建SparkContext对象
sc = SparkContext()
​
# 生成rdd
data = [[1, 2], [3, 4]]
data2 = ['a,b,c','d,f,g']  # 将数据转为['a','b','c','d','f','g']
rdd = sc.parallelize(data)
rdd2 = sc.parallelize(data2)
​
# rdd计算
# flatMap算子使用  将rdd元素中的列表数依次遍历取出对应的值放入新的rdd [1,2,3,4]
# 传递一个函数,函数接受一个参数
rdd_flatMap = rdd.flatMap(lambda x: x)
​
rdd_map = rdd2.map(lambda x:x.split(','))
rdd_flatMap2 = rdd_map.flatMap(lambda x:x)
​
# 输出展示数据
# 使用执行算子
res  = rdd_flatMap.collect()
print(res)
​
res2  = rdd_map.collect()
print(res2)
​
res3 = rdd_flatMap2.collect()
print(res3)
  • fliter

    • rdd.filter(lambda 参数:参数条件过滤)

    • 条件过滤的书写和Python中if判断一样

# 3、过滤算子
rdd7 = sc.parallelize([1, 2, 3, 4])
rdd8 = sc.parallelize(['a', 'b', 'c', 'a'])
# filter算子,可以接受rdd中每个元素数据,然后传递给函数进行过滤
# lambda需要有一个接收值x,x接收到每个元素数据后,如何进行过滤需要写判断逻辑
​
# 判断条件的书写逻辑和if的判断逻辑一样
filter_rdd = rdd7.filter(lambda x: x > 2)
filter_rdd2 = rdd8.filter(lambda x: x == 'a')
  • distinct 去重

    • 不需要lambda rdd.distinct

# 4、去重
# distinct 会对rdd中的重复数据进行去重,去重后会返回一个新的rdd
distinct_rdd = rdd8.distinct()
  • groupBy 分组

    • rdd.groupBy(lambda 参数:根据参数编写分组条件)

    • mapValues(list)

# 5、对数据进行分组
# groupBy是分组算子,会读取rdd中每个元素数据,传递给函数使用
# lambda需要一个接收值x,接收groupBy传递的元素数据,然后指定分组规则
# hash(x) % 2  对x中的元素数据进行hash取余,将数据分成两组,余数相同的数据会放在一起
# groupBy返回一个新的rdd,rdd的结构形式是  [(key,value),(k,v)]
groupBy_rdd = rdd8.groupBy(lambda x: hash(x) % 2)

# 6、对kv形式的数据进行取值处理
# mapValues,可以获取kv中的value值部分传递给函数进行使用
# mapValues返回一个新的rdd数据
mapValues_rdd = groupBy_rdd.mapValues(lambda x: list(x))
  • k-v数据 [(k,v),(k1,v1)]

    • groupByKey()

      • rdd.groupByKey()

    • reduceByKey()

      • rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)

    • sortByKey()

      • rdd.sortByKey()

# 6-2 对kv形式的数据进行分组,系统key值得数据会放在一起
rdd9 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])
# 不需要传递处理函数,返回一个新的rdd
groupByKey_rdd = rdd9.groupByKey()
mapValues_rdd2 = groupByKey_rdd.mapValues(lambda x: list(x))

# 6-3 对kv形式的数据先进行分组,在进行聚合计算
# reduceByKey会将相同key的数据放在一起,然后对每个key中对应的valeu进行累加计算
# reduceByKey会将分组后的数据,按照key值传递个函数进行计算
# lambda需要接受两个参数,后面编写累加计算  x=0  y=3  x+y=3    x=3,y=0  res=x+y=3
reduceByKey_rdd = rdd9.reduceByKey(lambda x, y: x + y)

# 6-4 对kv形式的数据先进行排序
# 不需要指定函数,按照key排序,默认升序
sortByKey_rdd= rdd9.sortByKey()
sortByKey_rdd2= rdd9.sortByKey(ascending=False)
  • sortBy() 排序

    • rdd.sortBy(lambda x:x,ascending=False)

# 排序算子
# sortBy 可以指定按照哪个数据进行排序
# sortBy会将rdd中的元素数据传递给函数使用
# lambda 需要一个接受值x,接受rdd中每个元素
# 如果元素是kv类型可以通过下标方式指定按照那种排序 x[0] 代表key x[1] 代表value值
# 默认升序
sortBy_rdd = rdd9.sortBy(lambda x: x[1])
# 降序
sortBy_rdd2 = rdd9.sortBy(lambda x: x[1],ascending=False)

4-3 常用action算子(掌握)

  • collect() 取出rdd中所有值

    • rdd.collect()

  • reduce() 非k-v类型数据累加 [1,2,3,4,6]

    • rdd.reduce(lambda 参数1,参数2:两个参数计算)

  • count() 统计rdd元素个数

    • rdd.count()

  • take() 取出指定数量值

    • rdd.take(数量)

# 执行算子的使用
from pyspark import SparkContext

sc = SparkContext()

# python转为rdd
rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# transformation算的转化计算
# map,flatMap,fliter等

# 触发计算
# action算子计算完成返回的是计算结果,不在是rdd了,不能在进行rdd操作了
# collect方法,触发计算获取是所有计算结果
res = rdd.collect()
print(res)
# reduce方法,传递一个计算逻辑,对元素数据进行累加计算
# 可以不需要转化算直接累加计算,但是不能处理kv形式数据
res  = rdd.reduce(lambda x,y:x+y)
# x初始为0 y一次获取元素数据   x=0,y=1 x= x+y=1
# x=1,y=2  x+y=3
# x=3,y=3  x+y=6
print(res)

# count 获取rdd元素个数
res = rdd.count()
print(res)

# take取指定数量的元素数据
res=rdd.take(3)
print(res)

4-4 词频统计案例(掌握)

# 词频统计
# 导入sparkcontext
from pyspark import SparkContext

# 创建SparkContext对象
sc = SparkContext()

# 将hdfs的文件数据读取后转为rdd
# 第一个参数 指定读取的文件路径
# rdd = sc.textFile('hdfs://node1:8020/data')
# 简写  有的会读取错误,当错误是就写完整
# rdd = sc.textFile('/data')
# 读取某单独文件
rdd = sc.textFile('hdfs://node1:8020/data/words.txt')
# rdd计算
# 对读取到的rdd中的每行数据,先进行切割获取每个单词的数据
# rdd_map = rdd.map(lambda x: x.split(','))
rdd_flatMap= rdd.flatMap(lambda x: x.split(','))

# 将单词数据转化为k-v结构数据   [(k,v),(k1,v1)]   给每个单词的value一个初始值1
rdd_map_kv = rdd_flatMap.map(lambda x:(x,1))

# 对kv数据进行聚合计算  hive:[1,1]  求和  求平均数  求最大值  求最小值
rdd_reduceByKey =  rdd_map_kv.reduceByKey(lambda x,y:x+y)  # 现将相同key值的数据放在一起,然后对相同key值内的进行累加


# 展示数据
res = rdd.collect()
print(res)

# res2 = rdd_map.collect()
# print(res2)

res3 = rdd_flatMap.collect()
print(res3)

res4 = rdd_map_kv.collect()
print(res4)

res5 = rdd_reduceByKey.collect()
print(res5)
# [('hadoop',1),('flink',1),('spark',2),('hive',2)]

4-5 其他高级算子

  • 多个rdd的方法

    • union 合并两个rdd 不去重

    • join k-v类型数据 通过key进行关联

# 多个rdd操作
from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([5,6,7,4])

rdd_kv1 = sc.parallelize([('a',1),('b',2),('c',3)])
rdd_kv2 = sc.parallelize([('c',4),('d',5),('e',6)])

# rdd之间的合并
# rdd1和并rdd2,合并后会返回先的rdd
union_rdd = rdd1.union(rdd2)
# rdd3和并rdd4,合并后会返回先的rdd
union_kv_rdd = rdd_kv1.union(rdd_kv2)


# kv形式rdd进行join关联  通过key关联
# 内关联  相同key的数据会保留下来
join_rdd = rdd_kv1.join(rdd_kv2)
# 左关联   左边rdd的数据会被保留下来,如果右边rdd有对应的key值数据会显示,没有对应key值会显示为空
leftOuterJoin_rdd = rdd_kv1.leftOuterJoin(rdd_kv2)
# 右关联   右边rdd的数据会被保留下来,如果左边rdd有对应的key值数据会显示,没有对应key值会显示为空
rightOuterJoin_rdd = rdd_kv1.rightOuterJoin(rdd_kv2)

# 查看结果
res = union_rdd.collect()
print(f'union合并结果:{res}')
res2 = union_kv_rdd.collect()
print(f'kv_union合并结果:{res2}')
res3 = join_rdd.collect()
print(f'join内关联结果:{res3}')
res4 = leftOuterJoin_rdd.collect()
print(f'左关联结果:{res4}')
res5 = rightOuterJoin_rdd.collect()
print(f'右关联结果:{res5}')
  • 重分区

# 1、导入sparkcontext类
from pyspark import SparkContext

# 2、初始化SparkContext类型
# 没有指定master参数,默认使用本机资源
sc = SparkContext()

# 3、将数据转为rdd数据
# 转化Python数据
data_list = [1, 2, 3, 4, 5, 6]

rdd = sc.parallelize(data_list,numSlices=10)

map_rdd = rdd.map(lambda x:x+1)

# 修改rdd分区信息
# 指定修改的分区数  返回得到新的rdd
# repartition 在进行使用时更多进行的是减少分区数
repartition_rdd = map_rdd.repartition(4)

repartition_rdd2 = map_rdd.repartition(3)

# 查看结果信息
# glom() 查看当前rdd的分区信息
res = map_rdd.glom().collect()
print(res)
res2 = repartition_rdd.glom().collect()
print(res2)

res3 = repartition_rdd2.glom().collect()
print(res3)
  • 数据保存

# 1、导入sparkcontext类
from pyspark import SparkContext

# 2、初始化SparkContext类型
# 没有指定master参数,默认使用本机资源
sc = SparkContext()

# 3、将数据转为rdd数据
# 转化Python数据
data_list = [1, 2, 3, 4, 5, 6]

rdd = sc.parallelize(data_list,numSlices=3)

rdd.saveAsTextFile('/itcast111')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/744661.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker 下载与安装以及配置

安装yum工具 yum install -y yum-ulits配置yum源 阿里云源 yum-config-manager --add-repo https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo安装Docker 17.03后为两个版本: 社区版(Community Edition,缩写为 CE&#x…

内网一键部署k8s-kubeshpere,1.22.12版本

1.引言 本文档旨在指导读者在内网环境中部署 Kubernetes 集群。Kubernetes 是一种用于自动化容器化应用程序部署、扩展和管理的开源平台,其在云原生应用开发和部署中具有广泛的应用。然而,由于一些安全或网络限制,一些组织可能选择在内部网络…

【踩坑】修复循环设置os.environ[‘CUDA_VISIBLE_DEVICES‘]无效

转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,欢迎[点赞、收藏、关注]哦~ 问题示例 for gpus in [0, 1, 2, 3, 4, 5, 6, 7]:os.environ[CUDA_VISIBLE_DEVICES] gpusprint(torch.cuda.get_device_name(0)) 始终将使用第…

专业技能篇---计算机网络

文章目录 前言计算机网络基础一、网络分层模型 HTTP一、从输入URL到页面显示发生了什么?二、Http的状态码有哪些?三、 HTTP与HTTPS有什么区别?四、URI 和 URL 的区别是什么?五、Cookie和Session有什么区别?六、GET与POST WebSock…

期货投机的操作

期货投机是一种高风险、高回报的投资方式,吸引着众多投资者参与。将深入探讨期货专业投机的操作秘诀,帮助投资者掌握必要的知识和技巧,在期货市场中驰骋。 一、期货专业投机的本质 期货投机是利用期货合约进行买卖,以赚取差价的一…

Diffusion Mamba:用于CT到MRI转换的Mamba扩散模型

Diffusion Mamba:用于CT到MRI转换的Mamba扩散模型 提出背景拆解左侧:整体框架中间:Mamba块的细节右侧:螺旋扫描的细节 提出背景 论文:https://arxiv.org/pdf/2406.15910 代码:https://github.com/wongzbb…

JAVA【案例5-2】模拟默认密码自动生成

【模拟默认密码自动生成】 1、案例描述 本案例要求编写一个程序,模拟默认密码的自动生成策略,手动输入用户名,根据用户名自动生成默认密码。在生成密码时,将用户名反转即为默认的密码。 2、案例目的 (1&#xff09…

超简单的nodejs使用log4js保存日志到本地(可直接复制使用)

引入依赖 npm install log4js 新建配置文件logUtil.js const log4js require(log4js);// 日志配置 log4js.configure({appenders: {// 控制台输出consoleAppender: { type: console },// 文件输出fileAppender: {type: dateFile,filename: ./logs/default, //日志文件的存…

【详述】BP神经网络建模流程一步一步详述

本文来自《老饼讲解-BP神经网络》https://www.bbbdata.com/ 目录 一、BP神经网络的建模流程二、BP神经网络的建模分步讲解2.1.数据归一化2.2.数据划分2.3.网络结构设置2.4.网络训练2.5.训练效果评估 本文梳理BP神经网络的建模流程,供大家建模时进行借鉴。 一、BP神经…

循环神经网络——RNN

循环神经网络 在之前NLP基础章节-语言模型中我们介绍了 n n n 元语法,其中单词 x t x_t xt​ 在时间步 t t t 的条件概率仅取决于前面 n n n 个单词,若是想要将之前单词的影响也加入那么模型参数数量会指数级增长。但是可能之前的单词存在重要的信息…

进阶篇08——MySQL管理

系统数据库 常用工具 mysql 客户端工具 mysqladmin 执行管理操作 mysqlbinlog 数据库二进制日志转成文本 mysqlshow 数据库查找 mysqldump 数据库备份 mysqlimport/source 数据库导入

LLM大语言模型-AI大模型全面介绍

简介: 大语言模型(LLM)是深度学习的产物,包含数十亿至数万亿参数,通过大规模数据训练,能处理多种自然语言任务。LLM基于Transformer架构,利用多头注意力机制处理长距离依赖,经过预训…

Python-爬虫 下载天涯论坛帖子

为了爬取的高效性,实现的过程中我利用了python的threading模块,下面是threads.py模块,定义了下载解析页面的线程,下载图片的线程以及线程池 import threading import urllib2 import Queue import re thread_lock threading.RL…

宝塔计划任务调用node程序时,log4js日志保存本地位置会发生变化

接我上一篇文章的情况 超简单的nodejs使用log4js保存日志到本地(可直接复制使用)-CSDN博客 原本应当保存在node项目目录下的日志文件,如果使用宝塔的计划任务来定时执行的话,日志保存路径会发生变化到如下图的位置: 如…

JFrame和JScrollPanel布局初步使用

还不是很了解,做了几个程序; import java.awt.Container; import java.awt.Color; import javax.swing.JFrame; import javax.swing.JScrollPane; import javax.swing.border.EmptyBorder;public class pa1 {public static void main(String[] agrs){JF…

AWS在国内的持续受欢迎:探究背后的原因

亚马逊云(AWS)作为全球领先的云计算服务提供商,在国内市场仍然保持着强劲的竞争力和广泛的用户群。尽管国内也有一些本土云计算服务提供商的崛起,但AWS在国内仍然有大量的用户在使用。我们九河云,一直致力AWS云相关服务…

如何使用 Swift 中的 GraphQL

文章目录 前言基础知识ApolloGraphQL结论前言 我一直在分享关于类型安全和在 Swift 中构建健壮 API 的更多内容。今天,我想继续探讨类型安全的话题,介绍 GraphQL。GraphQL 是一种用于 API 的查询语言。本周,我们将讨论 GraphQL 的好处,并学习如何在 Swift 中使用它。 基础…

面试-JMM的内存可见性

1.JAVA内存模型 分析: 由于JVM运行程序的实体是线程,而每个线程创建时,JVM都会 为其创建一个工作内存(栈空间),用于存储线程私有的数据。而java内存模型中规定所有变量都存储在主内存中。主内存是共享内存区域,所有线程都可以访问…

国密SSL证书提升网络安全

随着数字化时代的到来,网络安全已经成为全球关注的焦点。在这种背景下,SSL证书作为保护数据传输安全的重要工具,其重要性日益凸显。 数字证书产品有以下几种类别: 单域名SSL证书:为单一网站提供安全保护。 多域名SS…