ETL - Qualtrics (Staging Package)

Last Updated 5/24/2021

Overview
Location
Schedule
Details
 Master Packages
    qx_load_all_surveys
    qx_load_all_users
 Python Modules
    load_surveys.py
    load_users.py
    qualtrics_helper.py
    
dw_helper.py
Change Log
Troubleshooting Notes

Overview:

The Qualtrics/COVID ETL is primarily used to pull survey data from the Qualtrics tool and track responses on COVID Cheq, contact tracing, and supply level tracking. 

The ETL has two main components.  A python package that pulls down data from Qualtrics, and stored procedures that build the tables in dataMarq.

The primary purpose of this job is to populate data needed for COVID tracking and for powering the COVID Cheq tool in qualtrics.  Uses include:

  1. Populate staging table schemas with qualtrics data
  2. dataMarq procs to build covid related tables
    1. f_covid_cases
      qualtrics.contact_tracing provides self disclosures for covid cases
    2. f_covid_contacts
      qualtrics.contract_tracing provides close contacts for covid cases
    3. d_covidcheq_list
      qualtrics.covidcheq has the covidcheq responses and is used for delinquency and last response
    4. s_covid_call_tracking
      qualtrics.call_tracking is source
    5. s_covid_ppe_supply
      qualtrics.ppe_tracking is source
    6. s_covid_units
      qualtircs.test_supply is source
    7. Update users who can access dashboards in qualtrics based on a view (ops.v_covidcheq_users)
    8. Push data to Qualtrics to populate the directory used to send out survey data (ops.d_covidcheq_list)

 Schemas in staging that are populated with this data include:

  • Base schema: qualtrics
  • CDC schema: N/A

Location:

The solution 鈥 ETL-Qualtrics 鈥 contains all the packages for staging Qualtrics data and resides in the ETL-dataMarq folder in the SSIS catalog.

Schedule:

The jobs in this project run as part of multiple SQL Agent jobs (full dataMarq schedule).  

ETL 鈥 Qualrics Surveys - Daily 鈥 Everyday every 20 mins

ETL 鈥 BI-Integration 鈥 CovidCheq Full/Inc 鈥 Every 1 hr

Project and Package Details:

The packages in ETL 鈥 Qualtrics work with the Qualtircs API and don鈥檛 operate on our normal one package per table paradigm of other DB related loads. Instead, there are three main packages, one for loading survey data, one for loading/updating users, and one (decommissioned) for loading contact data .

Project Parameters

brand: This is either COVID or MU, to differentiate the two qualtrics instances we want to pull data from. These packages are designed to work with either instance, although we are only pulling from COVID now. Passed to python program as 鈥揵rand argument

dbEnvr: INT, STG, PRD 鈥 which DB should be loaded with the data. Passed to python program as 鈥揹bEvnr argument

dwConnStr: connection to datamarq

dwStgConnStr: connection to datamarq_staging

fileType: only using xml now, but there are a variety of ways survey data can be pulled from qualtrics and this identifies the file format that should be used. Passed to python program as 鈥揻ileType argument

fsEnvr: The file share environment for the landing files. We have both a DEV and PROD landing folder to differentiate when jobs are pulling in different environments and not to overlap source files. Passed to python program as 鈥揻ileEnvr argument

fullRefresh: 1 = Full Refresh, 0 = Incremental

gitEnvr: DEV or PROD. Because these jobs use python packages they python code is stored in our git folder on the BI shared drive.  For testing, we are able to point at the python code in DEV or in PROD.

logLevel: DEBUG, INFO, WARNING, ERROR, CRITICAL 鈥 This is passed to the python scripts as an argument to set the log level for the jobs

pythonPath: The path to the python executable 鈥 saved in the D:/ drive on servers but usually in C:/ if executing locally/testing.

refreshDays: How many days to look back to pull data for incremental refreshes

Master Packages

QX_LOAD_ALL_SURVEYS

Overview

This package uses an XML config file to pull data on the surveys that are identified for loading and loops through each survey, executing a python program to load the raw xml file to a 鈥渞aw鈥 staging table. It then executes a stored proc to transform the 鈥渞aw鈥 xml load to the base staging table format.

Package Parameters

endDate: For full refresh jobs, this is the end date passed to the python program

