Developer API

Code Plagiarism
Detection API

Build plagiarism detection into your apps. Check code against 20+ billion sources.

const response = await fetch('https://codequiry.com/api/v1/check', { method: 'POST', headers: { 'apikey': 'YOUR_API_KEY' }, body: formData }); // Check created!

Codequiry SDK for Python

The official Python SDK for Codequiry provides both synchronous and asynchronous interfaces for seamless integration with Django, Flask, FastAPI, and data science workflows.

📦 Installation

pip (Recommended)
                        pip install codequiry
                        
pip
Development Version
                        pip install git+https://github.com/cqchecker/codequiry-python.git
                        
Git
Requirements: Python 3.8+, requests, aiohttp (for async)

🚀 Quick Start

        import asyncio
from codequiry import CodequiryClient
from codequiry.exceptions import CodequiryException

# Initialize client
client = CodequiryClient(
    api_key='your-api-key-here',
    base_url='https://codequiry.com/api/v1',
    timeout=30
)

async def main():
    try:
        # Test connection
        account = await client.get_account_info()
        print(f"Connected as: {{account['name']@}}")
        print(f"Checks remaining: {{account['checks_remaining']@}}")
        
        # Run complete plagiarism check
        result = await run_plagiarism_check(client)
        print(f"Check completed: {{result['check_id']@}}")
        
    except CodequiryException as e:
        print(f"Error: {{e@}}")
    finally:
        await client.close()

async def run_plagiarism_check(client):
    """Complete workflow example"""
    
    # 1. Create check
    check = await client.create_check(
        name="Python Assignment - Data Analysis",
        language=14  # Python language ID
    )
    
    print(f"Created check: {{check['id']@}}")
    
    # 2. Upload files
    files = [
        'student1_submission.py',
        'student2_submission.py', 
        'student3_submission.py'
    ]
    
    upload_tasks = []
    for file_path in files:
        if Path(file_path).exists():
            task = client.upload_file(check['id'], file_path)
            upload_tasks.append(task)
    
    # Upload files concurrently
    uploads = await asyncio.gather(*upload_tasks, return_exceptions=True)
    
    successful_uploads = [u for u in uploads if not isinstance(u, Exception)]
    print(f"Uploaded {{len(successful_uploads)@}} files")
    
    # 3. Start analysis
    await client.start_check(
        check['id'],
        web_check=True,
        database_check=True,
        test_type=1
    )
    
    print("Analysis started...")
    
    # 4. Wait for completion with progress
    overview = await wait_for_completion_with_progress(client, check['id'])
    
    print(f"Analysis complete! Found {{len(overview['submissions'])@}} submissions")
    
    # 5. Process suspicious submissions
    suspicious = [
        sub for sub in overview['submissions'] 
        if float(sub['total_result']) > 50.0
    ]
    
    for submission in suspicious:
        print(f"⚠️  {{submission['filename']@}}: {{submission['total_result']@}}% similarity")
        
        # Get detailed results
        try:
            details = await client.get_detailed_results(check['id'], submission['id'])
            
            # Show peer matches
            for match in details.get('peer_matches', []):
                print(f"  → Match with {{match['file_matched']@}}: {{match['similarity']@}}%")
                
        except Exception as e:
            print(f"  Error getting details: {{e@}}")
    
    return {
        'check_id': check['id'],
        'overview': overview,
        'suspicious_count': len(suspicious)
    }

async def wait_for_completion_with_progress(client, check_id, timeout=600):
    """Wait for check completion with progress updates"""
    import time
    
    start_time = time.time()
    
    while time.time() - start_time < timeout:
        status = await client.get_check_status(check_id)
        
        if status['status_id'] == 4:  # Completed
            return await client.get_overview(check_id)
        
        print(f"Status: {{status.get('message', 'Processing...')@}}")
        await asyncio.sleep(30)  # Wait 30 seconds
    
    raise TimeoutError(f"Check did not complete within {{timeout@}} seconds")

# Run the example
if __name__ == "__main__":
    asyncio.run(main())
        
Complete Example

🔧 Django Integration

        # settings.py
CODEQUIRY = {
    'API_KEY': 'your-api-key-here',
    'BASE_URL': 'https://codequiry.com/api/v1',
    'TIMEOUT': 30,
}

# models.py
from django.db import models
from django.contrib.auth.models import User

class Assignment(models.Model):
    title = models.CharField(max_length=200)
    instructor = models.ForeignKey(User, on_delete=models.CASCADE)
    language_id = models.IntegerField(default=14)  # Python
    codequiry_check_id = models.CharField(max_length=50, blank=True)
    plagiarism_results = models.JSONField(null=True, blank=True)
    status = models.CharField(max_length=20, default='pending')
    created_at = models.DateTimeField(auto_now_add=True)
    processed_at = models.DateTimeField(null=True, blank=True)

