8000 threading.local memory leak · Issue #741 · eventlet/eventlet · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

threading.local memory leak #741

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
jshen28 opened this issue Jan 4, 2022 · 18 comments
Open

threading.local memory leak #741

jshen28 opened this issue Jan 4, 2022 · 18 comments

Comments

@jshen28
Copy link
jshen28 commented Jan 4, 2022

Hello,

I am finding that socket.connect when destination IP/Port pair is not used may lead to surprising memory usage. the memory profile result looks like below. It is weir that sock.connect uses around 100MB memory. The @profile decorator is added to _reconnect function from fluent-logger.

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
   196  415.797 MiB  255.617 MiB       20000       @profile
   197                                             def _reconnect(self):
   198  415.797 MiB    8.027 MiB       20000           if not self.socket:
   199  415.797 MiB    0.000 MiB       20000               try:
   200  415.797 MiB    0.082 MiB       20000                   if self.host.startswith('unix://'):
   201                                                             sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
   202                                                             sock.settimeout(self.timeout)
   203                                                             sock.connect(self.host[len('unix://'):])
   204                                                         else:
   205  415.797 MiB   27.230 MiB       20000                       sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   206  415.797 MiB    1.723 MiB       20000                       sock.settimeout(self.timeout)
   207                                                             # This might be controversial and may need to be removed
   208  415.797 MiB    0.000 MiB       20000                       sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
   209                                                             #sock.connect((self.host, self.port))
   210  415.797 MiB  117.297 MiB       20000                       sock.connect(("localhost", 24244))
   211  415.797 MiB    4.129 MiB       20000               except Exception as e:
   212  415.797 MiB    0.000 MiB       20000                   try:
   213  415.797 MiB    0.148 MiB       20000                       sock.close()
   214                                                         except Exception:  # pragma: no cover
   215                                                             pass
   216  415.797 MiB    1.543 MiB       20000                   raise e
   217                                                     else:
   218                                                         self.socket = sock

The script I am using is

#!/usr/bin/env python3
import tracemalloc
import logging
import logging.config
import time
import eventlet
tracemalloc.start()

eventlet.monkey_patch()

logging.config.fileConfig("logging.conf")
loggers = []
counter = 2

for i in range(counter):
    name = "name%s" % i
    loggers.append(logging.getLogger(name))

pool = eventlet.GreenPool()
for j in range(10000):
    for i in range(counter):
        pool.spawn(loggers[i].warning, "hello world from sjt")

time.sleep(4)
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Top 10 ]")
for stat in top_stats[:10]:
    print(stat)
pool.waitall()
total = sum(stat.size for stat in top_stats)
print("total: ", total)

And logging.conf looks like below

[formatter_context]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
[formatter_default]
datefmt = %Y-%m-%d %H:%M:%S
format = %(message)s
[formatter_fluentd]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
[formatters]
keys = context,default,fluentd
[handler_fluentd]
args = ("openstack.cinder.api", 'localhost', 24234)
kwargs = {'bufmax': 0}
class = fluent.handler.FluentHandler
formatter = fluentd
[handler_null]
args = ()
class = logging.NullHandler
formatter = default
[handler_stderr]
args = (sys.stderr,)
class = StreamHandler
formatter = context
[handler_stdout]
args = (sys.stdout,)
class = StreamHandler
formatter = context
[handlers]
keys = stdout,stderr,null,fluentd
[logger_root]
handlers = fluentd
level = DEBUG
[loggers]
keys = root

To run the script,

python3 -m memory_profiler fluent_test.py

Version:

  • eventlet: 0.33.0
  • fluent-logger: 0.10.0
  • python3: 3.8.10
@jshen28
Copy link
Author
jshen28 commented Jan 4, 2022

I also try python2< 8000 /strong> (with different package version) and looks like memory is not increasing too much but program feels significantly slower to me.

@temoto
Copy link
Member
temoto commented Jan 4, 2022

Thanks for details.

  • 100MB over 20000 occurrences is "only" 5kb per socket.connect which doesn't seem that much. Should be close to zero if socket buffers are counted as kernel memory or about two pages (8/16kb) if socket buffers are counted as process memory.
  • Python2/3 difference seems to confirm there is a problem. Or maybe it's the difference in dangling pointers. Try to add gc.collect() to be sure.

