后端

Python设置代理IP的常用方法与实践指南

TRAE AI 编程助手

Python设置代理IP的常用方法与实践指南

在网络爬虫、API调用、数据采集等场景中,代理IP的使用已成为开发者必备技能。本文将深入探讨Python中各种代理设置方法,从基础概念到高级实践,助你轻松应对各种网络请求挑战。

01|代理IP基础概念与应用场景

什么是代理IP?

代理IP(Proxy IP)是介于客户端和目标服务器之间的中间服务器,它接收客户端的请求,然后转发给目标服务器,并将响应返回给客户端。在Python开发中,合理使用代理IP可以有效解决以下问题:

核心应用场景:

  • 反爬虫规避:分散请求来源,降低被封IP风险
  • 地理限制突破:访问特定地区的受限内容
  • 匿名性保护:隐藏真实IP地址,保护隐私
  • 负载均衡:分散请求到多个代理,提高稳定性
  • 测试环境:模拟不同地区的用户访问

代理类型详解

graph TD A[代理类型] --> B[HTTP代理] A --> C[HTTPS代理] A --> D[SOCKS代理] B --> B1[适用于网页抓取] B --> B2[速度快] C --> C1[加密传输] C --> C2[安全性高] D --> D1[协议无关] D --> D2[支持UDP]

02|requests库代理配置详解

requests库是Python中最常用的HTTP库之一,其代理配置简单直观。

基础代理设置

import requests
 
# 定义代理配置
proxies = {
    'http': 'http://127.0.0.1:8080',
    'https': 'https://127.0.0.1:8080'
}
 
# 使用代理发送请求
try:
    response = requests.get('https://httpbin.org/ip', proxies=proxies, timeout=10)
    print(f"通过代理访问,返回IP: {response.json()}")
except requests.exceptions.RequestException as e:
    print(f"请求失败: {e}")

带认证的代理配置

import requests
 
# 需要用户名密码的代理
proxies_with_auth = {
    'http': 'http://username:password@proxy.example.com:8080',
    'https': 'https://username:password@proxy.example.com:8080'
}
 
response = requests.get('https://httpbin.org/ip', proxies=proxies_with_auth)
print(response.text)

高级配置技巧

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
 
# 创建会话并配置重试策略
session = requests.Session()
 
# 配置重试策略
retry_strategy = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[429, 500, 502, 503, 504]
)
 
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
 
# 代理配置
proxies = {
    'http': 'http://127.0.0.1:8080',
    'https': 'https://127.0.0.1:8080'
}
 
# 发送请求
response = session.get('https://httpbin.org/ip', proxies=proxies)
print(response.json())

TRAE IDE调试技巧:在TRAE IDE中,你可以使用内置的网络调试工具实时监控代理请求的状态码、响应时间和数据包大小,快速定位代理配置问题。

03|urllib库代理配置深度解析

虽然requests库更受欢迎,但urllib作为Python标准库,在某些场景下仍是首选。

urllib基础代理设置

import urllib.request
import urllib.error
 
# 创建代理处理器
proxy_handler = urllib.request.ProxyHandler({
    'http': 'http://127.0.0.1:8080',
    'https': 'https://127.0.0.1:8080'
})
 
# 创建opener
opener = urllib.request.build_opener(proxy_handler)
 
# 安装opener
urllib.request.install_opener(opener)
 
try:
    # 使用代理发送请求
    response = urllib.request.urlopen('https://httpbin.org/ip')
    print(response.read().decode('utf-8'))
except urllib.error.URLError as e:
    print(f"请求失败: {e}")

带认证的urllib代理

import urllib.request
import base64
 
# 创建密码管理器
password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
 
# 添加用户名和密码
proxy_url = 'proxy.example.com:8080'
username = 'your_username'
password = 'your_password'
password_mgr.add_password(None, proxy_url, username, password)
 
# 创建代理认证处理器
proxy_auth_handler = urllib.request.ProxyBasicAuthHandler(password_mgr)
 
# 创建并安装opener
opener = urllib.request.build_opener(proxy_auth_handler)
urllib.request.install_opener(opener)
 
# 发送请求
response = urllib.request.urlopen('https://httpbin.org/ip')
print(response.read().decode('utf-8'))

04|aiohttp异步代理配置

