-
Notifications
You must be signed in to change notification settings - Fork 327
threading.local memory leak #741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I also try python2< 8000 /strong> (with different package version) and looks like memory is not increasing too much but program feels significantly slower to me. |
Thanks for details.
|
Thank you for reply.
|
Yeah there is (was?) an issue with logging compatibility. It worked better with Following benchmark clearly show there is no memory leak in import eventlet
eventlet.monkey_patch()
import linecache
import resource
import tracemalloc
import os
def filter_snapshot(snapshot):
return snapshot.filter_traces(
(
tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
tracemalloc.Filter(False, "<unknown>"),
)
)
def display_top(top_stats, limit=10):
cwd = os.getcwd()
print("Top %s lines" % limit)
for index, stat in enumerate(top_stats[:limit], 1):
frame = stat.traceback[0]
filename = frame.filename.replace(cwd, ".", 1)
print(
"#%s: %s:%s: %.1f KiB" % (index, filename, frame.lineno, stat.size / 1024)
)
line = linecache.getline(filename, frame.lineno).strip()
if line:
print(" %s" % line)
other = top_stats[limit:]
if other:
size = sum(stat.size for stat in other)
print("%s other: %.1f KiB" % (len(other), size / 1024))
total = sum(stat.size for stat in top_stats)
print("Total allocated size: %.1f KiB" % (total / 1024))
return total
N = 10000
sockets = [None] * N
limit_nofile = N * 2 + 100
resource.setrlimit(resource.RLIMIT_NOFILE, (limit_nofile, limit_nofile))
tracemalloc.start()
for i in range(N):
try:
sockets[i] = eventlet.connect(("localhost", 4083))
except:
print(i)
raise
snap1 = tracemalloc.take_snapshot()
total1 = display_top(snap1.statistics("lineno"), limit=3)
[sock.close() for sock in sockets]
del sockets
snap2 = tracemalloc.take_snapshot()
total2 = display_top(snap2.statistics("lineno"), limit=3)
# snap2 = tracemalloc.take_snapshot()
# total2 = display_top(snap2.compare_to(snap1, "lineno"), limit=1)
print("Total change:", round((total2 - total1) / total1, 2))
|
Continuing previous message.
|
Thank you for investigation!
|
No server, Result is no memory used, also no leaks. Second snapshot didn't get any allocations in program, so it shows internals of stdlib.
memory_profiler
|
It is mandatory in that particular style of buffering with |
It's because memory_profiler by default uses psutil to get exactly the same value RSS as reported by top. https://github.com/pythonprofilers/memory_profiler/blob/master/memory_profiler.py#L314 So it also counts everything in Python interpreter internals. Also, AFAIK, RSS is measured in full pages. memory_profiler has tracemalloc backend too. Interpreter internal allocations should roughly cancel out when comparing two datapoints. Full pages should not be an obstacle when measuring a lot of allocations. So not saying RSS is bad metric, just not as precise. |
I try to remove |
I come up with a new demo which could somehow reproduce the high memory usage, it is a copy of I found #!/usr/bin/env python3
import tracemalloc
import socket
import logging
import logging.config
import time
import eventlet
import threading
eventlet.monkey_patch()
tracemalloc.start()
class MyThreadLocal:
def __init__(self):
self.__attrs = {}
def __setattr__(self, name, value):
if name.startswith("_MyThreadLocal"):
super().__setattr__(name, value)
return
self.__attrs[name] = value
class MyTester:
def __init__(self):
self.socket = None
self.lock = threading.Lock()
self.last_error = threading.local()
#self.last_error = MyThreadLocal()
def log(self, message):
with self.lock:
if self.socket:
return False
self._log_internal(message)
def _log_internal(self, message):
try:
self._send_data(message)
return True
except socket.error as e:
self._close()
return False
def _close(self):
try:
sock = self.socket
if sock:
try:
try:
sock.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
finally:
try:
sock.close()
except socket.error:
pass
finally:
self.socket = None
def _send_data(self, message):
self.connect_socket("localhost", 24224)
def connect_socket(self, host, port):
sock = None
if not self.socket:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.connect((host, port))
except Exception as e:
self.last_error.exception = e
try:
sock.close()
except:
pass
raise
else:
self.socket = sock
pool = eventlet.GreenPool()
obj = MyTester()
for i in range(100000):
pool.spawn(obj.log, b"localhost")
pool.waitall()
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Top 10 ]")
for stat in top_stats[:10]:
print(stat)
time.sleep(1000) |
Yeah that makes sense. GreenPool starts each call in a new greenthread, each having a separate I'd expect everything to be cleaned up:
No cleanup after |
You can reduce it further to something like this, no sockets or logging required. def work():
try:
raise Exception()
except Exception as e:
threading.local().last_error = e
pool = GreenPool()
[pool.spawn(work) for _ in range(N)]
pool.waitall() |
Looks like even without try/catch phrase, it still could reproduce the problem. def work():
threading.local().last_error = '123'
pool = GreenPool()
[pool.spawn(work) for _ in range(N)]
pool.waitall() |
Cool, thanks, I'll add this minimal test to eventlet test suite. After fixing of course. |
@jshen28 please post import eventlet
eventlet.monkey_patch()
import gc
import resource
import threading
import psutil
N = 100000
pool = eventlet.GreenPool()
def memory_usage():
return int(psutil.Process().memory_full_info().uss / 8192)
# return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
def work():
threading.local().leak = b"." * 1024
mem1 = memory_usage()
for i in range(N):
pool.spawn(work)
pool.waitall()
mem2 = memory_usage()
gc.collect()
mem3 = memory_usage()
print("base\t{:6d}".format(mem1))
print("waitall\t{:6d}\t{:+6d}".format(mem2, mem2 - mem1))
print("collect\t{:6d}\t{:+6d}".format(mem3, mem3 - mem1)) |
Hmm... could you try # uname -a
Linux compute-010 5.10.83 #3 SMP Fri Dec 3 11:13:00 CST 2021 x86_64 x86_64 x86_64 GNU/Linux
# python3 --version
Python 3.8.10 And the program is running in a container |
def work():
try:
raise Exception()
except Exception as e:
threading.local().last_error = e
pool = GreenPool()
[pool.spawn(work) for _ in range(N)]
pool.waitall()
|
Uh oh!
There was an error while loading. Please reload this page.
Hello,
I am finding that
socket.connect
when destination IP/Port pair is not used may lead to surprising memory usage. the memory profile result looks like below. It is weir thatsock.connect
uses around 100MB memory. The@profile
decorator is added to_reconnect
function fromfluent-logger
.The script I am using is
And
logging.conf
looks like belowTo run the script,
python3 -m memory_profiler fluent_test.py
Version:
eventlet
: 0.33.0fluent-logger
: 0.10.0python3
: 3.8.10The text was updated successfully, but these errors were encountered: