Evolving schema with Parquet files in hive

Different versions of parquet used in different tools (presto, spark, hive) may handle schema changes slightly
differently, causing a lot of headaches.

Parquet basically only supports the addition of new columns, but what if we have a change like the following :
– renaming of a column
– changing the type of a column, including from scalar to array
– not changing the type, but the meaning of a column
(for instance, switching from pounds to Kilos)

We want to support compatibility both ways, that is, forward and backward compatibility

That is, after we upgrade the schema, we should have –
 1. Backward compatibility: old queries would still run and be correct
 2. Forward compatibility : old data will work with new queries

Demonstration

— schema, v1
DROP TABLE IF EXISTS people_data_v1;
CREATE TABLE people_data_v1 (
name string,
weight double,
gender string
)
PARTITIONED BY (ingest_date string)
STORED AS PARQUET;

— some test values
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

INSERT INTO TABLE people_data_v1 PARTITION(ingest_date=’2018-12-24′) VALUES
(‘Eddard Stark’, 180.0, “Male”),
(‘Rob Stark’, 126.0, “Male”);

INSERT INTO TABLE people_data_v1 PARTITION(ingest_date=’2018-12-25′) VALUES
(‘Arya Stark’, 60.0, “Female”),
(‘Sansa Stark’, 80.0, “Female”);

select * from people_data_v1 where ingest_date=’2018-12-24′;

We are going to create a new table for each schema, to make sure we don’t unintentionally mix data with
different schemata. We are going to call :
– the tables with the pattern people_data_v??
– the views as people_v??

with the implicit contract that under a view version, data would look like that version. This way we can
keep backward compatibility. We can keep an extra top view ( master view ) called people
that points to the most recent view

Create a Master always pointing to latest version

DROP VIEW IF EXISTS people;
CREATE VIEW IF NOT EXISTS people( name, weight, gender, ingest_date)
PARTITIONED ON (ingest_date) AS select * from people_data_v1;

–Check data
select * from people;

Note that the table is partitioned by date. It is really important for partition pruning in hive to work that the views are aware of the partitioning schema of the underlying tables. Hive will do the right thing, when querying using the partition, it will go through the views and use the partitioning information to limit the amount of data it will read from disk.

Schema alteration

Let’s suppose that a new column age is to be added

— schema, v2
DROP TABLE IF EXISTS people_data_v2;
CREATE TABLE people_data_v2 (
name string,
weight double,
gender string,
age int
)
PARTITIONED BY (ingest_date string)
STORED AS PARQUET;

— some test values
INSERT INTO TABLE people_data_v2 PARTITION(ingest_date=’2018-12-26′) VALUES
(‘Aegon Targaryen’, 200.0, “Male”,60),
(‘Daenerys Targaryen’, 90.0, “Female”,24),
(‘Jon Snow’, 135.0, “Male”,24);

INSERT INTO TABLE people_data_v2 PARTITION(ingest_date=’2018-12-27′) VALUES
(‘Tywin Lannister’, 160.0, “Male”,65),
(‘Jamie Lannister’, 140.0, “Male”,41),
(‘Tyrion Lannister’, 60.0, “Male”,38),
(‘Cersei Lannister’, 80.0, “Female”,40);

select * from people_data_v2 where ingest_date=’2018-12-27′;

Schema Evolution for backward and forward compatibility

Create a new partitioned view for forward compatibility

Objective is to make new data look like old. Old consumers will see the new data but will not see new columns

— create view for backward compatibility
— make new data look like old one
DROP VIEW IF EXISTS people_v1;
CREATE VIEW people_v1 (name, weight, gender, ingest_date)
PARTITIONED ON (ingest_date) AS
SELECT * from people_data_v1
UNION ALL
SELECT name, weight, gender, ingest_date from people_data_v2 ;

— Check
select * from people_v1;

This also satisfies our rule that under a view version, data would look like that version. So under view
people_v1 data looks like people_data_v1

Create a new partitioned view for backward compatibility

Objective is to make old data look like new. New consumers will see the new data along with old data
with newer field having default values

— forward compatibility
— old data will look like new data
DROP VIEW IF EXISTS people_v2;
CREATE VIEW people_v2 (name, weight, gender, age, ingest_date)
PARTITIONED ON (ingest_date) AS
SELECT name,weight, gender, NULL AS age, ingest_date from people_data_v1
UNION ALL
SELECT * from people_data_v2 ;

— Check
select * from people_v2;

This also satisfies our rule that under a view version, data would look like that version. So under view
people_v2 data looks like people_data_v2

Lastly, set master view to the most recent one

DROP VIEW IF EXISTS people;
CREATE VIEW IF NOT EXISTS people(name, weight, gender, age, ingest_date)
PARTITIONED ON (ingest_date) AS select * from people_v2;

–Check
Select * from people;

Using views to abstract schema changes is cheap and extremely easy to roll back in case your users are not ready to upgrade to the new schema