在现代Python应用中,异步编程越来越重要。aiohttp提供了强大的异步HTTP功能。

基础异步代理设置

import aiohttp
import asyncio
 
async def fetch_with_proxy():
    # 代理配置
    proxy = 'http://127.0.0.1:8080'
    
    # 创建TCP连接器
    connector = aiohttp.TCPConnector(ssl=False)
    
    # 创建会话
    async with aiohttp.ClientSession(connector=connector) as session:
        try:
            # 使用代理发送异步请求
            async with session.get('https://httpbin.org/ip', proxy=proxy) as response:
                data = await response.json()
                print(f"通过代理访问,返回IP: {data}")
        except aiohttp.ClientError as e:
            print(f"请求失败: {e}")
 
# 运行异步函数
asyncio.run(fetch_with_proxy())

带认证的异步代理

import aiohttp
import asyncio
from aiohttp import BasicAuth
 
async def fetch_with_auth_proxy():
    # 代理认证信息
    proxy_auth = BasicAuth('username', 'password')
    proxy = 'http://proxy.example.com:8080'
    
    # 创建会话
    async with aiohttp.ClientSession() as session:
        try:
            # 使用带认证的代理发送请求
            async with session.get(
                'https://httpbin.org/ip',
                proxy=proxy,
                proxy_auth=proxy_auth
            ) as response:
                data = await response.json()
                print(f"认证代理访问成功: {data}")
        except aiohttp.ClientError as e:
            print(f"请求失败: {e}")
 
asyncio.run(fetch_with_auth_proxy())

高级异步代理池管理

import aiohttp
import asyncio
import random
from typing import List
 
class AsyncProxyPool:
    def __init__(self, proxies: List[str]):
        self.proxies = proxies
        self.failed_proxies = set()
    
    def get_random_proxy(self) -> str:
        available_proxies = [p for p in self.proxies if p not in self.failed_proxies]
        if not available_proxies:
            raise Exception("无可用的代理")
        return random.choice(available_proxies)
    
    def mark_proxy_failed(self, proxy: str):
        self.failed_proxies.add(proxy)
        print(f"代理 {proxy} 标记为失败")
    
    async def fetch_with_retry(self, url: str, max_retries: int = 3):
        for attempt in range(max_retries):
            proxy = self.get_random_proxy()
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url, proxy=proxy, timeout=10) as response:
                        if response.status == 200:
                            return await response.text()
                        else:
                            print(f"代理 {proxy} 返回状态码: {response.status}")
            except Exception as e:
                print(f"代理 {proxy} 请求失败: {e}")
                self.mark_proxy_failed(proxy)
        
        raise Exception("所有代理都失败")
 
# 使用示例
async def main():
    proxies = [
        'http://127.0.0.1:8080',
        'http://127.0.0.1:8081',
        'http://127.0.0.1:8082'
    ]
    
    proxy_pool = AsyncProxyPool(proxies)
    
    try:
        result = await proxy_pool.fetch_with_retry('https://httpbin.org/ip')
        print("请求成功:", result[:100])
    except Exception as e:
        print("最终失败:", e)
 
asyncio.run(main())

05|Selenium WebDriver代理配置

对于需要模拟浏览器行为的场景,Selenium的代理配置尤为重要。

Chrome浏览器代理设置

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import time
 
def setup_chrome_proxy():
    # Chrome选项配置
    chrome_options = Options()
    
    # 代理设置
    proxy = '127.0.0.1:8080'
    chrome_options.add_argument(f'--proxy-server={proxy}')
    
    # 其他常用选项
    chrome_options.add_argument('--disable-gpu')
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--disable-dev-shm-usage')
    
    # 创建驱动
    driver = webdriver.Chrome(options=chrome_options)
    
    try:
        # 访问测试页面
        driver.get('https://httpbin.org/ip')
        time.sleep(3)
        
        # 获取页面内容
        page_source = driver.page_source
        print(f"页面内容: {page_source[:200]}")
        
    finally:
        driver.quit()
 
setup_chrome_proxy()

带认证的Selenium代理

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import zipfile
import os
 
