How Databricks with AWS and Stitch Data Loader can help deliver sales and marketing analytics

How Databricks with AWS and Stitch Data Loader can help deliver sales and marketing analytics

Sales and marketing analytics is one of the most high-profile focus areas for organizations today, and for good reason – it’s the place to look to boost revenues. Savvy executives are already on board with the impact – in a survey of senior sales and marketing executives, McKinsey finds that “extensive users of customer analytics are significantly more likely to outperform the market.”

For organizations thinking seriously about revenue growth, teams of data scientists and analysts are tasked with transforming raw data into insights.

Data scientists and analytics teams are pouring through ever-increasing volumes of customer data but also identifying more and more sources of data relevant to their efforts.

So for today’s topic, let’s take a look at how you can quickly access the myriad of customer data sources with Talend’s Stitch Data Loader and get that data into an advanced analytics platform, in this case Databricks running on AWS and using a Lambda function to automate a common manual task.

Companies like Databricks, with their Unified Data Analytics Platform, provide powerful tooling for data teams to deliver actionable insights and predictions to the business. Stitch is our web-based ingestion service that allows users to connect to sources in minutes and schedule ongoing ingestion processes to move that data into common cloud data services.

It is very extensible (see Singer IO) and to date has over 100 Sources of which most integrate with Sales and Marketing platforms and 8 of the most widely used Cloud Data Analytic platforms.

The Ingestion Problem

Databricks is a Unified Data Analytics platform used widely by data scientists which provides powerful tooling for data teams to deliver actionable insights and predictions to the business. It is a platform that allows users to store, explore, analyze and process incredible volumes of data quickly.  But first, data needs to get into the environment. This is the very first obstacle anyone needs to tackle before they can begin analyzing their data.

Enter Stitch. By utilizing Stitch and a small AWS Lambda function we can ingest data from many different sales and marketing SaaS applications and land the data in an AWS S3 bucket. From there, the Lambda function automatically moves the data into the Databricks File System.  Now the Databricks user has the data in the foundational layer of the Databricks platform. From this point we can execute some python code to take the data loaded into the directory and begin to model the data or visualize the data until we are happy with the results. Once completed we can then save the Dataframe as a Global Table in the cluster for others to use or put in DELTA format and save as a Delta Lake table.

 

Example (Create table from data in directory):

dataFrame = "/FileStore/tables"

data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(dataFrame)

data.write.mode(“overwrite”).saveAsTable("MOCKDATA")

 

Let’s examine the small bit of python code above. The first line has a variable ‘dataFrame’ that points to the directory that we have the Lambda function loading data into from above. The next line loads all the data in that directory. Now we have told it the format, if it has a header and, if so, if it should infer the schema from the file.

Finally, we tell the cluster to write the data but as a Global table so that it persists beyond our execution of this code block. We also tell it to overwrite the data if we have this set to process new data loads. We give it the name ‘MOCKDATA’ and now anyone can easily go into Databricks and issue a simple SQL statement such as ‘select * from MOCKDATA’ or connect to your ETL tool of choice (Talend) and process the data as any regular result set.

Additionally, we can also create a Delta Lake table very quickly now with the following SQL statement:

‘create table MOCKDATA_DELTA USING DELTA LOCATION '/delta2' AS select * from MOCKDATA’

 

Here we create a Delta Lake table called MOCKDATA_DELTA and store this in a directory in our Databricks Filesystem called delta2. We take the query of the batch table and load its results into the newly created table.

How To

For starters, we will need is an AWS S3 bucket and Stitch Account. I am not going to discuss how to setup S3 in this blog but there are a number of tutorials on the AWS web site on how to achieve this. For the Stitch Account, one just has to go to the Stitch website and sign up for a free account and create a new Integration.

The Lambda Function

Creating a Lambda function is not all that difficult and the code for this example is very short and to the point. However, the purpose of this blog is to provide you with easy and configurable artifacts so you can quickly duplicate and get to work. First thing I did was code all the Databricks FS ReST API’s as there are not that many of them and, to be honest, they are very simple and easy to work with. So, why not?

Let’s discuss the Lambda Function in a bit of detail that I wrote to take the data landed in the S3 bucket from Stitch and stream into Databricks.

this.dbfs = DBFS.getInstance(System.getenv("DB_REGION"), System.getenv("DB_TOKEN"));
String path = System.getenv("DB_PATH");

Our Lambda function looks for three environment variables to execute properly. If you look at the code above, you will see the following

  • DB_REGION: This is the Azure Region that your Databricks is instantiated on
  • DB_TOKEN: This is the token that will Authenticate your requests
  • DB_PATH: This is the directory path that you want the files to be streamed into

 

From the AWS Lambda Function this is what it looks like:

 

context.getLogger().log("Received event: " + event);
List  records = event.getRecords();
try {
    dbfs.mkdirs(path);
} catch (DBFSException e) {
    context.getLogger().log(e.getMessage());
}

In the above lines of code, we are logging the event that has triggered our Lambda function and returned the list of records from this event. Then we attempt to create the full path that was used in the DB_PATH environment variable. If it already exists, then the catch block will log this and if not, the entire path will be created on the Databricks FS.

 

 

for (S3EventNotificationRecord record : records)
{
    S3Object fullObject = null;
    String filename = record.getS3().getObject().getKey();
    AmazonS3 client = new AmazonS3Client();
    S3Object object = client.getObject(new GetObjectRequest(record.getS3().getBucket().getName(),filename));

    context.getLogger().log("FileName : " + filename);
    String xpath = path+"/"+filename;

    try {
        if (!paths.contains(xpath)) {
            context.getLogger().log("Creating Path " + xpath);
            int handle = dbfs.create(path + "/" + filename);
            context.getLogger().log("Handle: " + handle);
            processFile(object.getObjectContent(), context, handle);
            dbfs.close(handle);
            context.getLogger().log("Closing Handle: " + handle);
        } else {
            context.getLogger().log(xpath + " already exists!");
        }
    } catch(DBFSException e)
    {
        context.getLogger().log(e.getMessage());
    }
}

In the preceding block of code, we are looping over every record from the event to get the file name and create the variable xpath. This variable is a combination of both the DB_PATH environment variable as well as the file name that was just written to your S3 bucket.

Next, we make a call to Databricks to create the file and have Databricks return the handle to this file. The handle will be used going forward to write data into the Databricks FS. The processFile function takes the S3 Object and the Databricks File handle and loops through the file until it has written the entire file into Databricks. Finally, we close the Databricks file by passing in the handle and calling the close function.

Now that our data is loading into the Databricks file system the Data Scientist / Data Engineer now has control to do whatever they want.

 

Next steps

If you would like to reproduce the steps from this article you can do some from the follow

 

Good luck and happy loading!

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *