The company’s Jupyter environment supports PySpark. this makes it very easy to use PySpark to connect to Hive queries and use. Since I had no prior exposure to Spark at all, I put together some reference material.

Spark Context

The core module in PySpark is SparkContext (sc for short), and the most important data carrier is RDD, which is like a NumPy array or a Pandas Series, and can be regarded as an ordered set of items. The RDD is like a NumPy array or a Pandas Series, which can be regarded as an ordered collection of items, but these items do not exist in the memory of the driver, but are divided into many partitions, and the data of each partition exists in the memory of the cluster executor.

SparkContext is the main entry point of Spark, if you consider Spark cluster as server, Spark Driver is the client, SparkContext is the core of the client; as the comment says SparkContext is used to connect to Spark cluster, create RDD, accumlator, broadcast variables, which is equivalent to the main function of the application.

Only one active SparkContext can exist in each JVM, and you must call stop() to close the previous SparkContext before creating a new one.

Spark Session

Before Spark 2.0, SparkContext was the structure for all Spark functions, and the driver connected to the cluster (via resource manager) through SparkContext, because before 2.0, RDD was the foundation of Spark. If you need to create a SparkContext, you need SparkConf to configure the content of SparkContext through Conf.

After Spark2.0, Spark Session is also an entry point for Spark, in order to introduce dataframe and dataset APIs, while retaining the functionality of the original SparkContext, if you want to use the HIVE, SQL, Streaming APIs, you need Spark Session is the entry point.

SparkSession not only provides access to all the spark functions that sparkContext has, but also provides APIs for handling DataFrame and DataSet.

Here’s how to create a SparkSession.

1
2
3
4
5
val spark = SparkSession
.builder()
.appName("Sparktest")
.config("spark.some.config.option", "some-value")
.getOrCreate()

The following are the parameters of SparkContext.

  • master - It is the URL of the cluster to connect to.
  • appName - The name of your job.
  • sparkHome - The Spark installation directory.
  • pyFiles - The .zip or .py files to send to the cluster and add to the PYTHONPATH.
  • environment - Work node environment variable.
  • batchSize - The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically select batch size based on object size, or -1 to use unlimited batch size.
  • serializer - The RDD serializer.
  • Conf - An object of L {SparkConf} to set all Spark properties.
  • gateway - Use the existing gateway and JVM, otherwise initialize the new JVM.
  • JSC - JavaSparkContext instance.
  • profiler_cls - A custom class of Profiler used for performance analysis (default is profiler.BasicProfiler).

In the above parameters, master and appname are mainly used.

Here’s how we can create a SparkSession using Hive support.

1
2
3
4
5
6
val spark = SparkSession
.builder()
.appName("SparkHivetest")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

RDD, Dataset and DataFrame

RDD

An RDD is an immutable collection of distributed elements of your data, distributed across nodes in a cluster, that can be processed in parallel by several underlying APIs that provide transformation and processing.

Scenarios for using RDDs:

  • You want to be able to perform the most basic transformations, processing and control of your data set.
  • Your data is unstructured, such as streaming media or character streams.
  • You want to process your data through functional programming rather than domain-specific representations.
  • you don’t want to define a schema as in columnar processing, processing or accessing data attributes by name or field.
  • you do not care for some of the optimization and performance benefits that can be gained by structured and semi-structured data processing through DataFrame and Dataset.

Pros.

  • Powerful, with many built-in function operations, group, map, filter, etc., to facilitate the handling of structured or unstructured data
  • object-oriented programming, direct storage of java objects, type conversion is also safe

Disadvantages.

  • because it is basically the same as hadoop universal, so there is no optimization for special scenarios, such as for structured data processing compared to sql to very troublesome
  • the default is the java serial number method, serialization results are relatively large, and the data is stored in the java heap memory, resulting in more frequent gc

DataFrame

DataFrame is a distributed data set based on RDD, similar to the two-dimensional tables in traditional databases. dataFrame introduces schema.

RDD and DataFrame comparison.

  • Similarities: Both are immutable distributed elastic datasets.
  • Differences: DataFrame datasets are stored by specified columns, i.e. structured data. Similar to a table in a traditional database.

The above figure visualizes the difference between DataFrame and RDD.

  • The RDD [Person] on the left has Person as the type parameter, but the Spark framework itself does not know the internal structure of the Person class. The DataFrame on the right side, however, provides detailed structure information, so that Spark SQL can clearly know which columns are contained in the dataset and what the name and type of each column are. dataFrame has more information about the structure of the data, i.e. schema.
  • RDD is a distributed collection of Java objects. dataFrame is a distributed collection of Row objects.
  • DataFrame provides richer arithmetic than RDD, but the more important feature is to improve the execution efficiency, reduce data reading and the optimization of execution plan, such as filter push down, crop, etc.