class Submission(models.Model):
    assignment = models.ForeignKey(Assignment, on_delete=models.CASCADE)
    student = models.ForeignKey(User, on_delete=models.CASCADE)
    file = models.FileField(upload_to='submissions/')
    uploaded_at = models.DateTimeField(auto_now_add=True)

# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
from django.conf import settings
from codequiry import CodequiryClient
import asyncio
import json

@method_decorator(csrf_exempt, name='dispatch')
class PlagiarismCheckView(View):
    
    def __init__(self):
        super().__init__()
        self.client = CodequiryClient(
            api_key=settings.CODEQUIRY['API_KEY'],
            base_url=settings.CODEQUIRY['BASE_URL'],
            timeout=settings.CODEQUIRY['TIMEOUT']
        )
    
    async def post(self, request, assignment_id):
        try:
            assignment = Assignment.objects.get(id=assignment_id)
            submissions = Submission.objects.filter(assignment=assignment)
            
            if not submissions.exists():
                return JsonResponse({
                    'success': False,
                    'error': 'No submissions found for this assignment'
                })
            
            # Create check
            check = await self.client.create_check(
                name=assignment.title,
                language=assignment.language_id
            )
            
            # Save check ID
            assignment.codequiry_check_id = check['id']
            assignment.status = 'uploading'
            assignment.save()
            
            # Upload files
            upload_tasks = []
            for submission in submissions:
                if submission.file:
                    task = self.client.upload_file_from_path(
                        check['id'], 
                        submission.file.path,
                        f"{{submission.student.username@}}_{{submission.file.name@}}"
                    )
                    upload_tasks.append(task)
            
            uploads = await asyncio.gather(*upload_tasks, return_exceptions=True)
            successful_uploads = [u for u in uploads if not isinstance(u, Exception)]
            
            if not successful_uploads:
                assignment.status = 'failed'
                assignment.save()
                return JsonResponse({
                    'success': False,
                    'error': 'Failed to upload any files'
                })
            
            # Start analysis in background
            asyncio.create_task(self.process_check_async(assignment))
            
            return JsonResponse({
                'success': True,
                'check_id': check['id'],
                'uploaded_files': len(successful_uploads),
                'message': 'Analysis started'
            })
            
        except Exception as e:
            return JsonResponse({
                'success': False,
                'error': str(e)
            })
    
    async def process_check_async(self, assignment):
        """Process check in background"""
        try:
            # Start analysis
            await self.client.start_check(
                assignment.codequiry_check_id,
                web_check=True,
                database_check=True
            )
            
            assignment.status = 'processing'
            assignment.save()
            
            # Wait for completion
            overview = await self.wait_for_completion(assignment.codequiry_check_id)
            
            # Save results
            assignment.plagiarism_results = overview
            assignment.status = 'completed'
            assignment.processed_at = timezone.now()
            assignment.save()
            
            # Send notification to instructor
            self.notify_instructor(assignment, overview)
            
        except Exception as e:
            assignment.status = 'failed'
            assignment.save()
            print(f"Check processing failed: {{e@}}")
    
    async def wait_for_completion(self, check_id, timeout=600):
        """Wait for check completion"""
        import time
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            status = await self.client.get_check_status(check_id)
            
            if status['status_id'] == 4:  # Completed
                return await self.client.get_overview(check_id)
            
            await asyncio.sleep(30)
        
        raise TimeoutError("Check timed out")
    
    def get(self, request, assignment_id):
        """Get check status"""
        try:
            assignment = Assignment.objects.get(id=assignment_id)
            
            if assignment.status == 'completed':
                return JsonResponse({
                    'status': 'completed',
                    'results': assignment.plagiarism_results
                })
            
            return JsonResponse({
                'status': assignment.status,
                'message': 'Analysis in progress...'
            })
            
        except Assignment.DoesNotExist:
            return JsonResponse({'error': 'Assignment not found'}, status=404)

# tasks.py (for Celery background processing)
from celery import shared_task
from codequiry import CodequiryClient
import asyncio

