ETL - Qualtrics (Staging Package)
Last Updated 5/24/2021
Overview
Location
Schedule
Details
Master Packages
qx_load_all_surveys
qx_load_all_users
Python Modules
load_surveys.py
load_users.py
qualtrics_helper.py
dw_helper.py
Change Log
Troubleshooting Notes
Overview:
The Qualtrics/COVID ETL is primarily used to pull survey data from the Qualtrics tool and track responses on COVID Cheq, contact tracing, and supply level tracking.
The ETL has two main components. A python package that pulls down data from Qualtrics, and stored procedures that build the tables in dataMarq.
The primary purpose of this job is to populate data needed for COVID tracking and for powering the COVID Cheq tool in qualtrics. Uses include:
- Populate staging table schemas with qualtrics data
- dataMarq procs to build covid related tables
- f_covid_cases
qualtrics.contact_tracing provides self disclosures for covid cases - f_covid_contacts
qualtrics.contract_tracing provides close contacts for covid cases - d_covidcheq_list
qualtrics.covidcheq has the covidcheq responses and is used for delinquency and last response - s_covid_call_tracking
qualtrics.call_tracking is source - s_covid_ppe_supply
qualtrics.ppe_tracking is source - s_covid_units
qualtircs.test_supply is source - Update users who can access dashboards in qualtrics based on a view (ops.v_covidcheq_users)
- Push data to Qualtrics to populate the directory used to send out survey data (ops.d_covidcheq_list)
- f_covid_cases
Schemas in staging that are populated with this data include:
- Base schema: qualtrics
- CDC schema: N/A
Location:
The solution 鈥 ETL-Qualtrics 鈥 contains all the packages for staging Qualtrics data and resides in the ETL-dataMarq folder in the SSIS catalog.
Schedule:
The jobs in this project run as part of multiple SQL Agent jobs (full dataMarq schedule).
ETL 鈥 Qualrics Surveys - Daily 鈥 Everyday every 20 mins
ETL 鈥 BI-Integration 鈥 CovidCheq Full/Inc 鈥 Every 1 hr
Project and Package Details:
The packages in ETL 鈥 Qualtrics work with the Qualtircs API and don鈥檛 operate on our normal one package per table paradigm of other DB related loads. Instead, there are three main packages, one for loading survey data, one for loading/updating users, and one (decommissioned) for loading contact data .
Project Parameters
brand: This is either COVID or MU, to differentiate the two qualtrics instances we want to pull data from. These packages are designed to work with either instance, although we are only pulling from COVID now. Passed to python program as 鈥揵rand argument
dbEnvr: INT, STG, PRD 鈥 which DB should be loaded with the data. Passed to python program as 鈥揹bEvnr argument
dwConnStr: connection to datamarq
dwStgConnStr: connection to datamarq_staging
fileType: only using xml now, but there are a variety of ways survey data can be pulled from qualtrics and this identifies the file format that should be used. Passed to python program as 鈥揻ileType argument
fsEnvr: The file share environment for the landing files. We have both a DEV and PROD landing folder to differentiate when jobs are pulling in different environments and not to overlap source files. Passed to python program as 鈥揻ileEnvr argument
fullRefresh: 1 = Full Refresh, 0 = Incremental
gitEnvr: DEV or PROD. Because these jobs use python packages they python code is stored in our git folder on the BI shared drive. For testing, we are able to point at the python code in DEV or in PROD.
logLevel: DEBUG, INFO, WARNING, ERROR, CRITICAL 鈥 This is passed to the python scripts as an argument to set the log level for the jobs
pythonPath: The path to the python executable 鈥 saved in the D:/ drive on servers but usually in C:/ if executing locally/testing.
refreshDays: How many days to look back to pull data for incremental refreshes
Master Packages
QX_LOAD_ALL_SURVEYS
Overview
This package uses an XML config file to pull data on the surveys that are identified for loading and loops through each survey, executing a python program to load the raw xml file to a 鈥渞aw鈥 staging table. It then executes a stored proc to transform the 鈥渞aw鈥 xml load to the base staging table format.
Package Parameters
endDate: For full refresh jobs, this is the end date passed to the python program
startDate: For full refresh jobs, this is the start date passed to the python program
surveyFile: Path to XML survey config file \\marqnet.mu.edu\depts\ITS\BI\config\DEV\COVID\surveys.xml
Package Variables
delimIndex: FINDSTRING( @[User::idProc] , "|", 1 ) - The XML config file is set up using pipe delimiters. Gets the value of the delimiter so the two values (survey id | stored proc) can be separated.
endDate: The python program to download survey results takes a start date and an end date 鈥 this is set based on the fullRefresh and the refreshDays project parameters
id: The survey id of the survey to be downloaded, extracted from the XML config file after separation. Passed to python proc as the 鈥搒urveyId argument
idProc: The pre-separated string pulled directly from the XML config file (survey id | stored proc)
loadDate: GETDATE() 鈥 the run date used to load the data to the 鈥渞aw鈥 table and identify when data was loaded
proc: The stored proc that loads data from the 鈥渞aw鈥 table to the base staging table. Extracted from the XML config file
procSQL: The sql string that needs to execute the stored proc to load raw data to the base table. This is dynamically generated using the proc variable and the load date
startDate: The python program to download survey results takes a start date and an end date 鈥 this is set based on the fullRefresh and the refreshDays project parameters
XML Survey Config File Example
<Surveys>
<Survey>
<fileName>disclosure</fileName>
<idProc>SV_b9qFHwnlHUSeUER|sp_disclosure_form</idProc>
<id>SV_b9qFHwnlHUSeUER</id>
<proc>sp_disclosure_form</proc>
<rawFileName>COVID-19 Voluntary Disclosure form</rawFileName>
<table>disclosure_form_raw</table>
</Survey>
</Surveys>
Package Flow
Package Snapshots
Raw File Load Procedures
Data from Qualtrics is downloaded as xml file and stored in the BI shared drive in the raw_survey_files folder. All of the files are labeled with a specific file name to the survey (as taken from the XML) into the BI shared drive.
These raw files are XML downloaded directly from Qualtrics and given a date stamp. After download, they python program loads them to a raw staging table. The raw tables all have the same structure with three columns: response_dt, raw_xml, stage_created.
A stored procedure then transforms and flattens the raw_xml into the final table structure. The flow is below:
- Qualtrics File downloaded as XML for given day
- XML file bulk loaded to 鈥渞aw鈥 table (qualtrics.critical_units_raw)
- Stored proc loads raw xml to 鈥渇inal鈥 staging table (qualtrics.critical_units)
QX_LOAD_ALL_USERS
Overview
This package is nothing but an orchestration packages. The only step in the packages is to execute the python program load_users.py from a SQL Agent job.
Package Parameters
None
Package Variables
None
Python Modules
Qualtrics data is accessed through a web service which dataMarq achieves through the use of python modules. These all reside in the ETL-Qualtrics solution 鈥 and therefore in the git repository 鈥 alongside the SSIS packages, which call them. They also require a config file that is stored in the BI shared drive, config folder.
Qualtrics API resource:
load_surveys.py
This program is the main driver of the survey load. It leverage functions in the two 鈥渉elper鈥 modules to do the actual work, but provides the flow logic.
Parameters
dbEnvr 鈥 INT, STG, PRD
brand 鈥 The qualtrics brand being targeted, either MU or COVID
surveyId 鈥 the unique Qualtrics Id of the survey being downloaded
startDate 鈥 start date of responses to be downloaded
endDate 鈥 end date of responses to be downloaded
filetype 鈥 the file type to be downloaded from Qualtrics for survey results
fileEnvr 鈥 the high level path for landing the results (DEV, PROD)
logLevel 鈥 the level to log to the log folder for the job
Quasi code flow
- Set the config file path based on the dbEnvr
- Set loadDate variable as current date
- Get the brand apiToken from the config file
- Get the survey dict from the config file for given survey Id
- Run the load_survey_results function from the qualtrics_helper.py module to load survey result to the data warehouse
load_users.py
This program is the main driver to add users to users to Qualtrics. Users are people who can access the dashboards that have been created, not people who can take surveys. It leverage functions in the two 鈥渉elper鈥 modules to do the actual work, but provides the flow logic.
Parameters
dbEnvr 鈥 INT, STG, PRD
brand 鈥 The qualtrics brand being targeted, either MU or COVID
logLevel 鈥 the level to log to the log folder for the job
Quasi code flow
- Set the config file path based on the dbEnvr
- Set loadDate variable as current date
- Get the brand apiToken from the config file
- Run the get_users_list to get the Qualtrics users list
- Run insert_users_dw function to insert Qualtrics users into DW staging table
- Run update_qx_users to compare staged users list with view and make additions/deletions in Qualtircs
load_contacts.py (DECOMMISSIONED AS RUNS TOO SLOW)
This program is the main driver to pull down contact info from Qualtrics. The API requires looping through each contact individually and this takes too long so this is not currently running.
Parameters
dbEnvr 鈥 INT, STG, PRD
brand 鈥 The qualtrics brand being targeted, either MU or COVID
logLevel 鈥 the level to log to the log folder for the job
Quasi code flow
- Set the config file path based on the dbEnvr
- Set loadDate variable as current date
- Get the brand apiToken from the config file
- Run the get_contacts_list to get the Qualtrics contacts list and all details on contacts
- Run insert_contacts_dw function to insert Qualtrics contacts into DW staging table
qualtrics_helper.py
All of the functions that are used to interact with the qualtrics APIs and download data
get_response
Parameters:
url 鈥 url for web service response
headers 鈥 headers to be passed to get call
Function: Generic function to get a json response from a url payload. Customized for the specific format of the response json
Output: responseJson - Url response in JSON format
get_user
Parameters:
apiToken 鈥 Qualtrics api token
userId 鈥 specific Id of user to get data on
Function: Takes a userid and returns a Json output of the user data from Qualtrics
Output: userJson - Url response from the user request endpoint in JSON format
get_users_list
Parameters:
apiToken 鈥 Qualtrics api token
Function: Takes the api token and returns a list of all the users with basic info, one user per list item
Output: usersList - python List of users with basic info like username, email, id, etc.
get_contacts_list (not currently used in production 鈥 too slow)
Parameters:
apiToken 鈥 Qualtrics api token
directoryId 鈥 Qualtrics Id for the a specific directory
Function: Takes the directory Id and builds a dictionary with all members of the directory with the contactId as the key. Then loops through the dictionary and calls get_contact on each id to build out information on each contact into a list. The list is intended to be loaded into a staging table in the data warehouse
Output: contactList - python List of directory contacts with embedded data and other stats available through Qualtrics API
get_contact
Parameters:
apiToken 鈥 Qualtrics api token
directoryId 鈥 Qualtrics Id for the a specific directory
contactId 鈥 Qualtrics Id for a directory contact
Function: Takes a contactId and returns a Json output of the user data from Qualtrics
Output: userJson - Url response from the contact request endpoint in JSON format
insert_users_dw
Parameters:
dbEnvr 鈥 INT, STG, PRD
usersList 鈥 python list of user data
USERS_TBL 鈥 the table name for the users table taken from the config file
USERS_FIELDS 鈥 the field list from the users table taken from the config file
Function: Calls functions from the dw_helper module to insert the output of the get_users_list into the users table in datamarq staging.
Output: None
insert_contacts_dw
Parameters:
dbEnvr 鈥 INT, STG, PRD
usersList鈥 python list of user data
CONTACTS_TBL 鈥 the table name for the contacts table taken from the config file
CONTACTS_FIELDS 鈥 the field list from the contacts table taken from the config file
Function: Calls functions from the dw_helper module to insert the output of the get_contacts_list into the contacts table in datamarq staging.
Output: None
create_user
Parameters:
apiToken 鈥 Qualtrics api token
user 鈥 JSON payload of data needed to create a user in Qualtrics
Function: Posts data to qualtrics using the users API to create a new user.
Output: None
delete_user
Parameters:
apiToken 鈥 Qualtrics api token
userId 鈥 Quatlrics ID for individual user
Function: Sends a delete request to qualtrics using the users API to delete a user.
Output: None
update_qx_users
Parameters:
dbEvnr 鈥 INT, STG, PRD
apiToken 鈥 Qualtrics api token
Function: This function calls a dw_helper function (get_user_compare) that gets a list of qualtrics users and users who need to be added/deleted. It puts these updates into a changes list and then loops through this list calling either create_user or delete_user depending on what needs to happen.
Output: Number added, Number deleted, Number with role changes
get_surveys (not currently used 鈥 survey list is derived from config file)
Parameters:
apiToken 鈥 Qualtrics api token
Function: This function calls the surveys endpoint in Qualtrics to get a list of all the surveys.
Output: surveysList 鈥 a list containing id, name, and updated dates
get_survey_results
Parameters:
apiToken 鈥 Qualtrics api token
survey 鈥 a python dictionary stored in the config file for each survey with ID as the key, the embeddeddata element in this dict are passed in the headers to the survey results download
startDate 鈥 the first date of results to retrieve
endDate 鈥 the last date of results to retrieve
fileEnvr 鈥 the BI file share location to store the downloaded files (DEV or PROD)
filetype 鈥 This is passed to the qualtrics endpoint to indicate file format (xml, cvs, etc.)
Function: This function takes a survey dict from the config file and a start and end date and creates a survey request and then gets the file from the survey request (two part sequence in Qualtrics to download results). Note that this can run for multiple days, but is designed to only process one day at a time and store that day鈥檚 results in the raw survey files folder with the date appended to clearly indicate which days survey results are included in the file
Output: newfilePath 鈥 the path to the downloaded XML files
load_survey_results
Parameters:
dbEvnr 鈥 INT, STG, PRD
apiToken 鈥 Qualtrics api token
survey 鈥 a python dictionary stored in the config file for each survey with ID as the key, the embeddeddata element in this dict are passed in the headers to the survey results download
startDate 鈥 the first date of results to retrieve
endDate 鈥 the last date of results to retrieve
fileEnvr 鈥 the BI file share location to store the downloaded files (DEV or PROD)
filetype 鈥 This is passed to the qualtrics endpoint to indicate file format (xml, cvs, etc.)
Function: This function is a wrapper function that wraps the get_surey_results function. It takes a start/end date and creates a list of dates in between that start and end date, then downloads survey results day by day for each one of those days by calling the get_survey_results function. After the file is downloaded it take the file path returned and calls the dw_helper function bulk_load_xml which loads the xml into the 鈥渞aw鈥 datamarq staging table
Output: None
get_qfs_files
Parameters:
apiToken 鈥 Qualtrics api token
automationId 鈥 The id for an automation
Function: qfs file are the Qualtircs file system files and where we push files to be loaded into qualtrics to add people and/or embedded data to the contact directory. The contact directory is the main list of survey recepients and we heavily leverage the embedded data there for survey logic in COVID Cheq. This function gets a list of files that are currently out in qfs to be processed 鈥 mostly so we can delete them if we need to clean them out for some reason.
Output: JSON output of files
delete_all_qfs_files
Parameters:
apiToken 鈥 Qualtrics api token
automationId 鈥 The id for an automation
Function: Deletes all files outstanding on the qualtrics file service
Output: None
delete_qfs_files
Parameters:
apiToken 鈥 Qualtrics api token
automationId 鈥 The id for an automation
fileid 鈥 the specific file id
Function: Deletes a specific file from qfs
Output: None
dw_helper.py
All of the functions that are used to interact with the datamarq database
insert_rows
Parameters:
connection 鈥 the python connection object to datamarq
table 鈥 the table name for insert
fieldList 鈥 the list of fields to be inserted
valueLIst 鈥 the list of values to be inserted
Function: Generic function that inserts values into a given table and fields
Output: s 鈥 number of successful rows inserted, f 鈥 number of failed rows caught in exception
get_user_compare
Parameters:
connection 鈥 the python connection object to datamarq
Function: Executes a query in datamarq to get the userCompareList. The query is specified within the function but essentially pulls from the view ops.v_covidcheq_users
Output: userCompareList 鈥 a list of users
bulk_load_xml
Parameters:
connection 鈥 the python connection object to datamarq
table 鈥 the staging table for loading
responseDate 鈥 the date of responses to be loaded, responses are loaded one day at a time
xmlPath 鈥 the path to the raw XML file
Function: Executes two queries. First it deletes the given response date data from the staging table. Second it uses an OPENROWSET command to load the xml as a blob into the 鈥渞aw鈥 table. The raw table has two columns (response_dt and raw_xml)
Output: None
exec_survey_proc
Parameters:
connection 鈥 the python connection object to datamarq
survey 鈥 a python dict taken from the config file that contains detailed info on specific surveys
loadDate 鈥 the exec date of the time this procedure is run 鈥 passed from the SSIS packages variable
Function: Executes the stored procedure specified in the survey dict in the config file. This takes data from the raw xml table and flattens it into the base staging table.
Output: None
update_hash
Parameters:
connection 鈥 the python connection object to datamarq
table 鈥 the table name
fieldList鈥揕ist of fields in the table
Function: takes a field list and builds a hash statement to get a unique row hash then runs an update statement
Output: None
cursor_list
Parameters:
cursor 鈥 a python sql cursor object (select statement usually)
name 鈥 the name of a tuple for each output row of the cursor
fields鈥搇ist of fields in the output row
Function: Creates a result set (python list) from a cursor execution as a each row a named tupled with defined fields
Output: resultSet 鈥 python list of named tuples as each row
update_audit
Parameters:
connection 鈥 the python connection object to datamarq
table 鈥 the table name
stageSource 鈥 The string to be put into the stage_source field in the staging table
Function: Runs commands to update the audit fields in a staging table with the default getdate() for the modified and created date
Output: None
update_dt
Parameters:
connection 鈥 the python connection object to datamarq
table 鈥 the table name
dt_field 鈥 The date to be entered into a datefield
Function: Runs commands to update the given date field to the given date
Output: None
alter_index
Parameters:
connection 鈥 the python connection object to datamarq
table 鈥 the table name
method 鈥 The method to be run on the index (disable, rebuild)
Function: Runs commands to alter all indexes on the table for the given method
Output: None
trunc_table
Parameters:
connection 鈥 the python connection object to datamarq
table 鈥 the table name
Function: Runs command to truncate the given table
Output: None
Change Log:
Date | Developer | Change Record |
5/24/2021 | N/A | Initial Documentation |
Troubleshooting Notes:
-
Occasional Failures Due to Qualtrics API
Calls to the Qualtrics web service will occasionally fail. This is not due to a problem with the code, but a limiting of calls as we are running these quite frequently. The only issue would be if the job fails repeatedly. This has not happened with the stage load, only with the downstream procs -
Changes to old data in a survey (old would be more than the 2 days we refresh)
This mostly happens with the contact tracing survey where the Medical Clinic enters in a close contact 鈥 or someone enters a self disclosure 鈥 and then finds out it shouldn鈥檛 have been. When this happens within two days of the response it is not a problem as the incremental refresh picks up the last two days and will alter the response. If it is older than two days the remedy is to DELETE the staging row with the bad response. This is preferrable to running the survey load job with different start/end dates which can be