Let me automate that for you II, Electric Bugaloo

Removing pain points around removing pain points
Siobhan Sabino

Improving our original, embedded SQL generator and some related scripts by converting them to a better, long term, stand alone SQL producer that’s faster, more reliable, and more obvious.

About seventeen years ago, in 2019, I published my blog post “Let me automate that for you” about a design for automating creating warehouse tables based on schemas for new event data. The idea was when our ETL system couldn’t load waiting data into a warehouse table (as there was no table to be found), it would look up the schema for that data, convert the schema to a SQL statement, then issue a PR to the repo where SQL migrations for such needs are kept. Eventually creating tables made a friend, updating tables when there was a mismatch between the schema of the data we were loading and the schema of the table in the warehouse, and a third buddy joined the part, optimizing a table to improve its performance.

The system had some absolutely great qualities: it automated acting on errors it saw, it generated great documentation in the PR and the SQL statement (with comments for discussions and places to review more closely), and it posted to Slack to let engineers know that there was something for them to do a final review on.

However… it wasn’t perfect.

Reading is going toward something that is about to be, and no one yet knows what it will be. [1]

Let me take you through the evolution of our embedded SQL generator to stand-alone SQL producer.

Limitations of previous implementation

While the SQL generator eased so much work for so many different people in the company, it had some… strange caveats, shall we say. Some were more noticable than others but all were, in their own way, just that little bit too grating to live with long term.

Systems program building is an entropy-decreasing process, hence inherently metastable. Program maintenance is an entropy-increasing process, and even its most skillful execution only delays the subsidence of the system into unfixable obsolescence. [6]

The most obvious was that the SQL generator was reactive. It might takes hours for the loader to hit a problem that causes it to try generating a SQL migration. This long turnaround was painful for the data team, painful for the engineers making the upstream changes — it was just too unpredictable and drawn out for us to ever feel comfortable. Nothing like a PR showing up at 2am on Saturday because your robot engineer doesn’t have a sense of boundaries and working hours!

On top of that, the SQL generator wasn’t always invoked when it should be, especially around updates to schemas. We require, with help from the Schema Registry, that all of our schemas be backwards compatible, which has this odd quirk that means the loader can still load the new schema’s data… into the old schema — good for sending data, not for warehouse table design! The issue here was we had the code to generate updates, but we didn’t have the code to trigger updates every time. Paired with the long turn around time for “will it/won’t it,” the system still required a lot of hands on attention, from triggering the update manually to finding gaps that went unnoticed.

We were also pushing the boundaries of Python; it just was no longer the right implementation language for this system. The most distrubing yet hilarious example was being able to tell the difference between “the default value is null” and “there is no default value,” both of which — in Python — are None. We ended up using a magic string of "∅⦰⦱⦲⦳⦴" to try to indicate these differentiate between these types of emptiness but we all knew, this indicated we had gone too far with this set of tools. We needed something better, something designed to work together instead of three related but separate mini systems that needed constant supervision.

Opportunities to build it better

With our new data pipeline out the door, we had an opportunity. You see, the Schema Registry writes all its schemas to Kafka. This actually means you can subscribe to schema changes from a Kafka consumer without a lot of fuss. Get updates within a few seconds or minutes of a new or updated schema instead of hours or days? Uh, yes please! That is a much more reasonable turnaround time and removed the problem of not updating for every changed schema.

With the valve’s Kafka consumer setup in Scala, that presented a companion opportunity to switch implementation languages to one that could better represent the strong typing of the two systems we were converting (Avro and SQL), including different forms of emptiness! :tada: It’s the simple wins in life sometimes that give you hope and being able to delete "∅⦰⦱⦲⦳⦴" as a mission critical part of a system was truly a win.

Thus we had a new plan: move the embedded SQL generator to a stand-alone SQL producer in Scala that consumed from Kafka, opening up the chance for faster turnaround, better representation of the data, easy access to the official Avro library (so we wouldn’t have to reimplement their logic), and a better setup for testing nitty gritty, hard-to-spot edge cases in both the short and long term.

I saw and heard, and knew at last

The How and Why of all things, past,

and present, and forevermore. [7]

It just made sense.

Building blocks of a stand-alone SQL producer

To start (re)implementing a system like this required tackling it both from the foundation as well as from the high level, “what will be the final output?” view, to ensure the two met somewhere reasonable; the previous system had grown organically but we really needed to replace it all at once, mostly for our own sanity but also to not have the two competing against each other. We scraped together implentations from the valve and other one-off scripts we had to form the basis of starting code that wasn’t unique to the SQL producer: things like producing to and consuming from Kafka, connecting to Consul or the Schema Registry, and talking with Redshift especially about the metadata of our warehouse tables. Then we looked at what did our Python implementation hint at the existence of but not fully explore as a data structure or stand alone function taking care of a specific task without outside help? What could we do to better leverage this new implementation language to make the code as obvious as possible?

Don’t tell me what I’m doing; I don’t want to know. [2]

A good place for us to start was, since we’d be combining multiple services within this one system to do specialized work, how could we talk about all of their output collectively? They each produce one or more migrations, after all, so… could we start with that?

trait Migration {
  def schema: String
  def table: String

  def hasChange: Boolean

  def migrations: Array[SqlStatement]

  def branchName: String = s"bot-${prType.toString}-$schema-$table"