@shared_task
def process_plagiarism_check(assignment_id):
    """Celery task for background processing"""
    from .models import Assignment
    
    assignment = Assignment.objects.get(id=assignment_id)
    
    async def run_check():
        client = CodequiryClient(api_key=settings.CODEQUIRY['API_KEY'])
        
        try:
            # Get submissions and upload
            submissions = assignment.submission_set.all()
            
            upload_tasks = []
            for submission in submissions:
                task = client.upload_file_from_path(
                    assignment.codequiry_check_id,
                    submission.file.path
                )
                upload_tasks.append(task)
            
            await asyncio.gather(*upload_tasks)
            
            # Start and wait for completion
            await client.start_check(assignment.codequiry_check_id)
            
            # Monitor progress
            while True:
                status = await client.get_check_status(assignment.codequiry_check_id)
                if status['status_id'] == 4:
                    break
                await asyncio.sleep(30)
            
            # Get and save results
            overview = await client.get_overview(assignment.codequiry_check_id)
            
            assignment.plagiarism_results = overview
            assignment.status = 'completed'
            assignment.save()
            
        except Exception as e:
            assignment.status = 'failed'
            assignment.save()
            raise
        finally:
            await client.close()
    
    # Run async function
    asyncio.run(run_check())
        
Django Integration

⚡ FastAPI Integration

        from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from codequiry import CodequiryClient
from typing import List, Optional
import asyncio
import os
from pathlib import Path

app = FastAPI(title="Plagiarism Detection API")

# Global client instance
client = CodequiryClient(
    api_key=os.getenv('CODEQUIRY_API_KEY'),
    base_url='https://codequiry.com/api/v1'
)

# In-memory storage (use Redis in production)
checks_status = {}

@app.post("/plagiarism/check")
async def create_plagiarism_check(
    name: str,
    language: int,
    files: List[UploadFile] = File(...),
    web_check: bool = True,
    database_check: bool = True,
    background_tasks: BackgroundTasks = None
):
    """Create and start a plagiarism check"""
    
    if not files:
        raise HTTPException(status_code=400, detail="No files provided")
    
    try:
        # Create check
        check = await client.create_check(name=name, language=language)
        check_id = check['id']
        
        # Store check info
        checks_status[check_id] = {
            'status': 'uploading',
            'progress': 0,
            'total_files': len(files),
            'uploaded_files': 0
        }
        
        # Save uploaded files temporarily
        temp_files = []
        for file in files:
            temp_path = f"/tmp/{{check_id@}}_{{file.filename@}}"
            with open(temp_path, "wb") as buffer:
                content = await file.read()
                buffer.write(content)
            temp_files.append((temp_path, file.filename))
        
        # Process uploads and analysis in background
        background_tasks.add_task(
            process_check_background,
            check_id,
            temp_files,
            web_check,
            database_check
        )
        
        return JSONResponse({
            "success": True,
            "check_id": check_id,
            "message": "Files uploaded. Analysis started.",
            "files_count": len(files)
        })
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

async def process_check_background(
    check_id: str,
    temp_files: List[tuple],
    web_check: bool,
    database_check: bool
):
    """Background task to process plagiarism check"""
    
    try:
        # Upload files
        upload_tasks = []
        for temp_path, original_name in temp_files:
            task = client.upload_file_from_path(check_id, temp_path, original_name)
            upload_tasks.append(task)
        
        # Track upload progress
        for i, task in enumerate(asyncio.as_completed(upload_tasks)):
            try:
                await task
                checks_status[check_id]['uploaded_files'] = i + 1
                checks_status[check_id]['progress'] = (i + 1) / len(temp_files) * 50  # 50% for uploads
            except Exception as e:
                print(f"Upload failed: {{e@}}")
        
        # Clean up temp files
        for temp_path, _ in temp_files:
            try:
                os.remove(temp_path)
            except:
                pass
        
        # Start analysis
        await client.start_check(
            check_id,
            web_check=web_check,
            database_check=database_check
        )
        
        checks_status[check_id]['status'] = 'processing'
        checks_status[check_id]['progress'] = 60
        
        # Wait for completion
        while True:
            status = await client.get_check_status(check_id)
            
            if status['status_id'] == 4:  # Completed
                overview = await client.get_overview(check_id)
                
                checks_status[check_id].update({
                    'status': 'completed',
                    'progress': 100,
                    'results': overview
                })
                break
            
            checks_status[check_id]['progress'] = min(95, checks_status[check_id]['progress'] + 5)
            await asyncio.sleep(30)
            
    except Exception as e:
        checks_status[check_id] = {
            'status': 'failed',
            'error': str(e),
            'progress': 0
        }

@app.get("/plagiarism/status/{check_id}")
async def get_check_status(check_id: str):
    """Get the status of a plagiarism check"""
    
    if check_id not in checks_status:
        raise HTTPException(status_code=404, detail="Check not found")
    
    return checks_status[check_id]

