(Insightful) Ramblings by batasrki

No subtitle needed

CRDT for Real Data in Riak and Ruby

| Comments

Intro

In the previous post, I briefly introduced CRDTs in Ruby and have shown a basic usage case for one Conflict-Free Replicated Data Type. Tonight, we will explore one slightly more complicated use case that can be seen in real applications in the wild.

Use case

For the purpose of this post, imagine we’re running a Netflix clone. We have accounts and we have movies. For the purposes of recommendations, we want to store what movies each account holder has viewed. As is the case with Netflix, an account may be signed in and used across more than one device by holders of differing tastes. Since we don’t really want to impose the restriction of just one account being signed in, we will record statistics from each session.

As you can see, this may lead to write conflicts, since any account holder may want to watch any movie at any time. For the sake of efficiency, we will only store an ID to each movie in the account’s movies watched collection.

The data type used

Out of the currently known CRDTs, the one that seems to fit this problem the best is the GSet, or Grow-only Set, data type. As its name suggests, this CRDT only ever grows in size, it does not shrink. Since recommendation engines thrive on tonnes of data, having an ever-increasing set of records is very beneficial.

The basic layout of a GSet is as follows:

1
2
3
4
{
  'type': 'g-set',
  'e': ['a', 'b', 'c']
}

The type information there is for the meangirls library to invoke the correct operational semantics. Other than that, it’s just a simple list of data, always growing in size. Only, the meangirls library does not include a class for a GSet. It has a class for TwoPhaseSet, which is a combination of two GSets, but it doesn’t actually implement a GSet by itself.

That’s not a big deal, we will implement one ourselves.

GSet implementation

Since it has fairly simple requirement, let’s take a look at the GSet class. I’ll explain parts of it after the code listing.

gset.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class GSet < Meangirls::Set
  attr_accessor :a

  def initialize(hash=nil)
    if hash
      raise ArgumentError, "hash must contain a" unless hash['a']

      @a = Set.new hash['a']
    else
      # Empty set
      @a = Set.new
    end
  end

  def <<(e)
    @a << e
    self
  end
  alias add <<

  def ==(other)
    other.kind_of?(self.class) && a == other.a
  end

  def as_json
    {
      'type' => type,
      'a' => a.to_a
    }
  end

  def clone
    c = super
    c.a = a.clone
    c
  end

  def merge(other)
    unless other.kind_of? self.class
      raise ArgumentError, "other must be a #{self.class}"
    end

    self.class.new('a' => (a | other.a))
  end

  def include?(e)
    @a.include? e
  end

  def to_set
    @a
  end

  def type
    'g-set'
  end
end

If you peek at meangirls implementation of a TwoPhaseSet, you will notice that we copied most of its code into here, save for the remove set logic. Let’s go through the important parts.

1
2
3
4
5
6
7
8
9
10
def initialize(hash=nil)
  if hash
    raise ArgumentError, "hash must contain a" unless hash['a']

    @a = Set.new hash['a']
  else
    # Empty set
    @a = Set.new
  end
end

Our constructor may or may not take a hash. This is done, so that we can re-construct a GSet from JSON data, such as a Riak record.

1
2
3
4
5
def <<(e)
  @a << e
  self
end
alias add <<

We only ever add elements to our GSet. Additional guarantees, like that an element can only be an Integer, can be added here.

1
2
3
4
5
6
7
def merge(other)
  unless other.kind_of? self.class
    raise ArgumentError, "other must be a #{self.class}"
  end

  self.class.new('a' => (a | other.a))
end

When merging, we first must ensure that we’re merging a GSet to a GSet. We would get into weird situations and breakage, otherwise. After that, a simple set union over our 2 sets of IDs will ensure we only ever have unique ID values here.

Setup

With that done, let’s add models for the Account records and Movie records. Persistence is done somewhere else. This time, we will store records through irb.

account.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Account
  attr_accessor :username, :email, :movies_watched

  def initialize(hash=nil)
    if hash
      @username = hash["username"]
      @email = hash["email"]
      @movies_watched = GSet.new(hash["movies_watched"])
    else
      @movies_watched = GSet.new
    end
  end

  def as_json
    {
      "username" => username,
      "email" => email,
      "movies_watched" => movies_watched.as_json
    }
  end

  def to_json
    as_json.to_json
  end