Summary

  • Create versioned data tables to reflect the point-in-time schema
  • Create sub-views to accommodate forward and backward compatibility
  • Create a master view, and update it whenever the schema changes

Reference

This post is just a modified version of : this post

 

Coursier : Blazing fast Artifact Fetching

Tired of waiting endlessly for dependencies being downloaded from central repo? Already on your 3rd cup of coffee getting frustrated ? Well worry no more, a new project coursier attempts to solve this very problem.

Coursier is written 100% in Scala. It helps fetch dependencies from Maven / Ivy repositories. However unlike maven, which downloads dependencies sequentially, coursier downloads all of the artifacts in parallel. If all your dependencies are in cache, chances are coursier will not even try to connect to remote repositories

Coursier has first class integration with SBT.  To use it, simply add it as a global plugin of SBT. Add the line mentioned below :

addSbtPlugin("io.get-coursier" % "sbt-coursier" % "<version>")

to the ~/.sbt/0.13/plugins/build.sbtfile. As of writing, the latest version is 1.0.0-RC14

If the file, build.sbt, or the folder plugins does not exist, simply create it under the .sbt folder located under your home directory.

Because you have added it as a global plugin, it is now enabled for all your SBT Projects. It is only the first time you may have to wait a bit (time to get that coffee brewing :P) while the dependencies of coursier itself gets downloaded. Thereafter every time your project needs any additional dependencies you can clearly see the speed difference (as illustrated in the GitHub page).

But it’s still early days, and there may be some compatibility issues & glitches. I have personally used it with sbt 0.13.8 and have not faced any major issues yet. Nevertheless it is an interesting project to keep an eye on as it evolves over time, especially for Scala developers.

Field validation using Spark DataFrame

Problem

You have a Spark DataFrame, and you want to do validation on some its fields.

Solution

While working with the DataFrame API, the schema of the data is not known at compile time. This imposes some challenge of having to work with individual fields. In this solution, we shall see how to deconstruct a Row object and manipulate its underlying data.

Schema in a Spark DataFrame is represented using the StructType object, which contains one or more StructField objects. You can get the schema of a data set simply by calling the schema method on a DataFrame. So, the first thing we shall do is to load the data and extract its schema.

// Load your base data
val input = <<your input dataframe>>
//Extract the schema of your base data
val originalSchema = input.schema

Suppose you have a data set on which you want to validate among the following fields :

Schema Details

  1. EMPID – (NUMBER)
  2. ENAME – (STRING)
  3. GENDER – (STRING)

To validate on these fields let’s write a simple Scala function that takes one Row as its input. Here we’ll do the following :

  • Extract the fields on which we want to do validation by the field name
  • Write our validation logic
  • Introduce new fields to hold the result of the validation

The implementation of the validation function is not shown here. However, we are introducing three new fields to hold the outcome of the validation which are –

  1. ERROR_COLUMN ( The column having the error )
  2. ERROR_VALUE ( The error code or error value )
  3. ERROR_DESCRIPTION ( Description of the error )

Note: This is a very trivial solution that assumes there can be error on only one field. But this logic can easily be expanded to multiple fields by modelling the above three fields as another StructType and then nesting it into the main StructType.

Once you compute the results of the validation, store the results in these three fields, and create a new Row object by merging the new and old columns.

def validateColumns(row: Row): Row = {

var err_col: String = null
var err_val: String = null
var err_desc: String = null
val empId = row.getAs[String]("EMPID")
val ename = row.getAs[String]("ENAME")
val gender = row.getAs[String]("GENDER")

// do checking here and populate (err_col,err_val,err_desc) with values if applicable

Row.merge(row, Row(err_col),Row(err_val),Row(err_desc))
}

Since, our custom function introduces some new fields, we will have to modify the original schema to reflect this. Therefore, add the new fields to the original schema.


// Modify you existing schema with you additional metadata fields
val newSchema = originalSchema.add("ERROR_COLUMN", StringType, true)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .add("ERROR_VALUE", StringType, true) &nbsp;.add("ERROR_DESCRIPTION", StringType, true)

After writing our custom function for validating the desired fields, we have to call it. This function will work on individual Row object and returns an RDD, as we are using the map function. Thereafter we have to convert this RDD ( of Row objects ) back to a DataFrame using the modified schema ( as shown earlier in the earlier step ).

 // Call you custom validation function
  val validateDF = input.map { row =&amp;amp;gt; validateColumns(row) }
 // Reconstruct the DataFrame with additional columns
  val checkedDf = sqlContext.createDataFrame(validateDF, newSchema)

Now all that remains is to divide our data set into two distinct parts :

  1. Records having errors
  2. Records having no error