@jshen28
Copy link
Author
jshen28 commented Jan 4, 2022

Thank you for reply.

  • I do try to use gc.collect but no luck. I also find if I comment out monkey_patch seems memory usage back to normal.
  • 100MB over 20000 is not true because fluent logger will use a threading.Lock, so essentially there is only one green thread calling socket.connect

@temoto
Copy link
Member
temoto commented Jan 4, 2022

Yeah there is (was?) an issue with logging compatibility. It worked better with monkey_patch(thread=False) but that's a bad workaround.

Following benchmark clearly show there is no memory leak in socket.connect isolated from logging/fluent/etc. It only means more measurements have to be done to find the root issue.

import eventlet

eventlet.monkey_patch()

import linecache
import resource
import tracemalloc
import os


def filter_snapshot(snapshot):
    return snapshot.filter_traces(
        (
            tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
            tracemalloc.Filter(False, "<unknown>"),
        )
    )


def display_top(top_stats, limit=10):
    cwd = os.getcwd()

    print("Top %s lines" % limit)
    for index, stat in enumerate(top_stats[:limit], 1):
        frame = stat.traceback[0]
        filename = frame.filename.replace(cwd, ".", 1)
        print(
            "#%s: %s:%s: %.1f KiB" % (index, filename, frame.lineno, stat.size / 1024)
        )
        line = linecache.getline(filename, frame.lineno).strip()
        if line:
            print("    %s" % line)

    other = top_stats[limit:]
    if other:
        size = sum(stat.size for stat in other)
        print("%s other: %.1f KiB" % (len(other), size / 1024))
    total = sum(stat.size for stat in top_stats)
    print("Total allocated size: %.1f KiB" % (total / 1024))
    return total


N = 10000
sockets = [None] * N
limit_nofile = N * 2 + 100
resource.setrlimit(resource.RLIMIT_NOFILE, (limit_nofile, limit_nofile))

tracemalloc.start()

for i in range(N):
    try:
        sockets[i] = eventlet.connect(("localhost", 4083))
    except:
        print(i)
        raise

snap1 = tracemalloc.take_snapshot()
total1 = display_top(snap1.statistics("lineno"), limit=3)

[sock.close() for sock in sockets]
del sockets

snap2 = tracemalloc.take_snapshot()
total2 = display_top(snap2.statistics("lineno"), limit=3)
# snap2 = tracemalloc.take_snapshot()
# total2 = display_top(snap2.compare_to(snap1, "lineno"), limit=1)

print("Total change:", round((total2 - total1) / total1, 2))
Top 3 lines
#1: ./eventlet/greenio/base.py:147: 2265.1 KiB
    self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
#2: ./eventlet/greenio/base.py:139: 859.4 KiB
    fd = _original_socket(family, *args, **kwargs)
#3: ./eventlet/greenio/base.py:172: 703.9 KiB
    self.shutdown = fd.shutdown
72 other: 5330.8 KiB
Total allocated size: 9159.1 KiB
Top 3 lines
#1: /usr/lib/python3.8/linecache.py:137: 43.2 KiB
    lines = fp.readlines()
#2: ./eventlet/greenio/base.py:166: 16.1 KiB
    self.close = fd.close
#3: ./eventlet/greenio/base.py:147: 5.0 KiB
    self._timeout = fd.gettimeout() or socket.getdefaulttimeout()
109 other: 60.9 KiB
Total allocated size: 125.2 KiB
Total change: -0.99

@temoto temoto changed the title socket.connect high memory usage high memory usage in logging or fluent-logger socket.connect Jan 4, 2022
@temoto
Copy link
Member
temoto commented Jan 4, 2022

Continuing previous message.

  • server used socat -u /dev/null tcp-listen:4083,reuseaddr,fork
  • memory_profiler result contradicts tracemalloc so I'm unsure how to interpret it
Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    51   25.219 MiB   25.219 MiB           1   @profile
    52                                         def foo():
    53   43.320 MiB    0.004 MiB       10001       for i in range(N):
    54   43.320 MiB    0.000 MiB       10000           try:
    55   43.320 MiB   18.098 MiB       10000               sockets[i] = eventlet.connect(("localhost", 4083))

@jshen28
Copy link
Author
jshen28 commented Jan 4, 2022

