Celery control command 在Redis可能遇到的問題

less than 1 minute read

最近在公司遇到一個有趣的bug,原本以為只是個簡單的問題,沒想到前後花了我一個多禮拜的工作天來處理,還讓我從Celery的soruce code開始看,然後往底層看到Kombu (Celery transport layer的實作)。

首先先來描述一下情境,我們的web server用了Celery來處理async tasks,為了要知道worker正常運作,所以有提供/health的endpoint給SRE team,當SRE team打health check endpoint的時候,就會送的ping message給workers,並預期在時間內得到pong

關於實作,我們是直接呼叫celery.app.control.Inspectping function,ping function 會回傳一個dict來indicate哪些worker回傳pong

功能上線以後,被SRE team回報health check有時候會回傳failed,在其中一個環境發生的情況又比其他環境高,然後就開始這次的debug之旅。

我們Celery的broker的backend是Redis,然後我們得到的error message是Connection closed by server。將錯誤訊息餵狗以後,知道這代表connection被Redis視為idle太久所以關掉。

這個時間長短可以藉由設定timeout來決定長短,不知道為什麼,production環境的環境設置都不太一樣,被回報最常發生的環境,設置是120秒。實際測試過後,在該環境也大概是兩分鐘左右就會得到該錯誤訊息,所以錯誤原因是因為connection被Redis server關掉無誤。

另外要提一下SRE team的health check設置,為了避免false alarm,所以他們每次health check會retry 3次,而且每次的間隔會以2的指數呈成長(相隔2、4、8秒)。

但是這個設置跟我在local觀察到的狀況不一樣,我在本地測試的情況是,第一次會因為connection closed而報錯,但接下來的request就會reconnect,因此跟SRE team retry 3次然後failed的狀況明顯衝突。

因此我們有兩個問題要釐清:

  1. 為什麼publish tasks不會遇到,但是ping pong就會遇到
  2. 為什麼SRE team的retry 沒有用

因為從官方文件無法解答我遇到的問題,我只好trace Celery的source code,然後一路往底層看到Kombu

簡單來說,Celery是對AMQP的抽象化,藉由封裝不同的backend,來提供統一的AMQP介面。

在更往下,AMQP需要producerconsumerbrokermessageconnectionchannel等等的component,Celery將這一層的的抽象化與實作拉出來在另外一個project,叫做Kombu

細節就不提了,反正最後看到的結果是,在publish task的時候,處理publish的邏輯在Celery並且有自己的retry policy,但是執行control command的時候,直接使用Kombu的producer和consumer,並且不處理connection error。

這邊也可以從Kombucomment看到:

def _brpop_read(self, **options):
        try:
            try:
                dest__item = self.client.parse_response(self.client.connection,
                                                        'BRPOP',
                                                        **options)
            except self.connection_errors:
                # if there's a ConnectionError, disconnect so the next
                # iteration will reconnect automatically.
                self.client.connection.disconnect()
                raise
            if dest__item:
                dest, item = dest__item
                dest = bytes_to_str(dest).rsplit(self.sep, 1)[0]
                self._queue_cycle.rotate(dest)
                self.connection._deliver(loads(bytes_to_str(item)), dest)
                return True
            else:
                raise Empty()
        finally:
            self._in_poll = None

這點也可以跟我觀察到的相符合,所以第一個問題的回答是因為兩者的執行路徑不一樣,所以只有Celerycontrol commands會有問題,但publish tasks不會有狀況。

第二個問題就麻煩了,很明顯的SRE team的retry照理說應該會work,但在prod上就是不work,而且我也複製不出這個狀況。

這問題我苦思很久都沒有結果,直到某個瞬間我突然想到一種可能,會不會是在是因為production環境是有replica的關係呢?

想到這點我瞬間豁然開朗,一般load balancer都是採用Round-Robin,而我們production的web server跑3個replica,所以有機率是三個request剛好打在三個replica上,全部都得到closed connection,這也可以解釋為何不是每次都失敗,而是有時候成功有時候失敗。

果然我嘗試在本地跑4個replica,然後來打health check endpoint就可以複製出這個狀況,在我本地的設置,大改5次裡面會失敗一次,所以不只要在http level retry,也需要在function level retry,在function level加上retry以後,果然就解決這個問題。

這個issue是個複合式的issue,既包含了framework本身的問題,同時也要是分散式系統才會觸發,在這過程中,不斷的提出假設和驗證,最後把問題一個一個解決,真是蠻有趣的一次經驗。

P.S 這過程中有嘗試修改Celery的config,來嘗試能否讓celery來keep connection alive,但都沒有成功。

Updated: