diff --git a/fastdeploy/_infer.py b/fastdeploy/_infer.py index 8830b55..b50e1a7 100644 --- a/fastdeploy/_infer.py +++ b/fastdeploy/_infer.py @@ -189,6 +189,7 @@ def add_to_infer_queue( "-1.predicted_at": 0, "last_predictor_sequence": -1, "last_predictor_success": True, + "timedout_in_queue": None, } } ) diff --git a/fastdeploy/_loop.py b/fastdeploy/_loop.py index 94359d9..688ac05 100644 --- a/fastdeploy/_loop.py +++ b/fastdeploy/_loop.py @@ -197,6 +197,7 @@ def start_loop( "-1.predicted_at": 0, "-1.received_at": {"$lt": time.time() - timeout_time}, "timedout_in_queue": {"$ne": True}, + "last_predictor_sequence": {"$ne": _utils.LAST_PREDICTOR_SEQUENCE}, }, update={"timedout_in_queue": True}, select_keys=[], @@ -243,8 +244,6 @@ def start_loop( f"Updated results predictor {predictor_sequence}: {list(unique_id_wise_results)}" ) - last_batch_collection_started_at = time.time() - if __name__ == "__main__": import sys diff --git a/fastdeploy/_utils.py b/fastdeploy/_utils.py index 079c637..1277b9c 100644 --- a/fastdeploy/_utils.py +++ b/fastdeploy/_utils.py @@ -3,7 +3,7 @@ logging.basicConfig( format="%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s", datefmt="%Y-%m-%d:%H:%M:%S", - level=logging.DEBUG, + level=logging.INFO, ) logger = logging.getLogger(__name__) diff --git a/testing/benchmark.py b/testing/benchmark.py index a33cd66..28e8766 100644 --- a/testing/benchmark.py +++ b/testing/benchmark.py @@ -1,171 +1,283 @@ -import gevent -from gevent import monkey; monkey.patch_all() import time +import logging +import argparse +import json +import random +import threading import numpy as np -from collections import deque -import statistics +from tqdm import tqdm +import queue +import requests from datetime import datetime -import argparse -import logging -import urllib.parse +import os +import uuid from fdclient import FDClient - -logging.basicConfig(level=logging.INFO) +# Configure logging +logging.basicConfig(format='%(asctime)s [%(threadName)s] - %(message)s') logger = logging.getLogger(__name__) class BenchmarkRunner: def __init__(self, target_rps, duration_seconds, server_url, - warmup_seconds=5, sample_input=None, concurrent_users=50): + warmup_seconds=5, sample_input=None, concurrent_users=50, + request_batch_size=1, log_dir=None, debug=False): self.target_rps = target_rps self.duration_seconds = duration_seconds self.warmup_seconds = warmup_seconds - self.sample_input = sample_input or {"default": "input"} - self.concurrent_users = concurrent_users self.server_url = server_url + self.sample_input = sample_input + self.concurrent_users = concurrent_users + self.request_batch_size = request_batch_size + self.log_dir = log_dir + self.debug = debug # Initialize metrics storage self.latencies = [] self.errors = [] - self.request_timestamps = deque() + self.successes = 0 + self.failures = 0 - # For better RPS control - self.window_size = 1.0 # 1 second rolling window - self.start_time = None - self.request_count = 0 + # Create log directory if specified + if self.log_dir: + os.makedirs(self.log_dir, exist_ok=True) - # Initialize client with server URL - logger.info(f"Initializing FDClient with server URL: {self.server_url}") self.client = FDClient(server_url=self.server_url) - def make_request(self): + # Per-user metrics + self.user_metrics = {} + self.metrics_lock = threading.Lock() + + def make_request(self, user_id, request_id, is_warmup=False): start_time = time.time() + try: - results = self.client.infer(self.sample_input) + inps = [self.sample_input[random.randint(0, len(self.sample_input) - 1)] + for _ in range(self.request_batch_size)] + + if self.debug: + logger.debug(f"User {user_id} - Request {request_id} - Sending input: {inps}...") + + results = self.client.infer(inps, unique_id=request_id) latency = (time.time() - start_time) * 1000 # Convert to ms - self.latencies.append(latency) - return True + + if self.log_dir: + self._log_request(user_id, request_id, inps, results, latency) + + if results['success']: + if not is_warmup: + with self.metrics_lock: + self.successes += 1 + self.latencies.append(latency) + + if user_id not in self.user_metrics: + self.user_metrics[user_id] = { + 'successes': 0, + 'failures': 0, + 'latencies': [] + } + self.user_metrics[user_id]['successes'] += 1 + self.user_metrics[user_id]['latencies'].append(latency) + + if self.debug: + logger.debug( + f"User {user_id} - Request {request_id} - Success - " + f"Latency: {latency:.2f}ms - " + ) + return True + else: + if not is_warmup: + with self.metrics_lock: + self.failures += 1 + if user_id not in self.user_metrics: + self.user_metrics[user_id] = { + 'successes': 0, + 'failures': 0, + 'latencies': [] + } + self.user_metrics[user_id]['failures'] += 1 + + logger.error( + f"User {user_id} - Request {request_id} - Failed - " + f"Error: {results.get('reason', 'Unknown error')}" + ) + return False + except Exception as e: - self.errors.append(str(e)) - logger.error(f"Request failed: {str(e)}") + with self.metrics_lock: + self.failures += 1 + self.errors.append(str(e)) + if user_id not in self.user_metrics: + self.user_metrics[user_id] = { + 'successes': 0, + 'failures': 0, + 'latencies': [] + } + self.user_metrics[user_id]['failures'] += 1 + + logger.error( + f"User {user_id} - Request {request_id} - Exception - " + f"Error: {str(e)}" + ) return False - def calculate_current_rps(self): - """Calculate current RPS based on rolling window""" - now = time.time() - # Remove timestamps older than our window - while self.request_timestamps and now - self.request_timestamps[0] > self.window_size: - self.request_timestamps.popleft() - return len(self.request_timestamps) / self.window_size + def _log_request(self, user_id, request_id, inputs, outputs, latency): + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f') + log_entry = { + 'timestamp': timestamp, + 'user_id': user_id, + 'request_id': request_id, + 'inputs': inputs, + 'outputs': outputs, + 'latency_ms': latency + } + log_file = os.path.join(self.log_dir, f'request_{timestamp}_{request_id}.json') + with open(log_file, 'w') as f: + json.dump(log_entry, f, indent=2) - def should_throttle(self): - """More precise throttling logic""" - if not self.start_time: - return False + def run_user_session(self, user_id, progress_queue): + """Run a single user session with rate limiting""" + if self.target_rps: + sleep_time = 1.0 / self.target_rps + else: + sleep_time = 0 - elapsed_time = time.time() - self.start_time - if elapsed_time == 0: - return True + total_requests = int(self.duration_seconds * (self.target_rps or 10)) # Default to 10 RPS if not specified + + pbar = tqdm(total=total_requests, + desc=f'User {user_id}', + position=user_id, + leave=True) + + if self.debug: + logger.debug(f"User {user_id} - Starting warmup period ({self.warmup_seconds}s)") + + # Warmup period + warmup_start = time.time() + warmup_requests = 0 + while time.time() - warmup_start < self.warmup_seconds: + self.make_request(user_id, request_id=f"{user_id}-warm-{warmup_requests}", is_warmup=True) + warmup_requests += 1 + + if self.debug: + logger.debug(f"User {user_id} - Warmup complete - {warmup_requests} requests made") - expected_requests = self.target_rps * elapsed_time - return self.request_count >= expected_requests - - def user_task(self): - """Task representing a single user making requests""" - while time.time() < self.end_time: - # Check if we need to throttle - if self.should_throttle(): - # Calculate sleep time based on target rate - sleep_time = 1.0 / self.target_rps - gevent.sleep(sleep_time) - continue - - success = self.make_request() - if success: - now = time.time() - self.request_timestamps.append(now) - self.request_count += 1 - - def run_benchmark(self): - logger.info(f"Starting benchmark with target {self.target_rps} RPS") - logger.info(f"Warming up for {self.warmup_seconds} seconds...") + # Main benchmark loop + start_time = time.time() + requests_made = 0 - # Warmup phase - warmup_end = time.time() + self.warmup_seconds - while time.time() < warmup_end: - self.make_request() - gevent.sleep(1.0 / self.target_rps) + if self.debug: + logger.debug(f"User {user_id} - Starting main benchmark loop") - # Clear warmup metrics - self.latencies = [] - self.errors = [] - self.request_timestamps.clear() - self.request_count = 0 + while time.time() - start_time < self.duration_seconds: + request_start = time.time() + self.make_request(user_id, request_id=f"{user_id}-{requests_made}") + requests_made += 1 + pbar.update(1) + + # Rate limiting + elapsed = time.time() - request_start + if sleep_time > elapsed: + time.sleep(sleep_time - elapsed) + + pbar.close() - # Main benchmark - self.start_time = time.time() - self.end_time = self.start_time + self.duration_seconds - users = [gevent.spawn(self.user_task) for _ in range(self.concurrent_users)] + # Calculate user-specific metrics + user_stats = self.user_metrics.get(user_id, {}) + user_latencies = user_stats.get('latencies', []) - # Monitor and log actual RPS during the test - def monitor_rps(): - while time.time() < self.end_time: - current_rps = self.calculate_current_rps() - logger.debug(f"Current RPS: {current_rps:.2f}") - gevent.sleep(1) + user_results = { + 'user_id': user_id, + 'requests_made': requests_made, + 'duration': time.time() - start_time, + 'successes': user_stats.get('successes', 0), + 'failures': user_stats.get('failures', 0) + } + + if user_latencies: + user_results.update({ + 'avg_latency': np.mean(user_latencies), + 'p95_latency': np.percentile(user_latencies, 95), + 'p99_latency': np.percentile(user_latencies, 99) + }) - monitor_greenlet = gevent.spawn(monitor_rps) - gevent.joinall([monitor_greenlet] + users) + if self.debug: + logger.debug( + f"User {user_id} - Benchmark complete - " + f"Requests: {requests_made}, " + f"Successes: {user_stats.get('successes', 0)}, " + f"Failures: {user_stats.get('failures', 0)}" + ) - return self.calculate_metrics() + progress_queue.put(user_results) - def calculate_metrics(self): - if not self.latencies: - return {"error": "No successful requests recorded"} - - latencies = np.array(self.latencies) - total_requests = len(self.latencies) + len(self.errors) - error_rate = (len(self.errors) / total_requests) * 100 if total_requests > 0 else 0 - actual_duration = self.end_time - self.start_time - - metrics = { - "total_requests": total_requests, - "successful_requests": len(self.latencies), - "failed_requests": len(self.errors), - "error_rate": f"{error_rate:.2f}%", - "target_rps": self.target_rps, - "actual_rps": len(self.latencies) / actual_duration, - "actual_duration": actual_duration, - "server_url": self.server_url, - "latency": { - "min": np.min(latencies), - "max": np.max(latencies), - "mean": np.mean(latencies), - "median": np.median(latencies), - "p95": np.percentile(latencies, 95), - "p99": np.percentile(latencies, 99), - "std_dev": np.std(latencies) - } - } + def run_benchmark(self): + """Run the benchmark with multiple concurrent users""" + threads = [] + progress_queue = queue.Queue() + + if self.debug: + logger.debug(f"Starting benchmark with {self.concurrent_users} users") - return metrics + # Start user threads + for i in range(self.concurrent_users): + thread = threading.Thread( + target=self.run_user_session, + args=(i, progress_queue), + name=f"User-{i}" + ) + threads.append(thread) + thread.start() + + # Wait for all threads to complete + for thread in threads: + thread.join() + + # Collect results + user_results = [] + while not progress_queue.empty(): + user_results.append(progress_queue.get()) + + # Calculate metrics + if self.latencies: + p95 = np.percentile(self.latencies, 95) + p99 = np.percentile(self.latencies, 99) + avg_latency = np.mean(self.latencies) + total_requests = self.successes + self.failures + actual_rps = total_requests / self.duration_seconds + + return { + 'total_requests': total_requests, + 'successes': self.successes, + 'failures': self.failures, + 'average_latency_ms': avg_latency, + 'p95_latency_ms': p95, + 'p99_latency_ms': p99, + 'actual_rps': actual_rps, + 'target_rps_per_user': self.target_rps, + 'errors': self.errors[:10], # First 10 errors + 'user_results': user_results # Individual user metrics + } + return None def validate_url(url): - """Validate the URL format""" try: - result = urllib.parse.urlparse(url) - return all([result.scheme, result.netloc]) + result = requests.get(url) + return result.status_code == 200 except: return False def main(): + import json parser = argparse.ArgumentParser(description='API Benchmark Tool') parser.add_argument('--host', type=str, required=True, help='Server hostname or IP') parser.add_argument('--port', type=int, required=True, help='Server port') - parser.add_argument('--rps', type=int, default=10, help='Target requests per second') + parser.add_argument('--rps_per_user', type=int, default=None, help='Target requests per second per user') parser.add_argument('--duration', type=int, default=60, help='Test duration in seconds') parser.add_argument('--warmup', type=int, default=5, help='Warmup period in seconds') - parser.add_argument('--users', type=int, default=50, help='Number of concurrent users') + parser.add_argument('--users', type=int, default=2, help='Number of concurrent users') parser.add_argument('--debug', action='store_true', help='Enable debug logging') + parser.add_argument('--sample_json', type=str, default=None, help='Input json file path', required=True) + parser.add_argument('--request_batch_size', type=int, default=1, help='Request batch size') + parser.add_argument('--log_dir', type=str, default=None, help='Directory to log request inputs and outputs') args = parser.parse_args() if args.debug: @@ -173,37 +285,60 @@ def main(): # Construct and validate server URL server_url = f"http://{args.host}:{args.port}" - if not validate_url(server_url): - logger.error(f"Invalid server URL: {server_url}") - return - # Sample input - modify as needed - sample_input = ["Some example sentence quick brown fox jumps over the lazy dog and the quick brown fox", "Another example sentence tiger is the fastest animal on earth"] + with open(args.sample_json, 'r') as f: + sample_input = json.load(f) + # Initialize and run benchmark runner = BenchmarkRunner( - target_rps=args.rps, + target_rps=args.rps_per_user, duration_seconds=args.duration, server_url=server_url, warmup_seconds=args.warmup, sample_input=sample_input, - concurrent_users=args.users + concurrent_users=args.users, + request_batch_size=args.request_batch_size, + log_dir=args.log_dir, + debug=args.debug ) - - logger.info("Starting benchmark...") - metrics = runner.run_benchmark() - logger.info("\nBenchmark Results:") - logger.info(f"Server URL: {metrics['server_url']}") - logger.info(f"Target RPS: {metrics['target_rps']}") - logger.info(f"Actual RPS: {metrics['actual_rps']:.2f}") - logger.info(f"Total Requests: {metrics['total_requests']}") - logger.info(f"Successful Requests: {metrics['successful_requests']}") - logger.info(f"Failed Requests: {metrics['failed_requests']}") - logger.info(f"Error Rate: {metrics['error_rate']}") - logger.info(f"Test Duration: {metrics['actual_duration']:.2f} seconds") - logger.info("\nLatency Statistics (ms):") - for key, value in metrics['latency'].items(): - logger.info(f"{key}: {value:.2f}") - + print(f"\nStarting benchmark with {args.users} concurrent users...") + print(f"Target RPS per user: {args.rps_per_user or 'unlimited'}") + print(f"Duration: {args.duration} seconds (+ {args.warmup} seconds warmup)") + print(f"Request batch size: {args.request_batch_size}\n") + + results = runner.run_benchmark() + + if results: + print("\nOverall Benchmark Results:") + print("=" * 50) + print(f"Total Requests: {results['total_requests']}") + print(f"Successes: {results['successes']}") + print(f"Failures: {results['failures']}") + print(f"Success Rate: {(results['successes']/results['total_requests'])*100:.2f}%") + print(f"\nOverall Latency (ms):") + print(f" Average: {results['average_latency_ms']:.2f}") + print(f" P95: {results['p95_latency_ms']:.2f}") + print(f" P99: {results['p99_latency_ms']:.2f}") + print(f"\nActual RPS: {results['actual_rps']:.2f}") + print(f"Target RPS per user: {results['target_rps_per_user'] or 'unlimited'}") + + print("\nPer-User Results:") + print("=" * 50) + for user_result in results['user_results']: + print(f"\nUser {user_result['user_id']}:") + print(f" Requests Made: {user_result['requests_made']}") + print(f" Successes: {user_result['successes']}") + print(f" Failures: {user_result['failures']}") + if 'avg_latency' in user_result: + print(f" Average Latency (ms): {user_result['avg_latency']:.2f}") + print(f" P95 Latency (ms): {user_result['p95_latency']:.2f}") + print(f" P99 Latency (ms): {user_result['p99_latency']:.2f}") + + if results['errors']: + print("\nSample Errors:") + for error in results['errors']: + print(f" - {error}") + if __name__ == "__main__": - main() + main() \ No newline at end of file