Thank you for investigation!

  • Do you also try if there is no server listening, because for me, memory boosts faster if fluent agent does not working properly.
  • Using monkey_patch(thread=False) does look like shrinking memory usage
  • looks like fluent logger is using thread.Lock to do synchronization, do you think it is still necessary to have it?
  • do you try top? for me, looks like memory profiler gives a closer value compared to top

@temoto
Copy link
Member
temoto commented Jan 4, 2022

No server, try/except: pass ignore connect error.

Result is no memory used, also no leaks. Second snapshot didn't get any allocations in program, so it shows internals of stdlib.

Top 3 lines
#1: ./eventlet/hubs/poll.py:111: 2.2 KiB
    listener.cb(fileno)
#2: ./eventlet/hubs/hub.py:365: 1.0 KiB
    self.wait(sleep_time)
#3: ./eventlet/hubs/__init__.py:157: 0.8 KiB
    listener = hub.add(hub.WRITE, fileno, current.switch, current.throw, mark_as_closed)
66 other: 17.6 KiB
Total allocated size: 21.6 KiB
Top 3 lines
#1: /usr/lib/python3.8/linecache.py:137: 66.9 KiB
    lines = fp.readlines()
#2: /usr/lib/python3.8/tracemalloc.py:65: 4.2 KiB
    return (self.size, self.count, self.traceback)
#3: /usr/lib/python3.8/tracemalloc.py:185: 3.2 KiB
    self._frames = tuple(reversed(frames))
95 other: 34.0 KiB
Total allocated size: 108.3 KiB
Total change: 4.02

memory_profiler

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    51   25.160 MiB   25.160 MiB           1   @profile
    52                                         def foo():
    53   25.965 MiB    0.000 MiB       10001       for i in range(N):
    54   25.965 MiB    0.000 MiB       10000           try:
    55   25.965 MiB    0.805 MiB       10000               sockets[i] = eventlet.connect(("localhost", 4083))
    56   25.965 MiB    0.000 MiB       10000           except:
    57   25.965 MiB    0.000 MiB       10000               pass

@temoto
Copy link
Member
temoto commented Jan 4, 2022

looks like fluent logger is using thread.Lock to do synchronization, do you think it is still necessary to have it?

It is mandatory in that particular style of buffering with pendings : bytes. Same buffering could be done without explicit lock (just GIL) with list of bytes, which has atomic append/pop or another thread-safe FIFO data structure. Interestingly, a TCP socket has required semantics, so naive implementation is just sock.sendall(msgpack(record)) + decision whether to wait or drop records that exceed buffer. Of course, they must have had perfect reasons to introduce separate buffering. Cross platform support is really hard.

@temoto
Copy link
Member
temoto 8000 commented Jan 4, 2022

do you try top? for me, looks like memory profiler gives a closer value compared to top

It's because memory_profiler by default uses psutil to get exactly the same value RSS as reported by top. https://github.com/pythonprofilers/memory_profiler/blob/master/memory_profiler.py#L314 So it also counts everything in Python interpreter internals. Also, AFAIK, RSS is measured in full pages. memory_profiler has tracemalloc backend too.

Interpreter internal allocations should roughly cancel out when comparing two datapoints. Full pages should not be an obstacle when measuring a lot of allocations. So not saying RSS is bad metric, just not as precise.

@jshen28
Copy link
Author
jshen28 commented Jan 5, 2022

I try to remove last_error which is a threadLocal variable, and looks like it could reduces some memory usage.

@jshen28
Copy link
Author
jshen28 commented Jan 5, 2022

I come up with a new demo which could somehow reproduce the high memory usage, it is a copy of sender.py but without actually sending data out. During test, IP/Port pair "localhost" and 24224 is NOT available.

I found threading.local might contribute to the high memory usage, by replacing it with a custom object, looks like memory usage could be reduced.

#!/usr/bin/env python3

import tracemalloc
import socket
import logging
import logging.config
import time
import eventlet
import threading

eventlet.monkey_patch()
tracemalloc.start()

class MyThreadLocal:
    def __init__(self):
        self.__attrs = {}

    def __setattr__(self, name, value):
        if name.startswith("_MyThreadLocal"):
            super().__setattr__(name, value)
            return
        self.__attrs[name] = value