@app.get("/plagiarism/results/{check_id}")
async def get_check_results(check_id: str):
    """Get detailed results for a completed check"""
    
    if check_id not in checks_status:
        raise HTTPException(status_code=404, detail="Check not found")
    
    check_info = checks_status[check_id]
    
    if check_info['status'] != 'completed':
        raise HTTPException(status_code=400, detail="Check not completed yet")
    
    return check_info['results']

@app.get("/plagiarism/detailed/{check_id}/{submission_id}")
async def get_detailed_results(check_id: str, submission_id: str):
    """Get detailed results for a specific submission"""
    
    try:
        details = await client.get_detailed_results(check_id, submission_id)
        return details
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Data analysis endpoint
@app.post("/plagiarism/analyze")
async def analyze_results(check_id: str):
    """Analyze plagiarism results with statistical insights"""
    
    try:
        overview = await client.get_overview(check_id)
        
        # Calculate statistics
        similarities = [float(sub['total_result']) for sub in overview['submissions']]
        
        analysis = {
            'total_submissions': len(similarities),
            'average_similarity': sum(similarities) / len(similarities) if similarities else 0,
            'max_similarity': max(similarities) if similarities else 0,
            'min_similarity': min(similarities) if similarities else 0,
            'high_risk_count': len([s for s in similarities if s > 70]),
            'medium_risk_count': len([s for s in similarities if 30 < s <= 70]),
            'low_risk_count': len([s for s in similarities if s <= 30]),
            'distribution': {
                'high_risk': [sub for sub in overview['submissions'] if float(sub['total_result']) > 70],
                'medium_risk': [sub for sub in overview['submissions'] if 30 < float(sub['total_result']) <= 70],
                'low_risk': [sub for sub in overview['submissions'] if float(sub['total_result']) <= 30]
            }
        }
        
        return analysis
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.on_event("shutdown")
async def shutdown_event():
    """Clean up resources on shutdown"""
    await client.close()

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
        
FastAPI Integration

📊 Data Science Integration

        import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from codequiry import CodequiryClient
import asyncio
from pathlib import Path

