高效获取速卖通商品实时数据:API 接口开发与接入全流程
在跨境电商领域,实时获取商品数据对商家优化定价策略、监控竞品动态及把握市场趋势至关重要。本文将详细介绍如何通过速卖通API 高效获取商品实时数据,涵盖从注册到数据处理的完整流程,并提供可复用的代码示例。
1. 速卖通 API 接入基础
1.1 平台概述
速卖通平台提供 RESTful API 接口,支持商品信息、订单管理、物流查询等核心功能。需通过认证获取访问权限,接口返回 JSON 格式数据。
1.2 接入准备
- 注册账号
- 获取
ApiKey
和ApiSecret
- 申请接口权限(如商品详情、列表查询)
- 了解 API 调用频率限制(通常为 5-10 次 / 秒)
2. 认证与授权流程
速卖通 API 采用授权码模式,需完成以下步骤:
import requests
import json
import time
from urllib.parse import urlencode# 配置信息
APP_KEY = "your_app_key"
APP_SECRET = "your_app_secret"
REDIRECT_URI = "https://your-callback-url.com"
AUTHORIZATION_CODE = "" # 授权后获取
ACCESS_TOKEN = "" # 访问令牌
REFRESH_TOKEN = "" # 刷新令牌(有效期1年)# 1. 获取授权URL
def get_authorization_url():params = {"client_id": APP_KEY,"redirect_uri": REDIRECT_URI,"response_type": "code","scope": "all","state": "init"}return f"https://gw.api.alibaba.com/auth/authorize.htm?{urlencode(params)}"# 2. 通过授权码换取访问令牌
def get_access_token(auth_code):url = "https://gw.api.alibaba.com/openapi/http/1/system.oauth2/getToken"payload = {"grant_type": "authorization_code","client_id": APP_KEY,"client_secret": APP_SECRET,"code": auth_code,"redirect_uri": REDIRECT_URI}response = requests.post(url, data=payload)return response.json()# 3. 刷新访问令牌(避免频繁授权)
def refresh_token(refresh_token):url = "https://gw.api.alibaba.com/openapi/http/1/system.oauth2/getToken"payload = {"grant_type": "refresh_token","client_id": APP_KEY,"client_secret": APP_SECRET,"refresh_token": refresh_token}response = requests.post(url, data=payload)return response.json()
3. 商品数据 API 调用实现
3.1 签名生成算法
速卖通 API 要求对请求参数进行 HMAC-SHA1 签名:
import hmac
import hashlibdef generate_signature(params, app_secret):"""生成API请求签名"""# 按参数名排序sorted_params = sorted(params.items(), key=lambda x: x[0])# 拼接参数名和值string_to_sign = app_secretfor key, value in sorted_params:string_to_sign += f"{key}{value}"# HMAC-SHA1加密signature = hmac.new(app_secret.encode("utf-8"),string_to_sign.encode("utf-8"),hashlib.sha1).hexdigest().upper()return signature
3.2 商品详情 API 调用
def get_product_detail(product_id, access_token):"""获取单个商品详情"""api_url = f"https://gw.api.alibaba.com/openapi/param2/2/aliexpress.affiliate.productdetail.get/{APP_KEY}"# 请求参数params = {"app_key": APP_KEY,"session": access_token,"timestamp": int(time.time() * 1000), # 毫秒级时间戳"format": "json","v": "2","sign_method": "hmac-sha1","product_id": product_id,"fields": "productId,productTitle,originalPrice,salePrice,imageUrl,categoryId,rating,orderCount,storeName,description"}# 生成签名params["sign"] = generate_signature(params, APP_SECRET)# 发送请求response = requests.post(api_url, data=params)return response.json()
3.3 商品列表批量查询
def get_product_list(keywords, page=1, page_size=20, access_token=None):"""根据关键词搜索商品列表"""api_url = f"https://gw.api.alibaba.com/openapi/param2/2/aliexpress.affiliate.products.search/{APP_KEY}"params = {"app_key": APP_KEY,"session": access_token,"timestamp": int(time.time() * 1000),"format": "json","v": "2","sign_method": "hmac-sha1","keywords": keywords,"page_no": page,"page_size": page_size,"fields": "productId,productTitle,originalPrice,salePrice,imageUrl,categoryId,rating,orderCount,storeName"}params["sign"] = generate_signature(params, APP_SECRET)response = requests.post(api_url, data=params)return response.json()
4. 数据处理与存储
4.1 数据解析与清洗
def parse_product_data(api_response):"""解析API返回的商品数据"""if "errorCode" in api_response:print(f"API错误: {api_response['errorCode']} - {api_response['errorMessage']}")return Noneif "result" not in api_response:print("无有效数据返回")return None# 提取商品信息product = api_response["result"]parsed_data = {"product_id": product.get("productId"),"title": product.get("productTitle"),"original_price": float(product.get("originalPrice", 0)),"sale_price": float(product.get("salePrice", 0)),"discount_rate": round(1 - (float(product.get("salePrice", 0)) / float(product.get("originalPrice", 1))), 2),"image_url": product.get("imageUrl"),"category_id": product.get("categoryId"),"rating": float(product.get("rating", 0)),"order_count": int(product.get("orderCount", 0)),"store_name": product.get("storeName"),"crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")}return parsed_data
4.2 数据存储(MySQL 示例)
import pymysql
from pymysql.cursors import DictCursordef save_to_mysql(product_data, table_name="aliexpress_products"):"""将商品数据存入MySQL数据库"""connection = pymysql.connect(host="localhost",user="your_username",password="your_password",database="aliexpress_data",charset="utf8mb4",cursorclass=DictCursor)try:with connection.cursor() as cursor:# 检查表格是否存在,不存在则创建create_table_sql = f"""CREATE TABLE IF NOT EXISTS `{table_name}` (`id` INT AUTO_INCREMENT PRIMARY KEY,`product_id` VARCHAR(255) NOT NULL,`title` TEXT,`original_price` DECIMAL(10, 2),`sale_price` DECIMAL(10, 2),`discount_rate` DECIMAL(3, 2),`image_url` TEXT,`category_id` VARCHAR(255),`rating` DECIMAL(3, 2),`order_count` INT,`store_name` VARCHAR(255),`crawl_time` DATETIME) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;"""cursor.execute(create_table_sql)# 插入数据insert_sql = f"""INSERT INTO `{table_name}` (`product_id`, `title`, `original_price`, `sale_price`, `discount_rate`, `image_url`, `category_id`, `rating`, `order_count`, `store_name`, `crawl_time`)VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);"""cursor.execute(insert_sql, (product_data["product_id"],product_data["title"],product_data["original_price"],product_data["sale_price"],product_data["discount_rate"],product_data["image_url"],product_data["category_id"],product_data["rating"],product_data["order_count"],product_data["store_name"],product_data["crawl_time"]))# 提交事务connection.commit()print(f"成功存储商品: {product_data['product_id']}")except Exception as e:print(f"存储失败: {str(e)}")connection.rollback()finally:connection.close()
5.2 智能限流与重试
import time
from functools import wrapsdef rate_limit(max_calls=5, period=1):"""API调用限流装饰器"""calls = []def decorator(func):@wraps(func)def wrapper(*args, **kwargs):# 移除时间窗口外的调用记录now = time.time()calls[:] = [t for t in calls if t > now - period]# 检查是否超过限制if len(calls) >= max_calls:wait_time = period - (now - calls[0])time.sleep(wait_time)# 记录本次调用calls.append(now)# 执行函数return func(*args, **kwargs)return wrapperreturn decoratordef retry(max_attempts=3, delay=1):"""API调用失败重试装饰器"""def decorator(func):@wraps(func)def wrapper(*args, **kwargs):attempts = 0while attempts < max_attempts:try:return func(*args, **kwargs)except Exception as e:attempts += 1if attempts == max_attempts:raise eprint(f"尝试 {attempts}/{max_attempts} 失败: {str(e)},{delay}秒后重试")time.sleep(delay)delay *= 2 # 指数退避return wrapperreturn decorator
6. 数据监控与分析
6.1 价格波动监控
def monitor_price_changes(product_id, threshold=0.1):"""监控商品价格变化(与历史数据对比)"""connection = pymysql.connect(host="localhost",user="your_username",password="your_password",database="aliexpress_data",charset="utf8mb4",cursorclass=DictCursor)try:with connection.cursor() as cursor:# 查询最近两次价格query = """SELECT sale_price FROM aliexpress_products WHERE product_id = %s ORDER BY crawl_time DESC LIMIT 2;"""cursor.execute(query, (product_id,))results = cursor.fetchall()if len(results) < 2:print("历史数据不足,无法判断价格变化")return Noneold_price = float(results[1]["sale_price"])new_price = float(results[0]["sale_price"])change_rate = (new_price - old_price) / old_priceif abs(change_rate) >= threshold:print(f"价格变动超过{threshold*100}%: 原价{old_price} → 现价{new_price}")return {"product_id": product_id,"old_price": old_price,"new_price": new_price,"change_rate": change_rate}return Nonefinally:connection.close()
7. 部署与运维建议
-
定时任务配置:
- 使用
APScheduler
或 Linuxcrontab
设置定时采集 - 避免高峰期集中请求,分散 API 调用时间
- 使用
-
异常处理机制:
- 记录详细日志,包括请求参数、响应内容和错误信息
- 设置告警系统(邮件 / 短信),监控 API 调用成功率
-
数据安全:
- 敏感信息(如 AppSecret)使用环境变量管理
- 定期备份数据库,防止数据丢失
-
性能优化:
- 使用缓存(如 Redis)存储高频访问数据
- 优化数据库查询,添加合适索引
8. 完整示例:商品监控系统
# 完整示例:商品监控系统
if __name__ == "__main__":# 1. 获取授权(首次运行)# auth_url = get_authorization_url()# print(f"请访问以下URL授权: {auth_url}")# # 授权后获取AUTHORIZATION_CODE,然后:# token_data = get_access_token(AUTHORIZATION_CODE)# ACCESS_TOKEN = token_data["access_token"]# REFRESH_TOKEN = token_data["refresh_token"]# 2. 刷新令牌(后续运行)# token_data = refresh_token(REFRESH_TOKEN)# ACCESS_TOKEN = token_data["access_token"]# 3. 监控商品列表products_to_monitor = ["3256804740043007", "3256804740043008", "3256804740043009"]for product_id in products_to_monitor:# 获取商品详情result = get_product_detail(product_id, ACCESS_TOKEN)# 解析数据product_data = parse_product_data(result)if product_data:# 存储数据save_to_mysql(product_data)# 检查价格变化price_change = monitor_price_changes(product_id)if price_change:print(f"⚠️ 价格变动警报: {price_change}")
总结
通过本文介绍的方法,开发者可以高效接入速卖通 API,实现商品数据的实时采集与监控。关键技术要点包括:
- 认证流程
- API 签名生成算法
- 并发请求与限流控制
- 数据解析与持久化
- 价格波动监控机制
实际应用中,建议根据业务需求调整采集频率和数据字段,同时注意遵守速卖通 API 使用规范,避免因过度请求导致 IP 封禁。通过合理设计架构,可以构建出稳定、高效的跨境电商数据采集系统。