Kafka Consumers

From my experience working at Simpl.

A few days ago, I was working on a project when I got distracted by an error originating from one of the Kafka consumers our team owns. The consumers read events from Kafka, add some metadata, and dump it in S3. It’s a python based microservice running on AWS ECS.

There are conditions attached before the data is flushed to S3 like: Either the memory buffer size is breached (eg: 10mb), or the time to hold data in buffer is expired (eg: 10mins).

Now, the error was: KafkaError{code=_MAX_POLL_EXCEEDED,val=-147,str="Application maximum poll interval (300000ms) exceeded by 249ms"}. Working on this issue gave me a reason to deep dive into the Kafka consumer code and understand how it works.

# Pseudocode: 

from confluent_kafka import Consumer, KafkaError

class KafkaConsumer:
    def __init__(self, config: dict = {}, topics: str = None, auto_commit: bool = True):
        self.auto_commit = auto_commit
        self.conf = {
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'groupid-1',
            'session.timeout.ms': 60000,
            'security.protocol': 'PLAINTEXT',
            'default.topic.config': {'auto.offset.reset': 'smallest'},
            'enable.auto.commit': self.auto_commit,
        }
        self.conf.update(config)
        if topics is None or topics == '':
            raise Exception('You must provide atleast one topic. eg, "topic1,topic2"')
        self.topics = [t.strip() for t in topics.split(',')]
        self._consumer = Consumer(**self.conf)

    def _subscribe(self):
        self._consumer.subscribe(self.topics)
        log.info(f"Subscribed to topic {self.topics}")
        # Reset consumer to a specific time or offset if needed to reprocess messages.
        reset_hours_back = None
        if reset_hours_back:
            self._reset_to_hours_back(reset_hours_back)

    def stream(self, poll_timeout=1.0):
        self._subscribe()
        while True:
            try:
                msg = self._consumer.poll(timeout=poll_timeout)
                if msg is None:
                    log.debug("Polling from kafka..")
                    continue
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        continue
                    else:
                        yield (None, msg.error())
                data = msg.value().decode('utf-8')
                yield (data, None)
            except Exception as e:
                yield (None, e)

    def commit_offsets(self, asynchronous=False):
        return self._consumer.commit(asynchronous=asynchronous)
    
class KafkaToS3ConsumerService:
    def __init__(self):
        self.buffer_event_count = 10000
        self.blockSize = int(config.BLOCK_SIZE)
        self.bufferExpireLimit = int(config.BUFFER_EXPIRE_LIMIT)
        self.topics = os.environ['S3_CONSUMER_KAFKA_TOPICS']
        self.conf = config.kafka_config
        self.tempFile = {}
        self.tempFileList = {}
        self.currentBufferSize = {}
        self.meta_dict = {}
        self.offsetStatistics = {}
        self.processingStartTime = time.monotonic()
        self.accumulation_start_time = None
        self.count = 0

    def consume(self):
        self.conf['on_commit'] = self.offset_commit_callback
        kafka = KafkaConsumer(config=self.conf, topics=self.topics, auto_commit=False)
        for event, error in kafka.stream():
            if error:
                notify_ims_systems(str(error)) # notifying incident management systems like: pagerduty/sentry etc. 
            if event:
                try:
                    self.buffer_data(event)
                    buffer_size_expired_flag = self.__check_buffer_size_expiration()
                    buffer_time_expired_flag = self.__check_buffer_time_expiration()
                    if buffer_time_expired_flag or buffer_size_expired_flag:
                        if self.__has_data():
                            self.close_file_and_reset_buffer()
                            kafka.commit_offsets(asynchronous=False)
                            self.accumulation_start_time = None
                            log.debug("Uploaded local temp files to S3. Buffer is reset")
                except Exception as e:
                    tb = traceback.format_exc()
                    log.error(f"Errors were found in following batch traceback: {tb}")
                    notify_ims_systems(f'Error while processing event. Error: {str(e)}, traceback: {tb}, event: {event}')

There are three fundamental concepts related to the kafka consumers:

But before I dive into them, a quick detour on Kafka architecture:

1) 
┌───────────────────────────────────────────────────────────────────────────────────────────────┐
│                                 Complete Kafka Architecture                                   │
├────────────────────────────────────────────────────────────────────────────────────────────── ┤
│                                                                                               │
│  Producers                             Kafka Cluster                           Consumers      │
│                                                                                               │
│ ┌─────────────┐               ┌─────────────────────────┐                 ┌─────────────┐     │
│ │   Web App   │──┐            │                          │              ┌─│  Analytics  │     │
│ │ (UserEvents)│  │    Produce │  ┌─────────────────────┐ │   Consume    │ │   Service   │     │
│ └─────────────┘  │   Requests │  │      Broker 1       │ │  Requests    │ └─────────────┘     │
│                  │            │  │ ┌─────────────────┐ │ │              │                     │
│ ┌─────────────┐  │            │  │ │ user-events     │ │ │              │ ┌─────────────┐     │
│ │Mobile App   │──┼──────────► │  │ Partition 0 (L) │ │ │◄──────────┼─── │ |  Billing    │     │
│ │(OrderEvents)│  │            │  │ └─────────────────┘ │ │              │ │   Service   │     │
│ └─────────────┘  │            │  │ ┌─────────────────┐ │ │              │ └─────────────┘     │
│                  │            │  │ │ order-events    │ │ │              │                     │
│ ┌─────────────┐  │            │  │ │ Partition 0 (L) │ │ │              │ ┌─────────────┐     │
│ │   API       │──┼──────────► │  | └─────────────────┘ │◄──────────┼────│ |Notification │     │
│ │ Gateway     │  │            │  └─────────────────────┘ │              │ │   Service   │     │
│ └─────────────┘  │            │                          │              │ └─────────────┘     │
│                  │            │  ┌─────────────────────┐ │              │                     │
│ ┌─────────────┐  │            │  │      Broker 2       │ │              │ ┌─────────────┐     │
│ │   IoT       │──┼──────────► │  │ ┌─────────────────┐ │◄──────────┼────│ | Audit       │     │
│ │  Sensors    │  │            │  │ │ user-events     │ │ │              │ │   Service   │     │
│ └─────────────┘  │            │  │ │ Partition 1 (L) │ │ │              │ └─────────────┘     │
│                  │            │  │ └─────────────────┘ │ │              │                     │
│ ┌─────────────┐  │            │  │ ┌─────────────────┐ │ │              │ ┌─────────────┐     │
│ │  Batch      │──┘            │  │ │ order-events    │ │◄───────────┼───│ |  Stream     │     │
│ │ Processor   │               │  │ │ Partition 1 (L) │ │ │              │ │ Processor   │     │
│ └─────────────┘               │  │ └─────────────────┘ │ │              │ └─────────────┘     │
│                               │  └─────────────────────┘ │              │                     │
│  Configuration:               │                          │              │ Consumer Groups:    │
│  • acks=all                   │  ┌─────────────────────┐ │              │ • analytics-group   │
│  • retries=3                  │  │      Broker 3       │ │              │ • billing-group     │
│  • batch.size=16384           │  │ ┌─────────────────┐ │ │              │ • notification-grp  │
│  • compression=gzip           │  │ │ user-events     │ │ │              │ • audit-group       │
│                               │  │ │ Partition 2 (L) │ │ │              │ • stream-processor  │
│                               │  │ └─────────────────┘ │ │              │                     │
│                               │  │ ┌─────────────────┐ │ │              │ Configuration:      │
│                               │  │ │ order-events    │ │ │              │ • auto.offset.reset │
│                               │  │ │ Partition 2 (L) │ │ │              │ • enable.auto.commit│
│                               │  │ └─────────────────┘ │ │              │ • max.poll.records  │
│                               │  └─────────────────────┘ │              │ • fetch.min.bytes   │
│                               └─────────────────────────┘               │                     │
└───────────────────────────────────────────────────────────────────────────────────────────────┘

2) Consumer Group Rebalancing

Before Rebalancing (3 consumers, 6 partitions):
┌─────────────────────────────────────────────────────────────────┐
│                Consumer Group: "data-processors"                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│ Consumer A        Consumer B        Consumer C                  │
│ ┌─────────────┐   ┌─────────────┐   ┌─────────────┐             │
│ │ Partition 0 │   │ Partition 2 │   │ Partition 4 │             │
│ │ Partition 1 │   │ Partition 3 │   │ Partition 5 │             │
│ └─────────────┘   └─────────────┘   └─────────────┘             │
└─────────────────────────────────────────────────────────────────┘

