Skip to content

Commit 3b07ff0

Browse files
committed
print_banner: print actual port in case the port is dynamic
1 parent f53e70d commit 3b07ff0

File tree

9 files changed

+269
-125
lines changed

9 files changed

+269
-125
lines changed

flower/app.py

Lines changed: 145 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import sys
22
import logging
3+
from urllib.parse import quote
34

45
from concurrent.futures import ThreadPoolExecutor
56

@@ -37,21 +38,21 @@ class Flower(tornado.web.Application):
3738

3839
def __init__(self, options=None, capp=None, events=None,
3940
io_loop=None, **kwargs):
41+
4042
handlers = default_handlers
4143
if options is not None and options.url_prefix:
4244
handlers = [rewrite_handler(h, options.url_prefix) for h in handlers]
4345
kwargs.update(handlers=handlers)
46+
4447
super().__init__(**kwargs)
48+
4549
self.options = options or default_options
4650
self.io_loop = io_loop or ioloop.IOLoop.instance()
4751
self.ssl_options = kwargs.get('ssl_options', None)
4852

4953
self.capp = capp or celery.Celery()
5054
self.capp.loader.import_default_modules()
5155

52-
self.executor = self.pool_executor_cls(max_workers=self.max_workers)
53-
self.io_loop.set_default_executor(self.executor)
54-
5556
self.inspector = Inspector(self.io_loop, self.capp, self.options.inspect_timeout / 1000.0)
5657

