A pattern for processing heterogeneous data sources

Published on Saturday, 07 March 2015

Introduction

Processing files and other data sources is a common issue in software development. Batch processing financial operations or enabling communication between systems and their stakeholders are just a couple of examples. If you're a developer it is very likely you will have to deal with massive input sources: validating them, filtering the content and processing it to satisfy business needs.

In this article we will see how to refactor a non scalable procedural piece of code into a set of components with well defined responsibilities, that are easier to extend and cheaper to maintain.

The code you're going to see here is quite simple. I wouldn't recommend applying the techniques described here in such a simple case, but keeping the problem small will help focus on the solution and ignore irrelevant noise. So please, think of the big picture.

If you want more detailed information about the refactors made, please look at the list of commits in Github.

Applying the pattern

The starting point

Look at the following code:

# Original version

module Loader
  def self.load(path)
    entries = []
    CSV.foreach(path) do |row|
      entries << OpenStruct.new({
        name: row[0].strip,
        balance: row[1].to_f,
        account_number: row[2]
      }) if row[1] =~ /[0-9]+\.[0-9]{2}/
    end

    entries
  end
end

This small piece of Ruby code is doing a bunch of things. It is reading a CSV file and creating instances with bank account information. It also discards rows where the balance is not numeric, and also processes the rest of the fields by manipulating the content, i.e., it removes trailing spaces in the name and converts the balance to float. By the way, I realise that the regular expression does not allow negative balances. For the sake of the readability of this whole post let's assume that it is always positive.

Lots of different concerns are mixed together. If we left this piece of code evolve the way it is written, we would probably find more and more conditionals and manipulations in it. We are going to separate some of these responsibilities out by using object oriented refactoring techniques.

Extract the balance validation

Our first step will be to move the validation step to another place. We'll do this by extracting a class to represent the field containing the balance, and then we'll move the validation there.

# extract the balance validation

module Loader
  module Fields
    class Balance
      def valid?(value)
        value =~ /[0-9]+\.[0-9]{2}/
      end
    end
  end

  def self.load(path)
    entries = []
    CSV.foreach(path) do |row|
      balance = Fields::Balance.new
      entries << OpenStruct.new({
        name: row[0].strip,
        balance: row[1].to_f,
        account_number: row[2]
      }) if balance.valid?(row[1])
    end

    entries
  end
end

With this extraction we've added more code and it doesn't seem easier to understand, right?. Lets keep working on it.

Extract the balance manipulation

Now that we have we have that new Fields::Balance class, we're going to put more behaviour on it. Let's make it responsible for casting the content to float.

# extract the balance manipulation

module Loader
  module Fields
    class Balance
      def extract(value)
        value.to_f 
      end

      def valid?(value)
        value =~ /[0-9]+\.[0-9]{2}/
      end
    end
  end

  def self.load(path)
    entries = []
    CSV.foreach(path) do |row|
      balance = Fields::Balance.new
      entries << OpenStruct.new({
        name: row[0].strip,
        balance: balance.extract(row[1]),
        account_number: row[2]
      }) if balance.valid?(row[1])
    end

    entries
  end
end

Four more lines and a zero readability gain. Patience, please!

Extract a name field

Let's do the same we did with the balance field for the name.

# extract the name field

module Loader
  module Fields
    class Balance
      # ...
    end

    class Name
      def extract(value)
        value.strip
      end

      def valid?(value)
        true
      end
    end
  end

  def self.load(path)
    entries = []
    CSV.foreach(path) do |row|
      name = Fields::Name.new
      balance = Fields::Balance.new
      entries << OpenStruct.new({
        name: name.extract(row[0]),
        balance: balance.extract(row[1]),
        account_number: row[2]
      }) if balance.valid?(row[1])
    end

    entries
  end
end

You'll notice that the Name class has a useless valid? method that always returns true. It is not that useless, it maintains consistence with the Field public interface. It also makes it easy to add complex validations or content processing in the future.

That makes 38 lines of code, 25 more than the original version and a little gain in maintainability. Let's keep going.