Consumer B Crashes:
┌─────────────────────────────────────────────────────────────────┐
│                     Rebalancing Triggered                       │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│ Consumer A        Consumer B        Consumer C                  │
│ ┌─────────────┐   ┌─────────────┐   ┌─────────────┐             │
│ │   Active    │   │  CRASHED    │   │   Active    │             │
│ └─────────────┘   └─────────────┘   └─────────────┘             │
│        │                ❌                 │                    │
│        └─────────────┬─────────────────────┘                    │
│                      │                                          │
│              Group Coordinator                                  │
│                 Redistributes                                   │
│                  Partitions                                     │
└─────────────────────────────────────────────────────────────────┘

After Rebalancing (2 consumers, 6 partitions):
┌─────────────────────────────────────────────────────────────────┐
│                Consumer Group: "data-processors"                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│ Consumer A                    Consumer C                        │
│ ┌─────────────────────────┐   ┌─────────────────────────┐       │
│ │ Partition 0             │   │ Partition 3             │       │
│ │ Partition 1             │   │ Partition 4             │       │
│ │ Partition 2             │   │ Partition 5             │       │
│ └─────────────────────────┘   └─────────────────────────┘       │
└─────────────────────────────────────────────────────────────────┘

3) Message Partitioning Strategies:

a. Key-based Partitioning (Default):
┌─────────────────────────────────────────────────────────────────┐
│ Producer Messages:                                              │
│                                                                 │
│ Message 1: key="user123" → hash(user123) % 3 = 0 → Partition 0  │
│ Message 2: key="user456" → hash(user456) % 3 = 1 → Partition 1  │
│ Message 3: key="user789" → hash(user789) % 3 = 2 → Partition 2  │
│ Message 4: key="user123" → hash(user123) % 3 = 0 → Partition 0  │
└─────────────────────────────────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────┐
│                    Topic: user-events                           │
├─────────────────────────────────────────────────────────────────┤
│ Partition 0        Partition 1        Partition 2               │
│ ┌─────────────┐    ┌─────────────┐    ┌─────────────┐           │
│ │ user123:msg1│    │ user456:msg2│    │ user789:msg3│           │
│ │ user123:msg4│    │             │    │             │           │
│ └─────────────┘    └─────────────┘    └─────────────┘           │
└─────────────────────────────────────────────────────────────────┘

b. Others like Round robin based, etc... 

4) Kafka ecosystem: 
Core Components
    Kafka Broker: The server that stores and serves messages. Multiple brokers form a Kafka cluster.
    Producer: Client application that publishes/sends messages to Kafka topics.
    Consumer: Client application that reads/subscribes to messages from Kafka topics.
    Topics: Named streams or categories where messages are published and organized.
    Partitions: Subdivisions of topics that enable parallel processing and scalability.
    Offsets: Sequential IDs assigned to messages within partitions for tracking read position.
Supporting Components
    Zookeeper: Coordination service for managing cluster metadata, leader election, and configuration.
    Consumer Groups: Groups of consumers that coordinate to process messages from topics in parallel.
    Replication: Copies of partition data across multiple brokers for fault tolerance.
    Leader/Follower: Leader handles reads/writes for a partition; followers replicate the data.

Now, Kafka consumers operate within consumer groups:

Topic: user-events (3 partitions)
┌─────────────┬─────────────┬─────────────┐
│ Partition 0 │ Partition 1 │ Partition 2 │
└─────────────┴─────────────┴─────────────┘
       │             │             │
       ▼             ▼             ▼
┌─────────────┬─────────────┬─────────────┐
│ Consumer A  │ Consumer B  │ Consumer C  │
│ (Group: X)  │ (Group: X)  │ (Group: X)  │
└─────────────┴─────────────┴─────────────┘

Each consumer in a group is assigned specific partitions, and the group coordinator (a Kafka broker) manages these assignments.

The poll() method is the heart of Kafka consumer operation, but it does much more than just fetch messages. I tried to go through the source code but realized it’s a maze, maybe I will spend more time on this later - ref.

