CALCULATION ENGINES BEYOND EXCEL: PART 6

In Part 5 we introduced an object-oriented approach as a solution to potential scalability issues that may emerge as functionality needs to expand. Another area that may be a good candidate for refinement is rate storage and retrieval. Our current solutions leverage csv rate files, but what if we expected rates to live in a database instead of flat files?

In this chapter we will add a simple SQLite database for rates to our project, load rates into it, and update our object-oriented solution to leverage the database. We will use SQLite primarily because it comes as part of Python with the sqlite3 package and does not require any additional IT setup.

By the end of this part, your code should look like this.

RATE DATABASE

At this point, your project should look something like the project here. Your local repository may also include a venv directory and a __pycache__ but those are ignored by Git so they do not show up in the stored repository.

So far we have been reading csv files to retrieve rates for our illustration engine but it might be more practical to expect rates to live in a database of some kind. We will add support for a SQLite database where we can add and retrieve rates as needed. The rates will be stored practically the same as they exist in the csv files for simplicity.

Before we get started, we need to update our .gitignore file so we do not add the database we are about to create to version control. Update your .gitignore so it looks like the below (note: the first three lines should already exist in your file):

.gitignore
venv/* *pyc __pycache__/* *db

When we create our SQLite database we will give it a .db extension which is why we add this to our .gitignore. Now, in your objects.py file add an import for the sqlite3 package and add the following very rough class outline after Insured and before BaseProduct:

objects.py
... import sqlite3 ... class SQLiteRateDatabase: def __init__(self, connection_path: str): self._connection = sqlite3.connect(connection_path) def import_csv(self, file_path: str): pass def read_table_for_insured(self, insured: Insured, table_name: str) -> list[float]: pass ...

When we create an instance of our new class SQLiteRateDatabase we require a path to the database and automatically connect to that database and store that connection in a non-public attribute _connection. If the database does not exist, one will be created where specified. We intentionally use a non-public attribute for storing the connection because we do not want to encourage users to start injecting their own SQL and we want to provide methods for users that we control.

Right now we expect to provide two methods with the new class: a way to import data into the database and a way to retrieve it. We made an educated guess on what the method signatures might look like for now (e.g., seems safe to assume that if we want to import a csv file we probably need to provide a path to the file if not the file itself) but they may change as we move forward implementing the necessary logic. Let’s start with getting data into the database.

IMPORT_CSV

Before we can use our database to retrieve rate information, we need to populate it. We will make the following assumptions as part of our design:

  • Importing a file will create a new table where the table name will be the same as the file name without the extension

  • If the table already exists it will be replaced without warning

We will leverage the csv module like we have before and we will also use the os module to help generate the table names from the file names. Let’s start with the following outline for our code:

objects.py
... import csv import os ... class SQLiteRateDatabase: ... def import_csv(self, file_path: str): # convert file name to table name # remove existing table if applicable # create table structure # add records to table pass ...

We can use the os module to convert the file path to a table name with a few steps, and removing an existing table in a SQLite database is a simple DROP TABLE command.

objects.py
... import csv import os ... class SQLiteRateDatabase: ... def import_csv(self, file_path: str): # convert file name to table name file_name = os.path.split(file_path)[-1] file_ext = os.path.splitext(file_path)[-1] table_name = file_name.removesuffix(file_ext) table_name = table_name.replace(" ", "_") # remove existing table if applicable sql = f"DROP TABLE IF EXISTS [{table_name}]" self._connection.execute(sql) # create table structure # add records to table ...

First we grab the file name from the path, determine the file extension, strip the extension from the file name, then replace any spaces with underscores. Once we have the expected table name we can attempt to drop it from the database. Instead of checking whether the table already exists we use a DROP command that will not error out if the table does not exist since we do not care if the table does not already exist. Using an f-string allows use to incorporate Python variables in strings pretty seamlessly which is what is happening on the line starting with “sql = “.

At this point we have some choices to make. We know our rate files come in a few different format so we could have helper methods that take care of each format or we could attempt to build something a bit more general up front. We’ll go with the latter for now and our general approach will be:

  • open the file

  • iterate over the field names in the file to determine how the table in SQLite will look

  • compile our sql statement to create the necessary SQL table

  • create the SQL table

  • transfer data from the file to the database

Update your method to reflect these steps:

objects.py
... import csv import os ... class SQLiteRateDatabase: ... def import_csv(self, file_path: str): ... # create table structure with open(file_path, "r") as f: reader = csv.DictReader(f) fields_and_types = {} for field in reader.fieldnames: if field == 'Rate': fields_and_types[field] = "float" elif field in ["Issue_Age", "Attained_Age", "Policy_Year"]: fields_and_types[field] = "int" else: fields_and_types[field] = "varchar" sql = f"CREATE TABLE [{table_name}] ({','.join([k + " " + v for k, v in fields_and_types.items()])})" self._connection.execute(sql) # add records to table total_fields = len(fields_and_types) placeholders = ['?'] * total_fields sql = f"INSERT INTO [{table_name}] ({','.join(reader.fieldnames)}) VALUES ({','.join(placeholders)})" for row in reader: self._connection.execute(sql, list(row.values())) self._connection.commit() ...

As we iterate over reader.fieldnames we store the name of the field in a dictionary along with the data type (float, int, or varchar) and use this to build the SQL statement to create our table. The type “varchar” is for variable length character strings and avoids having fields filled up with blanks so the elements are all equal length (e.g., if we set the field to a fixed character length of 10 and provided “Rate” as an value it would be converted to “Rate “ which could be a pain to deal with later on). We use some built in string joining functionality to quickly compile our SQL statements here since we will not know up front exactly what the SQL will need to be.

There are a few more things to note about the design: we did not include an explicit primary key nor any indices, we are assuming all the data will be well-behaved, and we do not concern ourselves with possible duplicates. These typical data safeguards could be ensured upstream for us or we could build them into our database load process later.

In any case, with this method finished we should be able to build our database and add our rates to it. We can do this in a command-line terminal:

>>> import illustrator.objects as iobj >>> db = iobj.SQLiteRateDatabase('./data/data.db') >>> db.import_csv('./data/coi.csv') >>> db.import_csv('./data/interest_rate.csv') >>> db.import_csv('./data/naar_discount.csv') >>> db.import_csv('./data/policy_fee.csv') >>> db.import_csv('./data/premium_load.csv') >>> db.import_csv('./data/unit_load.csv') >>> exit()

How can we make sure our data actually populated correctly? We’ll need to connect to and query our database to check. You can do this entirely through code or you could use another tool that helps visualize things. Let’s go over both.

CHECKING THE DATABASE THROUGH CODE

We can quickly check the database in the Python REPL by writing a few select statements and looking at the results:

>>> import sqlite3 >>> connection = sqlite3.connect("./data/data.db") >>> curs = connection.execute("SELECT * FROM premium_load") >>> curs.description (('Rate', None, None, None, None, None, None),) >>> curs.fetchall() [(0.06,)] >>> connection.execute("SELECT * FROM policy_fee").fetchall() [(120.0,)] >>> curs = connection.execute("SELECT * FROM unit_load") >>> curs.description (('Issue_Age', None, None, None, None, None, None), ('Policy_Year', None, None, None, None, None, None), ('Rate', None, None, None, None, None, None))

We can query each table by using a simple SELECT statement. Then we review what columns exist in the table using the description attribute of the cursor and retrieve all the resulting records using the fetchall() method. Fetching all the data for the unit loads and cois then immediately printing to the console may not be particularly helpful so you may want to store the results in a variable. Alternatively you can change how SQLite returns the data (by default it returns a list of tuples) by changing the row_factory. If we use the provided sqlite3.Row we can access values like we would a dictionary.

>>> connection.row_factory = sqlite3.Row >>> curs = connection.execute("SELECT * FROM premium_load") >>> for row in curs.fetchall(): ... for key in row.keys(): ... print(f"key: {key}\tvalue: {row[key]}") ... key: Rate value: 0.06

CHECK THE DATABASE WITH A TOOL

An alternative to programmatically reviewing the database would be to use a database tool, like DBeaver. Using this type of tool you can use an easily navigable GUI to connect to a database, open tables, write queries and review results, and many other actions.

In either case, once you are satisfied that you have created and populated your database as expected it is time to move on to the next task: adding the ability to retrieve rates for a particular case.

READ_TABLE_FOR_INSURED

Now that we’ve populated our database, let’s turn our attention to retrieving information. Let’s take a look at our preliminary method signature again:

objects.py
... class SQLiteRateDatabase: ... def read_table_for_insured(self, insured: Insured, table_name: str) -> list[float]: pass ...

Looking at our Product class this somewhat aligns with how we are currently retrieving information but there are two additional considerations:

  • the existing functionality distinguishes between rate file types (e.g., “read_flat_csv”, “read_ia_py_csv”, “read_gen_rc_ia_py_csv”) so we might need a way to distinguish between rate structures

  • the existing functionality supports a default input when rates cannot be found

For the first point, we could incorporate multiple read functions that take care of each specific case like we did for csvs or we could do a single method. Either way, let’s assume we will know the rate structure ahead of time but still provide a default value option in case rates are missing. With this in mind let’s update the method signature and outline what we think will happen.

objects.py
... class SQLiteRateDatabase: ... def read_table_for_insured(self, insured: Insured, table_name: str, table_type: str, default: float) -> list[float]: # prepare default output # build SQL statement # what filters do we need? # what columns do we need to select? # execute SQL statement # translate query results # return final output pass ...

This looks like a reasonable outline for now. Let’s tackle the first two pieces.

objects.py
... class SQLiteRateDatabase: ... def read_table_for_insured(self, insured: Insured, table_name: str, table_type: str, default: float) -> list[float]: # prepare default output output = [default for _ in range(120)] # build SQL statement ## determine headers of table sql = f"SELECT * FROM [{table_name}] WHERE 1 = 0" cursor = self._connection.execute(sql) headers = [f[0] for f in cursor.description] ## build WHERE clause where_clauses = [] if "Issue_Age" in headers: where_clauses.append(f"Issue_Age = {insured.issue_age}") if "Gender" in headers: where_clauses.append(f"Gender = '{insured.gender}'") if "Risk_Class" in headers: where_clauses.append(f"Risk_Class = '{insured.risk_class}'") where = " WHERE " + \ ' AND '.join(where_clauses) if where_clauses else "" ## determine SELECT fields select_fields = [] if table_type == 'Policy_Year': select_fields.append('Policy_Year') select_fields.append('Rate') sql = f"SELECT {','.join(select_fields)} FROM [{table_name}]{where}" # execute SQL statement # translate query results # return final output ...

Since we are not assuming anything about the categorical variables (e.g., gender, risk class, issue age) for the rate tables we need to scan the columns to determine what information we need to filter on. We first run a dummy statement that allows us to pull together the columns of the table, then we iterate over those column names to determine our filter criteria (the WHERE clause). Once we have that figured out we can set what columns we will actually retrieve so we can translate the result into the expected format.

Let’s finish up this method:

objects.py
... class SQLiteRateDatabase: ... def read_table_for_insured(self, insured: Insured, table_name: str, table_type: str, default: float) -> list[float]: ... # execute SQL statement cursor = self._connection.execute(sql) results = cursor.fetchall() # translate query results if table_type == 'Flat': output = [results[0][0] for _ in range(120)] elif table_type == 'Policy_Year': for row in results: output[row[0]-1] = row[1] else: raise # return final output return output ...

When the rates are expected to be the same for every duration, we can simply replace our entire output array as shown in the ‘Flat’ logic. When rates do vary by some form of time element we need to properly update the output array like we would have in the original functions that read csv files.

At this point we are almost finished with changes. The last thing to do is update the Product class to use the new rate database. Let’s store the path to the database as an instance attribute.

objects.py
... class Product(BaseProduct): def __init__(self): self.maturity_age = 121 self._db_path = './data/data.db' def get_rates_for_insured(self, insured: Insured) -> Rates: db = SQLiteRateDatabase(self._db_path) premium_loads = db.read_table_for_insured(insured, 'premium_load', 'Flat', 0) policy_fees = db.read_table_for_insured(insured, 'policy_fee', 'Flat', 0) unit_loads = db.read_table_for_insured(insured, 'unit_load', 'Policy_Year', 0) naar_discounts = db.read_table_for_insured(insured, 'naar_discount', 'Flat', 1) coi_rates = db.read_table_for_insured(insured, 'coi', 'Policy_Year', 0) interest_rates = db.read_table_for_insured(insured, 'interest_rate', 'Flat', 0) rates = Rates(premium_loads, policy_fees, unit_loads, naar_discounts, coi_rates, interest_rates) return rates ...

With these changes we should be able to quickly test our code by running our default case:

(venv) ...\illustrator> python -m illustrator.objects 132184.0426761172

That’s it! We’ve successfully built a rate database and updated our object-oriented solution to leverage it without too much redesign.

At this point it would be a good idea to go back and add docstrings to the new class and anywhere else they may be missing. Update your README.md to reflect changes and don’t forget to save your files, commit to your Git repo, and push to your online repository!

If you are having difficulty with the code please refer to the v0.6 tagged version of this repository.
Have questions or feedback? Reach out to let us know!

Next
Next

CALCULATION ENGINES BEYOND EXCEL: PART 5