[redis] More connection pooling changes

This commit is contained in:
Fosco Marotto 2021-01-17 17:36:20 -05:00
parent d2d381eb90
commit 002441af1f
11 changed files with 89 additions and 49 deletions

@ -9,15 +9,19 @@ class ActivityTracker
def increment(prefix)
key = [prefix, current_week].join(':')
redis.incrby(key, 1)
redis.expire(key, EXPIRE_AFTER)
redis.with do |conn|
conn.incrby(key, 1)
conn.expire(key, EXPIRE_AFTER)
end
end
def record(prefix, value)
key = [prefix, current_week].join(':')
redis.pfadd(key, value)
redis.expire(key, EXPIRE_AFTER)
redis.with do |conn|
conn.pfadd(key, value)
conn.expire(key, EXPIRE_AFTER)
end
end
private

@ -4,7 +4,7 @@ require 'singleton'
class FeedManager
include Singleton
include Redisable
# include Redisable
MAX_ITEMS = 150

@ -19,17 +19,24 @@ class PotentialFriendshipTracker
key = "interactions:#{account_id}"
weight = WEIGHTS[action]
redis.zincrby(key, weight, target_account_id)
redis.zremrangebyrank(key, 0, -MAX_ITEMS)
redis.expire(key, EXPIRE_AFTER)
redis.with do |conn|
conn.zincrby(key, weight, target_account_id)
conn.zremrangebyrank(key, 0, -MAX_ITEMS)
conn.expire(key, EXPIRE_AFTER)
end
end
def remove(account_id, target_account_id)
redis.zrem("interactions:#{account_id}", target_account_id)
redis.with do |conn|
conn.zrem("interactions:#{account_id}", target_account_id)
end
end
def get(account_id, limit: 10, offset: 0)
account_ids = redis.zrevrange("interactions:#{account_id}", offset, limit)
account_ids = []
redis.with do |conn|
account_ids = conn.zrevrange("interactions:#{account_id}", offset, limit)
end
return [] if account_ids.empty?
Account.searchable.where(id: account_ids).local
end

@ -10,11 +10,16 @@ class VerifiedSuggestions
def set(account_ids)
return if account_ids.nil? || account_ids.empty?
redis.setex(KEY, EXPIRE_AFTER, account_ids)
redis.with do |conn|
conn.setex(KEY, EXPIRE_AFTER, account_ids)
end
end
def get(account_id)
account_ids = redis.get(KEY)
account_ids = []
redis.with do |conn|
account_ids = conn.get(KEY)
end
if account_ids.nil? || account_ids.empty?
account_ids = Account.searchable
@ -24,7 +29,7 @@ class VerifiedSuggestions
.local
.limit(MAX_ITEMS)
.pluck(:id)
set(account_ids) if account_ids.nil? || account_ids.empty?
else
account_ids = JSON.parse(account_ids)

@ -15,12 +15,16 @@ class Feed
protected
def from_redis(limit, max_id, since_id, min_id)
if min_id.blank?
max_id = '+inf' if max_id.blank?
since_id = '-inf' if since_id.blank?
unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:first).map(&:to_i)
else
unhydrated = redis.zrangebyscore(key, "(#{min_id}", '+inf', limit: [0, limit], with_scores: true).map(&:first).map(&:to_i)
unhydrated = []
redis.with do |conn|
if min_id.blank?
max_id = '+inf' if max_id.blank?
since_id = '-inf' if since_id.blank?
unhydrated = conn.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:first).map(&:to_i)
else
unhydrated = conn.zrangebyscore(key, "(#{min_id}", '+inf', limit: [0, limit], with_scores: true).map(&:first).map(&:to_i)
end
end
Status.where(id: unhydrated).cache_ids

@ -18,6 +18,8 @@ class LinkBlock < ApplicationRecord
return false if text.nil?
return false if text.length < 1
return true if text.include? '.weebly.com'
urls = text.scan(FetchLinkCardService::URL_PATTERN).map {|array|
Addressable::URI.parse(array[0]).normalize
}
@ -30,4 +32,4 @@ class LinkBlock < ApplicationRecord
where("LOWER(link) LIKE LOWER(?)", "%#{link_for_fetch}%").exists?
end
end
end

@ -1,7 +1,7 @@
# frozen_string_literal: true
class BatchedRemoveStatusService < BaseService
include Redisable
# include Redisable
# Delete given statuses and reblogs of them
# Dispatch PuSH updates of the deleted statuses, but only local ones