  def prType: PullRequestType
  def prTitle: String = s"Migration to ${prType.toString} $schema.$table"
  def prDescription: Markdown
}

Internally, each data structure that extends the Migration type does a lot of logic to produce their unique array of one or more migrations and the detailed PR writeups, but hiding that complex code away allows them to be self contained. Ultimately, when we’re “done” with a service processing its request, we just need to be able to publish the migration to Github and ping Slack about it. The above exposes for us just what we need and nothing more.

Which, of course, meant that another foundational building block would be publishing migrations:

object Publish {
    def apply(migrations: Set[Migration]): Unit = migrations.foreach(migration => {
    if (!migration.hasChange) Log.info(s"No changes were found for `${migration.schema}.${migration.table}`")
    else if (recentMigrationAlready(migration)) Log.info(s"There's already a recent migration for `${migration.schema}.${migration.table}` so not going to publish")
    else {
      Log.info(s"Migration for `${migration.schema}.${migration.table}` has changes which going to publish")
      val branchName = github.commit(migration)
      val prUrl = issuePr(branchName, migration)
      val channelsPostTo = channelsToNotify(migration)
      notifyHumans(channelsPostTo, migration, prUrl)
      if (shouldUpdateDeduplication(migration)) updateDeduplication(branchName, migration)
      Log.info("Migration has been created, issued, and shared")
    }
  })
}

Here I’ve included only the main block of orchestration logic but you can already see how we can build complex flows from such a simple input as a Migration. For any set of migrations, so long as there are changes and we haven’t already recently issued a migration for it, we’ll commit it to Github (more in Appendix B), issue the PR, get the Slack channels to notify, let the humans know, and maybe even go back in to update other files like our JSON of deduplication rules for our loader. Configuration files have never been treated so well!

Another grouping of foundational items we needed were converters, translating from one language to another, for example from Avro types to Redshift types:

object ColumnDefinition {
  val defaultStringColumn = "CHARACTER VARYING(24)"

  def avroToWarehouse(schemaField: ProcessedField): String = schemaField.`type` match {
    case Schema.Type.STRING if schemaField.name.endsWith("id") => "CHARACTER(36)"
    case Schema.Type.STRING if schemaField.name.endsWith("ts") => "TIMESTAMP"
    case Schema.Type.STRING if schemaField.name.contains("email") => "CHARACTER VARYING(256)"
    case _ if schemaField.name.contains("latitude") => "DECIMAL(8,6)"
    case _ if schemaField.name.contains("longitude") => "DECIMAL(9,6)"
    case Schema.Type.STRING => defaultStringColumn
    case Schema.Type.BOOLEAN => "BOOLEAN"
    case Schema.Type.INT => "INTEGER"
    case Schema.Type.LONG => "BIGINT"
    case Schema.Type.FLOAT => "NUMERIC"
    case Schema.Type.UNION => avroToWarehouse(
      schemaField.copy(
        `type` = schemaField
          .unionTypeValues
          .get
          .filterNot(_.getName == "null")
          .head
          .getType
      )
    )
    case _ => defaultStringColumn
  }
}

This has a combination of simple translation using the Avro library’s built in types along with business logic, for example that every perceived identity field will be a UUID and thus exactly 36 characters in length. I also default string fields to a small number of characters, so that humans have to review it and consider what’s a more appropriate length. Emails, though, we let those get wild at 256 characters.

These sorts of conversions existed in our Python implementation but were nowhere near as easy to reason about nor readable. While the above switch case statement is massive, it’s super obvious what it’s doing and super easy to add to it if we, say, had a new specialized type like phone number that we wanted to handle. It’s a great example of could a human do this? Yes. Would a human do anything different than a machine in doing this? Not really, we’d just go look up the translation and go through a similar flow to find the right one. The system doesn’t get every case right every time but the ones it misses require human judgement anyway and are a great opportunity for someone new to say, “I think I have a rule for how to automate this.”

Dans la nature rien ne se crée, rien ne se perd, tout change.

In nature nothing is created, nothing is lost, everything changes. [5]

The last grouping of foundational items fell into a sort of “expert decision making” category. These functions don’t replace the average engineer looking at an Avro schema and saying an INT becomes an INTEGER in Redshift; they replaced a data engineer saying, “Sort keys should follow this pattern, distribution keys should follow this pattern, here’s what optimized types look like.” This is where the difficult decisions and need for deep knowledge become embedded in the system, which both helps make the attention of data engineers less scare (if they’re in a meeting, you can always look up what they have the expert system do for an idea of what they would tell you) while also ensuring humans don’t accidentally forget something minor along the way (which is 100% what I would do all the time when I tried to optimize tables by hand, omg the amount of small things to check became wild and you litter typos everywhere). So long as the experts have implemented and tested the rules, then all the cases they would know how to handle are handled, and other ones can be added as they’re discovered.

case class ColumnEncoding(column: String, recommendedEncoding: String, savings: Double) {
  lazy val reason: String = s"Switch to recommended encoding with savings of $savings%"
}