// Filter out row having errors
val errorDf = checkedDf.filter($"ERROR_COLUMN".isNotNull &amp;amp;amp;amp;&amp;amp;amp;amp; $"ERROR_VALUE".isNotNull &amp;amp;amp;amp;&amp;amp;amp;amp; $"ERROR_DESCRIPTION".isNotNull)

// Filter our row having no errors
val errorFreeDf = checkedDf.filter($"ERROR_COLUMN".isNull &amp;amp;amp;amp;&amp;amp;amp;amp; !$"ERROR_VALUE".isNull &amp;amp;amp;amp;&amp;amp;amp;amp; !$"ERROR_DESCRIPTION".isNull)

One caveat to this approach is that since we are using our own function, the Catalyst optimizer cannot do anything to speed up the operation. Hence do some performance test and release to production only if you are completely satisfied by the results.

This article is just an explanation of a solution I posted on stackoverflow. If you are interested you can have a look here.

 

 

 

 

Gracefully terminate a data pipeline with a Spark job generating no output

Problem

You have a data pipeline in which the last stage is a Sqoop job, which takes the result produced by a Spark job as its input. You want the data pipeline to terminate gracefully when the Spark job does not generate an output.

Solution

Use empty DataFrame.

A common data pipeline in a data science or analytics project is having a series of jobs chained together with the final output being exported to a Data Warehouse or Data Mart.

One such case could be a Spark job writing the result to HDFS and have a Sqoop job exporting the processed result. However the Spark job may or may not produce any output result. In this situation, irrespective of the outcome you want your pipeline to terminate successfully.

One solution to this is to emulate the absence of output with an empty DataFrame.

Case 1 : When schema is not required

This is an elementary approach, and is helpful in cases where the the output data is not partitioned and there is no requirement of merging data.

import org.apache.spark.sql.{ DataFrame, Row }
import com.databricks.spark.avro._

var dataInputPath= "/data/input"
val dataOuputPath= "/data/output"
val input = sqlContext.read.avro(dataPath)

// Get the Schema from the input data
// The schema is not specifically required, however it is
// required to create the empty DataFrame.
val inputSchema= input.schema

// Do processing ...

// If processing does not yield an outcome or an exception is thrown,output
// can be emulated by an empty DataFrame

/*
*Create an empty DataFrame adhereing to some schema
*/
val empty = sqlContext.createDataFrame(sc.emptyRDD[Row], inputSchema)
empty.write.mode("overwrite").avro(dataOuputPath)

Writing an empty DataFrame will result in a _SUCCESS file.

success_without_schema
Without Schema

Case 2 : When you want to store schema

A better approach would be to store the schema of the data along with the _SUCCESS file. This has the advantage that even if you read the empty directory at a later point in time no exception would be thrown.

Merging data across multiple snapshots is quite common, and adopting this approach would result in a seamless operation despite data not being present in some partitions. The only down side perhaps would be that the limit operation could take some time if the data set is unusually big. But this trade off is something one could easily live considering the benefit it confers.


import org.apache.spark.sql.{ DataFrame, Row }
import com.databricks.spark.avro._

var dataInputPath= "/data/input"
val dataOuputPath= "/data/output"
val input = sqlContext.read.avro(dataPath)

// Do processing ...

// If processing does not yield an outcome or an exception is thrown,output
// can be emulated by an empty DataFrame

/*
 * Do a limit 0 on the input data, this will have the effect of extracting
 * the schema from the data set.
 *
 * If you want to add additional columns, use the withColumn with default
 * values, and then use the limit function.
*/
val empty = input.limit(0)
empty.write.mode("overwrite").avro(dataOuputPath)

The output now will contain an additional data file, which contains meta data.

a
With Schema

As a result of writing the _SUCCESS file, the terminating Sqoop job exports no records and the data pipeline finishes gracefully.

 

 

Replicating a Spark DataFrame Row N-times

Problem

You want replicate a Spark Row N-times

Solution

We will use the explode function for this purpose : explode(Column e)

Here the column must be of type array.

The explode function creates a new row for each element in the given array or map column.

Case 1 : When the repetition factor is static

You can add a column with a literal value of an Array with size 100, and then use explode to make each of its elements create its own row; then, just get rid of this “dummy” column:

Here, we want to repeat each column 100 times


import org.apache.spark.sql.functions._
val result = df.withColumn("dummy", explode(array((0 until 100).map(lit): _*))).drop("dummy")

Case 2 : When the repetition is dynamic

Since the repetition parameter is not fixed, for each row we must create a column of type array whose length will be determined by some other column.

Step 1: For each row, create an array the length of which is determined by a column in the Dataframe


def createArrayFromColValue = (colValue: String) =&gt; {
 val index = colValue.toInt
 (1 to index).toArray
 }

val createArrayFromColValueUDF = udf(createArrayFromColValue)

val temp = normalize_run.withColumn("dummy",createArrayFromColValueUDF($"REP"))

Step 2 : Explode the array field


val temp = temp.withColumn("dummy", explode($"dummy"))