startDate: For full refresh jobs, this is the start date passed to the python program

surveyFile: Path to XML survey config file \\marqnet.mu.edu\depts\ITS\BI\config\DEV\COVID\surveys.xml

Package Variables

delimIndex: FINDSTRING( @[User::idProc] , "|", 1 ) - The XML config file is set up using pipe delimiters. Gets the value of the delimiter so the two values (survey id | stored proc) can be separated.

endDate: The python program to download survey results takes a start date and an end date 鈥 this is set based on the fullRefresh and the refreshDays project parameters

id: The survey id of the survey to be downloaded, extracted from the XML config file after separation. Passed to python proc as the 鈥搒urveyId argument

idProc: The pre-separated string pulled directly from the XML config file (survey id | stored proc)

loadDate: GETDATE() 鈥 the run date used to load the data to the 鈥渞aw鈥 table and identify when data was loaded

proc: The stored proc that loads data from the 鈥渞aw鈥 table to the base staging table. Extracted from the XML config file

procSQL: The sql string that needs to execute the stored proc to load raw data to the base table. This is dynamically generated using the proc variable and the load date

startDate: The python program to download survey results takes a start date and an end date 鈥 this is set based on the fullRefresh and the refreshDays project parameters

XML Survey Config File Example

<Surveys>

                <Survey>

                   <fileName>disclosure</fileName>

                   <idProc>SV_b9qFHwnlHUSeUER|sp_disclosure_form</idProc>

                   <id>SV_b9qFHwnlHUSeUER</id>

                   <proc>sp_disclosure_form</proc>

                   <rawFileName>COVID-19 Voluntary Disclosure form</rawFileName>

                   <table>disclosure_form_raw</table>

                </Survey>

</Surveys>

Package Flow

  1. Loop through the XML and map idProc line to the variable idProc
  2. Find the delimiter separating id and proc
  3. Set the id and proc variables
  4. Set the sql proc variable
  5. Execute the python program to load the survey data to the 鈥渞aw鈥 table 鈥 this command is created dynamically as an expression in the execute process task. It is essentially calls the load_survey.py program (see details below) passing the required parameters as set by the project/package parameters and variables.  Example below.  Note the important placement of quotes around each parameter

    "\\marqnet.mu.edu\depts\ITS\BI\git\DEV\DW\ETL-Qualtrics\qualtrics_load\load_surveys.py" "--dbEnvr" "INT"   "--brand" "COVID"   "--surveyId" "SV_b9qFHwnlHUSeUER"   "--startDate" "5/11/2021 9:33:31 AM"  "--endDate"  "5/13/2021 9:33:31 AM"  "--fileType"  "xml"  "--fileEnvr"  "DEV" "--log" "DEBUG"

  6. Execute survey stored procedure to load 鈥渞aw鈥 data to base staging table
  7. Repeat until all surveys have been run

Package Snapshots

snapshot

Raw File Load Procedures

Data from Qualtrics is downloaded as xml file and stored in the BI shared drive in the raw_survey_files folder.  All of the files are labeled with a specific file name to the survey (as taken from the XML) into the BI shared drive.

These raw files are XML downloaded directly from Qualtrics and given a date stamp.  After download, they python program loads them to a raw staging table.  The raw tables all have the same structure with three columns: response_dt, raw_xml, stage_created.

A stored procedure then transforms and flattens the raw_xml into the final table structure.  The flow is below:

  1. Qualtrics File downloaded as XML for given day
  2. XML file bulk loaded to 鈥渞aw鈥 table (qualtrics.critical_units_raw)
  3. Stored proc loads raw xml to 鈥渇inal鈥 staging table (qualtrics.critical_units)

 

QX_LOAD_ALL_USERS

Overview

This package is nothing but an orchestration packages.  The only step in the packages is to execute the python program load_users.py from a SQL Agent job.

Package Parameters

None

Package Variables

None

Python Modules

Qualtrics data is accessed through a web service which dataMarq achieves through the use of python modules.  These all reside in the ETL-Qualtrics solution 鈥 and therefore in the git repository 鈥 alongside the SSIS packages, which call them.  They also require a config file that is stored in the BI shared drive, config folder. 

Qualtrics API resource:

load_surveys.py

 This program is the main driver of the survey load.  It leverage functions in the two 鈥渉elper鈥 modules to do the actual work, but provides the flow logic.

 Parameters

dbEnvr 鈥 INT, STG, PRD

brand 鈥 The qualtrics brand being targeted, either MU or COVID

surveyId 鈥 the unique Qualtrics Id of the survey being downloaded

startDate 鈥 start date of responses to be downloaded

endDate 鈥 end date of responses to be downloaded

filetype 鈥 the file type to be downloaded from Qualtrics for survey results

fileEnvr 鈥 the high level path for landing the results (DEV, PROD)

logLevel 鈥 the level to log to the log folder for the job

 Quasi code flow

  1. Set the config file path based on the dbEnvr
  2. Set loadDate variable as current date
  3. Get the brand apiToken from the config file
  4. Get the survey dict from the config file for given survey Id
  5. Run the load_survey_results function from the qualtrics_helper.py module to load survey result to the data warehouse

load_users.py

 This program is the main driver to add users to users to Qualtrics. Users are people who can access the dashboards that have been created, not people who can take surveys.  It leverage functions in the two 鈥渉elper鈥 modules to do the actual work, but provides the flow logic.

 Parameters

dbEnvr 鈥 INT, STG, PRD

brand 鈥 The qualtrics brand being targeted, either MU or COVID

logLevel 鈥 the level to log to the log folder for the job

 Quasi code flow

  1. Set the config file path based on the dbEnvr
  2. Set loadDate variable as current date
  3. Get the brand apiToken from the config file
  4. Run the get_users_list to get the Qualtrics users list
  5. Run insert_users_dw function to insert Qualtrics users into DW staging table
  6. Run update_qx_users to compare staged users list with view and make additions/deletions in Qualtircs

load_contacts.py (DECOMMISSIONED AS RUNS TOO SLOW)

 This program is the main driver to pull down contact info from Qualtrics.  The API requires looping through each contact individually and this takes too long so this is not currently running. 

Parameters

dbEnvr 鈥 INT, STG, PRD

brand 鈥 The qualtrics brand being targeted, either MU or COVID

logLevel 鈥 the level to log to the log folder for the job

 Quasi code flow

  1. Set the config file path based on the dbEnvr
  2. Set loadDate variable as current date
  3. Get the brand apiToken from the config file
  4. Run the get_contacts_list to get the Qualtrics contacts list and all details on contacts
  5. Run insert_contacts_dw function to insert Qualtrics contacts into DW staging table

qualtrics_helper.py

 All of the functions that are used to interact with the qualtrics APIs and download data

get_response

Parameters:

url 鈥 url for web service response

headers 鈥 headers to be passed to get call

Function: Generic function to get a json response from a url payload. Customized for the specific format of the response json

Output:  responseJson -  Url response in JSON format

 get_user

Parameters:

apiToken 鈥 Qualtrics api token

userId 鈥 specific Id of user to get data on

Function: Takes a userid and returns a Json output of the user data from Qualtrics

Output:  userJson -  Url response from the user request endpoint in JSON format

 get_users_list

Parameters:

apiToken 鈥 Qualtrics api token

Function: Takes the api token and returns a list of all the users with basic info, one user per list item

Output:  usersList -  python List of users with basic info like username, email, id, etc.

get_contacts_list (not currently used in production 鈥 too slow)

Parameters:

apiToken 鈥 Qualtrics api token

directoryId 鈥 Qualtrics Id for the a specific directory

Function: Takes the directory Id and builds a dictionary with all members of the directory with the contactId as the key.  Then loops through the dictionary and calls get_contact on each id to build out information on each contact into a list.   The list is intended to be loaded into a staging table in the data warehouse

Output:  contactList -  python List of directory contacts with embedded data and other stats available through Qualtrics API 

get_contact

Parameters:

apiToken 鈥 Qualtrics api token

directoryId 鈥 Qualtrics Id for the a specific directory

contactId 鈥 Qualtrics Id for a directory contact

Function: Takes a contactId and returns a Json output of the user data from Qualtrics

Output:  userJson -  Url response from the contact request endpoint in JSON format 

insert_users_dw

Parameters:

dbEnvr 鈥 INT, STG, PRD