end
movie.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Movie
  attr_accessor :title

  def initialize(hash=nil)
    if hash
      @title = hash["title"]
    end
  end

  def as_json
    {
      "title" => title
    }
  end

  def to_json
    as_json.to_json
  end
end

Let’s store a few arbitrary movies, keep their keys in memory and cause a write conflict. We will then see how we can resolve this conflict through the usage of our CRDT.

irb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
movie_bucket = client.bucket("movies")
 => #<Riak::Bucket {movies}>

movie_record_keys = []

m1 = Movie.new("title" => "Test Movie")
 => #<Movie:0x007ffa0be21348 @title="Test Movie">

m2 = Movie.new("title" => "Another test movie")
 => #<Movie:0x007ffa0be43ba0 @title="Another test movie">

r1 = movie_bucket.new
 => #<Riak::RObject {movies} [#<Riak::RContent [application/json]:nil>]>

r1.data = m1.as_json
 => {"title"=>"Test Movie"}

r1.store
 => #<Riak::RObject {movies,CAQ3ewkn0vVI5q8Kxt4LPcrFYk9} [#<Riak::RContent [application/json]:{"title"=>"Test Movie"}>]>

movie_record_keys << r1.key

r2 = movie_bucket.new
 => #<Riak::RObject {movies} [#<Riak::RContent [application/json]:nil>]>

r2.data = m2.as_json
 => {"title"=>"Another test movie"}

r2.store
 => #<Riak::RObject {movies,aU6uCCZJ10xGIPJonpSYftZ4H0w} [#<Riak::RContent [application/json]:{"title"=>"Another test movie"}>]>

movie_record_keys << r2.key
 => ["CAQ3ewkn0vVI5q8Kxt4LPcrFYk9", "aU6uCCZJ10xGIPJonpSYftZ4H0w"]

OK, now we have two movies, so let’s simulate a simultaneous account access by 2 holders, each of which is watching a movie. As shown in the previous post, storing values at the same time will cause a write conflict, resulting in creation of siblings. As usual, the accounts bucket should have allow_mult set to true, so that Riak doesn’t discard one of the writes.

Let’s first create an account.

irb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
accounts_bucket = client.bucket("accounts")
 => #<Riak::Bucket {accounts}>

accounts_bucket.allow_mult = true
 => true

account = Account.new
 => #<Account:0x007ffa0ce89758 @movies_watched=#<GSet:0x007ffa0ce89730 @a=#<Set: {}>>>

account.username = "an-user"
 => "an-user"
account.email = "anuser@email.com"
 => "anuser@email.com"

record = accounts_bucket.new
 => #<Riak::RObject {accounts} [#<Riak::RContent [application/json]:nil>]>

record.data = account.as_json
 => {"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>[]}}

record.store
 => #<Riak::RObject {accounts,6z4xJjAFMJVHWn6Ni2MF5J3IBBq} [#<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>[]}}>]>

OK, with the record stored, we retrieve it twice without updating. This ensures that the vclock value for the record is not updated, allowing us the opportunity to cause a write conflict.

irb
1
2
3
4
5
6
7
8
9
10
one_access = accounts_bucket.get "6z4xJjAFMJVHWn6Ni2MF5J3IBBq"
 => #<Riak::RObject {accounts,6z4xJjAFMJVHWn6Ni2MF5J3IBBq} [#<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>[]}}>]>

# same as above
second_access = accounts_bucket.get "6z4xJjAFMJVHWn6Ni2MF5J3IBBq"

first_instance = Account.new(one_access.data)
 => #<Account:0x007ffa0cf2b710 @movies_watched=#<GSet:0x007ffa0cf2b670 @a=#<Set: {}>>, @username="an-user", @email="anuser@email.com">

second_instance = Account.new(second_access.data)

Let’s now add the ID of the first created movie to one instance of this Account record and the ID of the second movie to the other one.

irb
1
2
3
4
5
first_instance.movies_watched.add(movie_record_keys.first)
 => #<GSet:0x007ffa0cf2b670 @a=#<Set: {"CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"}>>

second_instance.movies_watched << movie_record_keys.last
 => #<GSet:0x007ffa0cf31f70 @a=#<Set: {"aU6uCCZJ10xGIPJonpSYftZ4H0w"}>>

Let’s now try storing these objects back into Riak and seeing what happens.

irb
1
2
3
4
5
6
7
8
9
10
11
one_access.data = first_instance.as_json
 => {"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"]}}

