Class: WCC::Contentful::Store::PostgresStore

Inherits:
Base
  • Object
show all
Includes:
Instrumentation
Defined in:
lib/wcc/contentful/store/postgres_store.rb

Overview

Implements the store interface where all Contentful entries are stored in a JSONB table.

Defined Under Namespace

Classes: Query

Constant Summary collapse

@@schema_mutex =

This is intentionally a class var so that all subclasses share the same mutex

Mutex.new

Constants included from Interface

Interface::INTERFACE_METHODS

Instance Attribute Summary collapse

Attributes included from Instrumentation

#_instrumentation

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Instrumentation

#_instrumentation_event_prefix, instrument

Methods inherited from Base

#ensure_hash, #execute, #find_by, #index, #index?

Methods included from Interface

#find_by, #index, #index?

Constructor Details

#initialize(configuration = nil, connection_options = nil, pool_options = nil) ⇒ PostgresStore

Returns a new instance of PostgresStore.



20
21
22
23
24
25
26
27
28
# File 'lib/wcc/contentful/store/postgres_store.rb', line 20

def initialize(configuration = nil, connection_options = nil, pool_options = nil)
  super(configuration)
  @schema_ensured = false
  connection_options ||= { dbname: 'postgres' }
  pool_options ||= {}
  @connection_pool = PostgresStore.build_connection_pool(connection_options, pool_options)
  @dirty = Concurrent::AtomicBoolean.new
  @mutex = Mutex.new
end

Instance Attribute Details

#connection_poolObject (readonly)



17
18
19
# File 'lib/wcc/contentful/store/postgres_store.rb', line 17

def connection_pool
  @connection_pool
end

#loggerObject



18
19
20
# File 'lib/wcc/contentful/store/postgres_store.rb', line 18

def logger
  @logger
end

Class Method Details

.build_connection_pool(connection_options, pool_options) ⇒ Object

rubocop:disable Style/ClassVars



326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/wcc/contentful/store/postgres_store.rb', line 326

def build_connection_pool(connection_options, pool_options)
  ConnectionPool.new(pool_options) do
    PG.connect(connection_options).tap do |conn|
      unless schema_ensured?(conn)
        @@schema_mutex.synchronize do
          ensure_schema(conn) unless schema_ensured?(conn)
        end
      end
      prepare_statements(conn)
    end
  end
end

.ensure_schema(conn) ⇒ Object



350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/wcc/contentful/store/postgres_store.rb', line 350

def ensure_schema(conn)
  result =
    begin
      conn.exec('SELECT version FROM wcc_contentful_schema_version ' \
                'ORDER BY version DESC')
    rescue PG::UndefinedTable
      []
    end
  1.upto(EXPECTED_VERSION).each do |version_num|
    next if result.find { |row| row['version'].to_s == version_num.to_s }

    conn.exec(File.read(File.join(__dir__, "postgres_store/schema_#{version_num}.sql")))
  end
end

.prepare_statements(conn) ⇒ Object



314
315
316
317
318
319
320
321
# File 'lib/wcc/contentful/store/postgres_store.rb', line 314

def prepare_statements(conn)
  conn.prepare('upsert_entry', 'SELECT * FROM fn_contentful_upsert_entry($1,$2,$3)')
  conn.prepare('select_entry', 'SELECT * FROM contentful_raw WHERE id = $1')
  conn.prepare('select_ids', 'SELECT id FROM contentful_raw')
  conn.prepare('delete_by_id', 'DELETE FROM contentful_raw WHERE id = $1 RETURNING *')
  conn.prepare('refresh_views_concurrently',
    'REFRESH MATERIALIZED VIEW CONCURRENTLY contentful_raw_includes_ids_jointable')
end

.schema_ensured?(conn) ⇒ Boolean

Returns:

  • (Boolean)


339
340
341
342
343
344
345
346
347
348
# File 'lib/wcc/contentful/store/postgres_store.rb', line 339

def schema_ensured?(conn)
  result = conn.exec('SELECT version FROM wcc_contentful_schema_version ' \
                     'ORDER BY version DESC LIMIT 1')
  return false if result.num_tuples == 0

  result[0]['version'].to_i >= EXPECTED_VERSION
rescue PG::UndefinedTable
  # need to run v1 schema migration
  false
end

Instance Method Details

#delete(key) ⇒ Object



76
77
78
79
80
81
82
83
84
85
# File 'lib/wcc/contentful/store/postgres_store.rb', line 76

def delete(key)
  result =
    _instrument 'delete_by_id', key: key do
      @connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) }
    end

  return if result.num_tuples == 0

  JSON.parse(result.getvalue(0, 1))
end

#exec_query(statement, params = []) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/wcc/contentful/store/postgres_store.rb', line 108

def exec_query(statement, params = [])
  if @dirty.true?
    # Only one thread should call refresh_views_concurrently but all should wait for it to finish.
    @mutex.synchronize do
      # We have to check again b/c another thread may have gotten here first
      if @dirty.true?
        _instrument 'refresh_views' do
          @connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') }
        end
        # Mark that the views have been refreshed.
        @dirty.make_false
      end
    end
  end

  logger&.debug("[PostgresStore] #{statement} #{params.inspect}")
  _instrument 'exec' do
    @connection_pool.with { |conn| conn.exec(statement, params) }
  end
end

#find(key, **_options) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/wcc/contentful/store/postgres_store.rb', line 87

def find(key, **_options)
  result =
    _instrument 'select_entry', key: key do
      @connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) }
    end
  return if result.num_tuples == 0

  JSON.parse(result.getvalue(0, 1))
rescue PG::ConnectionBad
  nil
end

#find_all(content_type:, options: nil) ⇒ Object



99
100
101
102
103
104
105
106
# File 'lib/wcc/contentful/store/postgres_store.rb', line 99

def find_all(content_type:, options: nil)
  Query.new(
    self,
    content_type: content_type,
    options: options,
    configuration: @configuration
  )
end

#keysObject



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/wcc/contentful/store/postgres_store.rb', line 63

def keys
  result =
    _instrument 'select_ids' do
      @connection_pool.with { |conn| conn.exec_prepared('select_ids') }
    end

  arr = []
  result.each { |r| arr << r['id'].strip }
  arr
rescue PG::ConnectionBad
  []
end

#set(key, value) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/wcc/contentful/store/postgres_store.rb', line 30

def set(key, value)
  ensure_hash value

  result =
    _instrument 'upsert_entry' do
      @connection_pool.with do |conn|
        conn.exec_prepared('upsert_entry', [
                             key,
                             value.to_json,
                             quote_array(extract_links(value))
                           ])
      end
    end

  previous_value =
    if result.num_tuples == 0
      nil
    else
      val = result.getvalue(0, 0)
      JSON.parse(val) if val
    end

  if views_need_update?(value, previous_value)
    # Mark the views as needing to be refreshed, they will be refreshed on the next query.
    was_dirty = @dirty.make_true
    # Send out an instrumentation event if we are the thread that marked it dirty
    # (make_true returns true if the value changed)
    _instrument 'mark_dirty' if was_dirty
  end

  previous_value
end