Here’s what a typical polling sequence looks like:

Time    Action                           Duration    Notes
─────────────────────────────────────────────────────────────
0:00:00 poll() called                   0ms         Start
0:00:00 Check internal buffer           <1ms        500 msgs available
0:00:00 Return 1 message                <1ms        From buffer
0:00:00 Process message                 1ms         Your application
0:00:01 poll() called again             0ms         Start next cycle
0:00:01 Check internal buffer           <1ms        499 msgs available
0:00:01 Return 1 message                <1ms        From buffer
...     (continues until buffer empty)
0:02:30 poll() called                   0ms         Buffer now empty
0:02:30 Send fetch request to broker    50ms        Network call
0:02:30 Receive 500 new messages        <1ms        Broker response
0:02:30 Return 1 message                <1ms        First from new batch

Note:

Relation & differences between Heartbeats/Poll/Session intervals:

We can understand with an example directly. But info around configs involved in tweaking the related intervals:

consumer_config = {
    # How often to send "I'm alive" signals
    'heartbeat.interval.ms': 3000,        # 3 seconds
    
    # How long without heartbeat before considered dead
    'session.timeout.ms': 30000,          # 30 seconds
    
    # How long between poll() calls before considered stuck
    'max.poll.interval.ms': 300000,       # 5 minutes (default)
}
Timeline: Consumer Processing a Large Batch
│
├── 0s ──────── poll() called, gets batch
│   │
│   ├── 3s ──── Heartbeat sent (background)
│   │
│   ├── 6s ──── Heartbeat sent (background)
│   │
│   ├── 9s ──── Heartbeat sent (background)
│   │
│   ├── 12s ─── Heartbeat sent (background)
│   │
│   ├── 15s ─── Processing continues...
│   │
│   ├── 30s ─── Still within session.timeout.ms ✓
│   │
│   ├── 60s ─── Still processing, heartbeats continuing
│   │
│   ├── 300s ── MAX_POLL_INTERVAL EXCEEDED! ❌
│   │
│   └── ERROR: Consumer removed from group
fetch_config = {
    # Maximum records in one poll() call
    'max.poll.records': 500,
    
    # Minimum bytes to fetch (efficiency)
    'fetch.min.bytes': 1,
    
    # Maximum bytes per fetch request
    'fetch.max.bytes': 52428800,  # 50MB
    
    # Max wait time for min bytes
    'fetch.max.wait.ms': 500,
}

A message journey from Kafka to broker would look like below:

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Kafka Broker  │    │ Consumer Client │    │ Your Application│
│                 │    │                 │    │                 │
│ ┌─────────────┐ │    │ ┌─────────────┐ │    │ ┌─────────────┐ │
│ │   Topic     │ │    │ │   Internal  │ │    │ │    Temp     │ │
│ │ Partition 0 │ │    │ │   Buffer    │ │    │ │   Files     │ │
│ │ [msg1]      │ │    │ │   (500 msg) │ │    │ │  (10k msg)  │ │
│ │ [msg2]      │ │    │ │             │ │    │ │             │ │
│ │ [msg3]      │ │    │ │             │ │    │ │             │ │
│ │  ...        │ │    │ │             │ │    │ │             │ │
│ └─────────────┘ │    │ └─────────────┘ │    │ └─────────────┘ │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         │ 1. fetch_request      │                       │
         │◄──────────────────────│                       │
         │                       │                       │
         │ 2. 500 messages       │                       │
         │──────────────────────►│                       │
         │                       │ 3. poll() returns     │
         │                       │     1 message         │
         │                       │─────────────────────► │
         │                       │                       │
         │                       │ 4. process & buffer   │
         │                       │◄───────────────────── │
         │                       │                       │
         │                       │ 5. repeat 499 times   │
         │                       │◄────────────────────► │

I think by now we have some picture of what could have led to the MAX_POLL_EXCEEDED error, but let’s continue.

Kafka consumer application actually has multiple types of buffers:

# Example of the three buffer types in code
class KafkaToS3ConsumerService:
    def consume(self):
        kafka = KafkaConsumer(...)  # Creates consumer with internal buffer
        for event, error in kafka.stream():  # Gets 1 msg from internal buffer
            # This is the application buffer
            self.buffer_data(event)  # Write to temp files
            if self.should_flush_buffer():
                self.upload_to_s3()  # Process your application buffer