def create_proxy_auth_extension(proxy_host, proxy_port, username, password):
    """创建带认证的代理扩展"""
    
    manifest_json = """
    {
        "version": "1.0.0",
        "manifest_version": 2,
        "name": "Chrome Proxy",
        "permissions": [
            "proxy",
            "tabs",
            "unlimitedStorage",
            "storage",
            "<all_urls>",
            "webRequest",
            "webRequestBlocking"
        ],
        "background": {
            "scripts": ["background.js"],
            "persistent": true
        },
        "minimum_chrome_version":"22.0.0"
    }
    """
    
    background_js = f"""
    var config = {{
            mode: "fixed_servers",
            rules: {{
              singleProxy: {{
                scheme: "http",
                host: "{proxy_host}",
                port: parseInt({proxy_port})
              }},
              bypassList: ["localhost"]
            }}
          }};
 
    chrome.proxy.settings.set({{value: config, scope: "regular"}}, function() {{}});
 
    function callbackFn(details) {{
        return {{
            authCredentials: {{
                username: "{username}",
                password: "{password}"
            }}
        }};
    }}
 
    chrome.webRequest.onAuthRequired.addListener(
                callbackFn,
                {{urls: ["<all_urls>"]}},
                ['blocking']
    );
    """
    
    # 创建扩展文件
    pluginfile = 'proxy_auth_plugin.zip'
    
    with zipfile.ZipFile(pluginfile, 'w') as zp:
        zp.writestr("manifest.json", manifest_json)
        zp.writestr("background.js", background_js)
    
    return pluginfile
 
def setup_auth_proxy():
    # 代理信息
    proxy_host = 'proxy.example.com'
    proxy_port = 8080
    username = 'your_username'
    password = 'your_password'
    
    # 创建认证扩展
    pluginfile = create_proxy_auth_extension(proxy_host, proxy_port, username, password)
    
    # Chrome选项
    chrome_options = Options()
    chrome_options.add_extension(pluginfile)
    
    # 创建驱动
    driver = webdriver.Chrome(options=chrome_options)
    
    try:
        driver.get('https://httpbin.org/ip')
        time.sleep(3)
        print(f"页面标题: {driver.title}")
        
    finally:
        driver.quit()
        # 清理扩展文件
        if os.path.exists(pluginfile):
            os.remove(pluginfile)
 
setup_auth_proxy()

TRAE IDE调试优势:使用TRAE IDE的浏览器自动化调试功能,可以实时监控Selenium脚本的执行过程,查看每个步骤的截图和网络请求详情,快速定位代理配置问题。

06|代理IP验证与异常处理

有效的代理验证机制是确保爬虫稳定性的关键。

代理有效性验证

import requests
import asyncio
import aiohttp
from typing import List, Dict
import time
 
class ProxyValidator:
    def __init__(self, test_url: str = 'https://httpbin.org/ip'):
        self.test_url = test_url
        self.timeout = 10
    
    def validate_proxy(self, proxy: Dict[str, str]) -> Dict:
        """验证单个代理的有效性"""
        result = {
            'proxy': proxy,
            'is_valid': False,
            'response_time': 0,
            'error': None
        }
        
        try:
            start_time = time.time()
            response = requests.get(
                self.test_url,
                proxies=proxy,
                timeout=self.timeout
            )
            result['response_time'] = time.time() - start_time
            
            if response.status_code == 200:
                result['is_valid'] = True
                result['response_data'] = response.json()
            else:
                result['error'] = f"HTTP状态码: {response.status_code}"
                
        except requests.exceptions.ConnectTimeout:
            result['error'] = "连接超时"
        except requests.exceptions.ProxyError:
            result['error'] = "代理错误"
        except requests.exceptions.SSLError:
            result['error'] = "SSL证书错误"
        except Exception as e:
            result['error'] = f"未知错误: {str(e)}"
        
        return result
    
    async def validate_proxy_async(self, proxy: Dict[str, str]) -> Dict:
        """异步验证代理"""
        result = {
            'proxy': proxy,
            'is_valid': False,
            'response_time': 0,
            'error': None
        }
        
        try:
            start_time = time.time()
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    self.test_url,
                    proxy=list(proxy.values())[0],
                    timeout=aiohttp.ClientTimeout(total=self.timeout)
                ) as response:
                    result['response_time'] = time.time() - start_time
                    
                    if response.status == 200:
                        result['is_valid'] = True
                        result['response_data'] = await response.json()
                    else:
                        result['error'] = f"HTTP状态码: {response.status}"
                        
        except asyncio.TimeoutError:
            result['error'] = "连接超时"
        except Exception as e:
            result['error'] = f"验证失败: {str(e)}"
        
        return result
    
    def validate_proxy_list(self, proxies: List[Dict[str, str]]) -> List[Dict]:
        """批量验证代理列表"""
        results = []
        for proxy in proxies:
            result = self.validate_proxy(proxy)
            results.append(result)
            print(f"代理 {proxy} 验证结果: {'有效' if result['is_valid'] else '无效'}")
        
        return results
    
    async def validate_proxy_list_async(self, proxies: List[Dict[str, str]]) -> List[Dict]:
        """异步批量验证代理"""
        tasks = [self.validate_proxy_async(proxy) for proxy in proxies]
        results = await asyncio.gather(*tasks)
        
        for result in results:
            proxy = result['proxy']
            print(f"代理 {proxy} 验证结果: {'有效' if result['is_valid'] else '无效'}")
        
        return results
 
