Quick Start - Python
Get started with Importly in your Python application using popular libraries and frameworks.
Setup
Install the required dependencies:
bash1pip install requests python-dotenv
For async support (optional):
bash1pip install aiohttp asyncio
1. Get Your API Key
- Create an Account at Importly.io
- Find your API Key via the api key page
- Set it as an environment variable:
bash1# .env2IMPORTLY_API_KEY=your_api_key_here3IMPORTLY_API_URL=https://api.importly.io4WEBHOOK_URL=https://your-domain.com/webhook/importly
2. Create Importly Client
Create a synchronous client using requests:
python1# importly_client.py2import os3import time4import requests5from typing import Optional, Dict, Any6from dotenv import load_dotenv78load_dotenv()910class ImportlyClient:11 def __init__(self, api_key: Optional[str] = None):12 self.api_key = api_key or os.getenv('IMPORTLY_API_KEY')13 if not self.api_key:14 raise ValueError("API key is required")1516 self.base_url = os.getenv('IMPORTLY_API_URL')17 self.session = requests.Session()18 self.session.headers.update({19 'Authorization': f'Bearer {self.api_key}',20 'Content-Type': 'application/json'21 })22 self.session.timeout = 302324 def import_media(self, url: str, **options) -> Dict[str, Any]:25 """Import media from a URL"""26 data = {27 'url': url,28 'includeVideo': options.get('include_video', True),29 'includeAudio': options.get('include_audio', True),30 'videoQuality': options.get('video_quality', '1080p'),31 'audioQuality': options.get('audio_quality', 'medium'),32 }3334 webhook_url = options.get('webhook_url')35 if webhook_url:36 data['webhookUrl'] = webhook_url3738 try:39 response = self.session.post(f"{self.base_url}/import", json=data)40 response.raise_for_status()41 return response.json()42 except requests.RequestException as e:43 raise self._handle_error(e)4445 def get_metadata(self, url: str, webhook_url: Optional[str] = None) -> Dict[str, Any]:46 """Get metadata for a URL"""47 params = {'url': url}48 if webhook_url:49 params['webhookUrl'] = webhook_url5051 try:52 response = self.session.get(f"{self.base_url}/metadata", params=params)53 response.raise_for_status()54 return response.json()55 except requests.RequestException as e:56 raise self._handle_error(e)5758 def check_import_status(self, import_id: str) -> Dict[str, Any]:59 """Check the status of an import"""60 try:61 response = self.session.get(f"{self.base_url}/import/status",62 params={'id': import_id})63 response.raise_for_status()64 return response.json()65 except requests.RequestException as e:66 raise self._handle_error(e)6768 def check_metadata_status(self, job_id: str) -> Dict[str, Any]:69 """Check the status of a metadata job"""70 try:71 response = self.session.get(f"{self.base_url}/metadata/status",72 params={'id': job_id})73 response.raise_for_status()74 return response.json()75 except requests.RequestException as e:76 raise self._handle_error(e)7778 def wait_for_completion(self, id: str, job_type: str = 'import',79 max_wait_time: int = 300, poll_interval: int = 5) -> Dict[str, Any]:80 """Wait for an import or metadata job to complete"""81 start_time = time.time()82 check_status = self.check_import_status if job_type == 'import' else self.check_metadata_status8384 while time.time() - start_time < max_wait_time:85 try:86 result = check_status(id)87 status = result['data']['status']8889 if status == 'completed':90 return result91 elif status in ['failed', 'cancelled']:92 error_msg = result['data'].get('error', 'Unknown error')93 raise Exception(f"{job_type} {status}: {error_msg}")9495 time.sleep(poll_interval)96 except requests.RequestException as e:97 if hasattr(e, 'response') and e.response.status_code == 404:98 raise Exception(f"{job_type} not found")99 raise100101 raise Exception(f"{job_type} timed out after {max_wait_time} seconds")102103 def _handle_error(self, error: requests.RequestException) -> Exception:104 """Handle and format API errors"""105 if hasattr(error, 'response') and error.response is not None:106 status_code = error.response.status_code107 try:108 error_data = error.response.json()109 message = error_data.get('message', error_data.get('error', 'API request failed'))110 except ValueError:111 message = error.response.text or 'API request failed'112113 if status_code == 401:114 return Exception('Invalid API key')115 elif status_code == 402:116 return Exception('Insufficient credits')117 elif status_code == 429:118 return Exception('Rate limit exceeded')119 elif status_code == 400:120 return Exception(f'Bad request: {message}')121 else:122 return Exception(f'API error ({status_code}): {message}')123 else:124 return Exception(f'Network error: {str(error)}')
3. Async Client (Optional)
For async applications, create an async client:
python1# async_importly_client.py2import os3import asyncio4import aiohttp5from typing import Optional, Dict, Any6from dotenv import load_dotenv78load_dotenv()910class AsyncImportlyClient:11 def __init__(self, api_key: Optional[str] = None):12 self.api_key = api_key or os.getenv('IMPORTLY_API_KEY')13 if not self.api_key:14 raise ValueError("API key is required")1516 self.base_url = "https://api.importly.io"17 self.headers = {18 'Authorization': f'Bearer {self.api_key}',19 'Content-Type': 'application/json'20 }2122 async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:23 """Make an HTTP request"""24 url = f"{self.base_url}{endpoint}"2526 async with aiohttp.ClientSession(headers=self.headers, timeout=aiohttp.ClientTimeout(30)) as session:27 async with session.request(method, url, **kwargs) as response:28 if response.status >= 400:29 error_text = await response.text()30 raise aiohttp.ClientError(f"HTTP {response.status}: {error_text}")3132 return await response.json()3334 async def import_media(self, url: str, **options) -> Dict[str, Any]:35 """Import media from a URL"""36 data = {37 'url': url,38 'includeVideo': options.get('include_video', True),39 'includeAudio': options.get('include_audio', True),40 'videoQuality': options.get('video_quality', '1080p'),41 'audioQuality': options.get('audio_quality', 'medium'),42 }4344 webhook_url = options.get('webhook_url')45 if webhook_url:46 data['webhookUrl'] = webhook_url4748 return await self._make_request('POST', '/import', json=data)4950 async def get_metadata(self, url: str, webhook_url: Optional[str] = None) -> Dict[str, Any]:51 """Get metadata for a URL"""52 params = {'url': url}53 if webhook_url:54 params['webhookUrl'] = webhook_url5556 return await self._make_request('GET', '/metadata', params=params)5758 async def check_import_status(self, import_id: str) -> Dict[str, Any]:59 """Check the status of an import"""60 return await self._make_request('GET', '/import/status', params={'id': import_id})6162 async def check_metadata_status(self, job_id: str) -> Dict[str, Any]:63 """Check the status of a metadata job"""64 return await self._make_request('GET', '/metadata/status', params={'id': job_id})6566 async def wait_for_completion(self, id: str, job_type: str = 'import',67 max_wait_time: int = 300, poll_interval: int = 5) -> Dict[str, Any]:68 """Wait for an import or metadata job to complete"""69 start_time = asyncio.get_event_loop().time()70 check_status = self.check_import_status if job_type == 'import' else self.check_metadata_status7172 while asyncio.get_event_loop().time() - start_time < max_wait_time:73 try:74 result = await check_status(id)75 status = result['data']['status']7677 if status == 'completed':78 return result79 elif status in ['failed', 'cancelled']:80 error_msg = result['data'].get('error', 'Unknown error')81 raise Exception(f"{job_type} {status}: {error_msg}")8283 await asyncio.sleep(poll_interval)84 except aiohttp.ClientError as e:85 if '404' in str(e):86 raise Exception(f"{job_type} not found")87 raise8889 raise Exception(f"{job_type} timed out after {max_wait_time} seconds")
4. Flask Web Application
Create a Flask app with webhook support:
python1# app.py2from flask import Flask, request, jsonify3from importly_client import ImportlyClient4import os5from datetime import datetime67app = Flask(__name__)89# Initialize Importly client10importly = ImportlyClient()1112# In-memory storage (use database in production)13imports_db = {}14metadata_db = {}1516@app.route('/import', methods=['POST'])17def import_media():18 """Start a media import"""19 try:20 data = request.get_json()21 url = data.get('url')2223 if not url:24 return jsonify({'error': 'URL is required'}), 4002526 # Start import with webhook27 result = importly.import_media(28 url,29 video_quality=data.get('videoQuality', '1080p'),30 audio_quality=data.get('audioQuality', 'medium'),31 webhook_url=os.getenv('WEBHOOK_URL')32 )3334 import_id = result['data']['jobId']3536 # Store import info37 imports_db[import_id] = {38 'id': import_id,39 'url': url,40 'status': 'queued',41 'created_at': datetime.utcnow().isoformat(),42 'video_quality': data.get('videoQuality', '1080p'),43 'audio_quality': data.get('audioQuality', 'medium')44 }4546 return jsonify({47 'success': True,48 'import_id': import_id,49 'status': 'queued',50 'message': 'Import started successfully'51 })5253 except Exception as e:54 return jsonify({'error': str(e)}), 5005556@app.route('/metadata', methods=['POST'])57def get_metadata():58 """Get metadata for a URL"""59 try:60 data = request.get_json()61 url = data.get('url')6263 if not url:64 return jsonify({'error': 'URL is required'}), 4006566 result = importly.get_metadata(url, os.getenv('WEBHOOK_URL'))67 job_id = result['data']['jobId']6869 # Store metadata info70 metadata_db[job_id] = {71 'id': job_id,72 'url': url,73 'status': 'queued',74 'created_at': datetime.utcnow().isoformat()75 }7677 return jsonify({78 'success': True,79 'job_id': job_id,80 'status': 'queued',81 'message': 'Metadata request started successfully'82 })8384 except Exception as e:85 return jsonify({'error': str(e)}), 5008687@app.route('/import/<import_id>/status', methods=['GET'])88def get_import_status(import_id):89 """Get import status"""90 try:91 # Check local storage first92 local_import = imports_db.get(import_id)93 if local_import and local_import['status'] == 'completed':94 return jsonify({'success': True, 'data': local_import})9596 # Check with Importly API97 result = importly.check_import_status(import_id)9899 # Update local storage100 if import_id in imports_db:101 imports_db[import_id].update(result['data'])102103 return jsonify(result)104105 except Exception as e:106 return jsonify({'error': str(e)}), 500107108@app.route('/metadata/<job_id>/status', methods=['GET'])109def get_metadata_status(job_id):110 """Get metadata status"""111 try:112 # Check local storage first113 local_metadata = metadata_db.get(job_id)114 if local_metadata and local_metadata['status'] == 'completed':115 return jsonify({'success': True, 'data': local_metadata})116117 # Check with Importly API118 result = importly.check_metadata_status(job_id)119120 # Update local storage121 if job_id in metadata_db:122 metadata_db[job_id].update(result['data'])123124 return jsonify(result)125126 except Exception as e:127 return jsonify({'error': str(e)}), 500128129@app.route('/webhook/importly', methods=['POST'])130def handle_webhook():131 """Handle Importly webhooks"""132 try:133 data = request.get_json()134 webhook_type = data.get('type')135 webhook_data = data.get('data')136137 print(f"Received webhook: {webhook_type}")138139 if webhook_type == 'import.completed':140 handle_import_completed(webhook_data)141 elif webhook_type == 'import.failed':142 handle_import_failed(webhook_data)143 elif webhook_type == 'metadata.completed':144 handle_metadata_completed(webhook_data)145 elif webhook_type == 'metadata.failed':146 handle_metadata_failed(webhook_data)147 else:148 print(f"Unknown webhook type: {webhook_type}")149150 return jsonify({'received': True})151152 except Exception as e:153 print(f"Webhook error: {e}")154 return jsonify({'error': 'Webhook processing failed'}), 500155156def handle_import_completed(data):157 """Handle completed import webhook"""158 import_id = data['jobId']159 result = data['result']160161 if import_id in imports_db:162 imports_db[import_id].update({163 'status': 'completed',164 'result': result,165 'completed_at': datetime.utcnow().isoformat()166 })167168 print(f"Import completed: {import_id}")169170 # Add your custom logic here:171 # - Send notifications172 # - Update database173 # - Process the media file174175def handle_import_failed(data):176 """Handle failed import webhook"""177 import_id = data['jobId']178 error = data.get('error', 'Unknown error')179180 if import_id in imports_db:181 imports_db[import_id].update({182 'status': 'failed',183 'error': error,184 'failed_at': datetime.utcnow().isoformat()185 })186187 print(f"Import failed: {import_id} - {error}")188189def handle_metadata_completed(data):190 """Handle completed metadata webhook"""191 job_id = data['jobId']192 result = data['result']193194 if job_id in metadata_db:195 metadata_db[job_id].update({196 'status': 'completed',197 'result': result,198 'completed_at': datetime.utcnow().isoformat()199 })200201 print(f"Metadata completed: {job_id}")202203def handle_metadata_failed(data):204 """Handle failed metadata webhook"""205 job_id = data['jobId']206 error = data.get('error', 'Unknown error')207208 if job_id in metadata_db:209 metadata_db[job_id].update({210 'status': 'failed',211 'error': error,212 'failed_at': datetime.utcnow().isoformat()213 })214215 print(f"Metadata failed: {job_id} - {error}")216217@app.route('/imports', methods=['GET'])218def list_imports():219 """List all imports"""220 imports_list = sorted(221 imports_db.values(),222 key=lambda x: x['created_at'],223 reverse=True224 )225 return jsonify({'imports': imports_list})226227@app.route('/metadata', methods=['GET'])228def list_metadata():229 """List all metadata requests"""230 metadata_list = sorted(231 metadata_db.values(),232 key=lambda x: x['created_at'],233 reverse=True234 )235 return jsonify({'metadata': metadata_list})236237@app.route('/health', methods=['GET'])238def health_check():239 """Health check endpoint"""240 return jsonify({241 'status': 'ok',242 'timestamp': datetime.utcnow().isoformat(),243 'imports': len(imports_db),244 'metadata': len(metadata_db)245 })246247if __name__ == '__main__':248 app.run(debug=True, port=5000)
5. Command Line Tool
Create a CLI tool for easy imports:
python1# cli.py2#!/usr/bin/env python334import sys5import argparse6from importly_client import ImportlyClient78def main():9 parser = argparse.ArgumentParser(description='Importly CLI tool')10 parser.add_argument('url', help='URL to import')11 parser.add_argument('--quality', '-q', default='1080p',12 choices=['480p', '720p', '1080p', '1440p', '2160p'],13 help='Video quality (default: 1080p)')14 parser.add_argument('--audio-quality', '-a', default='medium',15 choices=['low', 'medium', 'high'],16 help='Audio quality (default: medium)')17 parser.add_argument('--metadata-only', '-m', action='store_true',18 help='Only get metadata, don\'t import')19 parser.add_argument('--no-wait', action='store_true',20 help='Don\'t wait for completion')2122 args = parser.parse_args()2324 try:25 client = ImportlyClient()2627 if args.metadata_only:28 print(f"Getting metadata for: {args.url}")29 result = client.get_metadata(args.url)30 job_id = result['data']['jobId']31 print(f"Metadata job started with ID: {job_id}")3233 if not args.no_wait:34 print("Waiting for completion...")35 completed = client.wait_for_completion(job_id, 'metadata')36 print("Metadata completed!")37 print(f"Title: {completed['data']['result'].get('title', 'N/A')}")38 print(f"Duration: {completed['data']['result'].get('duration', 'N/A')}s")39 else:40 print(f"Starting import for: {args.url}")41 print(f"Video quality: {args.quality}")42 print(f"Audio quality: {args.audio_quality}")4344 result = client.import_media(45 args.url,46 video_quality=args.quality,47 audio_quality=args.audio_quality48 )4950 import_id = result['data']['jobId']51 print(f"Import started with ID: {import_id}")5253 if not args.no_wait:54 print("Waiting for completion...")55 completed = client.wait_for_completion(import_id, 'import')5657 print("Import completed!")58 result_data = completed['data']['result']59 print(f"Media URL: {result_data['mediaUrl']}")60 print(f"Credits used: {result_data['creditsUsed']}")61 print(f"Duration: {result_data['duration']}s")62 print(f"File size: {result_data['fileSizeBytes']} bytes")6364 except Exception as e:65 print(f"Error: {e}", file=sys.stderr)66 sys.exit(1)6768if __name__ == '__main__':69 main()
Make it executable:
bash1chmod +x cli.py
Use it:
bash1python cli.py "https://example.com/video" --quality 1080p2python cli.py "https://example.com/video" --metadata-only
6. Running the Application
Start your Flask app:
bash1python app.py
Test the endpoints:
bash1# Start an import2curl -X POST http://localhost:5000/import \3 -H "Content-Type: application/json" \4 -d '{"url": "https://example.com/video", "videoQuality": "1080p"}'56# Check status7curl http://localhost:5000/import/YOUR_IMPORT_ID/status
Django Integration (Optional)
For Django projects, you can integrate Importly into your models and views:
python1# models.py2from django.db import models3import uuid45class MediaImport(models.Model):6 STATUS_CHOICES = [7 ('queued', 'Queued'),8 ('processing', 'Processing'),9 ('completed', 'Completed'),10 ('failed', 'Failed'),11 ]1213 id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)14 import_id = models.CharField(max_length=255, unique=True)15 url = models.URLField()16 status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='queued')17 video_quality = models.CharField(max_length=10, default='1080p')18 audio_quality = models.CharField(max_length=10, default='medium')19 media_url = models.URLField(blank=True, null=True)20 credits_used = models.IntegerField(blank=True, null=True)21 error_message = models.TextField(blank=True, null=True)22 created_at = models.DateTimeField(auto_now_add=True)23 completed_at = models.DateTimeField(blank=True, null=True)2425 def __str__(self):26 return f"Import {self.import_id} - {self.status}"
Why Python?
Python is excellent for Importly because:
- ✅ Rich ecosystem - Great libraries for web development (Flask, Django, FastAPI)
- ✅ Data processing - Perfect for handling media metadata and files
- ✅ AI/ML integration - Easy to integrate with ML pipelines
- ✅ Async support - Handle multiple imports concurrently
- ✅ Simple syntax - Quick to implement and maintain
Best Practices
- Use virtual environments to manage dependencies
- Implement proper logging with Python's logging module
- Use databases instead of in-memory storage for production
- Add input validation with libraries like Pydantic
- Implement caching with Redis for frequently accessed data
- Use async/await for high-concurrency applications
- Add comprehensive error handling and retries
Production Considerations
- Database: Use SQLAlchemy with PostgreSQL or similar
- Task Queue: Implement Celery with Redis/RabbitMQ for background jobs
- Web Framework: Consider FastAPI for async APIs or Django for full applications
- Monitoring: Add logging, metrics, and health checks
- Security: Implement authentication, input validation, and rate limiting
Complete Example
Check out our complete Python example on GitHub for a full implementation with Django/Flask integrations and advanced features.