The memory usage over time would look like (considering our consumer code):

Kafka Internal Buffer (500 messages):
████████████████████████████████████████  (constant ~50MB)

Your Application Buffer (accumulating):
▁▂▃▄▅▆▇█ (grows) → flush to S3 → ▁ (reset) → ▁▂▃▄▅▆▇█ → flush...

Apart from poll interval which we discussed earlier, there is also: poll timeout.

Poll Timeout (timeout parameter in poll(timeout)):

msg = consumer.poll(timeout=1000)  # Wait UP TO 1 second for messages

Poll Interval (max.poll.interval.ms configuration):

config = {
    'max.poll.interval.ms': 300000  # Must call poll() within 5 minutes
}

For example:

Normal Operation (Fast Processing):

0:00:00 poll() → get message → process (100ms) → poll() → get message...
        ↑────── 10ms between polls ──────↑
        Interval: 100 ms (less than default 5 minutes)
        Timeout: 0ms (as messages readily available)
To imagine better: 
Time Window: 0-200ms
┌─────────────────────────────────────────────────────────────┐
│ poll()      │ process │ poll()      │ process │ poll()      │
│ ┌─────────┐ │ ┌─────┐ │ ┌─────────┐ │ ┌─────┐ │ ┌─────────┐ │
│ │ 0ms     │ │ │100ms│ │ │ 0ms     │ │ │100ms│ │ │ 0ms     │ │
│ │(instant)│ │ │     │ │ │(instant)│ │ │     │ │ │(instant)│ │
│ └─────────┘ │ └─────┘ │ └─────────┘ │ └─────┘ │ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
      ↑─────────────↑         ↑─────────────↑
    Poll Timeout: 0ms    Poll Interval: 100ms
    (messages ready)     (very fast)

Slow Processing Operation:

0:00:00 poll() → get message → process (6 minutes) → poll()
        ↑────────── 6 minutes between polls ──────────↑
        Interval: 6 minutes (exceeds default 5 minutes, processing took time)
        Timeout: 0ms (as messages readily available)
Result: MAX_POLL_EXCEEDED error because 6min > 5min limit
To imagine better: 
Time Window: 0-7 minutes
┌─────────────────────────────────────────────────────────────┐
│ poll() │         Long Processing (S3 Upload)        │ poll()│
│ ┌────┐ │ ┌─────────────────────────────────────────┐ │ ┌──┐ │
│ │0ms │ │ │                6 minutes                │ │ │❌│ │
│ │msg │ │ │          (no poll() calls)              │ │ │  │ │
│ └────┘ │ └─────────────────────────────────────────┘ │ └──┘ │
└─────────────────────────────────────────────────────────────┘
    ↑─────────────────────────────────────────────────────↑
  Poll Timeout: 0ms              Poll Interval: 6 minutes
  (message ready)                (EXCEEDS 5min limit!)
                                 MAX_POLL_EXCEEDED ERROR

Empty Queue Scenario:

0:00:00 poll(timeout=1000) → wait 1sec → return None → poll(timeout=1000)...
        ↑────────── 1 second between polls ──────────↑
        Interval: 1000ms (as no messages available, window closing for new messages based on timeout defined)
        Timeout: 1000ms (as specified in poll)
To imagine better: 
Time Window: 0-3 seconds
┌─────────────────────────────────────────────────────────────┐
│ poll()                    │ poll()                    │     │
│ ┌───────────────────────┐ │ ┌───────────────────────┐ │     │
│ │ 1000ms                │ │ │ 1000ms                │ │     │
│ │ (waiting for messages)│ │ │ (waiting for messages)│ │     │
│ │ returns None          │ │ │ returns None          │ │     │
│ └───────────────────────┘ │ └───────────────────────┘ │     │
└─────────────────────────────────────────────────────────────┘
        ↑─────────────────────────↑─────────────────────────↑
      Poll Timeout: 1000ms   Poll Interval: 1000ms
      (queue empty)          (controlled by timeout)

Next, let’s talk about the offset management and commits, which is crucial for understanding how Kafka tracks message processing progress and ensures reliability in message consumption.