class MyTester:
    def __init__(self):
        self.socket = None
        self.lock = threading.Lock()
        self.last_error = threading.local()
        #self.last_error = MyThreadLocal()

    def log(self, message):
        with self.lock:
            if self.socket:
                return False
            self._log_internal(message)

    def _log_internal(self, message):

        try:
            self._send_data(message)
            return True
        except socket.error as e:
            self._close()
            return False

    def _close(self):
        try:
            sock = self.socket
            if sock:
                try:
                    try:
                        sock.shutdown(socket.SHUT_RDWR)
                    except socket.error:
                        pass
                finally:
                    try:
                        sock.close()
                    except socket.error:
                        pass
        finally:
            self.socket = None

    def _send_data(self, message):
        self.connect_socket("localhost", 24224)

    def connect_socket(self, host, port):
        sock = None
        if not self.socket:
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(3)
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
                sock.connect((host, port))
            except Exception as e:
                self.last_error.exception = e
                try:
                    sock.close()
                except:
                    pass
                raise
            else:
                self.socket = sock


pool = eventlet.GreenPool()
obj = MyTester()
for i in range(100000):
    pool.spawn(obj.log, b"localhost")

pool.waitall()
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Top 10 ]")
for stat in top_stats[:10]:
    print(stat)
time.sleep(1000)

@temoto
Copy link
Member
temoto commented Jan 5, 2022

Yeah that makes sense. GreenPool starts each call in a new greenthread, each having a separate threading.local() object. Basically, it's a recipe to collect exceptions, together with heavy tracebacks, from each call.

I'd expect everything to be cleaned up:

  • best case after pool.waitall()
  • worst case after time or gc.collect() if there are circular references for some weird reason

No cleanup after gc.collect() is likely a bug in eventlet.

@temoto
Copy link
Member
temoto commented Jan 5, 2022

You can reduce it further to something like this, no sockets or logging required.

def work():
    try:
        raise Exception()
    except Exception as e:
        threading.local().last_error = e

pool = GreenPool()
[pool.spawn(work) for _ in range(N)]
pool.waitall()

@jshen28
Copy link
Author
jshen28 commented Jan 6, 2022

Looks like even without try/catch phrase, it still could reproduce the problem.

def work():
    threading.local().last_error = '123'
        

pool = GreenPool()
[pool.spawn(work) for _ in range(N)]
pool.waitall()

@temoto
Copy link
Member
temoto commented Jan 6, 2022

Cool, thanks, I'll add this minimal test to eventlet test suite. After fixing of course.

@temoto temoto changed the title high memory usage in logging or fluent-logger socket.connect threading.local memory leak Jan 6, 2022
@temoto
Copy link
Member
temoto commented Jan 6, 2022

@jshen28 please post uname -a and result of this script. I can't reproduce it in minimal version.

import eventlet

eventlet.monkey_patch()

import gc
import resource
import threading
import psutil


N = 100000
pool = eventlet.GreenPool()


def memory_usage():
    return int(psutil.Process().memory_full_info().uss / 8192)
    # return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss


def work():
    threading.local().leak = b"." * 1024


mem1 = memory_usage()


for i in range(N):
    pool.spawn(work)
pool.waitall()
mem2 = memory_usage()

gc.collect()
mem3 = memory_usage()

print("base\t{:6d}".format(mem1))
print("waitall\t{:6d}\t{:+6d}".format(mem2, mem2 - mem1))
print("collect\t{:6d}\t{:+6d}".format(mem3, mem3 - mem1))

@jshen28
Copy link
Author
jshen28 commented Jan 7, 2022

Hmm... could you try [pool.spawn(work) for _ in range(N)]?

# uname -a
Linux compute-010 5.10.83 #3 SMP Fri Dec 3 11:13:00 CST 2021 x86_64 x86_64 x86_64 GNU/Linux
# python3 --version
Python 3.8.10

And the program is running in a container

@jshen28
Copy link
Author
jshen28 commented Jan 7, 2022
def work():
    try:
        raise Exception()
    except Exception as e:
        threading.local().last_error = e

pool = GreenPool()
[pool.spawn(work) for _ in range(N)]
pool.waitall()
  • if I replace [pool.spawn(work) for _ in range(N)] with for i in range(N), looks like memory usage is low.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants
0