class PlagiarismAnalyzer:
    """Data science tools for plagiarism analysis"""
    
    def __init__(self, api_key):
        self.client = CodequiryClient(api_key=api_key)
    
    async def analyze_assignment_batch(self, assignments_dir: Path, language_id: int = 14):
        """Analyze multiple assignments and generate comprehensive report"""
        
        results = []
        
        for assignment_folder in assignments_dir.iterdir():
            if not assignment_folder.is_dir():
                continue
                
            print(f"Processing {{assignment_folder.name@}}...")
            
            try:
                # Get all Python files in the assignment folder
                files = list(assignment_folder.glob("*.py"))
                
                if not files:
                    continue
                
                # Create check
                check = await self.client.create_check(
                    name=assignment_folder.name,
                    language=language_id
                )
                
                # Upload files
                upload_tasks = [
                    self.client.upload_file(check['id'], str(file))
                    for file in files
                ]
                await asyncio.gather(*upload_tasks)
                
                # Start analysis
                await self.client.start_check(check['id'], web_check=True, database_check=True)
                
                # Wait for completion
                overview = await self.wait_for_completion(check['id'])
                
                # Extract data for analysis
                for submission in overview['submissions']:
                    results.append({
                        'assignment': assignment_folder.name,
                        'filename': submission['filename'],
                        'total_similarity': float(submission['total_result']),
                        'web_similarity': float(submission.get('result1', 0)),
                        'peer_similarity': float(submission.get('result2', 0)),
                        'database_similarity': float(submission.get('result3', 0)),
                        'check_id': check['id'],
                        'submission_id': submission['id']
                    })
                
            except Exception as e:
                print(f"Error processing {{assignment_folder.name@}}: {{e@}}")
        
        return pd.DataFrame(results)
    
    async def wait_for_completion(self, check_id, timeout=600):
        """Wait for check completion"""
        import time
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            status = await self.client.get_check_status(check_id)
            if status['status_id'] == 4:
                return await self.client.get_overview(check_id)
            await asyncio.sleep(30)
        
        raise TimeoutError("Check timed out")
    
    def generate_report(self, df: pd.DataFrame, output_dir: Path = None):
        """Generate comprehensive plagiarism analysis report"""
        
        if output_dir is None:
            output_dir = Path("plagiarism_report")
        
        output_dir.mkdir(exist_ok=True)
        
        # Basic statistics
        stats = {
            'total_submissions': len(df),
            'assignments_analyzed': df['assignment'].nunique(),
            'average_similarity': df['total_similarity'].mean(),
            'median_similarity': df['total_similarity'].median(),
            'std_similarity': df['total_similarity'].std(),
            'high_risk_submissions': len(df[df['total_similarity'] > 70]),
            'medium_risk_submissions': len(df[(df['total_similarity'] > 30) & (df['total_similarity'] <= 70)]),
            'low_risk_submissions': len(df[df['total_similarity'] <= 30])
        }
        
        # Save statistics
        with open(output_dir / "statistics.json", "w") as f:
            import json
            json.dump(stats, f, indent=2)
        
        # Create visualizations
        plt.style.use('seaborn')
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # 1. Distribution of similarity scores
        axes[0, 0].hist(df['total_similarity'], bins=20, alpha=0.7, color='skyblue', edgecolor='black')
        axes[0, 0].set_title('Distribution of Similarity Scores')
        axes[0, 0].set_xlabel('Similarity Percentage')
        axes[0, 0].set_ylabel('Frequency')
        axes[0, 0].axvline(df['total_similarity'].mean(), color='red', linestyle='--', label=f'Mean: {df["total_similarity"].mean():.1f}%')
        axes[0, 0].legend()
        
        # 2. Risk categories
        risk_counts = [stats['low_risk_submissions'], stats['medium_risk_submissions'], stats['high_risk_submissions']]
        risk_labels = ['Low Risk (≤30%)', 'Medium Risk (30-70%)', 'High Risk (>70%)']
        colors = ['green', 'orange', 'red']
        
        axes[0, 1].pie(risk_counts, labels=risk_labels, colors=colors, autopct='%1.1f%%', startangle=90)
        axes[0, 1].set_title('Risk Categories Distribution')
        
        # 3. Similarity by assignment
        if df['assignment'].nunique() > 1:
            df.boxplot(column='total_similarity', by='assignment', ax=axes[1, 0])
            axes[1, 0].set_title('Similarity Distribution by Assignment')
            axes[1, 0].set_xlabel('Assignment')
            axes[1, 0].set_ylabel('Similarity Percentage')
            plt.suptitle('')  # Remove automatic title
        
        # 4. Correlation matrix
        corr_data = df[['total_similarity', 'web_similarity', 'peer_similarity', 'database_similarity']].corr()
        sns.heatmap(corr_data, annot=True, cmap='coolwarm', center=0, ax=axes[1, 1])
        axes[1, 1].set_title('Similarity Type Correlations')
        
        plt.tight_layout()
        plt.savefig(output_dir / "analysis_charts.png", dpi=300, bbox_inches='tight')
        plt.close()
        
        # Generate detailed CSV report
        detailed_report = df.copy()
        detailed_report['risk_category'] = pd.cut(
            detailed_report['total_similarity'],
            bins=[0, 30, 70, 100],
            labels=['Low', 'Medium', 'High']
        )
        
        detailed_report.to_csv(output_dir / "detailed_report.csv", index=False)
        
        # Generate summary HTML report
        html_report = f"""
        
        
        
            Plagiarism Analysis Report
            
        
        
            

Plagiarism Analysis Report

Summary Statistics

Total Submissions Analyzed: {{stats['total_submissions']@}}
Assignments Analyzed: {{stats['assignments_analyzed']@}}
Average Similarity: {{stats['average_similarity']:.2f@}}%
Median Similarity: {{stats['median_similarity']:.2f@}}%

Risk Assessment

High Risk Submissions (>70%): {{stats['high_risk_submissions']@}}
Medium Risk Submissions (30-70%): {{stats['medium_risk_submissions']@}}
Low Risk Submissions (≤30%): {{stats['low_risk_submissions']@}}

Visualizations

Analysis Charts

High Risk Submissions

""" high_risk = df[df['total_similarity'] > 70].sort_values('total_similarity', ascending=False) for _, row in high_risk.iterrows(): html_report += f""" """ html_report += """
Assignment Filename Similarity
{{row['assignment']@}} {{row['filename']@}} {{row['total_similarity']:.2f@}}%
""" with open(output_dir / "report.html", "w") as f: f.write(html_report) print(f"Report generated in {{output_dir@}}") return stats async def close(self): """Close the client connection""" await self.client.close() # Usage example async def main(): analyzer = PlagiarismAnalyzer(api_key="your-api-key-here") try: # Analyze all assignments in a directory assignments_dir = Path("./student_assignments") df = await analyzer.analyze_assignment_batch(assignments_dir, language_id=14) # Python # Generate comprehensive report stats = analyzer.generate_report(df) print("Analysis complete!") print(f"Processed {{stats['total_submissions']@}} submissions") print(f"Found {{stats['high_risk_submissions']@}} high-risk submissions") finally: await analyzer.close() # Run the analysis if __name__ == "__main__": asyncio.run(main())
Data Science Tools