usersList 鈥 python list of user data

USERS_TBL 鈥 the table name for the users table taken from the config file

USERS_FIELDS 鈥 the field list from the users table taken from the config file

Function: Calls functions from the dw_helper module to insert the output of the get_users_list into the users table in datamarq staging.

Output:  None

insert_contacts_dw

Parameters:

dbEnvr 鈥 INT, STG, PRD

usersList鈥 python list of user data

CONTACTS_TBL 鈥 the table name for the contacts table taken from the config file

CONTACTS_FIELDS 鈥 the field list from the contacts table taken from the config file

Function: Calls functions from the dw_helper module to insert the output of the get_contacts_list into the contacts table in datamarq staging.

Output:  None

create_user

Parameters:

apiToken 鈥 Qualtrics api token

user 鈥 JSON payload of data needed to create a user in Qualtrics

Function: Posts data to qualtrics using the users API to create a new user.  

Output:  None

delete_user

Parameters:

apiToken 鈥 Qualtrics api token

userId 鈥 Quatlrics ID for individual user

Function: Sends a delete request to qualtrics using the users API to delete a user. 

Output:  None

update_qx_users

Parameters:

dbEvnr 鈥 INT, STG, PRD

apiToken 鈥 Qualtrics api token

Function: This function calls a dw_helper function (get_user_compare) that gets a list of qualtrics users and users who need to be added/deleted. It puts these updates into a changes list and then loops through this list calling either create_user or delete_user depending on what needs to happen.

Output:  Number added, Number deleted, Number with role changes

get_surveys (not currently used 鈥 survey list is derived from config file)

Parameters:

apiToken 鈥 Qualtrics api token

Function: This function calls the surveys endpoint in Qualtrics to get a list of all the surveys.

Output:  surveysList 鈥 a list containing id, name, and updated dates

get_survey_results

Parameters:

apiToken 鈥 Qualtrics api token

survey 鈥 a python dictionary stored in the config file for each survey with ID as the key, the embeddeddata element in this dict are passed in the headers to the survey results download

startDate 鈥 the first date of results to retrieve

endDate 鈥 the last date of results to retrieve

fileEnvr 鈥 the BI file share location to store the downloaded files (DEV or PROD)

filetype 鈥 This is passed to the qualtrics endpoint to indicate file format (xml, cvs, etc.)

Function: This function takes a survey dict from the config file and a start and end date and creates a survey request and then gets the file from the survey request (two part sequence in Qualtrics to download results).  Note that this can run for multiple days, but is designed to only process one day at a time and store that day鈥檚 results in the raw survey files folder with the date appended to clearly indicate which days survey results are included in the file

Output:  newfilePath 鈥 the path to the downloaded XML files

load_survey_results

Parameters:

dbEvnr 鈥 INT, STG, PRD

apiToken 鈥 Qualtrics api token

survey 鈥 a python dictionary stored in the config file for each survey with ID as the key, the embeddeddata element in this dict are passed in the headers to the survey results download

startDate 鈥 the first date of results to retrieve

endDate 鈥 the last date of results to retrieve

fileEnvr 鈥 the BI file share location to store the downloaded files (DEV or PROD)

filetype 鈥 This is passed to the qualtrics endpoint to indicate file format (xml, cvs, etc.)

Function: This function is a wrapper function that wraps the get_surey_results function.  It takes a start/end date and creates a list of dates in between that start and end date, then downloads survey results day by day for each one of those days by calling the get_survey_results function. After the file is downloaded it take the file path returned and calls the dw_helper function bulk_load_xml which loads the xml into the 鈥渞aw鈥 datamarq staging table

Output:  None

get_qfs_files

Parameters:

apiToken 鈥 Qualtrics api token

automationId 鈥 The id for an automation

Function: qfs file are the Qualtircs file system files and where we push files to be loaded into qualtrics to add people and/or embedded data to the contact directory.  The contact directory is the main list of survey recepients and we heavily leverage the embedded data there for survey logic in COVID Cheq.  This function gets a list of files that are currently out in qfs to be processed 鈥 mostly so we can delete them if we need to clean them out for some reason.

Output:  JSON output of files
 

delete_all_qfs_files

Parameters:

apiToken 鈥 Qualtrics api token