object FixEncoding {
  def apply(definition: Set[ColumnDefinition], encodings: Set[ColumnEncoding]): Option[ProposedChange] = {
    var notes: Map[String, String] = Map()
    val changes = encodings.map(recommendation => {
      definition.filter(column => column.name == recommendation.column).head match {
        case column if column.sortKey && column.encoding != "RAW" =>
          notes += (column.name -> "Sort key should have a `raw` encoding")
          column.copy(encoding = "RAW")
        case column if !column.sortKey && column.encoding != recommendation.recommendedEncoding && recommendation.savings > 1.0 =>
          notes += (column.name -> recommendation.reason)
          column.copy(encoding = recommendation.recommendedEncoding)
        case column => column
      }
    })

    if (definition != changes) Some(
      ProposedChange(
        definition,
        changes,
        "Corrected encodings that were mismatched, such as using tightest compression or not encoding the sort key.",
        Some(notes)
      )
    )
    else None
  }
}

The encoding example is probably the easiest to read (though I realize it’s still a touch wild) but has one of the most nuanced caveats in the system: we want to use the recommended encoding for all columns except the sort key. Why? Well, the tighter the compression, the less reading from disk Redshift has to do, which is one of the slowest acts it has to perform. However if you encode the sort key (which Redshift will make recommendations for), then you actually cause Redshift to need to perform more reads from disk to find the blocks of data it’s looking for. I would have no expectation that a randomly selected engineer in the office would remember that — it’s a deep bit of knowledge for data engineers, as the specialists in this area, to know and care about. But hey, if I’m on vacation, you can come look at the code and see that "Sort key should have a raw encoding". Sometimes, that’s enough.

Joining the human needs with the computer’s logic

Instead of showing what I built with these foundational pieces next, let me show you the entry point for the SQL producer: I think this will give you a better idea of how the bridge from high level entry point to small, dedicated blocks of foundatal code were built and, even better, how they can be changed, extended, or added to over time, depending on what we need.

We are what we repeatedly do. Excellence, therefore, is not an act, but a habit. [4]

Our driver is incredibly simple:

object Driver {
  val topics = Set(
    schemaTopic,
    optimizeTableTopic,
    optimizeSchemaTopic
  )

  def process(topic: String, messages: List[Message]): Unit = topic match {
    case _ if topic == schemaTopic => schemaChanges(messages)
    case _ if topic == optimizeTableTopic => optimizeTables(messages)
    case _ if topic == optimizeSchemaTopic => optimizeSchema
    case _ => Log.warn(s"Topic $topic does not have any supported actions.")
  }

  def main(args: Array[String]): Unit = PipelineConsumer(serviceName, topics, process)
}

Similar to my writeup of our valve system, we make use of a Kafka consumer that we can pass in a function to execute against for each batch of messages it receives. This consumer, however, actually acts on multiple topics: one for Schema Registry changes (either new or updated schemas), one for optimizing a specific table, and one for optimizing a specific schema. The function we pass in to the Kafka consumer, then, is essentially just an orchestrator that immediately moves each batch of messages to the processor that’s designed for its topic. So, what does that processor look like?

object Processor {
  private def warehouseSchema = Consul.valueFor("data-warehouse", "schema")

  private def start(action: String, about: String, metric: String): Unit = {
    Log.info(s"Going to $action `$warehouseSchema`")
    Metric.attempting(metric, warehouseSchema)
  }

  private def finish(action: String, about: String, metric: String, results: Try[Unit]): Unit = results match {
    case Success(_) =>
      Log.info(s"Able to $action `$about`")
      Metric.succeeded(metric, s"$about")
    case Failure(exception) =>
      Log.error(s"Unable to $action `$about`", exception)
      Metric.failed(metric, s"$about")
      Slack(
        Publish.channelsDefault,
        s":x: Unable to $action `$about` because of `${exception.getMessage}`",
        ":dna:",
        s"SQL producer ($environment)"
      )
  }

  val schemaTopic: Topic = "__schema"
  private[service] def createTable(schema: SchemaMessage): Unit = {
    val action = "create table"
    val metric = "create.table"

    start(action, s"$warehouseSchema.${schema.topic}", metric)
    val migrations = CreateTable(warehouseSchema, schema.topic)
    val results = Try(Publish(migrations))
    finish(action, s"$warehouseSchema.${schema.topic}", metric, results)
  }
  private[service] def updateTable(schema: SchemaMessage): Unit = {
    val action = "update table"
    val metric = "update.table"

    start(action, s"$warehouseSchema.${schema.topic}", metric)
    val migrations = UpdateTable(warehouseSchema, schema.topic, schema.version)
    val results = Try(Publish(migrations))
    finish(action, s"$warehouseSchema.${schema.topic}", metric, results)
  }
  def schemaChanges(messages: List[Message]): Unit = {
    val (newSchemas, updatedSchemas) = messages
      .map(_.asInstanceOf[SchemaMessage])
      .partition(_.isNew) // if schema version == 1
    newSchemas.foreach(schema => createTable(schema))
    updatedSchemas.foreach(schema => updateTable(schema))
  }

  val optimizeTableTopic: Topic = "_optimize_table"
  private[service] def optimizeTable(table: Message): Unit = {
    val action = "optimize table"
    val metric = "optimize.table"

    start(action, s"$warehouseSchema.${table.topic}", metric)
    val migrations = OptimizeTable(warehouseSchema, table.topic)
    val results = Try(Publish(migrations))
    finish(action, s"$warehouseSchema.$table", metric, results)
  }
  def optimizeTables(messages: List[Message]): Unit = messages.foreach(message => optimizeTable(message))

