0%

資料結構與演算法筆記 | 佇列 (Queue)

佇列是一個遵循先進先出 (First In First Out, FIFO) 的線性資料結構,最先進來的資料最先被處理,就像排隊一樣。

佇列操作

不同於堆疊,佇列加入的資料會放在尾部,模擬日常生活中排隊的現象。

  • 入隊 (enqueue):將元素放入列尾
  • 出隊 (dequeue):將元素從列頂移除並回傳
  • 頂元素 (peek):查看列頂元素(不會移除)
操作 時間複雜度
enqueue() $\mathcal{O}(1)$
dequeue() $\mathcal{O}(1)$
peek() $\mathcal{O}(1)$

佇列實作

與堆疊相似,佇列也只會對於列頂與列尾進行操作,陣列與鏈式串可以用來實作。

堆疊與佇列指標

堆疊僅需一個指標(peek),但佇列需要兩個指標,分別為頭指標(front)與尾指標(rear)。兩者根本差異的原因在於存取的方向不同:

結構存資料的地方取資料的地方結構特性
堆疊一端(top)同一端(top)後進先出(LIFO)
佇列尾端(rear)頂端(front)先進先出(FIFO)

堆疊所有元素都壓在最上面,無論要入堆或出堆,只會針對最上面的元素進行處理,毋需關心下面的元素是誰;佇列則是將資料從尾端加入,但從頂端移除,此時就必須追蹤誰最早進來與下一個要進來的位置

使用鏈結串列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class LinkedListQueue:
def __init__(self):
self._front: ListNode | None = None
self._rear: ListNode | None = None
self._size = 0

def size(self) -> int:
return self._size

def is_empty(self) -> bool:
return self._size == 0

def push(self, val: int) -> None:
node = ListNode(val)

if self._front is None:
self._front = node
self._rear = node
else:
self._rear.next = node # 更新當前 rear.next
self._rear = node # 指向 rear 為新節點
self._size += 1

def pop(self) -> int:
num = self.peek()
self._front = self._front.next # 交接頭節點
self._size -= 1
if self._front is None:
self._rear = None # 清空 rear 指標避免殘留
return num

def peek(self) -> int:
if self.is_empty():
raise IndexError("佇列為空")
return self._front.val

使用陣列

如果使用陣列實作佇列,會發現一個很有趣的問題:如果用一般的陣列,每次出列需要 pop(0),將第一個元素移除後,後面的所有元素都需要整個搬家,時間複雜度為 $\mathcal{O}(n)$。此外,若事先固定陣列長度:

1
arr = [0, 0, 0, 0, 0]

索引從 0 開始,enqueue 完五個元素後:

1
[10, 20, 30, 40, 50]

接著 dequeue 三次:

1
[_, _, _, 40, 50]

若還想要再 enqueue 元素,就會報錯 IndexError,因為後面沒有地方塞了。這時候解決方法就是使用環狀佇列 (circular array):將原本直的陣列彎曲成一個環形,使其首尾相連。當尾端到達陣列末尾時,就會從頭開始使用原本被 dequeue 掉的空位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class ArrayQueue:
def __init__(self, size: int):
self._size = size
self._nums: list[int] = [0] * self._size
self._front: int = 0
self._rear: int = 0

def capacity(self) -> int:
return len(self._nums)

def size(self) -> int:
return self._size

def is_empty(self) -> bool:
return self._size == 0

def push(self, num: int) -> None:
if self._size == self.capacity():
raise IndexError("佇列已滿")
# 將 num 新增至列尾
self._nums[self._rear] = num
self._size += 1
# 重新計算佇列尾指標
self._rear = (self._rear + 1) % self.capacity()

def pop(self) -> int:
# 將 num 移除列尾
num: int = self.peek()
self._size -= 1
# 重新計算佇列頭指標
self._front = (self._front + 1) % self.capacity()
return num

def peek(self) -> int:
if self.is_empty():
raise IndexError("佇列為空")
return self._nums[self._front]

環狀陣列的操作一開始我也不是很清楚,但是參考舊金山大學製作的 Array Queue Visualization,就有較為清楚的理解。或可以參件以下的影片:

練習題

實作一個類別 RecentCounter,每次收到請求時會記錄該請求時間(整數 t,以毫秒為單位,遞增順序呼叫)。請完成:

  • ping(t):表示在時間 t 來了一次請求,回傳從 t-3000t 這段時間內的請求總數。

限制:

  • 只需要儲存最近的請求,其他可以從 queue 中移除。
1
2
3
4
5
6
7
8
9
10
輸入:
["RecentCounter", "ping", "ping", "ping", "ping"]
[[], [1], [100], [3001], [3002]]
輸出: [null, 1, 2, 3, 3]
原因:
RecentCounter recentCounter = new RecentCounter();
recentCounter.ping(1); // requests = [1], range is [-2999,1], return 1
recentCounter.ping(100); // requests = [1, 100], range is [-2900,100], return 2
recentCounter.ping(3001); // requests = [1, 100, 3001], range is [1,3001], return 3
recentCounter.ping(3002); // requests = [1, 100, 3001, 3002], range is [2,3002], return 3
解答
1
2
3
4
5
6
7
8
9
class RecentCounter:
def __init__(self):
self._requests: list[int] = []

def ping(self, t: int) -> int:
self._requests.append(t)
while self._requests and self._requests[0] < t - 3000:
self._requests.pop(0) # 移除過期請求
return len(self._requests)

實作一個類別 MovingAverage,給定一個整數 size,代表滑動視窗大小。每次輸入一個整數時,回傳最近 size 筆資料的平均值。

1
2
3
4
5
m = MovingAverage(3)
m.next(1) # 1.0
m.next(10) # (1+10)/2 = 5.5
m.next(3) # (1+10+3)/3 = 4.67
m.next(5) # (10+3+5)/3 = 6.0
解答
1
2
3
4
5
6
7
8
9
10
class MovingAverage:
def __init__(self, size: int):
self.size = size
self._nums: list[int] = []

def next(self, val: int) -> float:
self._nums.append(val)
if len(self._nums) > self.size:
self._nums.pop(0) # 移除多餘元素,使佇列數量在 size 內
return sum(self._nums) / len(self._nums)