using kubernete jobs for one off ingestion of csv's
This post comes from wanting a repeatable way to get a one-off CSV into Postgres as the first step of a machine learning pipeline, with everything running on Kubernetes locally. I set up a local Kubernetes cluster with Docker for Mac, install Postgres and the dashboard with Helm, and then use a Kubernetes job with pgfutter to ingest the CSV into the database. It is overkill for a single CSV, but the point is to have a standardized pipeline I can reuse and later extend to model training.
Setting up Kubernetes locally might seem like overkill for a one-off ingestion task, but it has a few advantages:
- Creates a consistent development environment that mirrors production
- Lets you test Kubernetes configurations before deploying to the cloud
- Lets you develop microservices in isolation
- Gives a foundation for scaling out the ML pipeline
I’m using Docker for Mac with its built-in Kubernetes support (v1.9.8), which gives a straightforward setup with fairly modern tooling while staying compatible with cloud deployments. Docker Swarm and Compose are options too, but I find Kubernetes a better platform for building and managing the kind of data pipelines I want here.
Let’s start with the local environment and the tools we need.
Kubernetes Dashboard
While this will be done by most cloud providers, it’s useful to set up when you’re running locally. Luckily kubernetes-helm has come a long way. To install helm, you can simply use brew install kubernetes-helm followed by helm init and then helm repo update. While its also possible to just use dashboard.yaml config from https://github.com/kubernetes/dashboard using helm allows us to not have to be bothered by the nuances for docker-for-desktop system helm install stable/kubernetes-dashboard
This might seem trivial but the fewer kubernetes config files to worry about the better from my experience (more so when you are using preconfigured services). Your kubernetes config files should be looked over and understood by you personally and a majority of the basic apps you may wish to use will have preconfigured helm packages which makes the kubernetes ecosystem much more straightforward while still allowing a considerable amount of configuration from helm.
Sidenote: Running kubernetes locally
It is unlikely the directions for the official docs will work for you when running a local kubernetes cluster. To fix this, its necessary to create a tiller service account and a ClusterRoleBinding which grants permission at the cluster level and in all namespaces:
kubectl create serviceaccount --namespace kube-system tiller
kubectl create clusterrolebinding tiller-cluster-rule --clusterrole=cluster-admin --serviceaccount=kube-system:tiller
helm init --service-account tiller
This will be put into the kubernetes config yaml for clarity and script-initiating purposes in the future, even if it isn’t required for every kubernetes application.
With that done, use helm to create the dashboard for the cluster: helm install stable/kubernetes-dashboard
View the dashboard
Since helm allows both simple installs and the use of config files (which we did not use), the install will have explicit directions to allow you to proxy to the dashboard upon installation. For fish it is just
export POD_NAME=(kubectl get pods -n default -l "app=kubernetes-dashboard,release=interested-marsupial" -o jsonpath="{.items[0].metadata.name}")
kubectl -n default port-forward $POD_NAME 8443:8443
at which point you will be able to reach the dashboard (which you would otherwise see in some cloud provider by default) at localhost:8443
Side-note: when doing this sort of development, it becomes useful to start writing kubernetes/docker functions in your shell. I have a ton I’ve started using, but an example for this use case in fish shell would be:
function kube_portforward
set -l DEPLOYMNET_NAME $argv
set -l POD_NAME (kubectl get pods -n default -l "app=$DEPLOYMNET_NAME" -o jsonpath="{.items[0].metadata.name}")
set -l POD_PORT (kubectl get pods -n default -l "app=$DEPLOYMNET_NAME" -o jsonpath="{.items[0].spec.containers[0].ports[0].containerPort}")
echo "Forwarding to https://localhost:$POD_PORT"
kubectl -n default port-forward $POD_NAME $POD_PORT
end
Installing Postgres
Storing the data in Postgres might seem unnecessary for a small dataset you could just read straight from a CSV, but in this Kubernetes setup it makes sense. It lets me build a standardized pipeline that can handle both the initial ingestion and the later model-training workflows.
The installation is straightforward using Helm:
helm install -f k8s/helm/postgres_values.yaml stable/postgresql
Infrastructure Ready for Machine Learning
With Postgres running in the cluster, the basic infrastructure for the ML pipeline is in place. From here I can:
- Ingest new datasets through Kubernetes jobs
- Train models using transfer learning techniques
- Deploy specialized models based on a general-purpose pretrained model
- Configure and tune models for specific use cases
A later post will cover actually implementing these, starting with a simple example of the core workflow.
Data Ingesting
The data itself is quite simple and will contain something along the likes of these columns:
- 'call_id'
- 'status'
- 'first_name'
- 'id'
- 'content'
- 'filename'
- 'outcome_ids'
- 'occurred_at'
- 'call_type_name'
- 'call_length_sec'
- 'is_customer'
- 'start_time'
- 'end_time'
- 'tag'
- 'prices'
- 'category_label'
- 'rep_group_type'
- 'ranker'
- 'call_start_time'
- 'nchars'
- 'nwords
Example of data to ingest
An example of what this will look like (not all rows included):
| first_name | id | content | occurred_at | call_type_name | is_customer | start_time | end_time | tag | rep_group_type | ranker |
|---|---|---|---|---|---|---|---|---|---|---|
| Henry D… | 7525982 | No. We just bought some more property. And about to be soon renovation on the house […] look like, […] the vacation this year. | 2018-01-10 13:25:38 | subscription | t | 66120 | 79170.0 | outbound call | Middle Reps | 14 |
| Henry D… | 7525983 | Okay. I got you. | 2018-01-10 13:25:38 | subscription | f | 72360 | 79170.0 | outbound call | Middle Reps | 15 |
| Henry D… | 7525984 | We’re doing for the whole month on the floor and until we got a… | 2018-01-10 13:25:38 | subscription | t | 72360 | 79170.0 | outbound call | Middle Reps | 16 |
| Henry D… | 7525985 | Wow. Wow. | 2018-01-10 13:25:38 | subscription | f | 79170 | 83790.0 | outbound call | Middle Reps | 17 |
| Henry D… | 7525986 | Yeah. | 2018-01-10 13:25:38 | subscription | t | 79170 | 83790.0 | outbound call | Middle Reps | 18 |
The data in this example is already cleaned, so I can focus on the pipeline itself rather than preprocessing. The next section walks through building the ingestion pipeline and wiring it into the model.
There are a lot of ways to get data into Postgres, but here I’m just handling CSV files. The goal is a setup that can:
- Load data through Kubernetes jobs
- Trigger model training automatically
- Handle deployment of trained models
For the ingestion I’m using pgfutter, a simple tool that handles CSV imports with minimal configuration:
pgfutter csv dataset.csv
This might seem over-engineered for small CSV files, but it has a few advantages:
- Standardized data storage across different data types and sources
- Built-in monitoring through the Kubernetes dashboard
- Easy integration with additional services and pipelines
- Scalability from local development to cloud deployment
Why are you using postgres?
The idea is that regardless of how the data comes in, we have a standardized pipeline to ingest it and create models. Using the csv directly would be adequate for this machine learning model, but the data might come from another service down the line (for instance, with a phone call and a transcription hook via the Speech-to-Text api, I could have another job that transcribes the call and imports the data into the DB).
For this task, since we already have a csv, I’m using pgfutter. It’s a super useful tool for this sort of pipeline since it gives a streamlined way to ingest the data without worrying about data munging up front. Being a Go executable also means I can simply create a job that ingests the data and cascades it into the ML pipeline, rather than reaching for the other tools that get frequently recommended.
To do this in Kubernetes I’ll use a job definition that combines a ConfigMap for the settings and a job spec for the actual ingestion. Keeping it as a job plus a config file makes it configurable and reusable, and much easier to translate into a helm chart or template later on.
for example the configmap would be:
apiVersion: v1
kind: ConfigMap
metadata:
name: config-csv-ingest
namespace: default
data:
PGFUTTER_REPO: "github.com/lukasmartinelli/pgfutter"
DB_HOST: postgres
DB_NAME: postgres
DB_USER: postgres
DB_PASS: password
CSV_LOCATION: "/data.csv"
DATA_URL: "https://storage.googleapis.com/neural-niche/raw_text/csv/898f0e56-6a36-491a-9c43-3fed49d6a3d0/calldata.csv"
which lets us use the env vars, although things like DATA_URL would be passed in when the job is created.
apiVersion: batch/v1
kind: Job
metadata:
name: ingest
spec:
template:
metadata:
name: ingest
labels:
datatype: csv
spec:
containers:
- name: pgfutter
image: golang:latest
envFrom:
- configMapRef:
name: config-csv-ingest
command: ["/bin/sh", "-c"]
args:
- go get $(PGFUTTER);
wget -O $(CSV_LOCATION) $(DATA_URL);
pgfutter csv -d \",\" $(CSV_LOCATION);
restartPolicy: Never
This is a simple example, but it shows how a Kubernetes job can handle the ingestion and gives a foundation for automating the rest of the pipeline (model training, deployment, and so on) later on.