  val optimizeSchemaTopic: Topic = "_optimize_schema"
  def optimizeSchema: Unit = {
    val action = "optimize schema"
    val metric = "optimize.schema"

    start(action, s"$warehouseSchema", metric)
    val results = Try(OptimizeSchema(warehouseSchema))
    finish(action, s"$warehouseSchema", metric, results)
  }
}

There’s essentially five main groups of code within the processor:

  • There’s getting the main warehouse schema our SQL producer is in charge of, which comes from Consul. The reason it’s a function that keeps getting the value is in case we change the schema; the long-lived SQL producer instance handles staying up to date so no one has to think to refresh it.
  • There’s starting and finishing processing: log what doing, log what happened, incremement the correct metric, and possibly reach out on Slack to let humans know that there was a problem, this way we can act on bugs as soon as possible.
  • There’s the processing block for schema changes, which includes figuring out if the schema is new or updated then acting on each accordingly before publishing any changes found.
  • There’s the processing block for table optimizations, which checks the warehouse for any improvements to be made for the specified table and publishes what it finds.
  • There’s the processing block for schema optimizations, which walks all the tables available in the schema to find any that can be improved before putting such requests into the pipeline to be consumed by the SQL producer later on to optimize each table.

As you can imagine, this high level orchestration hides a lot of nitty-gritty complexity, but that is by design. The complex logic of what each input maps to as output is handled in either the small, foundational items or in the middle level of dedicated logic, both of which are heavily tested for every edge case we can think of or have encountered in the wild. Thus the orchestration is simple to read, simple to test (both automatically and manually, as live has its own set of problems), and easy to drill into if there’s a bug to be tackled. Want to add a new service? It’s very clear how to do it.

(I should state this code was recently refactored so its tidiness is due to that: if you build your own custom SQL producer and it looks much more messy, believe me ours was a mess too, thus the refactor. It just hasn’t had time to grow organically again quite yet.)

What you’ll notice is that each function essentially starts the action, hands off processing to a dedicated bit of logic that generates migrations, then publishes the migrations and finishes its work. The reason it ended up like this is that while the input and output for each service is nearly identical, the way the input is used to generate outputs varies wildly. Maintenance wise, this is a nice win, as we can choose to focus on either what all the services share or one specific service at a time in keeping the system up to date.

But that does rather leave, ya know, the complex marriage of the input to its output left to implement.

Detailed breakdown of the services available

Walking you through each service in detail would be not just worthy of a blog post for each one, but possibly multiple blog posts for each! Instead I’ll run you through the logic for each service, which is pretty unique to each technical landscape a SQL producer would be needed in. You might have different rules or opinions about, for example, a standard sort key than we do, and that’s fine: the point is just to get those rules or opinions into the code, so the system handles them for you.

Creating a table

The simplest service is, truly, the most foundational.

  1. Figure out if the table already exists. If it does, you’re done.

  2. Translate the Avro schema, in particular each field, to a Redshift table, in particular the columns.

A good rule of thumb for encodings in a new table is set everything except the sort key and booleans to ZSTD; leave the two exceptions as RAW. Later on you can optimize the encodings, once there’s data in the system, but until then this will work well enough.

Updating a table

In my opinion this is the most complex service; it is difficult for both humans and the system to get this sort of update right, which is why having the system helps: it might take a while to implement but then humans don’t have to worry about doing it themselves.

  1. Figure out if the table already exists. If it doesn’t, you’re done or create it, your choice.
  2. Get a copy of what it currently looks like in Redshift, what its previous Avro schema looked like, and what its current Avro schema looks like.
  3. Find the difference between the Redshift table and previous Avro schema compared to the current Avro schema.
  4. For each change, translate the difference into a block of SQL statements. You may want to issue multiple migrations for each block, depending on how you run migrations and what you feel comfortable with.

By not just comparing the two schemas but also looking at the Redshift table, you find a lot of edge cases that are super easy to miss. There’s also certain changes in Avro that aren’t really as dramatic in Redshift, so you might be able to discard certain changes as not actually having any impact on Redshift.

Optimizing a table

Honestly, this is the most fun service, both in terms of writing it and, most importantly, in terms of benefiting from it. When you create or update a table, you’re making an educated guess on what to set the columns, sort, and distribution to be, but being able to go back and review those guesses when you have more information is fantastic. This is especially helpful if you have an existing warehouse with tables in a variety of states from a little out of whack to what the hell is happening here.

  1. Grab yourself a whole lot of metadata about the table in Redshift: what its column definitions look like, what its recommended encodings are, what it’s skew is, just about everything. (Appendix A contains more details about how to do this.)
  2. Using each bit of metadata, find each change you want to make.
  3. For each change, translate the update into a block of SQL statements. As with updating a table, you might want to issue multiple migrations. You can also recreate the table from scratch, moving the data from the old table to the new one, if you find it easier. (We do!)

Obviously this service, unlike the Schema Registry centric ones, can be triggered by a human wanting to see if a table can be made better, for example a data analyst who is working with a table that’s super slow. We hooked our workflow system up to produce a message for this service whenever a human has a particular interest; otherwise, it tends to be requested by its companion…

Optimizing a schema

This was the next step up from optimizing a table. Sure, an out of whack table should be optimized, but what is an out of whack table to optimize?