5758
self.events = events or Events(
@@ -63,33 +64,85 @@ def __init__(self, options=None, capp=None, events=None,
6364
io_loop=self.io_loop,
6465
max_workers_in_memory=self.options.max_workers,
6566
max_tasks_in_memory=self.options.max_tasks)
66-
self.started = False
6767

68-
def start(self):
68+
self._http_server = None
69+
self._executor = None
70+
71+
def _start_executor(self):
72+
if self._executor is None:
73+
logging.debug("Starting executor...")
74+
ctx = self.pool_executor_cls(max_workers=self.max_workers)
75+
self._executor = ctx.__enter__()
76+
self.io_loop.set_default_executor(self._executor)
77+
78+
def _stop_executor(self):
79+
if self._executor is not None:
80+
logging.debug("Stop executor...")
81+
self._executor.__exit__(None, None, None)
82+
self._executor = None
83+
84+
def _start_events(self):
85+
logging.debug("Starting event...")
6986
self.events.start()
7087

88+
def _stop_events(self):
89+
logging.debug("Stopping event...")
90+
self.events.stop()
91+
92+
def _start_http_server(self):
93+
logging.debug("Starting HTTP server...")
7194
if not self.options.unix_socket:
72-
self.listen(self.options.port, address=self.options.address,
73-
ssl_options=self.ssl_options,
74-
xheaders=self.options.xheaders)
95+
http_server = self.listen(
96+
self.options.port,
97+
address=self.options.address,
98+
ssl_options=self.ssl_options,
99+
xheaders=self.options.xheaders
100+
)
75101
else:
76102
from tornado.netutil import bind_unix_socket
77-
server = HTTPServer(self)
78-
socket = bind_unix_socket(self.options.unix_socket, mode=0o777)
79-
server.add_socket(socket)
80103

81-
self.started = True
82-
self.update_workers()
104+
http_server = HTTPServer(self)
105+
socket = bind_unix_socket(self.options.unix_socket, mode=0o777)
106+
http_server.add_socket(socket)
107+
self._http_server = http_server
108+
109+
def _stop_http_server(self):
110+
logging.debug("Stopping HTTP server...")
111+
self.io_loop.run_sync(
112+
self._http_server.close_all_connections, timeout=5
113+
)
114+
self._http_server.stop()
115+
self._http_server = None
116+
117+
def start_server(self):
118+
if self._http_server is not None:
119+
logging.debug("Flower server already started.")
120+
return
121+
self._start_executor()
122+
self._start_events()
123+
self._start_http_server()
124+
logging.debug("Flower server started.")
125+
126+
def stop_server(self):
127+
if self._http_server is None:
128+
logging.debug("Flower server already stopped.")
129+
return
130+
self._stop_events()
131+
self._stop_http_server()
132+
self._stop_executor()
133+
logging.debug("Flower server stopped.")
134+
135+
def serve_forever(self):
136+
if not self._http_server:
137+
raise RuntimeError("The server is not running")
138+
logging.debug("Starting event loop...")
83139
self.io_loop.start()
84140

85-
def stop(self):
86-
if self.started:
87-
self.events.stop()
88-
logging.debug("Stopping executors...")
89-
self.executor.shutdown(wait=False)
90-
logging.debug("Stopping event loop...")
91-
self.io_loop.stop()
92-
self.started = False
141+
def shutdown(self):
142+
if self._http_server:
143+
raise RuntimeError("The server is still running")
144+
logging.debug("Stopping event loop...")
145+
self.io_loop.stop()
93146

94147
@property
95148
def transport(self):
@@ -101,3 +154,74 @@ def workers(self):
101154

102155
def update_workers(self, workername=None):
103156
return self.inspector.inspect(workername)
157+
158+
def _get_scheme(self):
159+
if self.options.unix_socket:
160+
return "http+unix"
161+
elif self.ssl_options:
162+
return "https"
163+
else:
164+
return "http"
165+
166+
def _get_domain(self):
167+
if self.options.unix_socket:
168+
raise RuntimeError("UNIX socket")
169+
170+
if hasattr(self._http_server, "_sockets") and self._http_server._sockets:
171+
sock = list(self._http_server._sockets.values())[0]
172+
return sock.getsockname()[0]
173+
else:
174+
return self.options.address or "0.0.0.0"
175+
176+
def _get_port(self):
177+
if self.options.unix_socket:
178+
raise RuntimeError("UNIX socket")
179+
180+
if hasattr(self._http_server, "_sockets") and self._http_server._sockets:
181+
sock = list(self._http_server._sockets.values())[0]
182+
return sock.getsockname()[1]
183+
else:
184+
return self.options.port
185+
186+
def _get_authority(self):
187+
if self.options.unix_socket:
188+
return quote(self.options.unix_socket)
189+
return f"{self._get_domain()}:{self._get_port()}"
190+
191+
def _get_url_path(self, path=None, include_prefix=True):
192+
if not include_prefix or not self.options.url_prefix:
193+
return path or ""
194+
prefix = self.options.url_prefix.strip("/")
195+
return f"/{prefix}{path or ''}"
196+
197+
def get_url(self, path=None, include_prefix=True):
198+
path = self._get_url_path(path, include_prefix=include_prefix)
199+
return f"{self._get_scheme()}://{self._get_authority()}{path}"
200+
201+
#
202+
# For backward compatibility
203+
#
204+
205+
def start(self):
206+
self.start_server()
207+
self.update_workers()
208+
self.serve_forever()
209+
210+
def stop(self):
211+
self.stop_server()
212+
self.shutdown()
213+
214+
@property
215+
def started(self):
216+
return self._http_server is not None
217+
218+
@started.setter
219+
def started(self, value):
220+
if value:
221+
self.start_server()
222+
else:
223+
self.stop_server()
224+
225+
@property
226+
def executor(self):
227+
return self._executor

flower/command.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,17 @@ def flower(ctx, tornado_argv):
5050
atexit.register(flower_app.stop)
5151
signal.signal(signal.SIGTERM, sigterm_handler)
5252

53-
if not ctx.obj.quiet:
54-
print_banner(app, 'ssl_options' in settings)
53+
try:
54+
flower_app.start_server()
55+
finally:
56+
# Print the banner even when server failed to start
57+
if not ctx.obj.quiet:
58+
print_banner(flower_app)
59+
60+
flower_app.update_workers()
5561

5662
try:
57-
flower_app.start()
63+
flower_app.serve_forever()
5864
except (KeyboardInterrupt, SystemExit):
5965
pass
6066

@@ -158,24 +164,18 @@ def is_flower_envvar(name):
158164
name[len(ENV_VAR_PREFIX):].lower() in default_options
159165

160166

161-
def print_banner(app, ssl):
162-
if not options.unix_socket:
163-
if options.url_prefix:
164-
prefix_str = f'/{options.url_prefix}/'
165-
else:
166-
prefix_str = ''
167-
168-
logger.info(
169-
"Visit me at http%s://%s:%s%s", 's' if ssl else '',
170-
options.address or '0.0.0.0', options.port,
171-
prefix_str
172-
)
167+
def print_banner(flower_app):
168+
if not flower_app.options.unix_socket:
169+
url = flower_app.get_url(include_prefix=True)
170+
logger.info("Visit me at %s", url)
173171
else:
174-
logger.info("Visit me via unix socket file: %s", options.unix_socket)
172+
unix_socket = flower_app.options.unix_socket
173+
logger.info("Visit me via unix socket file: %s", unix_socket)
175174

176-
logger.info('Broker: %s', app.connection().as_uri())
175+
capp = flower_app.capp
176+
logger.info('Broker: %s', capp.connection().as_uri())
177177
logger.info(
178178
'Registered tasks: \n%s',
179-
pformat(sorted(app.tasks.keys()))
179+
pformat(sorted(capp.tasks.keys()))
180180
)
181181
logger.debug('Settings: %s', pformat(settings))

tests/unit/__init__.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,60 @@
33

44
import celery
55
import tornado.testing
6-
from tornado.ioloop import IOLoop
76
from tornado.options import options
7+
from tornado.httpclient import AsyncHTTPClient, HTTPResponse
88

99
from flower import command # noqa: F401 side effect - define options
1010
from flower.app import Flower
11-
from flower.events import Events
1211
from flower.urls import handlers, settings
1312

1413

15-
class AsyncHTTPTestCase(tornado.testing.AsyncHTTPTestCase):
14+
class AsyncHTTPTestCase(tornado.testing.AsyncTestCase):
1615

17-
def _get_celery_app(self):
18-
return celery.Celery()
16+
def setUp(self) -> None:
17+
super().setUp()
18+
self._http_client = AsyncHTTPClient()
19+
self._capp = celery.Celery()
20+
self._start_flower()
1921

20-
def get_app(self, capp=None):
21-
if not capp:
22-
capp = self._get_celery_app()
23-
events = Events(capp, IOLoop.current())
24-
app = Flower(capp=capp, events=events,
25-
options=options, handlers=handlers, **settings)
26-
return app
22+
def _start_flower(self):
23+
self._app = Flower(
24+
capp=self._capp,
25+
io_loop=self.io_loop,
26+
options=options,
27+
handlers=handlers,
28+
**settings
29+
)
30+
self._app.start_server()
31+
32+
def _stop_flower(self):
33+
self._app.stop_server()
34+
35+
def _restart_flower(self, reset_celery_app=False):
36+
self._stop_flower()
37+
if reset_celery_app:
38+
self._capp = celery.Celery()
39+
self._start_flower()
40+
41+
def tearDown(self) -> None:
42+
self._http_client.close()
43+
self._app.stop_server()
44+
del self._http_client
45+
del self._app
46+
super().tearDown()
47+
48+
def fetch(
49+
self, path: str, raise_error: bool = False, **kwargs
50+
) -> HTTPResponse:
51+
url = self._app.get_url(path)
52+
53+
def fetch():
54+
return self._http_client.fetch(url, raise_error=raise_error, **kwargs)
55+
56+
return self.io_loop.run_sync(
57+
fetch,
58+
timeout=tornado.testing.get_async_test_timeout(),
59+
)
2760

2861
def get(self, url, **kwargs):
2962
return self.fetch(url, **kwargs)

tests/unit/api/test_control.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ def test_unknown_worker(self):
1616

1717
class WorkerControlTests(BaseApiTestCase):
1818
def setUp(self):
19-
BaseApiTestCase.setUp(self)
19+
super().setUp()
2020
self.is_worker = ControlHandler.is_worker
2121
ControlHandler.is_worker = lambda *args: True
2222

2323
def tearDown(self):
24-
BaseApiTestCase.tearDown(self)
24+
super().tearDown()
2525
ControlHandler.is_worker = self.is_worker
2626

2727
def test_shutdown(self):

tests/unit/api/test_tasks.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,6 @@ def get_task_by_id(events, task_id):
9797

9898

9999
class TaskTests(BaseApiTestCase):
100-
def setUp(self):
101-
self.app = super().get_app()
102-
super().setUp()
103-
104-
def get_app(self, capp=None):
105-
return self.app
106100

107101
@patch('flower.api.tasks.tasks', new=MockTasks)
108102
def test_task_info(self):
@@ -127,7 +121,7 @@ def test_tasks_pagination(self):
127121
e['clock'] = i
128122
e['local_received'] = time.time()
129123
state.event(e)
130-
self.app.events.state = state
124+
self._app.events.state = state
131125

132126
# Test limit 4 and offset 0
133127
params = dict(limit=4, offset=0, sort_by='name')

0 commit comments

Comments
 (0)