Skip to main content

Priority Queue

  • Similar to normal queue except that each element has a certain priority
  • The priority of the elements in the priority queue determine the order in which elements are removed from the PQ
  • How does a PQ knows which element to remove (i.e how does it keep track of value and priority of each element) => [[8. Heap#Heap| Heap]] because when we give more priority to smallest or the largest node in the tree as we can extract these node very efficiently using heaps (Heap is the not the only to implement PQ it's just the most common and efficient way)

Heap

https://www.hellointerview.com/learn/code/heap/overview

  • A heap is a tree based DS that satisfies the heap property: If A is a parent node of B then A is ordered with respect to B for all nodes A, B in the hope => This means that the value of the parent node always larger than or equal to the value of child node for all node, or vice versa
  • 2 types of Heap
    - Max Heap: value of parent always larger than value of children
    - Min Heap: value of parent always smaller than value of children

Patterns

Top K Pattern

https://leetcode.com/problems/kth-largest-element-in-an-array

  • Most basic use of heap
def findKthLargest(self, nums: List[int], k: int) -> int:
heap = []

for num in nums:
heapq.heappush(heap, num)

if len(heap) > k:
heapq.heappop(heap)

return heapq.heappop(heap)
 def topKFrequent(self, nums: List[int], k: int) -> List[int]:
counter = collections.Counter(nums)

heap = []

for num, count in counter.items():
heapq.heappush(heap, (count, num))
if len(heap) > k:
heapq.heappop(heap)

return [num for _, num in heap]

https://leetcode.com/problems/kth-largest-element-in-a-stream

class KthLargest:
def __init__(self, k: int, nums: List[int]):
self.k = k
self.minHeap = []

for num in nums:
heapq.heappush(self.minHeap, num)

if len(self.minHeap) > self.k:
heapq.heappop(self.minHeap)

def add(self, val: int) -> int:
heapq.heappush(self.minHeap, val)

if len(self.minHeap) > self.k:
heapq.heappop(self.minHeap)

return self.minHeap[0]

NOTES: heapq.heapify in python creates a min heap by default, if we want a max heap, we can multiply all value by -1 as seen in the code above (where we do -freq )

https://leetcode.com/problems/top-k-frequent-words

 def topKFrequent(self, words: List[str], k: int) -> List[str]:
counter = collections.Counter(words)
heap = []

for word, count in counter.items():
heapq.heappush(heap, (-count, word))

res = []
for _ in range(k):
res.append(heapq.heappop(heap)[1])

return res

Merge K sorted list Pattern

https://leetcode.com/problems/merge-k-sorted-lists

  • Keep all head node in the heap
  • [[6. Linked List]]
 def mergeKLists(self, lists: List[Optional[ListNode]]) -> Optional[ListNode]:
if not lists:
return
minHeap = []
for i, node in enumerate(lists):
if node:
heapq.heappush(minHeap, (node.val, i, node))

head = ListNode(0)
tail = head

while minHeap:
minVal, index, node = heapq.heappop(minHeap)
tail.next = node
tail = tail.next

if node.next:
heapq.heappush(minHeap, (node.next.val, index, node.next))

return head.next

Two Heaps Pattern

Key Idea

  • Maintain both min heap and max heap
  • Being able to access min and max at O(1)
  • If the problems involves finding a middle ground or partition point that needs to be maintained as new elements arrive.

https://leetcode.com/problems/longest-continuous-subarray-with-absolute-diff-less-than-or-equal-to-limit

  • Very good questions that no one seems to talk about
  def longestSubarray(self, nums: List[int], limit: int) -> int:
res = 0
left = 0
minHeap = []
maxHeap = []

for right in range(len(nums)):
heapq.heappush(minHeap, (nums[right], right))
heapq.heappush(maxHeap, (-nums[right], right))

while -maxHeap[0][0] - minHeap[0][0] > limit:
left = min(maxHeap[0][1], minHeap[0][1]) + 1

while maxHeap[0][1] < left:
heapq.heappop(maxHeap)
while minHeap[0][1] < left:
heapq.heappop(minHeap)

res = max(res, right - left + 1)

return res

https://leetcode.com/problems/find-median-from-data-stream

  • Most basic application of the Two Heaps Pattern
  • This is a good explanation: https://emre.me/coding-patterns/two-heaps/
  • Idea:
    • If we split the sorted array into 2, the median is the average of the smallest if the larger partition and the largest in the smaller partition
    • We can maintain this attribute using 2 heaps, 1 min heap and 1 max heap
    • We have to balance the size of each heap (so that we can get the correct median)
      • Choose size(minHeap) <= size(maxHeap) <= size(minHeap) +1
      • In case of there are odd number of elements, we allow the maxHeap to store 1 additional element, this will be our median
      • We have to check and re-balance every time we add a new number
class MedianFinder:
def __init__(self):
self.minHeap = []
self.maxHeap = []

def addNum(self, num: int) -> None:
if not self.maxHeap or num <= self.maxHeap[0] * -1:
heapq.heappush(self.maxHeap, -num)
else:
heapq.heappush(self.minHeap, num)

if len(self.maxHeap) > len(self.minHeap) + 1:
heapq.heappush(self.minHeap, -1 * heapq.heappop(self.maxHeap))
elif len(self.maxHeap) < len(self.minHeap):
heapq.heappush(self.maxHeap, -1 * heapq.heappop(self.minHeap))

def findMedian(self) -> float:
count = len(self.maxHeap) + len(self.minHeap)
if count % 2 == 0:
return (-self.maxHeap[0] + self.minHeap[0]) / 2
else:
return -self.maxHeap[0]

https://leetcode.com/problems/number-of-orders-in-the-backlog

  • Straight forward using 2 heap for 2 backlogs
def getNumberOfBacklogOrders(self, orders: List[List[int]]) -> int:
buyBacklog = []
sellBacklog = []

for price, amount, orderType in orders:
# buy
if orderType == 0:
while sellBacklog and sellBacklog[0][0] <= price and amount > 0:
backlogPrice, count = heapq.heappop(sellBacklog)
if count > amount:
heapq.heappush(sellBacklog, (backlogPrice, count - amount))
amount = 0
elif amount > count:
amount -= count
else:
amount = 0
if amount > 0:
heapq.heappush(buyBacklog, (-price, amount))
# sell
else:
while buyBacklog and -buyBacklog[0][0] >= price and amount > 0:
backlogPrice, count = heapq.heappop(buyBacklog)
backlogPrice *= -1
if count > amount:
heapq.heappush(buyBacklog, (-backlogPrice, count - amount))
amount = 0
elif amount > count:
amount -= count
else:
amount = 0
if amount > 0:
heapq.heappush(sellBacklog, (price, amount))

res = 0
mod = 10**9 + 7
for _, count in buyBacklog:
res += count
for _, count in sellBacklog:
res += count
return res % mod

Minimum number Pattern

https://leetcode.com/problems/meeting-rooms-ii

def minMeetingRooms(self, intervals: List[List[int]]) -> int:
# this can be thought of as occupying room
heap = []
intervals.sort()
heapq.heappush(heap, intervals[0][1])

for start, end in intervals[1:]:
# free up a room by popping it off the heap
if heap[0] <= start:
heapq.heappop(heap)
heapq.heappush(heap, end)

return len(heap)

https://leetcode.com/problems/car-pooling

 def carPooling(self, trips: List[List[int]], capacity: int) -> bool:
trips.sort(key=lambda t: t[1])
cur_pass = 0
heap = []

for num_pass, start, end in trips:
while heap and heap[0][0] <= start:
cur_pass -= heap[0][1]
heapq.heappop(heap)

cur_pass += num_pass
if cur_pass > capacity:
return False
heapq.heappush(heap, (end, num_pass))

return True

Task Scheduler Pattern

Commonly appears in problems involving:

  • Processing items with cooldown/waiting periods
  • Scheduling tasks with constraints
  • Managing resources with availability windows
  • Need to track item availability over time
  • Items may need to be reprocessed
  • Need to find valid ordering while maintaining constraints

Core Components:

  1. Priority Queue (Heap): Track items by frequency/priority
  2. Queue/Map: Track cooling/waiting items
  3. Time/Position Counter: Simulate processing order

https://leetcode.com/problems/task-scheduler

  • Direct application, also acts as a template for other similar problems
  def leastInterval(self, tasks: List[str], n: int) -> int:
tasksMap = collections.Counter(tasks)
heap = [-count for task, count in tasksMap.items()]
heapq.heapify(heap)

queue = deque()
time = 0

while heap or queue:
time += 1
# At each point in time, find task to execute
if heap:
curTaskCount = heapq.heappop(heap) * -1
# Execute task
curTaskCount -= 1
if curTaskCount > 0:
# Queue next execution with cooldown time
queue.append((curTaskCount, time + n))
# Check if any task finish cooldown time
if queue and queue[0][1] == time:
taskCount, _ = queue.popleft()
# Add back task to selection pool
heapq.heappush(heap, -taskCount)

return time

https://leetcode.com/problems/reorganize-string

def reorganizeString(self, s: str) -> str:
s = list(s)
counter = collections.Counter(s)
maxHeap = [(-count, char) for char, count in counter.items()]
heapq.heapify(maxHeap)

queue = deque()
res = [''] * len(s)
resIndex = 0

while maxHeap or queue:
flag = False
if maxHeap:
maxCharCount, char = heapq.heappop(maxHeap)
maxCharCount *= -1
maxCharCount -= 1
res[resIndex] = char
resIndex += 1
flag = True
if maxCharCount:
queue.append((char, maxCharCount, resIndex + 1))
if queue and queue[0][2] == resIndex:
char, maxCharCount, _ = queue.popleft()
heapq.heappush(maxHeap, (-maxCharCount, char))
flag = True
if not flag:
return ''

return ''.join(res)

https://leetcode.com/problems/ipo

  • Pretty nice problem, combined with [[8. Heap#Two Heaps Pattern]]
 def findMaximizedCapital(self, k: int, w: int, profits: List[int], capital: List[int]) -> int:
maxHeap = [(-profits[i], capital[i]) for i in range(len(profits))]
heapq.heapify(maxHeap)

currentCapital = w
minHeap = []

while k > 0 and (maxHeap or minHeap):
if not maxHeap and minHeap:
break
if maxHeap:
profit, cap = heapq.heappop(maxHeap)
profit *= -1

if cap <= currentCapital:
currentCapital += profit
k -= 1
else:
heapq.heappush(minHeap, (cap, profit))
while minHeap and minHeap[0][0] <= currentCapital:
cap, profit = heapq.heappop(minHeap)
heapq.heappush(maxHeap, (-profit, cap))

return currentCapital

https://leetcode.com/problems/most-profit-assigning-work

  • Simplified version of the other Task Scheduler problems
  def maxProfitAssignment(self, difficulty: List[int], profit: List[int], worker: List[int]) -> int:
maxHeap = [(-profit[i], difficulty[i]) for i in range(len(profit))]
heapq.heapify(maxHeap)

worker.sort(reverse=True)
res = 0

for i in range(len(worker)):
while maxHeap and maxHeap[0][1] > worker[i]:
heapq.heappop(maxHeap)

if maxHeap:
res += -maxHeap[0][0]

return res

https://leetcode.com/problems/distant-barcodes

  def rearrangeBarcodes(self, barcodes: List[int]) -> List[int]:
counter = collections.Counter(barcodes)
maxHeap = [(-count, code) for code, count in counter.items()]
heapq.heapify(maxHeap)

res = [-1] * len(barcodes)
resIndex = 0
queue = []

while maxHeap or queue:
if maxHeap:
count, code = heapq.heappop(maxHeap)
count *= -1
count -= 1
res[resIndex] = code
resIndex += 1

if count:
queue.append((count, code, resIndex+1))
while queue and queue[0][2] <= resIndex:
count, code, _ = queue.pop(0)
heapq.heappush(maxHeap, (-count, code))

return res

Lazy Deletion

NOTES: This is not a good name for it but I can't think of anything better.

  • Kinda relates to [[5. Sliding Window#Sliding Window + Heap]] as it both involves cases where the heap might become invalid and then we have to remove an arbitrary element not at the top

https://leetcode.com/problems/design-a-number-container-system

class NumberContainers:
def __init__(self):
self.numIndexes = collections.defaultdict(list)
self.indexNum = {}

def change(self, index: int, number: int) -> None:
# 1. Don't remove yet, we will lazily do this later
# 2. Set new value
self.indexNum[index] = number
heapq.heappush(self.numIndexes[number], index)

def find(self, number: int) -> int:
if number not in self.numIndexes:
return -1

# Lazily remove invalid index
# If the current value at the smallest index is not the current number means that it was changed before => Invalid
while self.numIndexes[number] and self.indexNum[self.numIndexes[number][0]] != number:
heapq.heappop(self.numIndexes[number])

if not self.numIndexes[number]:
return -1

return self.numIndexes[number][0]