Another flaw in the human character is that everybody wants to build and nobody wants to do maintenance. [8]

Our workflow system, every week, triggers checking our main schema and picking up to so many tables to optimize for us. At first this produced the max number of migrations every time but now we’ll go weeks without any optimizations, because the tables are kept so up to date and pristine.

(The reason for limiting how many tables are optimized is purely so that humans aren’t flooded with too many pull requests, especially when we knew our warehouse had a lot of old tables that needed a lot of work.)

  1. Get yourself a list of tables to focus on. The way we do that is:
    1. Get all tables in the schema.
    2. Cross reference all of these tables with the Schema Registry, to verify they’re of interest to us and not a table in the wrong schema.
    3. Do some light metadata checking for if they’re poorly optimized (see Appendix A for detailed instructions on this). A deeper check will come later.
    4. If we have enough poorly optimized tables, focus on those; otherwise, take the list of all tables to do a more random dive.
  2. Shuffle the possible tables to focus on (so you don’t have a bias towards those early in the alphabet) and take twice the max number of tables you want to end up with. This limit is purely to speed up the system so you can change it as you’d like.
  3. Only keep tables that have at least 90 days worth of data. This is to ensure we don’t prematurely optimize a table.
  4. Check each table’s metadata more deeply, such as for incorrect sort keys, missing encodings, or skew. Only keep those that have some deep issue we think we can correct.
  5. Of all remaining tables, take the max number.
  6. For each table, produce a message to Kafka for the optimize table service about the table.

While there is overlap in the metadata that the optimize table and optimize schema service review, breaking them down is both mentally easier to reason about and keeps the optimize schema request (which might issue some long running queries) moving along without timing out or making Kafka think it failed to consume a message. Like it did that one time where it spent all night issuing like a hundred PRs for the same table… yeah don’t do that, make sure it can complete within the amount of time Kafka is giving it to say it’s done.

Troubleshooting live

Sixty years ago I knew everything; now I know nothing; education is a progressive discovery of our own ignorance. [4]

As I alluded to above, no matter how much you test automatically, live has its own problems. Sometimes a new case for evolving a schema shows up, so you have to add in support to capture that in the future. Sometimes migrations make sense at each individual statement level but ultimately don’t add anything to the system, like making an already nullable column nullable, so you find ways to remove that code when the system sees such a migration since it has no actual “change” suggested. Sometimes Avro default types show up really heckin funny compared to what you thought they’d be, so you need to change the comparison logic to convert Avro’s NULL constant to a JVM null value. There will always be gaps — that’s fine.

Because the Schema Registry only sees new or updated schema so often, it’s not as easy to test live as say walking a schema to find some tables to optimize, which we could hammer in both our lower and upper environments to see what happened. What I’d recommend for those schema-dependent services is: take every change that does happen and every little “hmm” the migration or PR puts in you, and really ask yourself, “Should I do something here?” Even if it’s just a ticket you throw at the bottom of your backlog, having the example of here’s what happened, here’s what I’d expected to happen, here’s how this can be fixed — you’ll probably see this problem again, so you’ll be grateful you captured it. Those sorts of bugs might also be a great onboarding item for people new to the system who want to play around and get exposure to it.

Optimizing a schema or tables, though, you can get wild! Since it has human triggers, and for us at least only posts to Slack for our team, we can run it whenever we want and then discuss very particular cases we either set up or found to figure out, “What is better here? How do we keep this data useful?”

Invite feedback from others as well! We had an optimization for one of our largest tables, with its very thorough writeup in the PR, when fellow GCer Matt C pointed out that, if we had notes from the PR writeup in the SQL migration, we could comment on them specifically to have a deeper discussion. Brilliant! We have that now, just as a little comment at the end of each line for, if there was a change, what was the reason. The PR presents the full writeup, the SQL comments give you a place to drill in and figure out if this was the right decision.

@gamechanger/data

:wave: I've automatically created this migration for `public.sample_table` because I noticed it could be improved. :tada:

However I can't do everything a human can, so I've noted the parts that need verification and possibly updating below along with what I did.

I hope I did a good job! :blush:

## Table schema

### Description

>This is a sample table for testing.

### Before

* Sort: `test_column`
* Distribution: `test_column`

 column | type  | nullable | encoding | comment |
 ------ | ----- | :------: | -------- | ------- |
 `row_id` | `BIGINT` |  | `ZSTD` | ∅ |
 `event_ts` | `TIMESTAMP` |  | `RAW` | ∅ |
 `test_column` | `CHARACTER VARYING(36)` |  | `LZO` | This is a test of the emergency broadcasting system. |
 `empty_column` | `CHARACTER VARYING(256)` | ✓ | `ZSTD` | ∅ |

### After

* Sort: `event_ts`
* Distribution: `test_column`

 column | type  | nullable | encoding | comment |
 ------ | ----- | :------: | -------- | ------- |
 `test_column` | `CHARACTER(36)` |  | `ZSTD` | This is a test of the emergency broadcasting system. |
 `row_id` | `BIGINT` |  | `AZ64` | ∅ |
 `event_ts` | `TIMESTAMP` |  | `RAW` | ∅ |

## What changed?

### Type and length

 column | before | after | rationale |
 ------ | ------ | ----- | --------- |
 `test_column` | `CHARACTER VARYING(36)` | `CHARACTER(36)` | Max and min are same length |

