unisys 8th training,,..>>>>apache spark getting starting
Apache Spark Getting Started
Explore the basics of Apache Spark, an analytics engine used for big data processing. It's an open source, cluster computing framework built on top of Hadoop. Discover how it allows operations on data with both its own library methods and with SQL, while delivering great performance. Learn the characteristics, components, and functions of Spark, Hadoop, RDDS, the spark session, and master and worker notes. Install PySpark. Then, initialize a Spark Context and Spark DataFrame from the contents of an RDD and a DataFrame. Configure a DataFrame with a map function. Retrieve and transform data. Finally, convert Spark and Pandas DataFrames and vice versa.
Table of Contents
- Course Overview
- Introduction to Spark and Hadoop
- Resilient Distributed Datasets (RDDs)
- RDD Operations
- Spark DataFrames
- Spark Architecture
- Spark Installation
- Working with RDDs
- Creating DataFrames from RDDs
- Contents of a DataFrame
- The SQLContext
- The map() Function of an RDD
- Accessing the Contents of a DataFrame
- DataFrames in Spark and Pandas
- Exercise: Working with Spark
Course Overview
[Video description begins] Topic title: Course overview. Your host for this session is Kishan Iyer, a software engineer and big data expert. [Video description ends] Hi, and welcome to this course, An Introduction to Spark. My name is Kishan Iyer, and I will be your instructor for this course. A little about myself first. I have a Master's degree in Computer Science from Columbia University, and have previously worked in companies such as Deutsche Bank and WebMD in New York. I presently work for Loonycorn, a studio for high-quality video content.
Spark is an analytics engine which is used by data scientists all over the world for big-data processing. It is built on top of Hadoop, and can process both batch as well as streaming data. Hadoop is a framework for distributed computing, which splits up our data across multiple nodes in a cluster and uses off-the-shelf computing resources to process this data in parallel. The fact that it is both free and delivers great performance is why it is used extensively for big-data processing. For this course, we assume that our students are very comfortable programming in Python and, although this isn't strictly a prerequisite, having a basic understanding of distributed computing will help you get the best out of this course.
We will start off by trying to understand the fundamental components that make up Hadoop, and this includes MapReduce, YARN, and HDFS, and wlll understand how Spark uses these components. We shall then move on to understanding Resilient Distributed Datasets, or RDDs, which are a fundamental data structure of Spark, and then figure out how these are used to parallelize data-processing operations. We shall then talk about Spark DataFrames, which are high-level abstractions of RDDs, and then write some code to work with these objects. By the end of this course, you will become familiar with the basics of Spark, and shall become comfortable with some of the fundamental operations which can be performed on RDDs and DataFrames. You will learn to think differently about data processing and know how to work with data across multiple nodes in parallel using the high-level abstractions offered by Spark.
Introduction to Spark and Hadoop
[Video description begins] Topic title: Introduction to Spark and Hadoop. Your host for this session is Kishan Iyer. [Video description ends] Hello, and welcome to this course, DataFrames in Spark 2. We begin by covering some of the prerequisites for this course. First, you should be comfortable programming in the Python 3 language since all the demos in this course will be coded in that. Additionally, it helps if you're familiar with the Jupyter Notebooks dev environment, because that is the one we will be using. And, finally, to get the best out of this course, it will help if you have some basic knowledge of distributed computing as well. [Video description begins] Or, understand the basics of distributed computing. [Video description ends]
We begin by taking a look at what exactly Spark is. Spark can be thought of as a unified analytics engine for big-data processing, and this is built on top of Hadoop. This is why Spark is considered part of the Hadoop ecosystem, and it's worth our while taking a look at what exactly Hadoop is. Hadoop is one of the first frameworks meant for distributed computing where the nodes which formed the Hadoop cluster were made of off-the-shelf components. The job of Hadoop was to run any big-data processes on these components and then return the result to the user. The components of Hadoop include MapReduce, which is a framework to define a data-processing task. [Video description begins] So MapReduce is used to define parallel processing tasks to munge data. [Video description ends]
This is where we specify all the transformations which we need to apply on our distributed data. The next component is YARN, which stands for Yet Another Resource Negotiator, and it allows us to execute our MapReduce tasks by allocating the resources which it requires, and then scheduling and running the processes. And then there is the Hadoop Distributed File System, which actually stores the data which we are working with across multiple nodes. [Video description begins] Abbreviated to HDFS, it can store data distributed across multiple cluster machines. [Video description ends] So, here is the typical life cycle of a task which is defined on Hadoop. [Video description begins] A graphical representation of the co-ordination between Hadoop Blocks displays. [Video description ends]
We have MapReduce, where we specify exactly what the task will do, that is, what transformations it will apply on the data. [Video description begins] According to the diagram, MapReduce is used to write code for Map and Reduce Operations. [Video description ends] The MapReduce task binaries are then fed into YARN, which will ensure that the task is executed and then stored on the Hadoop Distributed File System. [Video description begins] Also known as HDFS. [Video description ends] To be precise, the MapReduce operation will pass on its task definition on to YARN, which will ensure that the resources necessary for the task are allocated and that all the processes are scheduled and then executed. And, finally, YARN will then pass on the results of the task on to the Hadoop Distributed File System, where it will be stored. [Video description begins] So MapReduce accesses data stored on HDFS to process it. [Video description ends]
This information about Hadoop is relevant for Spark, because Spark works with some of these components directly, specifically YARN and HDFS. [Video description begins] A diagram illustrating the components of Apache Spark, displays the Spark Core at the center of it. [Video description ends] When we use Spark, we will be using the libraries which are defined in the Spark Core, which is a general-purpose computing engine which interacts with the Hadoop components. So, while Spark provides a layer of abstraction over Hadoop, there are libraries which are even built on top of Spark itself. These include Spark SQL, which allows us to work with Spark using SQL queries. Then there is Spark Streaming, which is meant to work with streaming data. MLlib allows us to build machine-learning models using our distributed data and then, GraphX is a visualization library. [Video description begins] So the four libraries are: Spark SQL, Spark Streaming, MLlib, and GraphX. [Video description ends] So, while all of these specialized libraries are built on top of Spark, Spark itself is an abstraction on top of Hadoop.
Our interactions with Spark will involve the use of the Spark Core, which is a general-purpose computing engine, and the use of Spark libraries is considered by many to be much simpler than using Hadoop's MapReduce. This is because it allows us to work with our distributed data as though they were all residing on a single node. And, although this will be transparent to the user, under the hood, Spark is in fact interacting with YARN or some form of cluster manager which can be plugged in. In our demos, we will be using the Spark standalone cluster manager and, while we will be using high-level abstractions to work with our data, they will, in fact, be residing in the Hadoop Distributed File System. So, this is just a quick overview of how one can have various big-data libraries which are, in fact, built on top of Spark and Hadoop.
So, by virtue of being a layer of abstraction on top of Hadoop, Spark natively supports many use cases which are simply not available using bare-bones Hadoop. For one, Spark supports real-time as well as batch processing and, from Spark 2 onwards, both using the same API. [Video description begins] So, it supports batch processing as well as stream processing. [Video description ends] Spark also provides a REPL environment, that is Read, Execute, Print, Loop, which will allow us to quickly prototype our applications and speed up application development. [Video description begins] So, it is an interactive shell to help with prototype applications. [Video description ends] Spark is also integrated with Jupyter Notebooks, which we shall see during the demos of this course, and there are Spark libraries available for several high-level programming languages including Python, Java, Scala, and R.
Resilient Distributed Datasets (RDDs)
[Video description begins] Topic title: Resilient Distributed Datasets (RDDs). Your host for this session is Kishan Iyer. [Video description ends] We now explore Resilient Distributed Datasets, or RDDs for short, which are the fundamental building blocks on which Spark is built. So, what exactly is an RDD? Well, all the data which we use when using Spark is actually stored in these RDD data structures. [Video description begins] So the data operated on using Spark is stored in collections called RDDs. [Video description ends] An important thing to note is that RDDs are in-memory objects, and are not stored on disks, which is why any operations in Spark, involving RDDs, are very quick. [Video description begins] So RDDs are in-memory collections, not stored on files in disks. [Video description ends]
And what exactly is in these RDDs? Well, you can think of them as a collection of entities and these entities could be anything, such as integers, records, or even raw objects or, more generally speaking, any logical grouping of data. Java developers can think of RDDs as being similar to Java collections, which are in-memory objects which can be assigned to a variable and then operated upon. When applying operations to Spark RDDs, we say that we apply transformations to them. [Video description begins] So entities in an RDD are operated on using transformations. [Video description ends] These transformations will not modify the underlying RDD, because it is immutable, but will, in fact, generate a new RDD with those transformations applied on it. [Video description begins] In other words, transformations modify the entities and produce a new RDD with these modified entities. [Video description ends]
So, we move along now to some of the important features of Spark RDDs, and one of these is that an RDD is partitioned across multiple nodes. So, while the use of Spark will abstract away a lot of this partitioning from us, we need to know that RDDs will, in fact, be stored across the nodes in the Spark cluster. [Video description begins] RDDs are not stored in a single location, but across multiple nodes in the Spark cluster. [Video description ends] Another significant feature is that Spark RDDs are immutable. As we have touched upon, any transformations which we specify on a Spark RDD will, in fact, generate a new RDD with those modifications applied. [Video description begins] So RDDs cannot be modified or updated without the transformations resulting in the creation of new RDDs with updated data. [Video description ends] And, finally, Spark RDDs are resilient, that is, even if one of the nodes on which the RDD resides crashes, then it can be reconstructed using its metadata.
RDD Operations
[Video description begins] Topic title: RDD Operations. Your host for this session is Kishan Iyer. [Video description ends] Now that we have covered some of the basics of RDDs in Spark, we will explore how these are stored, and how they can be operated upon. So, imagine then that an RDD consists of all these rows of information. [Video description begins] RDD collections are split across nodes in a cluster. This is illustrated as a four-column table with six rows. The first column contains the row headers, ranging between 1 and 6. [Video description ends]
So, we have six rows and four columns of data, and these will all be stored in memory and, if you assume that we have three nodes in our cluster, then these six rows will be evenly divided. Rows 1 and 2 will fall into the first node, [Video description begins] The first two rows of the table are indicated. [Video description ends] and then rows 3 and 4 in the second, and the remaining two in the third node of the cluster. [Video description begins] The third and fourth rows of the table are indicated. The fifth and sixth rows of the table are indicated. [Video description ends] The splitting of our data across all the nodes is refered to as partitioning of the data. [Video description begins] Data is partitioned and each partition is on a different machine in the cluster. [Video description ends]
So what are the benefits of this partitioning? For that, let us imagine these three nodes in our cluster, which have each partition of the data. So, we now have our data distributed across all these machines in our Hadoop cluster and then, if we would like to perform any operations on our data, then we can do so simultaneously, that is, Spark is able to parallelize all operations across the nodes in the cluster. So, the combined effect of having all these RDDs stored in memory and distributed across all the nodes in a cluster environment, and then having operations performed in parallel, is that any processing of this data is extremely quick. [Video description begins] Spark jobs parallelize the operations on data. [Video description ends]
So, we have previously touched upon the fact that Spark RDDs are immutable, that is, once they have been created, they cannot be modified. [Video description begins] RDDs are stored in memory – this is what makes processing so fast in Spark. [Video description ends] Any operations which we perform on our RDDs, in fact, will end up creating a brand-new RDD which, in turn, will be stored across the nodes in the cluster. [Video description begins] So RDDs apply operations to modify data, and the modified data is then stored in new RDDs. [Video description ends]
Which brings us to the question, what exactly are these operations which we can perform on RDDs? Well, they fall into two categories, the first one being transformation operations. This is where the data in one RDD is taken, transformed, and then stored in a new RDD. [Video description begins] So transformation produces another RDD with the modified entities. [Video description ends] The other type of operation is called an action, and this is where we take part of or all of an RDD and then either display it onscreen or store it in a file. [Video description begins] An action gets the result of an RDD to display it on screen or store it in a file. [Video description ends]
Let us zoom in on the transformation operations which we can perform on an RDD, and for that we look at an example. So, taking the six rows and four columns of data we have seen previously, we can transform this into two columns of data. So, what are the specific transformations here? Well, we loaded all the data into an RDD, and then we dropped the unnecessary columns, that is the first and second columns, [Video description begins] The first column contained the row index numbers 1 through 6. The second column contained a five-digit value, such as 12345 or 45356. [Video description ends] and then, for the third column, we capitalized all the letters, and then, for the fourth and final column, we prepended a dollar sign to it. [Video description begins] So the third column, which becomes the new first column, had values in sentence case which are now in uppercase. And the values in the last column are now dollar amounts such as $400 and $50. [Video description ends]
All these transformations did not modify the original RDD in any way. They simply created this new RDD on the right, containing these two columns. So, when we define these four transformations on our RDD, how exactly are they applied? Well, one important thing to note is that, when transformations are defined, they're not executed immediately. In fact, they are lazily executed. So, at the point of definition of the transformations, they are simply recorded in Spark [Video description begins] So transformations are recorded when applied. [Video description ends] and an RDD has some metadata which is used to track any transformations which have been applied to the data within it.
So, the actual transformation of the data in the RDD is not executed at this point. In fact, the execution is performed lazily. The reason for this lazy evaluation is mainly to improve performance, that is, the data is actually transformed only when someone actually requests for it. [Video description begins] So transformations are only applied once you request a result. [Video description ends] So, when it comes to transforming the data within RDDs in Spark, the approach generally is, be prepared for exactly what you need to do but then, sit back and relax, and let someone actually ask you for something.
So, how exactly can we make a request to an RDD in Spark which will cause the transformations to be applied? Well, this brings us to the other operation which can be performed on Spark RDDs, which is the action. [Video description begins] Just two operations can be applied to an RDD: transformations and actions. [Video description ends] So, an action in a Spark RDD can simply be defined as applying the transformations, and then getting a result from it. For example, an action could be, get me the first 100 rows of an RDD, or you could simply ask, how many elements are present in this RDD? You could even perform an aggregate operation on it, such as a sum or an average. All of these three are examples of actions which will cause some transformation to be applied on your data, and then the results of that will either be printed to screen or they could be written to a file. We now explore lazy evaluation in a little more detail.
So, lazy evaluation means that, when a transaction or a transformation is defined, it is simply recorded. The actual execution of the transformation takes place only when absolutely needed, that is, when someone makes a request for it. So, how exactly are these transformations recorded? Well, they are stored in the form of a directed acyclic graph. Spark is then able to analyze this graph in order to decide what the most efficient means of execution is in order to deliver the best performance. One effect of having all these transformations recorded in the form of a graph is that it is always possible to reconstruct an RDD. This is what makes Spark RDDs resilient to any node failures. [Video description begins] A four-column table with six rows of data displays as an example of an RDD. [Video description ends]
To understand how this is the case, consider that an RDD can be created in two different ways. One, where you read in the contents of a file, and then the other is through the transformation of another existing RDD. So, lazy evaluation ensures that the new RDD will not be created yet. However, all the transformations on it are recorded in a directed acyclic graph. So, if an RDD has a record for what its original source is, along with a record for all the transformations which will be applied on it, then one can simply combine the original source, apply the transformations, and get the RDD. [Video description begins] RDDs store the history of transformations. [Video description ends]
So, you must keep in mind when working with Spark that all RDDs know their lineage, that is, they know exactly where they came from and how they got to where they are. This not only enables lazy evaluation, but also makes it easy for Spark RDDs to recover in the case of a node failure, and this is where the resilience of Spark RDDs come in. [Video description begins] Transformations are only applied when asked for. In summary, partitioned RDDs are not stored in a single location, but across multiple nodes in the Spark cluster. Because they're immutable, they cannot be modified or updated; transformations create new RDDs with updated data. And they're resilient in that they are not lost when the node crashes; and can be rebuilt with the metadata available. [Video description ends]
Spark DataFrames
[Video description begins] Topic title: Spark DataFrames. Your host for this session is Kishan Iyer. [Video description ends] Now that we have covered RDDs in Spark, it is time to look at another data structure which is built on top of them, and this is the Spark DataFrame. [Video description begins] DataFrames represent tabular data in rows and columns. [Video description ends] It is easiest to think of DataFrames as a tabular rendering of the data in an RDD. To understand, we revisit the table which we saw previously. So, this is one which contains six rows and four columns of data, [Video description begins] The column headers are: Num, ID, ProductName, and Price. [Video description ends] and this is exactly what a Spark DataFrame looks like.
So, for those of you who are familiar with the Pandas library, DataFrames in both Spark and Pandas are very similar. So here, one record of data in a Spark DataFrame is the equivalent of one row in the table, [Video description begins] For example, one of the records has the following values: Num: 1, ID: 12345, ProductName: Shoes, Price: 100. [Video description ends] and a field in a DataFrame corresponds to a column. So, why exactly do we use DataFrames instead of just plain RDDs? Well, this is because it's usually easier and more intuitive to work on structured data, such as a DataFrame, than with raw RDDs. So, DataFrames were not available in the initial versions of Spark, and were only added in version 1.3.
Since they are built on top of RDDs, DataFrames are also immutable and distributed across the nodes in the cluster. One feature of DataFrame is that the columns within it have names, and we can reference them using those column names or headers and, for those who are familiar with relational databases, it is often easier to imagine a DataFrame as being conceptually similar to a relational database table. [Video description begins] So a DataFrame is conceptually equal to a table in an RDBMS. [Video description ends] One thing to keep in mind when using Spark DataFrames is that there is no type safety at compile time so, unless you're careful, you may run into runtime errors when creating DataFrames.
And, finally, Spark DataFrames are available in all the supported languages, that is Python, Scala, Java, and R. And also, since DataFrames are simply higher-level abstractions on RDDs, they share the same qualities, that is, they can also be partitioned across the nodes in a cluster, [Video description begins] They are not stored in a single location but across multiple nodes in the Spark cluster. [Video description ends] they are immutable, and they are just as resilient as RDDs. [Video description begins] They cannot be modified or updated; transformations create new RDDs with updated data. They are not lost when the node crashes; and can be rebuilt with the metadata available. [Video description ends]
Spark Architecture
[Video description begins] Topic title: Spark Architecture. Your host for this session is Kishan Iyer. [Video description ends] In this clip, we will explore the overall architecture of Spark. We will take a look at its basic components and see what each of their functions are, and how all of them fit in together. So, the overall architecture for Spark is a typical master/worker configuration, where you have a master which coordinates the work across a number of workers. [Video description begins] A diagram displays to illustrate this process. First, there is a driver, which is an application that uses the Spark Context. The sparkContext then refers to a Cluster Manager to interact with the various Workers. Each Worker consists of one or more Tasks and an Executor. On-screen text reads: Spark 2.x is similar to Spark 1.x. [Video description ends]
For those of you who are familiar with version 1 of Spark, you may notice that the architecture for version 2 is not that different, and this is definitely the case in terms of the overall setup of the cluster and the way in which jobs are processed. In both versions 1 and 2, the master node runs a driver program which is its own separate JVM process. In Spark version 1, the driver program also contains the SparkContext, which serves as a gateway or a bridge to the underlying Spark environment. In Spark 2, however, instead of the SparkContext, we will end up using the SparkSession which, in fact, encapsulate the SparkContext and also some of the other contexts, such as the HiveContext and the SQLContext. Now, from all the components in this particular architecture diagram, we will focus on the driver program. So, as mentioned previously, the driver program runs on the master node of a Spark environment and it runs, in fact, as a separate JVM process. [Video description begins] Which is a Java process. [Video description ends]
Its responsibilities include launching all the tasks which will be executed by the workers on the subsets of RDDs which they have on them. It is also the job of the driver program to set up a SparkSession so that any Spark application can interact with the underlying environment. It is the driver program which actually schedules the jobs which are executed across the nodes in the cluster, and it will also take care of allocating the resources which are required for these jobs. In addition, the driver will also need to manage the cluster, to make sure all the nodes are up and running and to handle any failures. [Video description begins] This is done by running several services. [Video description ends]
And, finally, it is the responsibility of the driver program to communicate all the tasks to the workers, and then retrieve and compile the results from them. It is within the driver program where you instantiate the Spark application which you want to run. So, a Spark application uses the SparkSession as an entry point and the SparkSession, if you recall, encapsulates the SparkContext, the SQLContext, as well as the HiveContext, and it serves as a bridge to the underlying environment. The application will create a directed acyclic graph for all the operations to perform on this underlying data. [Video description begins] This can be abbreviated to DAG. [Video description ends]
With all these transformations recorded, the Spark application will create a set of stages, that is, a physical execution plan on how to apply the transformations [Video description begins] This is done internally. [Video description ends] and then, each of these stages is, in turn, divided into a number of tasks. [Video description begins] So each stage is split into operations on data partitions, called Tasks. [Video description ends] These are logical operations which can be performed on Spark RDDs and, when required, these tasks can then be executed. While there is not too much variation between Spark 1 and 2 in terms of the overall architecture, it is in the execution where things change. In version 1 of Spark, the executions resembled the query optimizations you might find in a traditional relational database. [Video description begins] So the execution in Spark 1.x resembles the optimizations on traditional RDBMS. [Video description ends]
This is known as the Volcano Iterator Model, and it says that any atomic operation can only operate on one row of data at a time. Aside from this, there were several optimizations at both the code and compiler level which were missed in version 1. However, these were all addressed in Spark 2, which ended up using the second-generation Tungsten engine, and this resulted in massively improved performance. [Video description begins] And improved execution. [Video description ends] There were many improvements and optimizations which were carried out. Some of these include the elimination of virtual function calls, and it stored frequently accessed data in CPU registers rather than in memory or cache, which is much slower. [Video description begins] Spark 2.x uses the Tungsten Engine to store data in registers, not RAM. [Video description ends]
In addition, there were also several compiler optimizations, such as loop unrolling and pipelining. All of these had the combined effect of making Spark 2 executions about 10 x faster than they were in Spark 1. We now move along to one more component in the Spark architecture, and this is the Cluster Manager. So, the SparkSession in Spark 2 and the SparkContext in Spark 1 will interact with the cluster manager, whose job it is to orchestrate all the executions among all the nodes in the cluster. The cluster manager is something which can be plugged into Spark, and is typically Hadoop's YARN. However, it can also be Apache Mesos or Spark Standalone. So, it is the responsibility of the cluster manager, whether it is YARN, Mesos, or Spark Standalone, to distribute the tasks among all the worker nodes and coordinate all executions between them. Now that we have covered all the pieces which run on a Spark master, we switch our attention to the workers in the Spark environment. So, the workers in a Spark setting are the compute nodes in your cluster, and this is where the processes are executed.
These are the processes which are created from your Spark application code, so it helps to imagine that it is the workers which are running your code while all their work is coordinated by the master. [Video description begins] So the master allocates the worker resources. [Video description ends] This coordination, performed by the master, involves not just the assigning of specific tasks, but also all the resources which are required in order to complete them. The workers are in constant touch with the master through the cluster manager, and the cluster manager ensures the communication with the workers by means of these executor agents, which are running on every worker node. It is these distributed agents which are responsible for the execution of specific tasks, [Video description begins] Worker nodes use executors to run jobs. [Video description ends] and it is these tasks which are the basic units of execution, that is, the transformations and actions which you have specified in your code.
Spark Installation
[Video description begins] Topic title: Spark Installation. Your host for this session is Kishan Iyer. [Video description ends] All the demos in this course will be coded in Python 3, and the IDE we will use are Jupyter Notebooks. A Jupyter Notebook is a Web-based Interactive Dev Environment which is very popular among Python developers. It allows one to execute just a section of their application's code and view the results immediately, within the IDE. For this demo, I have created a workspace containing a code and a datasets directory, and then launched the Jupyter Notebook session from there. [Video description begins] Jupyter is open on the Files tabbed page. The code and datasets directories are listed. [Video description ends]
So, let us explore this workspace now. First, we can navigate into the datasets directory and, within that, I'll order datasets in the form of CSV files, which we can load into our Spark applications in order to perform some analysis. [Video description begins] Kishan accesses the datasets directory. Its contents are listed, which are seven CSV files. [Video description ends] So, we head back into the root directory of the workspace now [Video description begins] He accesses the code directory. It is empty. [Video description ends] and then, from within the code directory, we can head in and then begin creating the new Jupyter Notebooks in order to develop our Spark applications. So, hit New in the top right and then select the kernel which you would like your Jupyter Notebook to use. [Video description begins] He clicks New on the Jupyter toolbar. A drop-down list opens, which includes the options: Python 3, Text File, Folder, and Terminal. [Video description ends]
So, I'm going to choose Python 3 and then, once the new notebook comes up, [Video description begins] An Untitled new Jupyter notebook opens. [Video description ends] the first thing I'm going to do is to set a name for it. [Video description begins] He clicks Untitled in the title bar. A Rename Notebook dialog box opens, which includes one text field, Enter a new notebook name. [Video description ends] So, I'll set it to something more meaningful, [Video description begins] He replaces the default notebook title of Untitled with IntroducingSpark2.0. [Video description ends] and this is going to be called IntroducingSpark. Once that is done, the first thing we're going to do is to check the version of Java. [Video description begins] He clicks the Rename button. The notebook title updates to IntroducingSpark2.0. He adds the code: ! java -version. [Video description ends]
This is just to show you which version I'm running, in case you happen to run a different version and run into some compatibility issues with Spark. So, in order to check the Java version, make sure you use the java -version command, but precede it with an exclamation point, which is required when you run any shell command from a Jupyter Notebook cell. So, I'm running version 1.8 over here. [Video description begins] He executes the code. The output reads: java version "1.8.0_181" Java(TM) SE Runtime Environment (build 1.8.0_181-b13) Java Hotspot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode). [Video description ends] Next, we can similarly check the version of Python [Video description begins] He adds the code: !python --version. [Video description ends] and I'm using version 3.6.5 over here. [Video description begins] He executes the code. The output reads: Python 3.6.5 :: Anaconda, Inc. [Video description ends]
And now, we are ready to go ahead and install PySpark, [Video description begins] He adds the code: !pip install pyspark. [Video description ends] so we just use the pip install pyspark command and I get the notification that I already have this installed. [Video description begins] He executes the code. The first line of output reads: Requirement already satisfied: pyspark in /anaconda3/lib/python3.6/site-packages (2.3.2). [Video description ends] If you don't have PySpark already, then you will see the installation steps here. All right, with that, we are now ready to begin some coding, and the first thing we will do is to import the PySpark library into our Jupyter Notebook. [Video description begins] He adds the code: import pyspark. [Video description ends]
So, this has gone through smoothly, which shows that PySpark installation was a success. And the next thing we do is import all the classes which we will require for this demo. [Video description begins] He adds code, which reads: from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql import SparkSession. [Video description ends] This includes the SparkContext, as well as the SQLContext, and the SparkSession classes. With that done, we can now initialize our SparkContext, [Video description begins] He adds the code: sc = SparkContext() sc. [Video description ends] and remember that a SparkContext serves as a bridge between the Spark application and the underlying environment.
And, on examining the SparkContext object, we see a few details here. [Video description begins] He executes the code. The SparkContext details are output. They are SparkContext: Spark UI, Version: v.2.3.2, Master: local(*), AppName: pyspark-shell. [Video description ends] This includes the Spark Version which, in my case, is version 2.3.2. [Video description begins] The version number is highlighted. [Video description ends] There is also a link to the Spark UI which allows us to see various details about our Spark cluster. In this particular case, since we are using Spark Standalone, there is not much of interest over there. Also, the Spark Master is now running on my localhost, and there is a default AppName which has been set as well. And, with that, we have now successfully installed PySpark and then initialized the SparkContext.
Working with RDDs
[Video description begins] Topic title: Working with RDDs. Your host for this session is Kishan Iyer. [Video description ends] Now that we have set up our SparkContext, we can now initialize a SparkSession using the SparkContext object. [Video description begins] A notebook called IntroducingSpark2.0 is open in Jupyter, a code editor application. Kishan has added some code, which reads: import pyspark from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql import Sparksession sc = SparkContext() sc spark = SparkSession(sc). [Video description ends] The SparkSession encapsulates a lot of different contexts in Spark, including the SparkContext, as well as the SQLContext and the HiveContext, and so on.
So here, we simply set up the SparkSession using the SparkContext. And, after that, to begin coding our application, we will import a couple of other objects which we need. [Video description begins] He adds code, which reads: from pyspark.sql.types import Row from datetime import datetime. [Video description ends] So, this includes a Row object, which we will use when creating our RDD, and then also the datetime object. Now, we are ready to create our first RDD and, for that, we make use of the SparkContext parallelize function. [Video description begins] He adds the code: simple_data = sc.parallelize([1, "Nissan Versa", 12]) simple_data. [Video description ends] This allows us to create an RDD using a Python list.
Our list, in this case, contains three elements, the number 1, the text Nissan Versa, and the number 12. So, once we load this, we create our first simple_data RDD [Video description begins] He executes the code. The output reads: ParallelCollectionRD[0] at parallelize at PythonRDD.scalar:194. [Video description ends] which is, in fact, of type ParallelCollectionRDD, which conveys the fact that operations on this RDD can be performed in parallel, across all the nodes in our cluster. Now, let us examine how many elements this particular RDD has, [Video description begins] He adds the code: simple_data.count(). [Video description ends] and we see here that this corresponds to the number of elements in our Python list, which is 3. [Video description begins] He executes the code. The output reads: 3. The list, consisting of the numbers 1 and 12 and the text "Nissan Versa," is highlighted in the previous code block. [Video description ends]
So, we have made use of the RDD's count function to get the number of elements, and do note that this is one of the actions which can be performed on an RDD. We can now make use of the first function, in order to retrieve the first element in the RDD, so this returns to us the number 1. [Video description begins] He adds the code: simple_data.first(). He executes the code. The output reads: 1. [Video description ends] Rather than just the first element, if you'd like to retrieve the first n elements from the RDD, then we can make use of the take function. [Video description begins] He adds code, which reads: simple_data.take(2). [Video description ends] So here, we specify the number of elements which we would like to get from the start of an RDD's argument.
So, in this case, this returns the first two elements. [Video description begins] He executes the code. The output reads: [1, 'Nissan Versa']. [Video description ends] And, if you'd like to retrieve every single thing within your RDD, then you can make use of the collect function. [Video description begins] He adds the code: simple_data.collect(). [Video description ends] So, this gives us the entire list which we have initialized the RDD with. [Video description begins] He executes the code. The output reads: [1, 'Nissan Versa', 12]. [Video description ends] So, now that we have created our first RDD and then performed some actions on it, let us try to create our first DataFrame and, for that, we can make use of an RDD's toDF function. [Video description begins] He adds the code: df = simple_data.toDF(). [Video description ends]
So, this will convert the contents of an RDD into a DataFrame, and a DataFrame, you will remember, is a representation of tabular data. So, what will happen if you try to convert this RDD, which was initialized with a one-dimensional list, to a tabular data structure? Well, it ends up throwing an error, [Video description begins] He executes the code. The last line of output reads: TypeError: can not infer schema for type: <class 'int'>. On-screen text reads: This RDD does not have columns, so cannot be represented as a tabular dataframe. [Video description ends] and this is because this RDD does not have any columns. So the RDD is currently one dimensional, and we need two-dimensional data in order to create a DataFrame.
Creating DataFrames from RDDs
[Video description begins] Topic title: Creating DataFrames from RDDs. Your host for this session is Kishan Iyer. [Video description ends] So, while you were not able to create a DataFrame from an RDD containing one dimension of data, let us see what happens if we initialize an RDD with a two-dimensional list, and then try to create a DataFrame from that. So, once again, we make use of the SparkContext parallelize function in order to load two rows of data into an RDD. [Video description begins] A notebook called IntroducingSpark2.0 is open in Jupyter, a code editor application. Kishan adds some code: records = sc.parallelize([[1, "Nissan Versa", 12], [2, "Ford Fiesta", 9]]) records. [Video description ends] So, we use a two-dimensional list over here in order to define those two rows of data, and we have information for two vehicles here.
So, once we create this RDD, [Video description begins] He executes the code. The output reads: ParallelCollectionRDD[8] at parallelize at PythonRDD.scala:194. [Video description ends] we see, once again, that this is a ParallelCollectionRDD and, when we check for the number of elements using the count function, [Video description begins] He adds the code: records.count(). [Video description ends] on this occasion we see that there are two elements in this RDD. [Video description begins] He executes the code. The output reads: 2. The two elements defined in the previous code block are highlighted, which are [1, "Nissan Versa", 12] and [2, "Ford Fiesta", 9] respectively. [Video description ends]
So, even though this contains more data than the previous RDD, this one only contains two elements because each element over here is one row in the RDD, whereas our previous RDD effectively contained half the data, but split into three elements. So, let us view this entire RDD using the collect function, [Video description begins] He adds the code: records.collect(). He executes the code. The output reads: [[1, 'Nissan Versa', 12], [2, 'Ford Fiesta', 9]]. [Video description ends] and we see here that this returns the two rows of data we initialized it with. And, if we make use of the first function this time, [Video description begins] He adds the code: records.first(). [Video description ends] this returns the contents of the first row. [Video description begins] He executes the code. The output reads: [1, 'Nissan Versa', 12]. [Video description ends]
So, clearly, this time each element in our RDD is a composite data structure like a list rather than a primitive type such as an integer or string. And, if we now make use of the take function to retrieve the first two elements in this RDD, this gives us the two rows of data. [Video description begins] He adds the code: records.take(2). He executes the code. The output reads: [[1, 'Nissan Versa', 12], [2, 'Ford Fiesta', 9]]. [Video description ends] So now, if we were to convert this RDD to a DataFrame, given that it contains two-dimensional data, will that work? [Video description begins] He adds the code: df = records.toDF(). [Video description ends] And the answer is yes, so there was no error thrown this time.
So, let us try to view this DataFrame, [Video description begins] He adds the code: df. [Video description ends] and we can see here that this is indeed our DataFrame object, [Video description begins] He executes the code. The output reads: DataFrame[_1: bigint, _2: string, _3: bigint]. [Video description ends] and the types of each of the three elements within each row of this DataFrame have been inferred. So, the first and third elements have been inferred to be of type bigint and the middle element of type string. We can now verify the type of this df, [Video description begins] He adds the code: type(df). [Video description ends] and we see that this is a pyspark.sql.dataframe.DataFrame object. [Video description begins] He executes the code. The output reads: pyspark.sql.dataframe.DataFrame. [Video description ends]
And, if you would like to view the contents of a DataFrame, [Video description begins] He adds the code: df.show(). [Video description ends] we can use the show function which belongs to the DataFrame object. So now, this returns to us a tabular structure [Video description begins] He executes the code. A three-column table is output. The table has two rows, one for the values of Nissan Versa and Ford Fiesta each. [Video description ends] which contains three columns of data and our two rows within it. So, this is what a DataFrame looks like in Spark, and the show function which we ran right now is somewhat similar to the take or collect function on an RDD, that is, it performed an action on the RDD which is underlying this DataFrame.
Now that we have successfully created our first Spark DataFrame, we notice that there is something missing in this one, specifically, there are no column headers. So, let us recreate this DataFrame but, this time, we will initialize it with column headers. [Video description begins] He adds the code: data = sc.parallelize([Row(index = 1, vehicle_name = "Nissan Versa", vehicle_stock = 12)]) data. [Video description ends] So, we once again make use of the SparkContext parallelize function. So, rather than define our RDD using a list of lists or a 2D list, this time we create a list of Row objects. So currently, this is just a single Row object in our list and the arguments to the list correspond to the column name and the value in the corresponding cell for that row. So here, there will be three columns called index, vehicle_name, and vehicle_stock, and the values of this particular row corresponding to the columns are represented as well.
So, let us now create this RDD [Video description begins] He executes the code. The output reads: ParallelCollectionRDD[24] at parallelize at PythonRDD.scala: 194. [Video description ends] and then, when we use the count function to check for the number of elements, [Video description begins] He adds the code: data.count(). [Video description ends] this time we get a value of 1 representing the single row in our RDD. [Video description begins] He executes the code. The output reads: 1. On-screen text reads: There is one Row in this RDD. [Video description ends] And, if we view the contents of this RDD using the collect function, [Video description begins] He adds the code: data.collect(). [Video description ends] our row of data is now visible over here. [Video description begins] He executes the code. The output reads: [Row(index=1, vehicle_name='Nissan Versa', vehicle_stock=12)]. [Video description ends]
So, can we convert this to a DataFrame now, [Video description begins] He adds the code: df = data.toDF() df.show(). [Video description ends] given that it was essentially the same data which we had loaded previously and then encountered an error when converting to a DataFrame? Well, the answer is yes, we can convert to a DataFrame, [Video description begins] He executes the code. A three-column table with column headers index, vehicle_name, and vehicle_stock is output. [Video description ends] and this is because our RDD was created using a Row object, given that it was, essentially, a two-dimensional data structure, and we can see in this particular DataFrame as well that it contains column headers corresponding to the values defined in our Row object.
All right, we now move along and we create another RDD, [Video description begins] He adds code, which reads: data = sc.parallelize([Row(index = 1, vehicle_name = 'Nissan Versa', vehicle_stock = 12), Row(index = 2, vehicle_name = 'Ford Fiesta', vehicle_stock = 9), Row(index = 3, vehicle_name = 'Hyundai Accent', vehicle_stock = 8)]) data. [Video description ends] but this one contains three rows of data. So, once again, note that the parallelize function, we are passing a list of Row objects, and our first row is similar to what we had previously, corresponding to the Nissan Versa. [Video description begins] The first row's code is highlighted. [Video description ends] And we have a Ford Fiesta as well, in addition to a Hyundai Accent. [Video description begins] The second row's code is highlighted. The third row's code is highlighted. [Video description ends]
So, we have data corresponding to three vehicles in this RDD, [Video description begins] He executes the code. The output reads: ParallelCollectionRDD[36] at parallelize at PythonRDD.scala:194. [Video description ends] and note that the column headers which we defined in each row is exactly the same. So, once this RDD has been initialized, we will convert it to a DataFrame [Video description begins] He adds code, which reads: df = data.toDF() df.show(). [Video description ends] and, this time, our DataFrame does look like a real table, which contains three rows and three columns, and has column headers as well. [Video description begins] The column headers are: index, vehicle_name, and vehicle_stock. There are three rows of data, corresponding to each of the three cars: Nissan Versa, Ford Fiesta, and Hyundai Accent. [Video description ends] So, we have now learned how we can create a Spark DataFrame using Row objects in our RDD, and also using a list of lists.
Contents of a DataFrame
[Video description begins] Topic title: Contents of a DataFrame. Your host for this session is Kishan Iyer. [Video description ends] So the DataFrames we have created so far, have involved both integers as well as string elements. We will now explore how we can have different types of primitive types in our DataFrame in addition to composite types. [Video description begins] A notebook called IntroducingSpark2.0 is open in Jupyter, a code editor application. [Video description ends] So, we now create a new RDD called complex_data, once again using the parallelize function [Video description begins] Kishan adds code, which reads: complex_data = sc.parallelize([Row(col_string = "Alice", col_double = 3.14, col_integer = 20)]). [Video description ends] and this, again, contains one row of data within it. And, within the row, we have three different types. We have a string, a double, and an integer within our row, [Video description begins] These are col_string, col_double, and col_integer respectively. [Video description ends] and we have named the columns to represent the type of data which it contains.
So, once we create this RDD, let us convert it to a DataFrame and then view its elements, [Video description begins] He adds the code: complex_data_df = complex_data.toDF() complex_data_df.show(). [Video description ends] and this does indeed contain the three columns as we have defined, [Video description begins] He executes the code. A three-column table is output. The column headers are: col_double, col_integer, and col_string. [Video description ends] and we have demonstrated that a DataFrame is capable of holding these three different types of elements within it. So, while we view the contents as a double integer and string, we did not explicitly define the types, so the types have been inferred by Spark, but as what exactly?
For that, we just take a look at our DataFrame object, [Video description begins] He adds the code: complex_data_df. [Video description ends] and we see here that the types have indeed been interpreted as double, integer, and string. [Video description begins] He executes the code. The output reads: DataFrame[col_double: double, col_integer: bigint, col_string: string]. [Video description ends] So, these are three of the types which are supported by Spark for sure. We move along now, and then include some other types in our DataFrame. [Video description begins] He adds code, which reads: complex_data = sc.parallelize([Row(col_string = "Alice", col_double = 3.14, col_integer = 20, col_boolean = True, col_list = [2,4,6])]). [Video description ends]
And, for that, we create an RDD, once again containing one row of data, and this contains two additional columns, [Video description begins] The two additional columns, col_boolean and col_list, are highlighted in the code. [Video description ends] one containing a Boolean value and the other one has a composite type, specifically, a list. So, will this work? So, we just create a DataFrame from it once more [Video description begins] He adds the code: complex_data_df = complex_data.toDF() complex_data_df.show(). [Video description ends] and this conversion is successful, and we see the values which we expect. [Video description begins] He executes the code. A five-column table is output. The column headers are: col_boolean, col_double, col_integer, col_list, and col_string. The table has one row. [Video description ends]
So, our Boolean value, as well as the list, is shown over here, but does Spark treat them as a Boolean value and a list, or did it convert them to some other object? Well, we examine this DataFrame once more, [Video description begins] He adds the code: complex_data_df. [Video description ends] and we see here that Spark has inferred the Boolean column to be of type boolean, and the list is, in fact, of a type array of bigint. [Video description begins] He executes the code. The output reads: DataFrame[col_boolean: boolean, col_double: double, col_integer: bigint, col_list: array<bigint>, col_string: string]. [Video description ends]
So, Spark data structures such as RDDs and DataFrames support a number of different primitive types, and also a composite type, such as an array. Well, let us move along now, and this time we create one more DataFrame, and the RDD we initialize it with contains three rows of data [Video description begins] He adds code, which reads: complex_data = sc.parallelize([Row(col_list = [2,4,6], col_dict = {"a1":0}, col_row = Row(x=15, y=25, z=35), col_time = datetime(2018, 7, 1, 14, 1, 2)], Row(col_list = [2, 4, 6, 8, 10], col_dict = {"a1": 0, "a2": 1}, col_row = Row(x=45, y=55, z=65), col_time = datetime(2018, 7, 2, 14, 1, 3)], Row(col_list = [2, 4, 6, 8, 10, 12, 14], col_dict = {"a1": 0, "a2": 2 }, col_row = Row(x=75, y=85, z=95), col_time = datetime(2018, 7, 3, 14, 1, 4))]). [Video description ends] and, within each row, there are some more complex data types. [Video description begins] The col_dict and col_time objects are highlighted in the code. [Video description ends]
So, our rows now include a dictionary as well as a datetime object. And there is, in fact, a Row object, which is one of the cells in our rows. In fact, all of the rows in this particular RDD will contain the dictionary and the datetime object and each of these rows, in fact, includes another row within it. So, will this work? Well, there's only one way to find out. [Video description begins] He adds code, which reads: complex_data_df = complex_data.toDF() complex_data_df.show(). [Video description ends] So, once we create the DataFrame, there are no errors thrown and we see here that our four columns of data have been created [Video description begins] He executes the code. A four-column data table is output, which has the column headers: col_dict, col_list, col_row, and col_time. [Video description ends] and, visually, everything seems to appear exactly as it should. But what are the types which Spark has inferred for each of the columns? [Video description begins] He adds the code: complex_data_df. [Video description ends]
Well, we get the answer over here, [Video description begins] He executes the code. The output reads: DataFrame[col_dict: map<string,bigint>, col_list: array<bigint>, col_row: struct<x:bigint, y:bigint, z:bigint>, col_time: timestamp]. [Video description ends] and we see that a Python dictionary corresponds to a map object in Spark, and this is a map containing a string key and a bigint value. Our Row object is represented as a struct data structure over here, and this contains three bigint fields. And, finally, the datetime object has been mapped to a timestamp object in our Spark DataFrame. So, we have shown that the contents of a Spark DataFrame can include primitive types as well as complex data types. This is often achieved by mapping a Python data structure to some corresponding Spark data structure. For example, a Python list corresponds to an array while a Python dictionary corresponds to a map object.
The SQLContext
[Video description begins] Topic title: The SQLContext. Your host for this session is Kishan Iyer. [Video description ends] We have now covered how DataFrames are essentially tabular data structures, and how they can be initialized using RDDs which have been created using the SparkContext. Now, given that DataFrames seem very similar to relational database tables, is it possible to treat them as such in some way? [Video description begins] A notebook called IntroducingSpark2.0 is open in Jupyter, a code editor application. [Video description ends]
And then, rather than go through the intermediate step of initializing an RDD and then creating a DataFrame from it, can we not load data directly into a DataFrame? Well, the answer to both of these questions is yes and, to do that, we need to make use of the SQLContext, which is our bridge to Spark SQL. [Video description begins] Kishan adds code, which reads: sqlContext = SQLContext(sc). [Video description ends] Spark SQL allows us to work with structured data such as DataFrames as though they were relational database tables, and we can also query them using SQL.
So, we initialize a SQLContext object over here and, for that, again we use the SparkContext, and this will initialize our gateway from the application to Spark SQL. So, what exactly does this SQLContext look like? [Video description begins] He adds the code: sqlContext. [Video description ends] Well, you can see here that it is a SQLContext object which can be found within pyspark.sql.context [Video description begins] He executes the code. The output reads: <pyspark.sql.context.SQLContext at 0x10739b0b8>. [Video description ends] and, using this, we can load data directly into a DataFrame. [Video description begins] He adds the code: df = sql.Context.range(4) df. On-screen text reads: This will create a column of 4 integers – 0, 1, 2, 3. [Video description ends]
So here, we initialize the DataFrame using the sqlContext.range function. So, similar to a Python range function, which will return a list containing a certain number of elements, the range function of the SQLContext will return the same elements, but they will be formatted as the elements of a particular column in a DataFrame, To see what that looks like, we just execute the cell, [Video description begins] He executes the code. The output reads: DataFrame[id: bigint]. [Video description ends] and then we can see here that this has created a DataFrame object which includes a single column called id, and the type of its contents are bigint.
So, let us try to visualize the contents now by running the show function, [Video description begins] He adds the code: df.show(). [Video description ends] and we see here that this has created four rows of data containing the elements which were returned by the range function. [Video description begins] He executes the code. A single-column table with four rows is output. The column header is: id. The row values are 0, 1, 2, and 3. [Video description ends] So, we have now successfully loaded data into our DataFrame without going through the intermediate step of creating an RDD.
Now, let us examine this DataFrame which has been created, first, by counting the number of elements within it. [Video description begins] He adds the code: df.count(). [Video description ends] So, this contains four elements corresponding to the four rows of data, [Video description begins] He executes the code. The output reads: 4. [Video description ends] but a DataFrame such as this one is not quite meaningful. We want our rows to convey more information. So, for that, let us define another data structure [Video description begins] He adds code, which reads: data = [('Nissan Versa', 12), ('Ford Fiesta', 9), ('Hyundai Accent', 8)]. [Video description ends] and this data object is, in fact, a list of tuples. Each tuple should correspond to one row in our DataFrame.
To do that, we make use of the createDataFrame function of the sqlContext object, and to that we pass in our list of tuples. [Video description begins] He adds the code: sqlContext.createDataFrame(data).show(). [Video description ends] That should create a new DataFrame for us, containing three rows of data corresponding to the tuples, and then we run the show function to see what the DataFrame looks like. And we see here that this does indeed include our three rows of data, [Video description begins] He executes the code. A two-column table with three rows is output. The column headers are _1 and _2. [Video description ends] but this one does not contain any column headers.
We can fix that [Video description begins] He adds the code: sqlContext.createDataFrame(data, ['vehicle_name', 'vehicle_stock']).show(). [Video description ends] and, this time, we make use of the createDataFrame function once more but, in addition to passing our data, which is the list of tuples, we pass in a second argument, which again is a list, and this one contains all the column headers. So the columns in this DataFrame will be vehicle_name and vehicle_stock, which represents the number of vehicles we have in our inventory. And, with that, we see here that the same DataFrame has been created but, this time, with the column headers. [Video description begins] He executes the code. A two-column table with three rows is output. The column headers are now vehicle_name and vehicle_stock. [Video description ends]
So, the SQLContext allows us to load data directly into a DataFrame, if they're fed in the form of a list of tuples. But the tuples we have used so far contained simple data types, such as a string and an int, so let us now try to load it with some more complex data. [Video description begins] He adds code, which reads: complex_data = [(1.0, 12, "Nissan Versa", True, [2, 4, 6], {"a1": 0}, Row(x=1, y=2, z=3), datetime(2018, 7, 1, 14, 1, 2)), (2.0, 13, "Ford Fiesta", True, [2, 4, 6, 8, 10], {"a1": 0, "a2": 1}, Row(x=1, y=2, z=3), datetime(2018, 7, 2, 14, 1, 3)), (3.0, 15, "Hyundai Accent", True, [2, 4, 6, 8, 10, 12, 14], {"a1": 0, "a2": 1, "a3": 2}, Row(x=1, y=2, z=3), datetime(2018, 7, 3, 14, 1, 4))]. [Video description ends] So, in addition to some of the primitive data types, our rows will now contain a list, a dictionary, a Row object, and also a datetime object. [Video description begins] He highlights each of the objects listed in the code. [Video description ends]
So, we have represented a total of three rows of data over here and this is in the form of list of tuples once more. [Video description begins] He adds the code: sqlContext.createDataFrame(complex_data).show(). [Video description ends] So, we once again make use of the createDataFrame function and load it with our list of tuples, and we see here that the DataFrame has been created without any issues. [Video description begins] He executes the code. An eight-column table is output. The table has three rows. The column headers are _1, _2, _3, _4, _5, _6, _7, and _8. [Video description ends] The contents of each cell look exactly as we expect them to, but there are no column headers here.
So, we recreate this DataFrame [Video description begins] He adds the code: complex_data_df = sqlContext.createDataFrame(complex_data, ['col_int', 'col_double', 'col_string', 'col_bool', 'col_array', 'col_dictionary', 'col_row', 'col_date_time']) complex_data_df.show(). [Video description ends] but, in addition to our complex data which is a list of tuples, we pass in a list of column headers as well and, this time, our DataFrame has been recreated with the column headers. [Video description begins] He executes the code. An eight-column table is output. The table has three rows. The column headers are col_int, col_double, col_string, col_bool, col_array, col_dictionary, col_row, and col_date_time. [Video description ends]
The map() Function of an RDD
[Video description begins] Topic title: The map() Function of an RDD. Your host for this session is Kishan Iyer. [Video description ends] We have previously seen how we can create multiple rows of data in an RDD using the Row object. In order for the tabular data represented in our RDD to have column headers, we have explicitly set the column headers within each Row object, and that is perfectly fine when we are working with three or four rows of data. However, it can become cumbersome when we have say thousands, or even millions, of rows of data, which is pretty much going to be the default case when working with a big-data tool such as Spark.
One way to solve this is to have each of our row definitions contain only the data each of the rows will be loaded with, and then set the column header later. [Video description begins] A notebook called IntroducingSpark2.0 is open in Jupyter, a code editor application. Kishan has added code, which reads: data = sc.parallelize([Row(1, "Nissan Versa", 12), Row[2, "Ford Fiesta", 9), Row(3, "Hyundai Accent", 8)]). [Video description ends] So here, we make use of the SparkContext parallelize function once more to create an RDD with a list of rows, and then each of these rows does not include any column headers. They only include the data and do note that the schema for each of these rows is identical, that is, each of them contains an int, followed by a string, followed by an int again.
And, when we create a DataFrame out of this, [Video description begins] He adds the code: data.toDF().show(). [Video description ends] this contains the three rows of data which we defined but no column headers. [Video description begins] He executes the code. A three-column table with three rows is output. The column headers are the defaults, which are _1, _2, and _3. [Video description ends] So, can we add column headers after the fact? Well, the answer is yes. So here, we define one more Row object, [Video description begins] He adds the code: column_names = Row('index', 'vehicle_name', 'vehicle_stock'). [Video description ends] and this one only contains the column headers which we wish to use for our DataFrame, so we call this column_names, and then we make use of the RDD's map function. [Video description begins] He adds code, which reads: cars = data.map(lambda r: column_names(*4)) cars. [Video description ends]
So, what we wish to do here is to explicitly set the column headers for each of the Row objects in our RDD. Now, the map function is something which will apply to every element within an RDD, the elements of our RDD here being the rows. So, the function which will be applied to every row is going to be this lambda function and here we say that, for each row, apply the column_names functionand the arguments which we pass to it are the contents of each row.These are passed as positional arguments, which are denoted by the *r over here, [Video description begins] The r: and (*r) elements are highlighted in the code. [Video description ends] so this will have the effect of applying the column_names function to every row in our RDD. [Video description begins] He executes the code. The output reads: PythonRDD[139] at RDD at PythonRDD.scala:52. [Video description ends] And then the arguments to this function include every element of our row.
So this map operation will apply a transformation on the RDD and, since RDD's are immutable, another RDD, which is of type PythonRDD, has been created from this transformation. However, the transformation will not actually be executed unless we invoke some action, and that is exactly what we do by calling the collect function, [Video description begins] He adds the code: cars.collect(). [Video description ends] and the result shows that our column headers have now applied to every single row in our RDD. [Video description begins] He executes the code. The output reads: [Row(index=1, vehicle_name='Nissan Versa', vehicle_stock=12), Row(index=2, vehicle_name='Ford Fiesta', vehicle_stock=9), Row(index=3, vehicle_name='Hyundai Accent', vehicle_stock=8)]. [Video description ends]
Now, we can use this particular RDD and then create a DataFrame from it using the sqlContext.createDataFrame. [Video description begins] He adds code, which reads: cars_df = sqlContext.createDataFrame(cars) cars_df. [Video description ends] So, while we have previously used the toDF function of an RDD to create a DataFrame from it, and have used the sqlContext to create a DataFrame from a list of tuples, we have not used the sqlContext to create a DataFrame from an RDD. Well, this is exactly what we do this time, and our DataFrame has been created, and the types for each of the columns have been inferred as well. [Video description begins] He executes the code. The output reads: DataFrame[index: bigint, vehicle_name: string, vehicle_stock: bigint]. [Video description ends]
We now do a final sanity check and examine the contents of this DataFrame, [Video description begins] He adds the code: cars_df.show(). [Video description ends] and we see that it contains exactly the data and the column headers which we had defined. [Video description begins] He executes the code. A three-column, three-row table is output. The column headers are: index, vehicle_name, and vehicle_stock. [Video description ends]
Accessing the Contents of a DataFrame
[Video description begins] Topic title: Accessing the Contents of a DataFrame. Your host for this session is Kishan Iyer. [Video description ends] In this demo, we will see how we can access specific elements of a DataFrame. These include individual rows as well as individual cells. We have previously accessed the first few elements of an RDD by using the take or the first functions. However, we have not accessed any data from DataFrames yet. So, how exactly can we do that? Well, for that we will make use of the complex_data DataFrame, which we have created a bit earlier [Video description begins] A notebook called IntroducingSpark2.0 is open in Jupyter, a code editor application. Kishan has added code, which reads: complex_data_df.first(). [Video description ends] and, just as we did with an RDD, we can use the first function of a DataFrame as well. [Video description begins] The first() function is highlighted in the code. [Video description ends]
Since a DataFrame is built on top of an RDD, this operation is possible and then, when we run it, we see that this, in fact, returns the first row of our DataFrame, just as we would expect. [Video description begins] He executes the code. The output reads: Row(col_int=1.0, col_double=12, col_string='Nissan Versa', col_bool=True, col_array=[2, 4, 6], col_dictionary={'a1': 0}, col_row=Row(x=1, y=2, z=3), col_date_time=datetime.datetime(2018, 7, 1, 14, 1, 2)). [Video description ends] And, similar to that, we can make use of the take function as well of the underlying RDD. [Video description begins] He adds the code: complex_data_df.take(2). [Video description ends]
So here, we wish to get the first two elements of our DataFrame, and this returns to us the first two rows [Video description begins] He executes the code. The first and second rows are output. The first row's output is identical as previously. The second row's output reads: Row(col_int=2.0, cold_double=13, col_string='Ford Fiesta', col_bool=True, col_array=[2, 4, 6, 8, 10], col_dictionary={'a1': 0, 'a2': 1}, col_row=Row(x=1, y=2, z=3), col_date_time=datetime.datetime(2018, 7, 2, 14, 1, 3)). [Video description ends] and note that this is, in fact, a list of rows.
So we have now been able to access the initial rows in a DataFrame, but what if we want to access individual cells? Well, for that, we can simply make use of the DataFrame's collect function, [Video description begins] He adds code, which reads: cell_string = complex_data_df.collect()[0][2] cell_string. [Video description ends] which will return to us a list of rows and then we can treat the returned list as a two-dimensional array. [Video description begins] The [0][2] code is highlighted. [Video description ends] So, by using this double index, we say that we want to retrieve the row at index 0 and, from that, we wish to get at the element which is in the third column, that is, at index 2 of that row. So, this gives us the value of Nissan Versa, which is exactly what we expect. [Video description begins] He executes the code. The output reads: 'Nissan Versa'. [Video description ends]
Next, from the same row of data, we retrieve the element which is at column with the index of 4. [Video description begins] He adds code, which reads: cell_list = complex_data_df.collect()[0][4] cell_list. [Video description ends] So this corresponds to a list or an array and, once we get that, [Video description begins] He executes the code. The output reads: [2, 4, 6]. [Video description ends] let us see what happens if we were to modify this array, that is, is the returned list an independent copy of the array in the DataFrame, or is it linked to it? [Video description begins] He adds the code: cell_list.append(100) cell_list. [Video description ends] So, once we modify this cell_list variable, we see that it now does, in fact, contain four elements as opposed to the original three. [Video description begins] He executes the code. The output reads: [2, 4, 6, 100]. The previous output of [2, 4, 6] is also highlighted. [Video description ends]
So, in addition to the elements 2, 4, and 6, it contains the number 100 at the end. And then, following that, let us examine the DataFrame to see if that has been modified in any way, [Video description begins] He adds the code: complex_data_df.show(). [Video description ends] and the answer is no. [Video description begins] He executes the code. An eight-column table with three rows is output. The value in the first row of the col_array column is highlighted: [2, 4, 6]. On-screen text reads: The col_list variable is an independent copy created from this array. [Video description ends] So, what was returned to us was in fact an independent copy of this particular element in the DataFrame.
So, that is one way to access individual cells in our DataFrame. Now, what if we wish to access only specific columns? So, this DataFrame contains eight columns in total, but we are only really interested in two of them, specifically, the string column and the dictionary ones. [Video description begins] He adds code, which reads: complex_data_df.rdd\ .map(lambda x: (x.col_string, x.col_dictionary))\ .collect(). On-screen text reads: Create a new RDD with only some columns of complex_data_df. [Video description ends] And, not only that, we want the return type to be an RDD.
Well, for that, we can access the underlying RDD of a DataFrame by using its .rdd property and then, from that, we would make use of the map function to define exactly what we want from each row. [Video description begins] The highlighted code reads: .map(lambda x: (x.col_string, x.col_dictionary))\. [Video description ends] So, this is where we define the transformation which we we wish to apply, and our transformation is that we only wish to include two of the columns, the col_string and the col_dictionary ones. So, by specifying those as a tuple over here, the map function will ensure that this particular operation applies to every single row in the RDD and that only these two columns will be returned from each row, and the result gives us a list of tuples which contain the two rows of data which we want. [Video description begins] He executes the code. The output reads: [('Nissan Versa', {'a1': 0}), ('Ford Fiesta', {'a1': 0, 'a2': 1}), ('Hyundai Accent', {'a1': 0, 'a2': 1, 'a3': 2})]. [Video description ends]
So the map function ensured that a single tuple was returned from each row in our RDD, giving us a list of tuples. Now, this is not the most intuitive way of retrieving specific columns from a DataFrame, especially for those who have a background using relational database tables, where the operation can be performed by constructing a rather simple SELECT statement and running it against the table. Well, for exactly that reason, we have the select function available for a Spark DataFrame whose arguments are exactly the same as the column names you would use in a SELECT statement. [Video description begins] He adds code, which reads: complex_data_df.select('col_string', 'col_array', 'col_date_time').show(). He executes the code. A three-row, three-column table is output. The column headers are: col_string, col_array, and col_date_time. [Video description ends]
And the result shows us that this indeed worked exactly as it would in a relational database table, and the three columns which we have specified have been returned. So, we have merely returned the specific columns from our DataFrame as is. What if we would like to apply an additional transformation? Well, let us examine the RDD approach first [Video description begins] He adds code, which reads: complex_data_df.rdd\ .map(lambda x: ("2018 " + x.col_string))\ .collect(). [Video description ends] and, for that, we make use of the map operation once more and, this time, we do select a single column, specifically, the string one, but we will prepend it with the string 2018. [Video description begins] The highlighted code reads: .map(lambda x: ("2018 " + x.col_string))\. [Video description ends]
So, since our string column represents specific models of cars, this would make them look like 2018 versions of that particular model and, in fact, that is exactly what occurs, [Video description begins] He executes the code. The output reads: ['2018 Nissan Versa', '2018 Ford Fiesta', '2018 Hyundai Accent']. [Video description ends] and a new RDD has been created with this transformed column. So, we now do something similar with our SQLContext [Video description begins] He adds code, which reads: complex_data_df.select('col_int', 'col_double')\ .withColumn("col_sum", complex_data_df.col_int + complex_data_df.col_double)\ .show(). [Video description ends] but, rather than only returning a transformed column, our new DataFrame will include two of the columns from the original DataFrame without any modifications, and then a new column will be created which includes a transformation applied using those two columns.
So here, we make use of the int and double columns and then we make use of the withColumn function to create a new column called col_sum, which pretty much contains the sum of those two columns, and our resultant DataFrame shows us exactly what we expect. [Video description begins] He executes the code. A three-column, three-row table is output. The column headers are: col_int, col_double, and col_sum. [Video description ends] So, each row contains an int, a double, and the sum column contains the sum of the int and the double. [Video description begins] He highlights the values in the first row, which are col_int: 1.0, col_double: 12, and col_sum: 13.0. [Video description ends]
So, the transformations we have applied so far includes a string concatenation, as well as a mathematical operation. We will now make use of a Boolean operation in order to apply a transformation. [Video description begins] He adds code, which reads: complex_data_df.select('col_bool')\ .withColumn("col_opposite", complex_data_df.col_bool == False )\ .show(). [Video description ends] So here, we make use of the Boolean column in our DataFrame and then create a column called col_opposite which pretty much negates that Boolean value, [Video description begins] The highlighted code reads: "col_opposite", complex_data_df.col_bool == False )\. [Video description ends] and our new DataFrame includes this Boolean transformation as well. [Video description begins] He executes the code. A two-column, three-row table is output. The column headers are col_bool and col_opposite. The value in the first row is col_bool:true, col_opposite: false. [Video description ends]
So, there will be cases when you want to select the data from a particular column, but you don't particularly like the column header. So, there are two ways of dealing with that. One, we can make use of the withColumnRenamed function of a DataFrame in order to recreate the DataFrame but with a different name for a specific column. [Video description begins] He adds code, which reads: complex_data_df.withColumnRenamed("col_dictionary", "col_map").show(). [Video description ends] So here, we wish to create a new DataFrame which contains the exact same contents as the complex_data_df, but the col_dictionary column will be renamed the col_map and, on running this, that is exactly what we get and the new column name has been applied. [Video description begins] He executes the code. An eight-column table is output. The col_dictionary column header has been renamed to col_map. [Video description ends]
Another way to set a new name for a column is to make use of the SELECT statement [Video description begins] He adds code, which reads: complex_data_df.select(complex_data_df.col_string.alias("vehicle_name")).show(). [Video description ends] and then, when referencing the columns which you wish to include in your resultant DataFrame, you can make use of the alias function in order to set a new alias for that column. So here we wish to select the string column in our DataFrame, and then set it to a name which is more meaningful, such as vehicle_name, and our resultant DataFrame gives us exactly what we need. [Video description begins] He executes the code. A single-column table is output with column header: vehicle_name. The table has three rows. [Video description ends]
DataFrames in Spark and Pandas
[Video description begins] Topic title: DataFrames in Spark and Pandas. Your host for this session is Kishan Iyer. [Video description ends] For those of you who have used Pandas before, you will see that Pandas DataFrames are, in fact, quite similar to Spark DataFrames, and you must be wondering whether there is a way to integrate the two. Such integration is possible and, in this demo, we will cover how we can convert a Spark DataFrame into a Pandas DataFrame, and vice versa.
We begin by importing the Pandas library so, if you do not have it already, you will need to install it. [Video description begins] A notebook called IntroducingSpark2.0 is open in Jupyter, a code editor application. Kishan has added code, which reads: import pandas. [Video description ends] And, following that, we will see how we can create a Pandas DataFrame from a Spark DataFrame. And this is, in fact, a really simple task. We just need to call the toPandas function associated with a Spark DataFrame, and this will return a Pandas DataFrame to us. [Video description begins] He adds code, which reads: df_pandas = complex_data_df.toPandas() df_pandas. [Video description ends]
With that done, we can examine the contents, and we see here that our Spark DataFrame is now represented as a Pandas DataFrame. [Video description begins] He executes the code. An eight-column table with three rows is output. The column headers are: col_int, col_double, col_string, col_bool, col_array, col_dictionary, col_row, and col_date_time. [Video description ends] Okay, so that was simple enough, but what about the reverse operation? Can we create a Spark DataFrame easily from a Pandas DataFrame? And the answer is a resounding yes. [Video description begins] He adds code, which reads: df_spark = sqlContext.createDataFrame(df_pandas).show() df_spark. [Video description ends]
We can simply make use of the sqlContext.createDataFrame function once more, and we can initialize it by making use of the Pandas DataFrame which we have. [Video description begins] The code, df_pandas, is highlighted. [Video description ends] And, with that, our Spark DataFrame has been created out of a Pandas DataFrame. [Video description begins] He executes the code. The same eight-column table with three rows is output. [Video description ends] So, you can see here that this is exactly the same data as in the Pandas DataFrame. So, if you are receiving your data in the form of a Pandas DataFrame and you wish to create a Spark DataFrame out of the same content in order to perform an analysis of your data in Spark, then you now know that this transformation is quite simple.
Exercise: Working with Spark
[Video description begins] Topic title: Exercise: Working with Spark. Your host for this session is Kishan Iyer. [Video description ends] In this exercise, you will think of the different techniques which allows Spark to deliver great performance on operations involving big Datasets. This is specific to version 2 of Spark, so you can think of the improvements when compared to version 1 and also with other tools which can be used to analyze data, such as relational databases. You'll also remind yourself of the two different types of operations which are permitted on Spark's Resilient Datasets, or RDDs, [Video description begins] So, which two types of operations can be performed on a Spark RDD. [Video description ends] and, finally, you will recall how Spark RDDs enable lazy evaluation. To be precise, you will think of the operations which are put on hold to be executed later, and exactly how that is accomplished. Do think of these on your own.
[Video description begins] The solution commences. [Video description ends] The first task in the exercise was to recall the different techniques which allow Spark to deliver great performance, so we focus here on why RDDs are so efficient when we perform operations on them. For one, the data in RDDs is split across multiple nodes, which allow us to perform several operations in parallel. In addition, all the RDDs are in-memory, that is, each segment of the RDD which lies in a particular node in the cluster, is stored within the memory of that cluster.
This makes operations far more efficient than if the data had been on disk. Also, when compared to Spark version 1, the Tungsten engine of Spark 2 includes numerous optimizations which greatly improve performance. For one, frequently used data is stored within the registers of the CPUs rather than in memory. Virtual function calls have also been eliminated in Spark 2, which allows more efficient use of the available resources and thus improves performance. And, finally, the Tungsten engine also includes various compiler optimizations, such as loop unrolling, pipelining, and so on. And the combined effect of all of these means that Spark 2 is between 10 and 20 times faster than Spark 1.
The next task involved recalling the types of operations which are permitted on RDDs. So, these include transformations, where you define how the content of an RDD will change. This could involve filtering out specific rows and columns or even adding a new column by using the data from some of the others. Once you define a transformation, they are not executed immediately. In fact, transformations are only applied once you invoke the other type of permitted operations on an RDD, which is an action. An action is where you specify what you would like to view from an RDD. This will have the effect of applying the transformations which were defined, and then displaying the data which was requested in the action. Do keep in mind, though, that RDDs in Spark are immutable so, when you define a transformation and apply it, it will in fact create a new RDD with those transformations applied.
The final task was to recall how lazy evaluation is implemented in Spark RDDs. The way it is done is that the transformations which are defined are simply recorded at first. These are stored in the form of a Directed Acyclic Graph, which you can find in the RDD metadata. [Video description begins] This can be abbreviated to DAG. [Video description ends] At this point, Spark knows exactly what to do in order to transform an RDD. However, the actual transformation only takes place when something is requested from the RDD or, to be more precise, when an action is invoked. Since Spark does not perform transformations immediately, but just waits until it is actually required to perform them, this is known as lazy evaluation.
Comments
Post a Comment