Advantages.

  • Structured data processing is very convenient, supporting kv data such as Avro, CSV, elastic search, and Cassandra, as well as traditional data tables such as HIVE tables, MySQL, etc.
  • targeted optimization, because the data structure meta information spark has been saved, serialization does not need to bring meta information, greatly reducing the size of serialization, and the data is saved in off-heap memory, reducing the number of gc.
  • hive compatible, support hql, udf, etc.

Disadvantages.

  • No type conversion safety check at compile time, runtime to determine if there is a problem
  • for object support is not friendly, rdd internal data stored directly in java objects, dataframe memory storage is row objects and can not be custom objects

DataSet

A Dataset is a strongly typed domain-specific object that can be transformed in parallel by functional or relational operations. Each Dataset has an untyped view called a DataFrame, which is a dataset of rows. This DataFrame is a Dataset of type Row, i.e. Dataset[Row].

You can think of a DataFrame as an alias for a collection of some generic object Dataset[Row], and a row is a generic untyped JVM object. In contrast, a Dataset is a collection of JVM objects with explicit type definitions, specified by the Case Class you define in Scala or the Class in Java.

Datasets are “lazy”, triggering computation only when an action is performed. Essentially, a dataset represents a logical plan that describes the computation required to produce the data. When an action is performed, Spark’s query optimizer optimizes the logical plan and generates an efficient parallel and distributed physical plan.

The main difference between a DataSet and an RDD is that a DataSet is a domain-specific collection of objects; however, an RDD is a collection of any objects. the DataSet API is always strongly typed; and it is possible to optimize using these patterns, however, an RDD is not.

Advantages.

  • dataset integrates the advantages of rdd and dataframe, supporting both structured and unstructured data
  • Same support for custom object storage as rdd
  • Same as dataframe, supports sql queries for structured data
  • Out-of-heap memory storage, gc friendly
  • Type conversion safe, code friendly
  • Officially recommended to use dataset

Spark DataFrame and Pandas DataFrame

Origin of DataFrame

The earliest “DataFrame” (which began to be called “data frame”), originated from the S language developed by Bell Labs. data frame" was released in 1990, and its concepts are detailed in Chapter 3 of “Statistical Models of the S Language”, which highlights the matrix origin of the dataframe. The book describes DataFrame as looking very much like a matrix and supporting matrix-like operations; at the same time, it looks very much like a relational table.

The R language, an open source version of the S language, released its first stable version in 2000 and implemented dataframes. pandas was developed in 2009, and the concept of DataFrame was introduced in Python. These DataFrames are all homogeneous and share the same semantics and data model.

DataFrame Data Model

The need for a DataFrame comes from viewing data as a matrix and a table. However, matrices contain only one data type, which is too restrictive, and relational tables require that the data must first have a schema defined; for a DataFrame, its column types can be inferred at runtime and do not need to be known in advance, nor are all columns required to be of one type. Thus, a DataFrame can be thought of as a combination of a relational system, a matrix, or even a spreadsheet program (typically Excel).

Compared to relational systems, DataFrames have several particularly interesting properties that make DataFrames unique.

Guaranteed order, column and row symmetry

First, DataFrames are ordered in both row and column directions; and rows and columns are first-class citizens and are not treated differently.

Take pandas for example, when a DataFrame is created, the data is ordered in both rows and columns; therefore, you can use position to select data in both rows and columns.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
In [1]: import pandas as pd                                                     

In [2]: import numpy as np                                                      

In [3]: df = pd.DataFrame(np.random.rand(5, 4))                                 

In [4]: df                                                                      
Out[4]: 
          0         1         2         3
0  0.736385  0.271232  0.940270  0.926548
1  0.319533  0.891928  0.471176  0.583895
2  0.440825  0.500724  0.402782  0.109702
3  0.300279  0.483571  0.639299  0.778849
4  0.341113  0.813870  0.054731  0.059262

In [5]: df.iat[2, 2]  # 第二行第二列元素                                                             
Out[5]: 0.40278182653648853

因为行和列的对称关系因此聚合函数在两个方向上都可以计算只需指定 axis 即可
In [6]: df.sum()  # 默认 axis == 0,在行方向上做聚合,因此结果是4个元素                                                                
Out[6]: 
0    2.138135
1    2.961325
2    2.508257
3    2.458257
dtype: float64

In [7]: df.sum(axis=1)  # axis == 1,在列方向上做聚合,因此是5个元素                                                        
Out[7]: 
0    2.874434
1    2.266533
2    1.454032
3    2.201998
4    1.268976
dtype: float64

Those familiar with numpy (the numerical computation library containing definitions of multidimensional arrays and matrices) can see that this feature is very familiar, and thus the matrix nature of DataFrame can be seen.