### Dropped columns

 column | type | rationale |
 ------ | ---- | --------- |
 `empty_column` | `CHARACTER VARYING(256)` | Column contains no data |

### Encoding

 column | before | after | savings |
 ----------- | ------ | ----- | ------ |
 `row_id` | `ZSTD` | `AZ64` | Switch to recommended encoding with savings of 10.0% |
 `test_column` | `LZO` | `ZSTD` | Switch to recommended encoding with savings of 27.0% |

### Sort

* Before: `test_column`
* After: `event_ts`
* Reason: Made `event_ts` the only sort key.

### Distribution

* Before: `test_column`
* After: `test_column`
* Reason: Currently there is no logic to automatically change the distribution if required.

## When reviewing, please focus on:

* Types and lengths changed, impacting
	* `test_column`
* Columns were dropped, impacting
	* `empty_column`
* Encodings changed, impacting
	* `row_id`
	* `test_column`
* Distribution key, **human intervention is required**
* Sort key changed, impacting
	* `event_ts`
	* `test_column`
	

Sample PR writeup

CREATE TABLE public.sample_table_temp (
  test_column CHARACTER(36) NOT NULL ENCODE ZSTD, -- Corrected encodings that were mismatched, such as using tightest compression or not encoding the sort key. Optimize character column so it's just the size it needs to be.
  row_id BIGINT identity(0, 1) PRIMARY KEY NOT NULL ENCODE AZ64, -- Corrected encodings that were mismatched, such as using tightest compression or not encoding the sort key.
  event_ts TIMESTAMP NOT NULL ENCODE RAW
)
DISTSTYLE KEY
DISTKEY(test_column) -- Currently there is no logic to automatically change the distribution if required.
COMPOUND SORTKEY(event_ts); -- Made `event_ts` the only sort key.

INSERT INTO public.sample_table_temp (
  test_column,
  event_ts
)
(SELECT
  test_column,
  event_ts
FROM public.sample_table);

DROP TABLE public.sample_table;
ALTER TABLE public.sample_table_temp RENAME TO sample_table;

DELETE FROM metadata.comments WHERE schema_name = 'public' AND table_name = 'sample_table' AND column_name = 'empty_column';

GRANT ALL ON public.sample_table TO GROUP human_users;
GRANT SELECT ON public.sample_table TO GROUP system_users;

ANALYZE public.sample_table;

Sample SQL migration

And as always, do be sure to include a wide variety of emojis in your PRs. The PR might be from some code but that code is still, in this instance, a teammate doing their best.

Final thoughts

Life can only be understood backwards; but it must be lived forwards. [3]

Converting the embedded SQL generator to a stand alone SQL producer probably struck outside people as a weird thing to give attention to: after all, the current thing works fine enough, so like… who cares?

Well, “works fine enough” isn’t the same as “works.” We were relying on it more and more as a company, all while it became harder to maintain and missed more edge cases. The long turn around was causing ongoing confusion. The Hack Day project in Python that the SQL generator had started out as needed to, finally, become a true production-ready system.

It’s a big system, bigger than the valve; its Python implementations hid how complex it was. I like to say that while the valve is complicated to explain, it’s got a simple implementation — the SQL producer is the reverse. You really become aware of how much you know and how many heurestic rules you use to do this sort of work once you start getting it down into code with numerous tests to verify everything. Even within the team, there were differences in what we looked for and how we decided what to do with the same information.

But it’s a great system: it’s a second example of Scala and Kafka consumers, it reacts quickly (great for inspiring more streaming ideas), and it allows humans to not even have to think about it or the problems it addresses. If you’re needed, a PR will tag you and Slack will have a message; otherwise, you keep doing your thing.

Truthfully, it’s been one of my favorite systems to work on, even when it aggrevates me to no end. It combines so many different pieces (Kafka, Avro, Schema Registry, Redshift, SQL) in a way that makes sense and relieves the burden of work on me. I used to spend a lot of time creating, updating, and optimizing tables, which led to lots of mistakes no one caught or lots of tradeoffs because I didn’t have the time — no more! :tada: And it shows how the implementation language can impact the implementation you produce: you might start off picking what everyone is most comfortable with but ultimately you’ll need to use what’s the right language or framework or set of tools for the problem at hand, otherwise you’ll have friends for "∅⦰⦱⦲⦳⦴". You don’t want friends for "∅⦰⦱⦲⦳⦴".

You do, however, want automated PRs with emojis. Trust me, it’ll make you smile every time.


Appendix A: Redshift optimization queries

Please read my crash course to Redshift for a more dedicated walkthrough of Redshift basics and early optimizations to focus on. A lot of the queries included below are described there in more detail for newer Redshift users.

Our code uses the AWS Redshift JDBC driver for Java without the AWS SDK but any Postgres connector should work. I’m providing the queries as Scala strings with basic interpolation, so it’s obvious what values need to be passed in from the service running these queries. You parameterize your queries as you like though for production systems.

Also, because the JDBC returns truly the funkiest data structure anyone has seen, here’s the StackOverflow you’d probably search the Internet for about turning the JDBC results into a Scala map along with the realization of it we use, you’re welcome. (Yes, we do link to the answer directly in our code, you should too.) Assuming your JDBC results are stored in a results variable:

// https://codereview.stackexchange.com/questions/64662/realizing-a-sql-resultset-into-a-map-in-scala
Iterator
	.continually(results.next)
	.takeWhile(identity)
	.map(_ => (
    	for (column <- columns)
    	yield column -> results.getObject(column)
  	).toMap
  )
	.toSet

This returns a set of maps, where each element in the set is a row and each map is the column to value of that row. Highly recommending setting type WarehouseRecord = Map[String, Any] and type WarehouseResults = Set[WarehouseRecord] to make it just that bit more obvious, even if Scala doesn’t yet have opaque type aliasing.

Schema of a table

s"""SELECT *
FROM pg_table_def
WHERE
	schemaname = '$schema'
	AND tablename = '$table'"""

While this query is great to get an overview of what the table currently looks like, we’ve also found it helpful in seeing if a human already updated a table ahead of the system or if the “revised” table the system will suggest a migration for is actually that different from the table right now.

Metadata about a table on disk

First execute

s"ANALYZE $schema.$table;"

to refresh Redshift’s metadata, then execute

s"""SELECT
    results.rows AS numRows,
    tableInfo.unsorted AS percentUnsorted,
    tableInfo.size AS sizeOnDiskInMB,
    tableInfo.max_varchar AS maxVarCharColumn,
    tableInfo.encoded AS encodingDefinedAtLeastOnce,
    tableInfo.diststyle AS distStyle,
    tableInfo.sortkey_num AS numSortKeys,
    tableInfo.sortkey1 AS sortKeyFirstColumn
FROM SVV_TABLE_INFO AS tableInfo
LEFT JOIN STL_ANALYZE AS results
ON results.table_id = tableInfo.table_id
WHERE
    tableInfo.schema = '$schema'
    AND tableInfo.table = '$table'
ORDER BY results.endtime DESC
LIMIT 1;"""

to get the latest metadata for yourself. The results tell you things like if you’re missing encodings (bad), the size on disk (to determine how much of an impact tweaking this table might have), and what your sort and distribution currently look like. Great for both “what do we fix?” and “what is the benefit of doing the fix?”

Skew of a table across the cluster

s"""SELECT skew_rows
FROM svv_table_info
WHERE
	schema = '$schema'
	AND "table" = '$table';"""

This is a handy one I learned while looking for ways to automate distribution suggestions. Skew can be particularly hard to spot as the table needs time to accumulate data before a bad distribution style or key becomes evident. Ideal skew is 1.0; we choose to recommend distribution optimization on any table with skew of 3.0 or higher. Like golf, lower is better here.

s"ANALYZE COMPRESSION $schema.$table"

I have seen Redshift recommend we bounce a particular column between two encoding types, over and over, so we tend to only use a recommendation if there’s other changes we’re making or the change will save us a minimum amount of space on disk. You can combine this with metadata about the table’s size on disk to figure out if there’s enough savings to make it worth it:

def encodingsIndiciateOptimize(schema: String, table: String, eventKey: String, diskSavingsMinimum: Double): Boolean = {
  Redshift.recommendedEncodingsFor(schema, table) match {
    case Success(results) =>
    	val sizeOnDisk = Redshift
    		.execute(
          s"""SELECT tableInfo.size AS sizeOnDiskInMB
FROM SVV_TABLE_INFO AS tableInfo
LEFT JOIN STL_ANALYZE AS results
ON results.table_id = tableInfo.table_id
WHERE
    tableInfo.schema = '$schema'
    AND tableInfo.table = '$table'
ORDER BY results.endtime DESC
LIMIT 1;""")
    		.get
    		.head("sizeOnDiskInMB".toLowerCase)
    		.asInstanceOf[Long]
    	val savings = results
    		.filter(_.column != "row_id") // our surrogate primary key
    		.filter(_.column != eventKey) // our standard sort key
    		.map(result => result.savings / 100.0 * sizeOnDisk)
    		.fold(0.0)(_ + _)
    	savings >= diskSavingsMinimum
    case Failure(_) => false
  }
}

We look for at least 25 GB of savings typically, to ensure doing the work is worth it, but we might drop the amount soon as all of our really poorly encoded tables have already been found.

(For a really thrilling/terrifying warehouse, you might want to start higher to focus on the biggest wins possible with encodings, especially if you’re trying to build an argument for spending time optimizing tables by hand or building out your own automation. Tweaking two tables for us one time saved us terabytes of data and sped just about every query in the warehouse up.)

Max and min length of varying character columns

s"SELECT
	MAX(OCTET_LENGTH(${column.name})) AS max,
	MIN(OCTET_LENGTH(${column.name})) AS min
FROM $schema.$table;"

This query actually let’s us do a couple of things:

  • if both lengths are 0, the column is empty so can possibly be dropped

  • if both lengths are the same, we can convert a VARYING CHARACTER column to a CHARACTER column

  • if the max is under where the schema indicates we set the limit, we can lower it to something more realisitic

We use powers of 2 to make a recommendation, such as a column with a max value length of 92 characters being set to allow a max of 128 characters instead of 256 or 1024 characters. This is less for performance and more for, when a human looks at a column, having a vague idea of how much shtuff each value contains. A field called “name” that’s 1024 characters wide is a weird thing to find in the wild; a field called “name” that’s 64 characters wide makes more sense mentally.

(If you’re wondering with we use OCTET_LENGTH in this query: emojis.)

Tables that truly need your love and attention