automationId 鈥 The id for an automation

Function: Deletes all files outstanding on the qualtrics file service

Output:  None

delete_qfs_files

Parameters:

apiToken 鈥 Qualtrics api token

automationId 鈥 The id for an automation

fileid 鈥 the specific file id

Function: Deletes a specific file from qfs

Output:  None

 

dw_helper.py

All of the functions that are used to interact with the datamarq database

insert_rows

Parameters:

connection 鈥 the python connection object to datamarq

table 鈥 the table name for insert

fieldList 鈥 the list of fields to be inserted

valueLIst 鈥 the list of values to be inserted

Function: Generic function that inserts values into a given table and fields

Output:  s 鈥 number of successful rows inserted, f 鈥 number of failed rows caught in exception

get_user_compare

Parameters:

connection 鈥 the python connection object to datamarq

Function: Executes a query in datamarq to get the userCompareList. The query is specified within the function but essentially pulls from the view ops.v_covidcheq_users

Output:  userCompareList 鈥 a list of users

bulk_load_xml

Parameters:

connection 鈥 the python connection object to datamarq

table 鈥 the staging table for loading

responseDate 鈥 the date of responses to be loaded, responses are loaded one day at a time

xmlPath 鈥 the path to the raw XML file

Function: Executes two queries. First it deletes the given response date data from the staging table. Second it uses an OPENROWSET command to load the xml as a blob into the 鈥渞aw鈥 table.   The raw table has two columns (response_dt and raw_xml)

Output:  None

exec_survey_proc

Parameters:

connection 鈥 the python connection object to datamarq

survey 鈥 a python dict taken from the config file that contains detailed info on specific surveys

loadDate 鈥 the exec date of the time this procedure is run 鈥 passed from the SSIS packages variable

Function: Executes the stored procedure specified in the survey dict in the config file.  This takes data from the raw xml table and flattens it into the base staging table.  

Output:  None

update_hash

Parameters:

connection 鈥 the python connection object to datamarq

table 鈥 the table name

fieldList鈥揕ist of fields in the table

Function: takes a field list and builds a hash statement to get a unique row hash then runs an update statement

Output:  None

cursor_list

Parameters:

cursor 鈥 a python sql cursor object (select statement usually)

name 鈥 the name of a tuple for each output row of the cursor

fields鈥搇ist of fields in the output row

Function: Creates a result set (python list) from a cursor execution as a each row a named tupled with defined fields

Output:  resultSet 鈥 python list of named tuples as each row
 

update_audit

Parameters:

connection 鈥 the python connection object to datamarq

table 鈥 the table name

stageSource 鈥 The string to be put into the stage_source field in the staging table

Function: Runs commands to update the audit fields in a staging table with the default getdate() for the modified and created date

Output:  None

update_dt

Parameters:

connection 鈥 the python connection object to datamarq

table 鈥 the table name

dt_field 鈥 The date to be entered into a datefield

Function: Runs commands to update the given date field to the given date

Output:  None

alter_index

Parameters:

connection 鈥 the python connection object to datamarq

table 鈥 the table name

method 鈥 The method to be run on the index (disable, rebuild)

Function: Runs commands to alter all indexes on the table for the given method

Output:  None

trunc_table

Parameters:

connection 鈥 the python connection object to datamarq

table 鈥 the table name

Function: Runs command to truncate the given table

Output:  None

  

Change Log:

Date Developer Change Record
5/24/2021 N/A Initial Documentation


Troubleshooting Notes:

  1. Occasional Failures Due to Qualtrics API

    Calls to the Qualtrics web service will occasionally fail. This is not due to a problem with the code, but a limiting of calls as we are running these quite frequently. The only issue would be if the job fails repeatedly.  This has not happened with the stage load, only with the downstream procs
  2. Changes to old data in a survey (old would be more than the 2 days we refresh)

    This mostly happens with the contact tracing survey where the Medical Clinic enters in a close contact 鈥 or someone enters a self disclosure 鈥 and then finds out it shouldn鈥檛 have been. When this happens within two days of the response it is not a problem as the incremental refresh picks up the last two days and will alter the response. If it is older than two days the remedy is to DELETE the staging row with the bad response. This is preferrable to running the survey load job with different start/end dates which can be