Polling (fetching messages):

records = consumer.poll(timeout=1000)  # Gets messages to process

Committing (confirming processing):

consumer.commit()  # Tells Kafka: "I've processed messages up to offset X"

Commit behaviour:

Auto-Commit Behavior:

config = {
    'enable.auto.commit': True,
    'auto.commit.interval.ms': 5000  # Commit every 5 seconds
}

# Timeline:
# 0s: poll() and process messages
# 3s: poll() and process messages  
# 5s: auto-commit happens (commits all polled messages)
# 6s: poll() and process messages
# 10s: auto-commit happens again

Manual Commit Behavior (Your Code):

config = {
    'enable.auto.commit': False  # You control when to commit
}

for event in kafka.stream():
    process(event)
    if batch_complete:
        kafka.commit_offsets()  # Explicit commit after processing

Commit Strategies:

At-Most-Once (commit before processing):

for record in consumer:
    consumer.commit()     # Commit first
    process(record)       # Then process
    # Risk: If processing fails, message is lost

At-Least-Once (commit after processing) - Current approach:

for record in consumer:
    process(record)       # Process first
    consumer.commit()     # Then commit
    # Risk: If commit fails, message may be processed twice

Exactly-Once (using transactions):

# Requires Kafka transactions and idempotent processing
with consumer.transaction():
    for record in consumer:
        result = process(record)
        producer.send(result)
    # Commit and produce happen atomically

Consumer Lag: Lag occurs when your processing speed is slower than the message production rate. In our case, the step when S3 upload happens, it would’ve caused a spike in lag.

Message Production Rate: 1000 msgs/second
Your Processing Rate:    800 msgs/second
                        ────────────────
Net Lag Increase:       200 msgs/second

I think we have touched upon most of the concepts related to the Kafka consumer, and now we can easily connect the dots which led to the MAX_POLL_EXCEEDED error and possible fixes.

Error:

KafkaError{code=_MAX_POLL_EXCEEDED,val=-147,str="Application maximum poll interval (300000ms) exceeded by 249ms"}

The error means: “Consumer didn’t call poll() for more than 5 minutes and 249 milliseconds, so it’s being removed from the consumer group.”

The error happened because of a processing logic blocking the polling loop:

# Problematic code pattern
for event, error in kafka.stream():  # This calls poll() internally
    if event:
        self.buffer_data(event)  # Fast operation
        
        if self.should_flush():
            # This is the likely problem: blocking operation
            self.close_file_and_reset_buffer()  # Includes S3 upload (6+ minutes)
            kafka.commit_offsets()
            # poll() can't be called during S3 upload!

If it’s hypothesized, then the most likely scenario is:

Timeline of MAX_POLL_EXCEEDED Error:

0:00:00 ├─ poll() called, message received
0:00:00 ├─ buffer_data() - fast
0:00:01 ├─ poll() called, message received  
0:00:01 ├─ buffer_data() - fast
        │   ... (many messages processed quickly)
0:02:30 ├─ poll() called, message received
0:02:30 ├─ buffer_data() - fast
0:02:30 ├─ Buffer full! Start S3 upload...
0:02:30 ├─ ████████████ S3 UPLOAD (6 minutes) ████████████
        │   │
        │   │ During this time:
        │   │ - No poll() calls can happen
        │   │ - Heartbeats continue (good)
        │   │ - But max.poll.interval.ms timer keeps ticking
        │   │
0:08:30 ├─ ████████████████████████████████████████████████
0:08:30 ├─ S3 upload completes
0:08:30 ├─ commit_offsets() called
0:08:30 ├─ Try to call poll() again... ERROR!
        │
        └─ 6 minutes > 5 minute limit = MAX_POLL_EXCEEDED

Possible solutions would’ve been:

config = {
    'max.poll.interval.ms': 900000  # 15 minutes
}
import concurrent.futures
class KafkaToS3Consumer:
    def __init__(self):
        self.s3_executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    def close_file_and_reset_buffer(self):
        # Submit S3 upload to thread pool (non-blocking)
        future = self.s3_executor.submit(
            upload_to_s3, 
            self.temp_file_path
        )
        # Don't wait for completion - polling can continue
        return future