diff --git a/src/invidious.cr b/src/invidious.cr index 29a412a1b..f7a7876f1 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -160,90 +160,29 @@ end # Start jobs Invidious::Jobs.register Invidious::Jobs::RefreshChannelsJob.new(PG_DB, logger, config) -refresh_feeds(PG_DB, logger, config) -subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config) +Invidious::Jobs.register Invidious::Jobs::RefreshFeedsJob.new(PG_DB, logger, config) +Invidious::Jobs.register Invidious::Jobs::SubscribeToFeedsJob.new(PG_DB, logger, config, HMAC_KEY) +Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB) +Invidious::Jobs.register Invidious::Jobs::UpdateDecryptFunctionJob.new -statistics = { - "error" => "Statistics are not availabile.", -} if config.statistics_enabled - spawn do - statistics = { - "version" => "2.0", - "software" => SOFTWARE, - "openRegistrations" => config.registration_enabled, - "usage" => { - "users" => { - "total" => PG_DB.query_one("SELECT count(*) FROM users", as: Int64), - "activeHalfyear" => PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64), - "activeMonth" => PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64), - }, - }, - "metadata" => { - "updatedAt" => Time.utc.to_unix, - "lastChannelRefreshedAt" => PG_DB.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64, - }, - } - - loop do - sleep 1.minute - Fiber.yield - - statistics["usage"].as(Hash)["users"].as(Hash)["total"] = PG_DB.query_one("SELECT count(*) FROM users", as: Int64) - statistics["usage"].as(Hash)["users"].as(Hash)["activeHalfyear"] = PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64) - statistics["usage"].as(Hash)["users"].as(Hash)["activeMonth"] = PG_DB.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64) - statistics["metadata"].as(Hash(String, Int64))["updatedAt"] = Time.utc.to_unix - statistics["metadata"].as(Hash(String, Int64))["lastChannelRefreshedAt"] = PG_DB.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64 - end - end + Invidious::Jobs.register Invidious::Jobs::StatisticsRefreshJob.new(PG_DB, config, SOFTWARE) end -Invidious::Jobs.register Invidious::Jobs::PullPopularVideosJob.new(PG_DB) +if config.captcha_key + Invidious::Jobs.register Invidious::Jobs::BypassCaptchaJob.new(logger, config) +end + +connection_channel = Channel({Bool, Channel(PQ::Notification)}).new(32) +Invidious::Jobs.register Invidious::Jobs::NotificationJob.new(connection_channel, PG_URL) + Invidious::Jobs.start_all def popular_videos Invidious::Jobs::PullPopularVideosJob::POPULAR_VIDEOS.get end -DECRYPT_FUNCTION = [] of {SigProc, Int32} -spawn do - update_decrypt_function do |function| - DECRYPT_FUNCTION.clear - function.each { |i| DECRYPT_FUNCTION << i } - end -end - -if CONFIG.captcha_key - spawn do - bypass_captcha(CONFIG.captcha_key, logger) do |cookies| - cookies.each do |cookie| - config.cookies << cookie - end - - # Persist cookies between runs - CONFIG.cookies = config.cookies - File.write("config/config.yml", config.to_yaml) - end - end -end - -connection_channel = Channel({Bool, Channel(PQ::Notification)}).new(32) -spawn do - connections = [] of Channel(PQ::Notification) - - PG.connect_listen(PG_URL, "notifications") { |event| connections.each { |connection| connection.send(event) } } - - loop do - action, connection = connection_channel.receive - - case action - when true - connections << connection - when false - connections.delete(connection) - end - end -end +DECRYPT_FUNCTION = Invidious::Jobs::UpdateDecryptFunctionJob::DECRYPT_FUNCTION before_all do |env| preferences = begin @@ -3658,12 +3597,7 @@ get "/api/v1/stats" do |env| next error_message end - if statistics["error"]? - env.response.status_code = 500 - next statistics.to_json - end - - statistics.to_json + Invidious::Jobs::StatisticsRefreshJob::STATISTICS.to_json end # YouTube provides "storyboards", which are sprites containing x * y diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr deleted file mode 100644 index 11eb7defd..000000000 --- a/src/invidious/helpers/jobs.cr +++ /dev/null @@ -1,286 +0,0 @@ -def refresh_feeds(db, logger, config) - max_channel = Channel(Int32).new - spawn do - max_threads = max_channel.receive - active_threads = 0 - active_channel = Channel(Bool).new - - loop do - db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs| - rs.each do - email = rs.read(String) - view_name = "subscriptions_#{sha256(email)}" - - if active_threads >= max_threads - if active_channel.receive - active_threads -= 1 - end - end - - active_threads += 1 - spawn do - begin - # Drop outdated views - column_array = get_column_array(db, view_name) - ChannelVideo.type_array.each_with_index do |name, i| - if name != column_array[i]? - logger.puts("DROP MATERIALIZED VIEW #{view_name}") - db.exec("DROP MATERIALIZED VIEW #{view_name}") - raise "view does not exist" - end - end - - if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))" - logger.puts("Materialized view #{view_name} is out-of-date, recreating...") - db.exec("DROP MATERIALIZED VIEW #{view_name}") - end - - db.exec("REFRESH MATERIALIZED VIEW #{view_name}") - db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) - rescue ex - # Rename old views - begin - legacy_view_name = "subscriptions_#{sha256(email)[0..7]}" - - db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0") - logger.puts("RENAME MATERIALIZED VIEW #{legacy_view_name}") - db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}") - rescue ex - begin - # While iterating through, we may have an email stored from a deleted account - if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool) - logger.puts("CREATE #{view_name}") - db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}") - db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) - end - rescue ex - logger.puts("REFRESH #{email} : #{ex.message}") - end - end - end - - active_channel.send(true) - end - end - end - - sleep 5.seconds - Fiber.yield - end - end - - max_channel.send(config.feed_threads) -end - -def subscribe_to_feeds(db, logger, key, config) - if config.use_pubsub_feeds - case config.use_pubsub_feeds - when Bool - max_threads = config.use_pubsub_feeds.as(Bool).to_unsafe - when Int32 - max_threads = config.use_pubsub_feeds.as(Int32) - end - max_channel = Channel(Int32).new - - spawn do - max_threads = max_channel.receive - active_threads = 0 - active_channel = Channel(Bool).new - - loop do - db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs| - rs.each do - ucid = rs.read(String) - - if active_threads >= max_threads.as(Int32) - if active_channel.receive - active_threads -= 1 - end - end - - active_threads += 1 - - spawn do - begin - response = subscribe_pubsub(ucid, key, config) - - if response.status_code >= 400 - logger.puts("#{ucid} : #{response.body}") - end - rescue ex - logger.puts("#{ucid} : #{ex.message}") - end - - active_channel.send(true) - end - end - end - - sleep 1.minute - Fiber.yield - end - end - - max_channel.send(max_threads.as(Int32)) - end -end - -def pull_popular_videos(db) - loop do - videos = db.query_all("SELECT DISTINCT ON (ucid) * FROM channel_videos WHERE ucid IN \ - (SELECT channel FROM (SELECT UNNEST(subscriptions) AS channel FROM users) AS d \ - GROUP BY channel ORDER BY COUNT(channel) DESC LIMIT 40) \ - ORDER BY ucid, published DESC", as: ChannelVideo).sort_by { |video| video.published }.reverse - - yield videos - - sleep 1.minute - Fiber.yield - end -end - -def update_decrypt_function - loop do - begin - decrypt_function = fetch_decrypt_function - yield decrypt_function - rescue ex - # TODO: Log error - next - ensure - sleep 1.minute - Fiber.yield - end - end -end - -def bypass_captcha(captcha_key, logger) - loop do - begin - {"/watch?v=CvFH_6DNRCY&gl=US&hl=en&has_verified=1&bpctr=9999999999", produce_channel_videos_url(ucid: "UCXuqSBlHAE6Xw-yeJA0Tunw")}.each do |path| - response = YT_POOL.client &.get(path) - if response.body.includes?("To continue with your YouTube experience, please fill out the form below.") - html = XML.parse_html(response.body) - form = html.xpath_node(%(//form[@action="/das_captcha"])).not_nil! - site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"] - s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"] - - inputs = {} of String => String - form.xpath_nodes(%(.//input[@name])).map do |node| - inputs[node["name"]] = node["value"] - end - - headers = response.cookies.add_request_headers(HTTP::Headers.new) - - response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/createTask", body: { - "clientKey" => CONFIG.captcha_key, - "task" => { - "type" => "NoCaptchaTaskProxyless", - "websiteURL" => "https://www.youtube.com#{path}", - "websiteKey" => site_key, - "recaptchaDataSValue" => s_value, - }, - }.to_json).body) - - raise response["error"].as_s if response["error"]? - task_id = response["taskId"].as_i - - loop do - sleep 10.seconds - - response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/getTaskResult", body: { - "clientKey" => CONFIG.captcha_key, - "taskId" => task_id, - }.to_json).body) - - if response["status"]?.try &.== "ready" - break - elsif response["errorId"]?.try &.as_i != 0 - raise response["errorDescription"].as_s - end - end - - inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s - headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || "" - response = YT_POOL.client &.post("/das_captcha", headers, form: inputs) - - yield response.cookies.select { |cookie| cookie.name != "PREF" } - elsif response.headers["Location"]?.try &.includes?("/sorry/index") - location = response.headers["Location"].try { |u| URI.parse(u) } - headers = HTTP::Headers{":authority" => location.host.not_nil!} - response = YT_POOL.client &.get(location.full_path, headers) - - html = XML.parse_html(response.body) - form = html.xpath_node(%(//form[@action="index"])).not_nil! - site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"] - s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"] - - inputs = {} of String => String - form.xpath_nodes(%(.//input[@name])).map do |node| - inputs[node["name"]] = node["value"] - end - - captcha_client = HTTPClient.new(URI.parse("https://api.anti-captcha.com")) - captcha_client.family = CONFIG.force_resolve || Socket::Family::INET - response = JSON.parse(captcha_client.post("/createTask", body: { - "clientKey" => CONFIG.captcha_key, - "task" => { - "type" => "NoCaptchaTaskProxyless", - "websiteURL" => location.to_s, - "websiteKey" => site_key, - "recaptchaDataSValue" => s_value, - }, - }.to_json).body) - - raise response["error"].as_s if response["error"]? - task_id = response["taskId"].as_i - - loop do - sleep 10.seconds - - response = JSON.parse(captcha_client.post("/getTaskResult", body: { - "clientKey" => CONFIG.captcha_key, - "taskId" => task_id, - }.to_json).body) - - if response["status"]?.try &.== "ready" - break - elsif response["errorId"]?.try &.as_i != 0 - raise response["errorDescription"].as_s - end - end - - inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s - headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || "" - response = YT_POOL.client &.post("/sorry/index", headers: headers, form: inputs) - headers = HTTP::Headers{ - "Cookie" => URI.parse(response.headers["location"]).query_params["google_abuse"].split(";")[0], - } - cookies = HTTP::Cookies.from_headers(headers) - - yield cookies - end - end - rescue ex - logger.puts("Exception: #{ex.message}") - ensure - sleep 1.minute - Fiber.yield - end - end -end - -def find_working_proxies(regions) - loop do - regions.each do |region| - proxies = get_proxies(region).first(20) - proxies = proxies.map { |proxy| {ip: proxy[:ip], port: proxy[:port]} } - # proxies = filter_proxies(proxies) - - yield region, proxies - end - - sleep 1.minute - Fiber.yield - end -end diff --git a/src/invidious/jobs/bypass_captcha_job.cr b/src/invidious/jobs/bypass_captcha_job.cr new file mode 100644 index 000000000..169e6afbb --- /dev/null +++ b/src/invidious/jobs/bypass_captcha_job.cr @@ -0,0 +1,131 @@ +class Invidious::Jobs::BypassCaptchaJob < Invidious::Jobs::BaseJob + private getter logger : Invidious::LogHandler + private getter config : Config + + def initialize(@logger, @config) + end + + def begin + loop do + begin + {"/watch?v=CvFH_6DNRCY&gl=US&hl=en&has_verified=1&bpctr=9999999999", produce_channel_videos_url(ucid: "UCXuqSBlHAE6Xw-yeJA0Tunw")}.each do |path| + response = YT_POOL.client &.get(path) + if response.body.includes?("To continue with your YouTube experience, please fill out the form below.") + html = XML.parse_html(response.body) + form = html.xpath_node(%(//form[@action="/das_captcha"])).not_nil! + site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"] + s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"] + + inputs = {} of String => String + form.xpath_nodes(%(.//input[@name])).map do |node| + inputs[node["name"]] = node["value"] + end + + headers = response.cookies.add_request_headers(HTTP::Headers.new) + + response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/createTask", body: { + "clientKey" => config.captcha_key, + "task" => { + "type" => "NoCaptchaTaskProxyless", + "websiteURL" => "https://www.youtube.com#{path}", + "websiteKey" => site_key, + "recaptchaDataSValue" => s_value, + }, + }.to_json).body) + + raise response["error"].as_s if response["error"]? + task_id = response["taskId"].as_i + + loop do + sleep 10.seconds + + response = JSON.parse(HTTP::Client.post("https://api.anti-captcha.com/getTaskResult", body: { + "clientKey" => config.captcha_key, + "taskId" => task_id, + }.to_json).body) + + if response["status"]?.try &.== "ready" + break + elsif response["errorId"]?.try &.as_i != 0 + raise response["errorDescription"].as_s + end + end + + inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s + headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || "" + response = YT_POOL.client &.post("/das_captcha", headers, form: inputs) + + response.cookies + .select { |cookie| cookie.name != "PREF" } + .each { |cookie| config.cookies << cookie } + + # Persist cookies between runs + File.write("config/config.yml", config.to_yaml) + elsif response.headers["Location"]?.try &.includes?("/sorry/index") + location = response.headers["Location"].try { |u| URI.parse(u) } + headers = HTTP::Headers{":authority" => location.host.not_nil!} + response = YT_POOL.client &.get(location.full_path, headers) + + html = XML.parse_html(response.body) + form = html.xpath_node(%(//form[@action="index"])).not_nil! + site_key = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-sitekey"] + s_value = form.xpath_node(%(.//div[@id="recaptcha"])).try &.["data-s"] + + inputs = {} of String => String + form.xpath_nodes(%(.//input[@name])).map do |node| + inputs[node["name"]] = node["value"] + end + + captcha_client = HTTPClient.new(URI.parse("https://api.anti-captcha.com")) + captcha_client.family = config.force_resolve || Socket::Family::INET + response = JSON.parse(captcha_client.post("/createTask", body: { + "clientKey" => config.captcha_key, + "task" => { + "type" => "NoCaptchaTaskProxyless", + "websiteURL" => location.to_s, + "websiteKey" => site_key, + "recaptchaDataSValue" => s_value, + }, + }.to_json).body) + + raise response["error"].as_s if response["error"]? + task_id = response["taskId"].as_i + + loop do + sleep 10.seconds + + response = JSON.parse(captcha_client.post("/getTaskResult", body: { + "clientKey" => config.captcha_key, + "taskId" => task_id, + }.to_json).body) + + if response["status"]?.try &.== "ready" + break + elsif response["errorId"]?.try &.as_i != 0 + raise response["errorDescription"].as_s + end + end + + inputs["g-recaptcha-response"] = response["solution"]["gRecaptchaResponse"].as_s + headers["Cookies"] = response["solution"]["cookies"].as_h?.try &.map { |k, v| "#{k}=#{v}" }.join("; ") || "" + response = YT_POOL.client &.post("/sorry/index", headers: headers, form: inputs) + headers = HTTP::Headers{ + "Cookie" => URI.parse(response.headers["location"]).query_params["google_abuse"].split(";")[0], + } + cookies = HTTP::Cookies.from_headers(headers) + + cookies.each { |cookie| config.cookies << cookie } + + # Persist cookies between runs + File.write("config/config.yml", config.to_yaml) + end + end + rescue ex + logger.puts("Exception: #{ex.message}") + ensure + sleep 1.minute + Fiber.yield + end + end + end +end diff --git a/src/invidious/jobs/notification_job.cr b/src/invidious/jobs/notification_job.cr new file mode 100644 index 000000000..2f525e084 --- /dev/null +++ b/src/invidious/jobs/notification_job.cr @@ -0,0 +1,24 @@ +class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob + private getter connection_channel : Channel({Bool, Channel(PQ::Notification)}) + private getter pg_url : URI + + def initialize(@connection_channel, @pg_url) + end + + def begin + connections = [] of Channel(PQ::Notification) + + PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) } + + loop do + action, connection = connection_channel.receive + + case action + when true + connections << connection + when false + connections.delete(connection) + end + end + end +end diff --git a/src/invidious/jobs/refresh_feeds_job.cr b/src/invidious/jobs/refresh_feeds_job.cr new file mode 100644 index 000000000..eebdf0f34 --- /dev/null +++ b/src/invidious/jobs/refresh_feeds_job.cr @@ -0,0 +1,77 @@ +class Invidious::Jobs::RefreshFeedsJob < Invidious::Jobs::BaseJob + private getter db : DB::Database + private getter logger : Invidious::LogHandler + private getter config : Config + + def initialize(@db, @logger, @config) + end + + def begin + max_threads = config.feed_threads + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs| + rs.each do + email = rs.read(String) + view_name = "subscriptions_#{sha256(email)}" + + if active_threads >= max_threads + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + spawn do + begin + # Drop outdated views + column_array = get_column_array(db, view_name) + ChannelVideo.type_array.each_with_index do |name, i| + if name != column_array[i]? + logger.puts("DROP MATERIALIZED VIEW #{view_name}") + db.exec("DROP MATERIALIZED VIEW #{view_name}") + raise "view does not exist" + end + end + + if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))" + logger.puts("Materialized view #{view_name} is out-of-date, recreating...") + db.exec("DROP MATERIALIZED VIEW #{view_name}") + end + + db.exec("REFRESH MATERIALIZED VIEW #{view_name}") + db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) + rescue ex + # Rename old views + begin + legacy_view_name = "subscriptions_#{sha256(email)[0..7]}" + + db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0") + logger.puts("RENAME MATERIALIZED VIEW #{legacy_view_name}") + db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}") + rescue ex + begin + # While iterating through, we may have an email stored from a deleted account + if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool) + logger.puts("CREATE #{view_name}") + db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}") + db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) + end + rescue ex + logger.puts("REFRESH #{email} : #{ex.message}") + end + end + end + + active_channel.send(true) + end + end + end + + sleep 5.seconds + Fiber.yield + end + end +end diff --git a/src/invidious/jobs/statistics_refresh_job.cr b/src/invidious/jobs/statistics_refresh_job.cr new file mode 100644 index 000000000..021671bed --- /dev/null +++ b/src/invidious/jobs/statistics_refresh_job.cr @@ -0,0 +1,59 @@ +class Invidious::Jobs::StatisticsRefreshJob < Invidious::Jobs::BaseJob + STATISTICS = { + "version" => "2.0", + "software" => { + "name" => "invidious", + "version" => "", + "branch" => "", + }, + "openRegistrations" => true, + "usage" => { + "users" => { + "total" => 0_i64, + "activeHalfyear" => 0_i64, + "activeMonth" => 0_i64, + }, + }, + "metadata" => { + "updatedAt" => Time.utc.to_unix, + "lastChannelRefreshedAt" => 0_i64, + }, + } + + private getter db : DB::Database + private getter config : Config + + def initialize(@db, @config, @software_config : Hash(String, String)) + end + + def begin + load_initial_stats + + loop do + refresh_stats + sleep 1.minute + Fiber.yield + end + end + + # should only be called once at the very beginning + private def load_initial_stats + STATISTICS["software"] = { + "name" => @software_config["name"], + "version" => @software_config["version"], + "branch" => @software_config["branch"], + } + STATISTICS["openRegistration"] = config.registration_enabled + end + + private def refresh_stats + users = STATISTICS.dig("usage", "users").as(Hash(String, Int64)) + users["total"] = db.query_one("SELECT count(*) FROM users", as: Int64) + users["activeHalfyear"] = db.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '6 months'", as: Int64) + users["activeMonth"] = db.query_one("SELECT count(*) FROM users WHERE CURRENT_TIMESTAMP - updated < '1 month'", as: Int64) + STATISTICS["metadata"] = { + "updatedAt" => Time.utc.to_unix, + "lastChannelRefreshedAt" => db.query_one?("SELECT updated FROM channels ORDER BY updated DESC LIMIT 1", as: Time).try &.to_unix || 0_i64, + } + end +end diff --git a/src/invidious/jobs/subscribe_to_feeds_job.cr b/src/invidious/jobs/subscribe_to_feeds_job.cr new file mode 100644 index 000000000..3d3b2218d --- /dev/null +++ b/src/invidious/jobs/subscribe_to_feeds_job.cr @@ -0,0 +1,52 @@ +class Invidious::Jobs::SubscribeToFeedsJob < Invidious::Jobs::BaseJob + private getter db : DB::Database + private getter logger : Invidious::LogHandler + private getter hmac_key : String + private getter config : Config + + def initialize(@db, @logger, @config, @hmac_key) + end + + def begin + max_threads = 1 + if config.use_pubsub_feeds.is_a?(Int32) + max_threads = config.use_pubsub_feeds.as(Int32) + end + + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs| + rs.each do + ucid = rs.read(String) + + if active_threads >= max_threads.as(Int32) + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + + spawn do + begin + response = subscribe_pubsub(ucid, hmac_key, config) + + if response.status_code >= 400 + logger.puts("#{ucid} : #{response.body}") + end + rescue ex + logger.puts("#{ucid} : #{ex.message}") + end + + active_channel.send(true) + end + end + end + + sleep 1.minute + Fiber.yield + end + end +end diff --git a/src/invidious/jobs/update_decrypt_function_job.cr b/src/invidious/jobs/update_decrypt_function_job.cr new file mode 100644 index 000000000..5332c6727 --- /dev/null +++ b/src/invidious/jobs/update_decrypt_function_job.cr @@ -0,0 +1,19 @@ +class Invidious::Jobs::UpdateDecryptFunctionJob < Invidious::Jobs::BaseJob + DECRYPT_FUNCTION = [] of {SigProc, Int32} + + def begin + loop do + begin + decrypt_function = fetch_decrypt_function + DECRYPT_FUNCTION.clear + decrypt_function.each { |df| DECRYPT_FUNCTION << df } + rescue ex + # TODO: Log error + next + ensure + sleep 1.minute + Fiber.yield + end + end + end +end