# 使用示例
validator = ProxyValidator()
 
# 测试代理列表
test_proxies = [
    {'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'},
    {'http': 'http://invalid.proxy:8080', 'https': 'https://invalid.proxy:8080'}
]
 
# 同步验证
print("=== 同步验证 ===")
sync_results = validator.validate_proxy_list(test_proxies)
 
# 异步验证
print("\n=== 异步验证 ===")
asyncio.run(validator.validate_proxy_list_async(test_proxies))

智能异常处理机制

import requests
import time
import random
from typing import Optional, Callable
from functools import wraps
 
class SmartProxyManager:
    def __init__(self, proxies: list, max_retries: int = 3, retry_delay: float = 1.0):
        self.proxies = proxies
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.failed_proxies = set()
        self.proxy_stats = {}
    
    def get_working_proxy(self) -> Optional[Dict[str, str]]:
        """获取可用的代理"""
        available_proxies = [p for p in self.proxies if str(p) not in self.failed_proxies]
        
        if not available_proxies:
            return None
        
        # 优先选择成功率高的代理
        sorted_proxies = sorted(
            available_proxies,
            key=lambda x: self.proxy_stats.get(str(x), {}).get('success_rate', 0),
            reverse=True
        )
        
        return sorted_proxies[0]
    
    def mark_proxy_failed(self, proxy: Dict[str, str]):
        """标记代理为失败"""
        proxy_str = str(proxy)
        self.failed_proxies.add(proxy_str)
        
        # 更新统计信息
        if proxy_str not in self.proxy_stats:
            self.proxy_stats[proxy_str] = {'success_count': 0, 'fail_count': 0}
        
        self.proxy_stats[proxy_str]['fail_count'] += 1
    
    def mark_proxy_success(self, proxy: Dict[str, str]):
        """标记代理为成功"""
        proxy_str = str(proxy)
        
        if proxy_str not in self.proxy_stats:
            self.proxy_stats[proxy_str] = {'success_count': 0, 'fail_count': 0}
        
        self.proxy_stats[proxy_str]['success_count'] += 1
    
    def get_proxy_success_rate(self, proxy: Dict[str, str]) -> float:
        """获取代理成功率"""
        proxy_str = str(proxy)
        stats = self.proxy_stats.get(proxy_str, {'success_count': 0, 'fail_count': 0})
        
        total = stats['success_count'] + stats['fail_count']
        if total == 0:
            return 0.0
        
        return stats['success_count'] / total
    
    def smart_request(self, url: str, **kwargs) -> Optional[requests.Response]:
        """智能请求,自动处理代理失败和重试"""
        
        for attempt in range(self.max_retries):
            proxy = self.get_working_proxy()
            
            if not proxy:
                print("没有可用的代理")
                return None
            
            try:
                print(f"尝试使用代理 {proxy} (第{attempt + 1}次)")
                
                # 添加代理到kwargs
                kwargs['proxies'] = proxy
                kwargs['timeout'] = kwargs.get('timeout', 10)
                
                response = requests.get(url, **kwargs)
                
                if response.status_code == 200:
                    self.mark_proxy_success(proxy)
                    print(f"请求成功!代理成功率: {self.get_proxy_success_rate(proxy):.2%}")
                    return response
                else:
                    print(f"HTTP状态码异常: {response.status_code}")
                    self.mark_proxy_failed(proxy)
                    
            except requests.exceptions.RequestException as e:
                print(f"请求异常: {e}")
                self.mark_proxy_failed(proxy)
            
            # 重试延迟
            if attempt < self.max_retries - 1:
                delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
                print(f"等待 {delay:.1f} 秒后重试...")
                time.sleep(delay)
        
        return None
 