Rich API

The DataFrame API is very rich, spanning relational (e.g. filter, join), linear algebra (e.g. transpose, dot) and spreadsheet-like (e.g. pivot) operations.

Again using pandas as an example, a DataFrame can do transpose operations to get rows and columns to line up.

1
2
3
4
5
6
7
In [8]: df.transpose()                                                          
Out[8]: 
          0         1         2         3         4
0  0.736385  0.319533  0.440825  0.300279  0.341113
1  0.271232  0.891928  0.500724  0.483571  0.813870
2  0.940270  0.471176  0.402782  0.639299  0.054731
3  0.926548  0.583895  0.109702  0.778849  0.059262

Intuitive syntax for interactive analysis

Users can continuously explore DataFrame data, query results can be reused by subsequent results, and very complex operations can be very easily combined programmatically, making it well suited for interactive analysis.

Heterogeneous data allowed in columns

The DataFrame type system allows for the presence of heterogeneous data in a column, for example, an int column allows for the presence of string type data, which may be dirty data. This makes DataFrame very flexible.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
In [10]: df2 = df.copy()                                                        

In [11]: df2.iloc[0, 0] = 'a'                                                   

In [12]: df2                                                                    
Out[12]: 
          0         1         2         3
0         a  0.271232  0.940270  0.926548
1  0.319533  0.891928  0.471176  0.583895
2  0.440825  0.500724  0.402782  0.109702
3  0.300279  0.483571  0.639299  0.778849
4  0.341113  0.813870  0.054731  0.059262

Data Model

We can now formally define what a DataFrame really is.

A DataFrame consists of a two-dimensional array of mixed types, row labels, column labels, and types (types or domains). On each column, the type is optional and can be inferred at runtime. In terms of rows, a DataFrame can be viewed as a mapping of row labels to rows, with guaranteed order between rows; in terms of columns, it can be viewed as a mapping of column types to column labels to columns, again with guaranteed order between columns.

The existence of row labels and column labels makes it very convenient to select data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
In [13]: df.index = pd.date_range('2020-4-15', periods=5)                       

In [14]: df.columns = ['c1', 'c2', 'c3', 'c4']                                  

In [15]: df                                                                     
Out[15]: 
                  c1        c2        c3        c4
2020-04-15  0.736385  0.271232  0.940270  0.926548
2020-04-16  0.319533  0.891928  0.471176  0.583895
2020-04-17  0.440825  0.500724  0.402782  0.109702
2020-04-18  0.300279  0.483571  0.639299  0.778849
2020-04-19  0.341113  0.813870  0.054731  0.059262

In [16]: df.loc['2020-4-16': '2020-4-18', 'c2': 'c3']  # 注意这里的切片是闭区间                         
Out[16]: 
                  c2        c3
2020-04-16  0.891928  0.471176
2020-04-17  0.500724  0.402782
2020-04-18  0.483571  0.639299

Here index and columns are the row and column labels respectively. We can easily select a period of time (row selection) and several columns (column selection) of data. Of course, this is based on the fact that the data is stored sequentially.

This sequential storage makes DataFrame very suitable for statistical work.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
In [17]: df3 = df.shift(1)  # 把 df 的数据整体下移一格,行列索引保持不变                                                      

In [18]: df3                                                                    
Out[18]: 
                  c1        c2        c3        c4
2020-04-15       NaN       NaN       NaN       NaN
2020-04-16  0.736385  0.271232  0.940270  0.926548
2020-04-17  0.319533  0.891928  0.471176  0.583895
2020-04-18  0.440825  0.500724  0.402782  0.109702
2020-04-19  0.300279  0.483571  0.639299  0.778849

In [19]: df - df3  # 数据减法会自动按标签对齐,因此这一步可以用来计算环比                                                             
Out[19]: 
                  c1        c2        c3        c4
2020-04-15       NaN       NaN       NaN       NaN
2020-04-16 -0.416852  0.620697 -0.469093 -0.342653
2020-04-17  0.121293 -0.391205 -0.068395 -0.474194
2020-04-18 -0.140546 -0.017152  0.236517  0.669148
2020-04-19  0.040834  0.330299 -0.584568 -0.719587

In [21]: (df - df3).bfill()  # 第一行的空数据按下一行填充                                                    
Out[21]: 
                  c1        c2        c3        c4
2020-04-15 -0.416852  0.620697 -0.469093 -0.342653
2020-04-16 -0.416852  0.620697 -0.469093 -0.342653
2020-04-17  0.121293 -0.391205 -0.068395 -0.474194
2020-04-18 -0.140546 -0.017152  0.236517  0.669148
2020-04-19  0.040834  0.330299 -0.584568 -0.719587

As we can see from the example, just because the data is stored in order, we can keep the index unchanged and move down one row as a whole, so that yesterday’s data goes to today’s row, and then when we take the original data and subtract the displaced data, because DataFrame will automatically do alignment by label, so for a date, it is equivalent to subtracting the previous day’s data from the day’s data, so that we can do something like ring-by-ring operation. This is incredibly convenient. I’m afraid that for a relational system, you’d need to find a column to use as a join condition, and then do the subtraction, etc. Finally, for empty data, we can also fill in the previous row (ffill) or the next row (bfill). Trying to achieve the same effect in a relational system would require a lot of work.

Spark’s DataFrame

Spark brings the concept of “DataFrame” to the Big Data space. Spark DataFrame only contains the semantics of relational tables, the schema needs to be determined, and the data is not guaranteed to be sequential.

Difference between Pandas DataFrame and Spark DataFrame

TODO

DataFrameReader class and DataFrameWriter class

DataFrameReader class

Reading data from an external storage system and returning a DataFrame object is usually accessed using SparkSession.read. The common syntax is to first call the format() function to specify the format of the input data, and then call the load() function to load the data from the data source and return the DataFrame object.

1
df = spark.read.format('json').load('python/test_support/sql/people.json')

For the different formats, the DataFrameReader class has subdivided functions to load the data.

1
2
3
4
5
6
7
8
9
df_csv = spark.read.csv('python/test_support/sql/ages.csv')
df_json = spark.read.json('python/test_support/sql/people.json')
df_txt = spark.read.text('python/test_support/sql/text-test.txt')
df_parquet = spark.read.parquet('python/test_support/sql/parquet_partitioned')

# read a table as a DataFrame
df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
df.createOrReplaceTempView('tmpTable')
spark.read.table('tmpTable')

DataFrame can also be constructed from JDBC URLs via jdbc

1
jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)

DataFrameWriter class

Used to write a DataFrame to an external storage system, accessible via DataFrame.write.

1
2
3
(df.write.format('parquet')  
    .mode("overwrite")
    .saveAsTable('bucketed_table'))

Function Comments.

  • format(source): Specify the format of the source of the underlying output
  • mode(saveMode): Specify the behavior of the data storage when the data or table already exists. Save modes are: append, overwrite, error, and ignore.
  • saveAsTable(name, format=None, mode=None, partitionBy=None, **options): store the DataFrame as a table
  • save(path=None, format=None, mode=None, partitionBy=None, **options): store the DataFrame to the data source

For different formats, the DataFrameWriter class has subdivision functions to load the data.

1
2
3
4
5
6
7
df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.txt(os.path.join(tempfile.mkdtemp(), 'data'))

#wirte data to external database via jdbc
df.write.jdbc(url, table, mode=None, properties=None)

Storing the DataFrame content to the source.

1
df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data'))

To store the contents of a DataFrame into a table.

1
df.write.saveAsTable(name='db_name.table_name',format='delta')

PySpark Interaction with Hive Database

1
2
3
4
5
6
7
# 从SQL查询中创建DataFrame
df = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")

# 直接把dataframe的内容写入到目标hive表
df.write().mode("overwrite").saveAsTable("tableName");
df.select(df.col("col1"),df.col("col2")).write().mode("overwrite").saveAsTable("schemaName.tableName");
df.write().mode(SaveMode.Overwrite).saveAsTable("dbName.tableName");

Spark DataFrame operations

Spark dataframe is immutable, so each return is a new dataframe

Column operation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# add a new column
data = data.withColumn("newCol",df.oldCol+1)

# replace the old column
data = data.withColumn("oldCol",newCol)

# rename the column
data.withColumnRenamed("oldName","newName")

# change column data type
data.withColumn("oldColumn", data.oldColumn.cast("integer"))

Conditional filtering of data

1
2
3
4
5
# filter data by pass a string
temp1 = data.filter("col > 1000")

# filter data by pass a column of boolean value
temp2 = data.filter(data.col > 1000)

Select data

1
2
3
4
5
6
7
# select based on column name
temp1 = data.select("col1","col2")
temp1 = data.select("col1*100 as newCol1")

# select based on column object
temp2 = data.select(data.col1, data.col2)
temp2 = data.select(data.col1+1.alias(newCol1))

aggregate function

1
2
3
4
5
6
7
8
9
# get the minimum value of a column
data.groupBy().min("col1")

# group by on certain column and do calculation
data.groupBy("col1").max("col2")

# agg function
import pyspark.sql.functions as F
data.groupBy("a","b").agg(F.stddev("c"))

Consolidated Data Sheet

1
2
newData = data1.join(data2, on = "col", how = "leftouter")
newData = data1.join(data2, data1['col1'] == data2['col2'])