Extract the account field

# extract the account field

module Loader
  module Fields
    class Balance
      # ...
    end

    class Name
      # ...
    end

    class AccountNumber
      def extract(value)
        value
      end

      def valid?(value)
        true
      end
    end
  end

  def self.load(path)
    entries = []
    CSV.foreach(path) do |row|
      name = Fields::Name.new
      balance = Fields::Balance.new
      account_number = Fields::AccountNumber.new
      entries << OpenStruct.new({
        name: name.extract(row[0]),
        balance: balance.extract(row[1]),
        account_number: account_number.extract(row[2])
      }) if balance.valid?(row[1])
    end

    entries
  end
end

Finally we are done with the fields. There is a consistent, albeit verbose way to handle the fields in the file. But we still have a long road to walk before we make the code better than it was.

Making the row the unit of work

The code above validates each field one by one, in this step we're going to change it and apply the validation to the entire line. We'll do that with a simple refactoring technique: Extract Method.

# validate entire rows

module Loader
  module Fields
    # ...
  end

  def self.load(path)
    entries = []
    CSV.foreach(path) do |row|
      name = Fields::Name.new
      balance = Fields::Balance.new
      account_number = Fields::AccountNumber.new
      if self.row_valid?(row)
        extracted_fields = {
          name: name.extract(row[0]),
          balance: balance.extract(row[1]),
          account_number: account_number.extract(row[2])
        }
        entries << OpenStruct.new(extracted_fields)
      end
    end

    entries
  end

  def self.row_valid?(row)
    layout = [
      Fields::Name.new,
      Fields::Balance.new,
      Fields::AccountNumber.new
    ]
    layout.zip(row).all? do |field, value|
      field.valid?(value)
    end
  end
end

You'll see that I've replicated the construction of the field instances. I will understand if you give up at this point and stop reading this article. If you are still here, I feel obliged to tell you this is going to get even worse.

Extract fields by row

Now we are going to carry out the extraction taking the full row into consideration instead of doing it field by field, the same way we did with the validation in the previous step. In order to do this, we need to add a bit more stuff to the fields classes.

# extract entire rows

module Loader
  module Fields
    class Balance
      def id
        :balance
      end

      # ...
    end

    class Name
      def id
        :name
      end

      # ...
    end

    class AccountNumber
      def id
        :account_number
      end

      # ...
    end
  end

  def self.load(path)
    entries = []
    CSV.foreach(path) do |row|
      if self.row_valid?(row)
        entries << OpenStruct.new(extract_fields(row))
      end
    end

    entries
  end

  def self.row_valid?(row)
    layout = [
      Fields::Name.new,
      Fields::Balance.new,
      Fields::AccountNumber.new
    ]
    layout.zip(row).all? do |field, value|
      field.valid?(value)
    end
  end

  def self.extract_fields(row)
    layout = [
      Fields::Name.new,
      Fields::Balance.new,
      Fields::AccountNumber.new
    ]

    fields = {}
    layout.zip(row).each do |field, value|
      fields[field.id] = field.extract(value)
    end

    fields
  end
end

We have added an id() method to the field classes that give us semantics to build the hash for the OpenStruct. We have moved the extraction to a method, in a way similar to the validation. And we have introduced clear replication we can start working with.

Extract a Layout class

# extract Layout class

module Loader
  module Fields
    # ...
  end

  class Layout
    HEADERS = [
      Fields::Name.new,
      Fields::Balance.new,
      Fields::AccountNumber.new
    ]

    def valid?(row)
      HEADERS.zip(row).all? do |field, value|
        field.valid?(value)
      end
    end

    def extract(row)
      fields = {}
      HEADERS.zip(row).each do |field, value|
        fields[field.id] = field.extract(value)
      end

      fields
    end
  end

  def self.load(path)
    entries = []
    layout = Layout.new
    CSV.foreach(path) do |row|
      entries << OpenStruct.new(layout.extract(row)) if layout.valid?(row)
    end

    entries
  end