@ -34,7 +34,9 @@ class EditStatusService < BaseService
postprocess_status!
create_revision! revision_text
redis.setex(idempotency_key, 3_600, @status.id) if idempotency_given?
redis.with do |conn|
conn.setex(idempotency_key, 3_600, @status.id) if idempotency_given?
end
@status
end
@ -91,7 +93,7 @@ class EditStatusService < BaseService
end
def validate_links!
raise GabSocial::NotPermittedError if LinkBlock.block?(@text)
raise GabSocial::LinkBlockedError if LinkBlock.block?(@text)
end
def language_from_option(str)
@ -119,7 +121,10 @@ class EditStatusService < BaseService
end
def idempotency_duplicate?
@idempotency_duplicate = redis.get(idempotency_key)
redis.with do |conn|
@idempotency_duplicate = conn.get(idempotency_key)
end
@idempotency_duplicate
end
def status_attributes

@ -47,7 +47,9 @@ class PostStatusService < BaseService
bump_potential_friendship!
end
redis.setex(idempotency_key, 3_600, @status.id) if idempotency_given?
redis.with do |conn|
conn.setex(idempotency_key, 3_600, @status.id) if idempotency_given?
end
@status
end
@ -200,7 +202,10 @@ class PostStatusService < BaseService
end
def idempotency_duplicate?
@idempotency_duplicate = redis.get(idempotency_key)
redis.with do |conn|
@idempotency_duplicate = conn.get(idempotency_key)
end
@idempotency_duplicate
end
def scheduled_in_the_past?

@ -55,8 +55,10 @@ class RemoveStatusService < BaseService
end
def remove_from_affected
@mentions.map(&:account).select(&:local?).each do |account|
redis.publish("timeline:#{account.id}", @payload)
redis.with do |conn|
@mentions.map(&:account).select(&:local?).each do |account|
conn.publish("timeline:#{account.id}", @payload)
end
end
end
@ -73,15 +75,19 @@ class RemoveStatusService < BaseService
def remove_from_hashtags
return unless @status.public_visibility?
@tags.each do |hashtag|
redis.publish("timeline:hashtag:#{hashtag}", @payload)
redis.publish("timeline:hashtag:#{hashtag}:local", @payload) if @status.local?
redis.with do |conn|
@tags.each do |hashtag|
conn.publish("timeline:hashtag:#{hashtag}", @payload)
conn.publish("timeline:hashtag:#{hashtag}:local", @payload) if @status.local?
end
end
end
def remove_from_pro
if @account.is_pro || @account.is_donor || @account.is_investor || @account.is_verified
redis.publish('timeline:pro', @payload)
redis.with do |conn|
if @account.is_pro || @account.is_donor || @account.is_investor || @account.is_verified
conn.publish('timeline:pro', @payload)
end
end
end

@ -24,24 +24,26 @@ class Scheduler::FeedCleanupScheduler
def clean_feeds!(ids, type)
reblogged_id_sets = {}
redis.pipelined do
ids.each do |feed_id|
redis.del(feed_manager.key(type, feed_id))
reblog_key = feed_manager.key(type, feed_id, 'reblogs')
# We collect a future for this: we don't block while getting
# it, but we can iterate over it later.
reblogged_id_sets[feed_id] = redis.zrange(reblog_key, 0, -1)
redis.del(reblog_key)
redis.with do |conn|
conn.pipelined do
ids.each do |feed_id|
conn.del(feed_manager.key(type, feed_id))
reblog_key = feed_manager.key(type, feed_id, 'reblogs')
# We collect a future for this: we don't block while getting
# it, but we can iterate over it later.
reblogged_id_sets[feed_id] = conn.zrange(reblog_key, 0, -1)
conn.del(reblog_key)
end
end
end
# Remove all of the reblog tracking keys we just removed the
# references to.
redis.pipelined do
reblogged_id_sets.each do |feed_id, future|
future.value.each do |reblogged_id|
reblog_set_key = feed_manager.key(type, feed_id, "reblogs:#{reblogged_id}")
redis.del(reblog_set_key)
# Remove all of the reblog tracking keys we just removed the
# references to.
conn.pipelined do
reblogged_id_sets.each do |feed_id, future|
future.value.each do |reblogged_id|
reblog_set_key = feed_manager.key(type, feed_id, "reblogs:#{reblogged_id}")
conn.del(reblog_set_key)
end
end
end
end