one_access.store
 => #<Riak::RObject {accounts,6z4xJjAFMJVHWn6Ni2MF5J3IBBq} [#<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"]}}>]>

second_access.data = second_instance.as_json
 => {"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["aU6uCCZJ10xGIPJonpSYftZ4H0w"]}}

second_access.store
 => #<Riak::RObject {accounts,6z4xJjAFMJVHWn6Ni2MF5J3IBBq} [#<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["aU6uCCZJ10xGIPJonpSYftZ4H0w"]}}>, #<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"]}}>]>

And, there we go. We have successfully (again) caused a conflict. Let’s write some code to resolve it. The trick here to resolving conflicts like this is that you must not access the data nor the raw_data methods. These will throw an exception and mess up your day.

Let’s instead, introduce a class method in the Account model that will handle conflict resolution for us.

account.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class Account
  # existing methods
  #
  #
  #
  def self.from_persistence(obj)
    if obj.conflict?
      resolved = {}
      watched_lists = []
      obj.siblings.each do |sibling|
        watched_lists << sibling.data.delete("movies_watched")
        resolved.merge!(sibling.data)
      end

      first_one = GSet.new(watched_lists.shift)
      watched_lists.each {|list| first_one = first_one.merge(GSet.new(list))}

      resolved.merge!("movies_watched" => first_one.as_json)
      new(resolved)
    else
      new(obj.data)
    end
  end
end

In this method, we check first if the raw record coming from Riak has the conflict? flag set, which indicates that there’s been a write conflict somewhere. If it is, we loop through all the siblings and pull out the movies_watched lists. The rest of this record should not be subject to conflict resolution, so we will accept whatever values there are for them.

We then initialize the first item in the list of GSets and then merge the others in that list with the first one. We append this to the hash of resolved values and then instantiate our Account model with the new values.

The second part to this conflict resolution is storing the resolved object back into Riak. Again, as in the previous post, we do that by creating a new Riak::RObject instance, setting its key to the conflicted one and setting the siblings property of the conflicted instance to an array of Riak::RObjects of size 1. The only member is the new instance. We store that and we’re done.

irb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
record = accounts_bucket.get "6z4xJjAFMJVHWn6Ni2MF5J3IBBq"
 => #<Riak::RObject {accounts,6z4xJjAFMJVHWn6Ni2MF5J3IBBq} [#<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["aU6uCCZJ10xGIPJonpSYftZ4H0w"]}}>, #<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"]}}>]>

a = Account.from_persistence(record)
 => #<Account:0x007ff318320c68 @username="an-user", @email="anuser@email.com", @movies_watched=#<GSet:0x007ff318320bc8 @a=#<Set: {"aU6uCCZJ10xGIPJonpSYftZ4H0w", "CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"}>>>

resolved = accounts_bucket.new
 => #<Riak::RObject {accounts} [#<Riak::RContent [application/json]:nil>]>

resolved.data = a.as_json
 => {"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["aU6uCCZJ10xGIPJonpSYftZ4H0w", "CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"]}}

resolved.key = record.key
 => "6z4xJjAFMJVHWn6Ni2MF5J3IBBq"

record.siblings = [resolved]
 => [#<Riak::RObject {accounts,6z4xJjAFMJVHWn6Ni2MF5J3IBBq} [#<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["aU6uCCZJ10xGIPJonpSYftZ4H0w", "CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"]}}>]>]

record.store
 => #<Riak::RObject {accounts,6z4xJjAFMJVHWn6Ni2MF5J3IBBq} [#<Riak::RContent [application/json]:{"username"=>"an-user", "email"=>"anuser@email.com", "movies_watched"=>{"type"=>"g-set", "a"=>["aU6uCCZJ10xGIPJonpSYftZ4H0w", "CAQ3ewkn0vVI5q8Kxt4LPcrFYk9"]}}>]>

Conclusion

Hopefully, this has shed a bit more light on how CRDTs can be used in real applications. The big key here is that during data model design, a need for conflict resolution must be anticipated and then the data, or parts of it, must be structured as a CRDT. Once that is done, merging conflicted records is pretty easy and availability is preserved.

Until next time.

Comments