end

Our code is now 79 lines long while the original one was 13. But we have extracted some of the responsibilities to the load() method. With the separation of concerns the readability has improved significantly, but we can still improve the code.

Extract CSVSource class

The title of this post contains the word heterogeneous for a good reason. One of the strengths of the pattern I'm developing here is that it will make it easier to switch between different data sources: CSV, XML, XLS, Databases or remote services. First, we'll need an abstraction to let us build a common interface shared among these different sources.

# extract CSVSource class

module Loader
  module Fields
    # ...
  end

  class Layout
    # ...
  end

  class CSVSource
    def initialize(path)
      @path = path
    end

    def rows
      CSV.read(@path)
    end
  end

  def self.load(path)
    entries = []
    layout = Layout.new
    rows = CSVSource.new(path).rows
    rows.each do |row|
      entries << OpenStruct.new(layout.extract(row)) if layout.valid?(row)
    end

    entries
  end
end

There is something to notice here. The load() method asks the CSVSource for the rows, and then it passes them to the layout. So it is only mediating between both classes. Unless it is obviously beneficial, I try to avoid these situations by removing the mediation and making the classes talk to each other directly.

Break the mediation

# implement the crawl method

module Loader
  module Fields
    # ...
  end

  class Layout
    def crawl(source)
      entries = []
      source.rows.each do |row|
        entries << OpenStruct.new(extract(row)) if valid?(row)
      end

      entries
    end

    private

    # ...
  end

  class CSVSource
    # ..
  end

  def self.load(path)
    layout = Layout.new
    source = CSVSource.new(path)
    layout.crawl(source)
  end
end

Using heterogeneous data sources

Once the pattern is implemented, it is really easy to switch to using different data source without changing a single line of code. Let's say that a second customer, another bank maybe, wants us to access to their database instead of sending us the file. Given that we have an abstraction for data sources, we can write the database equivalent for the CSVSource.

module Loader
  class DatabaseSource
    def rows
      rows = @connection.execute('SELECT * FROM `transfers`')
      sort_rows_for_layout
    end
  end

  def self.load(path)
    layout = Layout.new
    source = DatabaseSource.new(path)
    layout.crawl(source)
  end
end

The only thing we have to worry about is the order of the columns in the matrix provided by the rows() method. They must be compatible with the headers specified in the Layout class. This shared knowledge is one of the weaknesses of this pattern, since every time you need to change the Layout, you've to modify all the data sources to match the changes. When talking about patterns there are always tradeoffs.

What this pattern is not

The main target of this pattern is to separate concerns. On one side we decouple the concrete artefact that will give us the data. Then there is the layout, then the validation and manipulation of the fields, then the resulting objects construction. How these objects are consumed by the system is outside the scope.

No business logic should be found in the components described in this post. The layout is not responsible for interpreting the relations between the data. For example, if some columns are related, if for some reason some operation should be applied between two cells in a row, it must be done elsewhere. If the rows must be grouped by some criteria, it is someone else who should be in charge of that.

Problems

We commented above that there is a problem with the shared knowledge about the order of the fields between the Layout and the DataSource. Apart from that, the field objects have two different responsibilities: they not only clean the data they provide, but they also perform validation. There are probably better ways for doing this. I'm open to suggestions and will be happy to receive feedback from you.

Another thing to note is that you don't necessarily need that much granularity for the field classes. Let's imagine a Layout that contains two float fields, you could write a generic FloatField that receives the id in construction:

# generic fields

class FloatField
  attr_reader :id

  def initialize(id)
    @id = id
  end

  def valid?(value)
    # ...
  end

  def extract(value)
    # ...
  end
end


headers = [
  FloatField.new(:balance),
  FloatField.new(:interest_rate),
  # ...
  ]

Note for Ruby ninjas

I'm relatively new to the Ruby language so if you're an expert Ruby programmer you'll surely know better ways to process arrays and stuff like that. Don't hesitate to send me any Pull Request with improvements to the examples.