# 使用示例
proxies = [
    {'http': 'http://127.0.0.1:8080', 'https': 'https://127.0.0.1:8080'},
    {'http': 'http://proxy1.example.com:8080', 'https': 'https://proxy1.example.com:8080'},
    {'http': 'http://proxy2.example.com:8080', 'https': 'https://proxy2.example.com:8080'}
]
 
manager = SmartProxyManager(proxies)
 
# 智能请求
response = manager.smart_request('https://httpbin.org/ip')
if response:
    print(f"最终成功!响应: {response.json()}")
else:
    print("所有代理都失败")

07|代理池的构建和管理策略

构建高效的代理池是大型爬虫项目的核心。一个优秀的代理池需要具备自动获取、验证、调度和监控等功能。

完整代理池架构设计

import asyncio
import aiohttp
import time
import random
import json
import sqlite3
from datetime import datetime, timedelta
from typing import List, Dict, Optional
import logging
 
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
 
class ProxyPool:
    """高性能代理池管理器"""
    
    def __init__(self, db_path: str = 'proxy_pool.db', max_concurrent_tests: int = 10):
        self.db_path = db_path
        self.max_concurrent_tests = max_concurrent_tests
        self.test_url = 'https://httpbin.org/ip'
        self.timeout = 15
        self._init_database()
    
    def _init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS proxies (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                proxy TEXT UNIQUE NOT NULL,
                protocol TEXT NOT NULL,
                ip TEXT NOT NULL,
                port INTEGER NOT NULL,
                is_valid BOOLEAN DEFAULT 1,
                response_time REAL,
                success_count INTEGER DEFAULT 0,
                fail_count INTEGER DEFAULT 0,
                last_tested TIMESTAMP,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_proxy ON proxies(proxy);
            CREATE INDEX IF NOT EXISTS idx_valid ON proxies(is_valid);
            CREATE INDEX IF NOT EXISTS idx_last_tested ON proxies(last_tested);
        ''')
        
        conn.commit()
        conn.close()
    
    def add_proxy(self, proxy: str, protocol: str = 'http') -> bool:
        """添加代理到数据库"""
        try:
            # 解析代理信息
            if '@' in proxy:
                # 格式: username:password@ip:port
                auth_part, addr_part = proxy.split('@')
                ip, port = addr_part.split(':')
            else:
                # 格式: ip:port
                ip, port = proxy.split(':')
            
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT OR REPLACE INTO proxies 
                (proxy, protocol, ip, port, updated_at)
                VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
            ''', (proxy, protocol, ip, int(port)))
            
            conn.commit()
            conn.close()
            
            logger.info(f"代理 {proxy} 已添加到数据库")
            return True
            
        except Exception as e:
            logger.error(f"添加代理失败: {e}")
            return False
    
    def add_proxies_batch(self, proxies: List[Dict[str, str]]):
        """批量添加代理"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        for proxy_info in proxies:
            proxy = proxy_info.get('proxy')
            protocol = proxy_info.get('protocol', 'http')
            
            if proxy:
                try:
                    if '@' in proxy:
                        auth_part, addr_part = proxy.split('@')
                        ip, port = addr_part.split(':')
                    else:
                        ip, port = proxy.split(':')
                    
                    cursor.execute('''
                        INSERT OR REPLACE INTO proxies 
                        (proxy, protocol, ip, port, updated_at)
                        VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
                    ''', (proxy, protocol, ip, int(port)))
                    
                except Exception as e:
                    logger.error(f"添加代理 {proxy} 失败: {e}")
        
        conn.commit()
        conn.close()
        logger.info(f"批量添加 {len(proxies)} 个代理完成")
    
    async def test_proxy_async(self, session: aiohttp.ClientSession, proxy_info: Dict) -> Dict:
        """异步测试单个代理"""
        proxy = proxy_info['proxy']
        protocol = proxy_info['protocol']
        
        result = {
            'proxy': proxy,
            'is_valid': False,
            'response_time': 0,
            'error': None
        }
        
        try:
            start_time = time.time()
            proxy_url = f"{protocol}://{proxy}"
            
            async with session.get(
                self.test_url,
                proxy=proxy_url,
                timeout=aiohttp.ClientTimeout(total=self.timeout)
            ) as response:
                result['response_time'] = time.time() - start_time
                
                if response.status == 200:
                    result['is_valid'] = True
                    data = await response.json()
                    result['response_data'] = data
                else:
                    result['error'] = f"HTTP状态码: {response.status}"
                    
        except asyncio.TimeoutError:
            result['error'] = "连接超时"
        except aiohttp.ClientError as e:
            result['error'] = f"客户端错误: {str(e)}"
        except Exception as e:
            result['error'] = f"未知错误: {str(e)}"
        
        return result
    
    async def test_all_proxies_async(self):
        """异步测试所有代理"""
        # 获取所有需要测试的代理
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT proxy, protocol FROM proxies 
            WHERE is_valid = 1 OR 
            (last_tested IS NULL OR last_tested < datetime('now', '-1 hour'))
        ''')
        
        proxies_to_test = cursor.fetchall()
        conn.close()
        
        if not proxies_to_test:
            logger.info("没有需要测试的代理")
            return
        
        logger.info(f"开始测试 {len(proxies_to_test)} 个代理")
        
        # 创建会话并限制并发数
        connector = aiohttp.TCPConnector(limit=self.max_concurrent_tests)
        
        async with aiohttp.ClientSession(connector=connector) as session:
            # 分批处理,避免一次性创建过多任务
            batch_size = self.max_concurrent_tests
            
            for i in range(0, len(proxies_to_test), batch_size):
                batch = proxies_to_test[i:i + batch_size]
                
                # 创建测试任务
                tasks = []
                for proxy, protocol in batch:
                    proxy_info = {'proxy': proxy, 'protocol': protocol}
                    tasks.append(self.test_proxy_async(session, proxy_info))
                
                # 执行测试
                results = await asyncio.gather(*tasks, return_exceptions=True)
                
                # 更新数据库
                await self._update_proxy_results(results)
                
                logger.info(f"完成第 {i//batch_size + 1} 批测试")
                
                # 短暂延迟,避免对测试服务器造成压力
                await asyncio.sleep(1)
        
        logger.info("代理测试完成")
    
    async def _update_proxy_results(self, results: List):
        """更新代理测试结果到数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        for result in results:
            if isinstance(result, Exception):
                logger.error(f"测试任务异常: {result}")
                continue
            
            proxy = result['proxy']
            is_valid = result['is_valid']
            response_time = result.get('response_time', 0)
            
            if is_valid:
                cursor.execute('''
                    UPDATE proxies 
                    SET is_valid = 1, response_time = ?, success_count = success_count + 1,
                        last_tested = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
                    WHERE proxy = ?
                ''', (response_time, proxy))
            else:
                cursor.execute('''
                    UPDATE proxies 
                    SET is_valid = 0, fail_count = fail_count + 1,
                        last_tested = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP
                    WHERE proxy = ?
                ''', (proxy,))
            
            logger.info(f"代理 {proxy} 测试结果: {'有效' if is_valid else '无效'}")
        
        conn.commit()
        conn.close()
    
    def get_valid_proxies(self, limit: int = 10) -> List[Dict]:
        """获取有效的代理列表"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT proxy, protocol, response_time, success_count, fail_count
            FROM proxies 
            WHERE is_valid = 1
            ORDER BY response_time ASC, success_count DESC
            LIMIT ?
        ''', (limit,))
        
        proxies = []
        for row in cursor.fetchall():
            proxy, protocol, response_time, success_count, fail_count = row
            
            # 计算成功率
            total = success_count + fail_count
            success_rate = success_count / total if total > 0 else 0
            
            proxies.append({
                'proxy': proxy,
                'protocol': protocol,
                'response_time': response_time or 999,
                'success_rate': success_rate
            })
        
        conn.close()
        return proxies
    
    def get_random_proxy(self, weighted: bool = True) -> Optional[Dict[str, str]]:
        """获取随机代理"""
        valid_proxies = self.get_valid_proxies(limit=50)
        
        if not valid_proxies:
            return None
        
        if weighted:
            # 基于成功率加权的随机选择
            weights = [p['success_rate'] for p in valid_proxies]
            if sum(weights) == 0:
                # 如果所有权重都是0,使用均等权重
                weights = [1] * len(valid_proxies)
            
            selected = random.choices(valid_proxies, weights=weights, k=1)[0]
        else:
            selected = random.choice(valid_proxies)
        
        return {
            'http': f"{selected['protocol']}://{selected['proxy']}",
            'https': f"{selected['protocol']}://{selected['proxy']}"
        }
    
    def get_proxy_stats(self) -> Dict:
        """获取代理池统计信息"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 总体统计
        cursor.execute('SELECT COUNT(*) FROM proxies')
        total = cursor.fetchone()[0]
        
        cursor.execute('SELECT COUNT(*) FROM proxies WHERE is_valid = 1')
        valid = cursor.fetchone()[0]
        
        cursor.execute('SELECT COUNT(*) FROM proxies WHERE is_valid = 0')
        invalid = cursor.fetchone()[0]
        
        # 平均响应时间
        cursor.execute('SELECT AVG(response_time) FROM proxies WHERE is_valid = 1 AND response_time IS NOT NULL')
        avg_response_time = cursor.fetchone()[0] or 0
        
        conn.close()
        
        return {
            'total': total,
            'valid': valid,
            'invalid': invalid,
            'valid_rate': valid / total if total > 0 else 0,
            'avg_response_time': round(avg_response_time, 2)
        }
    
    def cleanup_invalid_proxies(self, days: int = 7):
        """清理长期无效的代理"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            DELETE FROM proxies 
            WHERE is_valid = 0 AND 
            (last_tested < datetime('now', '-' || ? || ' days') OR last_tested IS NULL)
        ''', (days,))
        
        deleted_count = cursor.rowcount
        conn.commit()
        conn.close()
        
        logger.info(f"清理了 {deleted_count} 个长期无效的代理")
        return deleted_count
 
# 使用示例
async def main():
    # 创建代理池
    proxy_pool = ProxyPool()
    
    # 添加一些测试代理
    test_proxies = [
        {'proxy': '127.0.0.1:8080', 'protocol': 'http'},
        {'proxy': '127.0.0.1:8081', 'protocol': 'http'},
        {'proxy': '127.0.0.1:8082', 'protocol': 'http'}
    ]
    
    proxy_pool.add_proxies_batch(test_proxies)
    
    # 测试所有代理
    await proxy_pool.test_all_proxies_async()
    
    # 获取统计信息
    stats = proxy_pool.get_proxy_stats()
    print(f"代理池统计: {stats}")
    
    # 获取有效代理
    valid_proxies = proxy_pool.get_valid_proxies(limit=5)
    print(f"有效代理: {valid_proxies}")
    
    # 获取随机代理
    random_proxy = proxy_pool.get_random_proxy()
    print(f"随机代理: {random_proxy}")
 
# 运行示例
# asyncio.run(main())

08|实际项目中的最佳实践建议

基于多年的代理使用经验,以下是一些在实际项目中被证明非常有效的最佳实践:

1. 代理获取策略

免费代理源(适合学习和测试):

  • 西刺代理、快代理、89免费代理等网站
  • GitHub上的开源代理池项目
  • 各大代理服务商提供的免费试用

付费代理推荐(适合生产环境):

  • 阿布云: 稳定性好,适合企业级应用
  • 快代理: 性价比高,支持多种协议
  • 芝麻代理: 国内节点丰富,响应速度快

2. 代理使用策略

# 推荐的代理配置结构
PROXY_CONFIG = {
    'rotation_enabled': True,      # 启用代理轮换
    'retry_on_failure': True,      # 失败时重试
    'max_retries': 3,             # 最大重试次数
    'request_timeout': 15,          # 请求超时时间
    'retry_delay': 1,               # 重试延迟(秒)
    'success_rate_threshold': 0.8,  # 成功率阈值
    'response_time_threshold': 5    # 响应时间阈值(秒)
}

3. 错误处理和监控

import logging
from dataclasses import dataclass
from typing import Optional
 
@dataclass
class ProxyMetrics:
    """代理性能指标"""
    total_requests: int = 0
    successful_requests: int = 0
    failed_requests: int = 0
    avg_response_time: float = 0.0
    blocked_count: int = 0
 
class ProxyMonitor:
    """代理性能监控器"""
    
    def __init__(self):
        self.metrics = ProxyMetrics()
        self.logger = logging.getLogger(__name__)
    
    def record_request(self, success: bool, response_time: float, blocked: bool = False):
        """记录请求结果"""
        self.metrics.total_requests += 1
        
        if success:
            self.metrics.successful_requests += 1
            # 更新平均响应时间
            total_time = self.metrics.avg_response_time * (self.metrics.total_requests - 1) + response_time
            self.metrics.avg_response_time = total_time / self.metrics.total_requests
        else:
            self.metrics.failed_requests += 1
            if blocked:
                self.metrics.blocked_count += 1
    
    def get_success_rate(self) -> float:
        """获取成功率"""
        if self.metrics.total_requests == 0:
            return 0.0
        return self.metrics.successful_requests / self.metrics.total_requests
    
    def get_report(self) -> dict:
        """获取监控报告"""
        return {
            'total_requests': self.metrics.total_requests,
            'successful_requests': self.metrics.successful_requests,
            'failed_requests': self.metrics.failed_requests,
            'success_rate': f"{self.get_success_rate():.2%}",
            'avg_response_time': f"{self.metrics.avg_response_time:.2f}s",
            'blocked_count': self.metrics.blocked_count,
            'block_rate': f"{self.metrics.blocked_count / max(self.metrics.total_requests, 1):.2%}"
        }
    
    def should_alert(self, threshold: float = 0.7) -> bool:
        """判断是否需要告警"""
        return self.get_success_rate() < threshold

4. 性能优化建议

连接池优化:

# 使用连接池复用连接
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
    pool_connections=100,    # 连接池大小
    pool_maxsize=100,       # 最大连接数
    max_retries=3           # 重试次数
)
session.mount('http://', adapter)
session.mount('https://', adapter)

并发控制:

# 使用信号量控制并发数
import asyncio
from asyncio import Semaphore
 
class ConcurrentProxyManager:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = Semaphore(max_concurrent)
    
    async def fetch_with_proxy(self, url: str, proxy: str):
        async with self.semaphore:
            # 实际的请求逻辑
            async with aiohttp.ClientSession() as session:
                async with session.get(url, proxy=proxy) as response:
                    return await response.text()

5. 安全注意事项

  1. 代理认证信息安全存储

    • 使用环境变量或配置文件
    • 避免在代码中硬编码敏感信息
    • 定期更换认证信息
  2. 防止代理劫持

    • 使用HTTPS代理加密传输
    • 验证代理服务器的SSL证书
    • 监控异常的网络行为
  3. 合规性考虑

    • 遵守目标网站的robots.txt规则
    • 控制请求频率,避免对目标服务器造成过大压力
    • 尊重网站的反爬虫策略

TRAE IDE综合优势:TRAE IDE不仅提供了强大的代码编辑功能,还集成了网络调试、性能监控、代理测试等一站式开发工具。通过其智能提示和实时代码分析功能,你可以快速识别代理配置中的潜在问题,大幅提升开发效率。

09|总结与展望

本文全面介绍了Python中代理IP的使用方法,从基础概念到高级实践,涵盖了日常开发中的主要场景。掌握这些技能将帮助你:

  • 提高爬虫稳定性:通过智能代理池管理,显著降低被封IP的风险
  • 优化请求性能:选择高质量的代理服务器,提升数据采集效率
  • 增强错误处理能力:构建健壮的异常处理机制,确保程序稳定运行
  • 简化开发流程:利用TRAE IDE等专业工具,快速定位和解决代理相关问题

随着网络环境的不断变化,代理技术也在持续发展。未来的代理服务将更加智能化,具备自动切换、智能路由、质量评估等高级功能。作为开发者,我们需要持续学习和实践,才能在这个快速变化的领域中保持竞争力。

希望本文的内容能够帮助你在实际项目中更好地使用代理IP,如果你有任何问题或建议,欢迎在评论区交流讨论!

(此内容由 AI 辅助生成,仅供参考)