I’m not going to pretend to fully understand the following query; the Redshift Advisor suggested it for finding what they considered poorly optimized tables. What is helpful about this query (which I’m sure AWS has an explanation for somewhere though I’ve tweaked it a bit) is that it surfaces tables that truly need your love and attention as soon as you can give it to them. Even if you’re not going to have your SQL producer optimize tables, this is helpful for a human to use to find where to look in Redshift and put attention.

s"""SELECT DISTINCT ti."table" AS "table"
FROM svv_table_info AS ti
LEFT JOIN(
    SELECT
        tbl AS table_id,
        COUNT(*) AS size
    FROM stv_blocklist
    WHERE (tbl, col) IN (
        SELECT
            attrelid,
            attnum - 1
        FROM pg_attribute
        WHERE
             attencodingtype IN (0,128)
             AND attnum > 0
             AND attsortkeyord != 1
    )
    GROUP BY  tbl
) AS raw_size USING (table_id)
WHERE
    raw_size.size IS NOT NULL
    AND (
      raw_size.size > 100
      OR skew_rows > 3.0
    )
    AND ti.schema = '$schema'
;"""

Appendix B: select Github logic

We use this Github Java driver for interacting with the Github API but others are available, both natively in Scala and Java. The Github API has a lot of power but can be hard for a new person to wrap their head around, thus why I am providing our code essentially as-is. (Also shoutout to GC alumni Hesham, now at Github, who helped me debug my problems and make my ideas a reality!) With this base, you should be able to tweak anything to match your needs while also finding other functionality to add following a similar pattern.

Our setup involves connecting to a specific repo using an access token but you can make it more generic if necessary. We also use some established values like defaultBaseBranch and pathToMigrations (since this system explicitly puts out migrations) which can be easily swapped out for your specific needs or, again, made more generic.

class Github(accessToken: String, repoName: String) {
  private val repo = new GitHubBuilder()
    .withOAuthToken(accessToken, organization)
    .build
    .getRepository(s"$repoName")

  def getBranch(branchName: String): GHRef = repo.getRef(s"heads/$branchName")

  private def makeBranch(branchName: String): GHRef = {
    val base = repo.getRef(s"heads/$defaultBaseBranch")
    val baseSha = base.getObject.getSha
    repo.createRef(s"refs/heads/$branchName", baseSha)
  }

  /** If the branch does not yet exist, create it. If it does exist, it can be created again or returned as is. */
  def createBranch(branchName: String, deleteIfExists: Boolean = false): GHRef = {
    if (deleteIfExists) deleteBranch(branchName)

    Try(getBranch(branchName)) match {
      case Success(branch) => branch
      case Failure(_) => makeBranch(branchName)
    }
  }

  def deleteBranch(branchName: String): Unit = Try(getBranch(branchName)).map(branch => branch.delete)

  private def getMaxMigrationNumber: Int = repo
    .getDirectoryContent(pathToMigrations)
    .asScala
    .map(content => content.getName)
    .filter(_.startsWith("V"))
    .map(_.stripPrefix("V"))
    .map(_.split("__")(0))
    .map(_.toInt)
    .max

  def commit(migration: Migration, deleteBranchIfExists: Boolean = true): String = {
    val branch = createBranch(migration.branchName, deleteBranchIfExists)
    var nextMigrationNumber = getMaxMigrationNumber + 1

    migration
      .migrations
      .foreach(sql => {
        repo
          .createContent
          .branch(migration.branchName)
          .path(s"${pathToMigrations}V${nextMigrationNumber}__${migration.prType.toString}_${migration.schema}_${migration.table}.sql")
          .content(sql)
          .message(s"Creating migration to ${migration.prType.toString} ${migration.schema}.${migration.table} in warehouse")
          .commit
        nextMigrationNumber += 1
      })

    migration.branchName
  }

  /** If a branch has had commits since a given datetime. */
  def hasCommitsSince(branchName: String, since: Date): Boolean = repo
    .queryCommits
    .from(branchName)
    .since(since)
    .list
    .toList
    .size > 0

  def makePullRequest(branchName: String, migration: Migration): URL = new URL(
    repo
      .createPullRequest(s"${migration.prTitle} ($environment)", branchName, defaultBaseBranch, migration.prDescription, true)
      .getIssueUrl
      .toString
      .replace("api.", "")
      .replace("repos/", "")
      .replace("issues", "pull")
  )

  def getContentsOf(branchName: String, path: String): String = repo
    .getFileContent(path, s"heads/$branchName")
    .getContent

  def updateContentsOf(branchName: String, path: String, newContent: String, commitMessage: String): Unit = repo
    .getFileContent(path, s"heads/$branchName")
    .update(newContent, commitMessage, branchName)
}

Image of orange kitten in field of grass

I felt bad there were no images in this post so here’s a kitten, thank you for making it this far. :bowing_woman:


Footnotes

  1. Italo Calvino, If on a Winter’s Night a Traveler.
  2. Federico Fellini.
  3. Søren Kierkegaard’s journals.
  4. Will Durant.
  5. Antoine-Laurent de Lavoisier, Elements of Chemistry.
  6. Frederick P. Brooks Jr., Mythical Man Month.
  7. Edna St. Vincent Millay, Renascence and Other Poems.
  8. Kurt Vonnegut, Hocus Pocus.