Skip to main content

Setup

Installing Dependencies

pip install requests python-dotenv

Environment Setup

# .env
FIREMOON_API_KEY=your_api_key_here
# Load environment variables
import os
from dotenv import load_dotenv

load_dotenv()
API_KEY = os.getenv('FIREMOON_API_KEY')
BASE_URL = 'https://firemoon.studio/api/v1'

Basic Image Generation

import requests
import os

def generate_image(prompt, **kwargs):
    """Generate an image using FLUX/dev"""
    response = requests.post(
        f"{BASE_URL}/flux/dev",
        headers={
            'Authorization': f'Bearer {API_KEY}',
            'Content-Type': 'application/json'
        },
        json={
            'prompt': prompt,
            'num_images': kwargs.get('num_images', 1),
            'image_size': kwargs.get('image_size', 'landscape_4_3'),
            'guidance_scale': kwargs.get('guidance_scale', 3.5),
            **kwargs
        }
    )

    response.raise_for_status()
    return response.json()

# Usage
try:
    result = generate_image('A beautiful sunset over mountains')
    print('Generated image:', result['images'][0]['url'])
except requests.exceptions.RequestException as e:
    print('Error:', str(e))

Video Generation

def generate_video(prompt, **kwargs):
    """Generate a video using Kling"""
    response = requests.post(
        f"{BASE_URL}/kling/kling-2-1-master",
        headers={
            'Authorization': f'Bearer {API_KEY}',
            'Content-Type': 'application/json'
        },
        json={
            'prompt': prompt,
            'duration': kwargs.get('duration', '5'),
            'aspect_ratio': kwargs.get('aspect_ratio', '16:9'),
            **kwargs
        }
    )

    response.raise_for_status()
    return response.json()

# Usage
video_result = generate_video(
    'A butterfly emerging from its chrysalis',
    duration='10'
)
print('Generated video:', video_result['videos'][0]['url'])

Character Editing with Ideogram

def edit_character(prompt, image_url, **kwargs):
    """Edit a character using Ideogram"""
    response = requests.post(
        f"{BASE_URL}/ideogram/v3-character-edit",
        headers={
            'Authorization': f'Bearer {API_KEY}',
            'Content-Type': 'application/json'
        },
        json={
            'prompt': prompt,
            'image_url': image_url,
            'style': kwargs.get('style', 'realistic'),
            'magic_prompt_option': kwargs.get('magic_prompt_option', 'auto'),
            **kwargs
        }
    )

    response.raise_for_status()
    return response.json()

# Usage
edit_result = edit_character(
    'Change the character to wear a red jacket',
    'https://example.com/character.jpg'
)
print('Edited image:', edit_result['images'][0]['url'])

Advanced Client Class

import requests
import time
from typing import Dict, Any, Optional, Union
from dataclasses import dataclass

@dataclass
class GenerationResult:
    images: Optional[list] = None
    videos: Optional[list] = None
    seed: Optional[int] = None
    has_nsfw_concepts: Optional[list] = None
    timings: Optional[Dict[str, float]] = None

class FiremoonStudio:
    def __init__(self, api_key: str, base_url: str = 'https://firemoon.studio/api/v1'):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {api_key}',
            'Content-Type': 'application/json'
        })

    def _request(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
        response = self.session.post(f"{self.base_url}{endpoint}", json=params)
        response.raise_for_status()
        return response.json()

    def generate_image(self, provider: str, model: str, params: Dict[str, Any]) -> GenerationResult:
        """Generate images using any provider/model"""
        result = self._request(f"/{provider}/{model}", params)
        return GenerationResult(**result)

    # Convenience methods
    def flux_dev(self, prompt: str, **kwargs) -> GenerationResult:
        """Generate images using FLUX/dev"""
        params = {
            'prompt': prompt,
            'num_images': 1,
            'image_size': 'landscape_4_3',
            'guidance_scale': 3.5,
            **kwargs
        }
        return self.generate_image('flux', 'dev', params)

    def kling_video(self, prompt: str, **kwargs) -> GenerationResult:
        """Generate videos using Kling"""
        params = {
            'prompt': prompt,
            'duration': '5',
            'aspect_ratio': '16:9',
            **kwargs
        }
        return self.generate_image('kling', 'kling-2-1-master', params)

    def ideogram_edit(self, prompt: str, image_url: str, **kwargs) -> GenerationResult:
        """Edit characters using Ideogram"""
        params = {
            'prompt': prompt,
            'image_url': image_url,
            'style': 'realistic',
            **kwargs
        }
        return self.generate_image('ideogram', 'v3-character-edit', params)

# Usage
client = FiremoonStudio(os.getenv('FIREMOON_API_KEY'))

# Generate multiple images
result = client.flux_dev('A futuristic city', num_images=3)
for i, img in enumerate(result.images or []):
    print(f"Image {i + 1}: {img['url']}")

# Generate video
video = client.kling_video('A rocket launch')
print('Video:', video.videos[0]['url'])

Error Handling and Retries

from requests.exceptions import RequestException, Timeout, ConnectionError
import logging

class FiremoonAPIError(Exception):
    def __init__(self, message: str, status_code: int = None, details: Dict = None):
        super().__init__(message)
        self.status_code = status_code
        self.details = details or {}

def request_with_retry(endpoint: str, params: Dict, max_retries: int = 3):
    """Make API request with automatic retries"""
    last_exception = None

    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"{BASE_URL}{endpoint}",
                headers={
                    'Authorization': f'Bearer {API_KEY}',
                    'Content-Type': 'application/json'
                },
                json=params,
                timeout=30
            )

            if response.status_code == 429:
                # Rate limited - wait and retry
                reset_time = response.headers.get('X-RateLimit-Reset')
                wait_time = min(2 ** attempt, 30)  # Exponential backoff, max 30s
                logging.warning(f"Rate limited. Waiting {wait_time}s before retry {attempt + 1}/{max_retries}")
                time.sleep(wait_time)
                continue

            if not response.ok:
                error_data = response.json()
                raise FiremoonAPIError(
                    error_data.get('message', 'Unknown error'),
                    response.status_code,
                    error_data
                )

            return response.json()

        except (Timeout, ConnectionError) as e:
            last_exception = e
            if attempt == max_retries - 1:
                raise
            logging.warning(f"Network error (attempt {attempt + 1}/{max_retries}): {e}")
            time.sleep(2 ** attempt)

        except RequestException as e:
            last_exception = e
            # Don't retry on client errors (4xx except 429)
            if hasattr(e.response, 'status_code') and 400 <= e.response.status_code < 500 and e.response.status_code != 429:
                raise FiremoonAPIError(str(e), e.response.status_code)
            if attempt == max_retries - 1:
                raise
            logging.warning(f"Request error (attempt {attempt + 1}/{max_retries}): {e}")

    raise last_exception

# Usage with error handling
def safe_generate_image(prompt: str):
    try:
        return request_with_retry('/flux/dev', {'prompt': prompt})
    except FiremoonAPIError as e:
        if e.status_code == 400:
            logging.error(f"Invalid request: {e.details}")
        elif e.status_code == 401:
            logging.error("Invalid API key")
        elif e.status_code == 403:
            logging.error("Access denied to this provider")
        else:
            logging.error(f"API error: {e}")
        raise
    except Exception as e:
        logging.error(f"Unexpected error: {e}")
        raise

Batch Processing

import asyncio
import concurrent.futures
from typing import List, Tuple, Dict

def generate_batch(prompts: List[str], **kwargs) -> Tuple[List[Dict], List[Dict]]:
    """Generate images for multiple prompts with error handling"""
    batch_size = kwargs.get('batch_size', 3)
    results = []
    errors = []

    def process_prompt(prompt: str, index: int):
        try:
            result = request_with_retry('/flux/dev', {
                'prompt': prompt,
                'num_images': 1,
                **kwargs
            })
            return {'success': True, 'prompt': prompt, 'result': result, 'index': index}
        except Exception as e:
            return {'success': False, 'prompt': prompt, 'error': str(e), 'index': index}

    # Process in batches to avoid overwhelming the API
    with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
        for i in range(0, len(prompts), batch_size):
            batch = prompts[i:i + batch_size]
            futures = [
                executor.submit(process_prompt, prompt, i + j)
                for j, prompt in enumerate(batch)
            ]

            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result['success']:
                    results.append(result)
                else:
                    errors.append(result)

            # Small delay between batches
            if i + batch_size < len(prompts):
                time.sleep(1)

    return results, errors

# Usage
prompts = [
    'A sunset over mountains',
    'A futuristic city',
    'A serene lake',
    'A space rocket launch',
    'A butterfly on a flower'
]

results, errors = generate_batch(prompts, guidance_scale=4.0)

print(f"Successfully generated {len(results)} images")
if errors:
    print(f"Failed to generate {len(errors)} images")
    for error in errors:
        print(f"  - {error['prompt']}: {error['error']}")

Rate Limit Monitoring

import time
from collections import deque
import threading

class RateLimitTracker:
    def __init__(self):
        self.requests = deque()
        self.warnings = []
        self.lock = threading.Lock()

    def track_request(self):
        now = time.time()
        with self.lock:
            # Keep only requests from the last minute
            while self.requests and now - self.requests[0] > 60:
                self.requests.popleft()
            self.requests.append(now)

            requests_per_minute = len(self.requests)

            if requests_per_minute > 45:  # 75% of 60 limit
                warning = {
                    'timestamp': now,
                    'requests_per_minute': requests_per_minute,
                    'message': 'Approaching rate limit'
                }
                self.warnings.append(warning)
                logging.warning(f"Approaching rate limit: {requests_per_minute}/60 requests/minute")

    def get_stats(self):
        now = time.time()
        with self.lock:
            last_minute = [t for t in self.requests if now - t < 60]
            last_hour = [t for t in self.requests if now - t < 3600]

            return {
                'last_minute': len(last_minute),
                'last_hour': len(last_hour),
                'warnings': self.warnings[-10:]  # Last 10 warnings
            }

# Global tracker instance
tracker = RateLimitTracker()

def tracked_request(endpoint: str, params: Dict):
    """Make API request with rate limit tracking"""
    tracker.track_request()
    return request_with_retry(endpoint, params)

# Usage
result = tracked_request('/flux/dev', {'prompt': 'A sunset'})

# Check stats
stats = tracker.get_stats()
print(f"Requests last minute: {stats['last_minute']}")

Async Support

import asyncio
import aiohttp
from typing import List, Dict, Any

class AsyncFiremoonStudio:
    def __init__(self, api_key: str, base_url: str = 'https://firemoon.studio/api/v1'):
        self.api_key = api_key
        self.base_url = base_url

    async def _request(self, endpoint: str, params: Dict[str, Any]) -> Dict[str, Any]:
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{self.base_url}{endpoint}",
                headers={
                    'Authorization': f'Bearer {self.api_key}',
                    'Content-Type': 'application/json'
                },
                json=params
            ) as response:
                if not response.ok:
                    error_data = await response.json()
                    raise FiremoonAPIError(
                        error_data.get('message', 'Unknown error'),
                        response.status,
                        error_data
                    )
                return await response.json()

    async def generate_image(self, provider: str, model: str, params: Dict[str, Any]) -> GenerationResult:
        result = await self._request(f"/{provider}/{model}", params)
        return GenerationResult(**result)

    async def generate_batch(self, requests: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Generate multiple images/videos concurrently"""
        tasks = [
            self.generate_image(req['provider'], req['model'], req['params'])
            for req in requests
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)

# Usage
async def main():
    client = AsyncFiremoonStudio(os.getenv('FIREMOON_API_KEY'))

    # Generate single image
    result = await client.generate_image('flux', 'dev', {
        'prompt': 'A beautiful sunset',
        'num_images': 1
    })
    print('Image:', result.images[0]['url'])

    # Generate batch
    batch_requests = [
        {
            'provider': 'flux',
            'model': 'dev',
            'params': {'prompt': 'A mountain landscape', 'num_images': 1}
        },
        {
            'provider': 'flux',
            'model': 'dev',
            'params': {'prompt': 'A city skyline', 'num_images': 1}
        }
    ]

    results = await client.generate_batch(batch_requests)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Request {i + 1} failed: {result}")
        else:
            print(f"Request {i + 1} succeeded: {result.images[0]['url']}")

asyncio.run(main())

Flask Web Application

from flask import Flask, request, jsonify, render_template
import os

app = Flask(__name__)
client = FiremoonStudio(os.getenv('FIREMOON_API_KEY'))

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/generate', methods=['POST'])
def generate():
    try:
        data = request.get_json()
        prompt = data.get('prompt')
        num_images = data.get('num_images', 1)

        if not prompt:
            return jsonify({'error': 'Prompt is required'}), 400

        result = client.flux_dev(prompt, num_images=num_images)

        return jsonify({
            'success': True,
            'images': result.images,
            'seed': result.seed
        })

    except FiremoonAPIError as e:
        return jsonify({'error': str(e), 'status_code': e.status_code}), e.status_code
    except Exception as e:
        return jsonify({'error': 'Internal server error'}), 500

if __name__ == '__main__':
    app.run(debug=True)
<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
    <title>Firemoon Studio Image Generator</title>
</head>
<body>
    <h1>Generate Images with AI</h1>
    <form id="generateForm">
        <input type="text" id="prompt" placeholder="Enter your prompt..." required>
        <input type="number" id="numImages" value="1" min="1" max="4">
        <button type="submit">Generate</button>
    </form>

    <div id="results"></div>

    <script>
        document.getElementById('generateForm').addEventListener('submit', async (e) => {
            e.preventDefault();

            const prompt = document.getElementById('prompt').value;
            const numImages = document.getElementById('numImages').value;

            const response = await fetch('/generate', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ prompt, num_images: parseInt(numImages) })
            });

            const result = await response.json();
            const resultsDiv = document.getElementById('results');

            if (result.success) {
                resultsDiv.innerHTML = result.images.map(img =>
                    `<img src="${img.url}" alt="Generated image" style="max-width: 300px; margin: 10px;">`
                ).join('');
            } else {
                resultsDiv.innerHTML = `<p style="color: red;">Error: ${result.error}</p>`;
            }
        });
    </script>
</body>
</html>

Django Integration

# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST
import json

client = FiremoonStudio(os.getenv('FIREMOON_API_KEY'))

@csrf_exempt
@require_POST
def generate_image_view(request):
    try:
        data = json.loads(request.body)
        prompt = data.get('prompt')
        num_images = data.get('num_images', 1)

        if not prompt:
            return JsonResponse({'error': 'Prompt is required'}, status=400)

        result = client.flux_dev(prompt, num_images=num_images)

        return JsonResponse({
            'success': True,
            'images': result.images,
            'seed': result.seed
        })

    except FiremoonAPIError as e:
        return JsonResponse(
            {'error': str(e), 'status_code': e.status_code},
            status=e.status_code
        )
    except Exception as e:
        return JsonResponse({'error': 'Internal server error'}, status=500)

# urls.py
from django.urls import path
from . import views

urlpatterns = [
    path('generate/', views.generate_image_view, name='generate_image'),
]
These examples demonstrate various patterns for integrating with the Firemoon Studio API. Choose the approach that best fits your application’s architecture and requirements.