Developer API
Code Plagiarism
Detection API
Build plagiarism detection into your apps. Check code against 20+ billion sources.
const response = await fetch('https://codequiry.com/api/v1/check', {
method: 'POST',
headers: { 'apikey': 'YOUR_API_KEY' },
body: formData
});
// Check created!
Codequiry SDK for Python
The official Python SDK for Codequiry provides both synchronous and asynchronous interfaces for seamless integration with Django, Flask, FastAPI, and data science workflows.
Open Source:
github.com/cqchecker/codequiry-python
📦 Installation
pip (Recommended)
pip install codequiry
Development Version
pip install git+https://github.com/cqchecker/codequiry-python.git
Requirements: Python 3.8+, requests, aiohttp (for async)
🚀 Quick Start
import asyncio
from codequiry import CodequiryClient
from codequiry.exceptions import CodequiryException
# Initialize client
client = CodequiryClient(
api_key='your-api-key-here',
base_url='https://codequiry.com/api/v1',
timeout=30
)
async def main():
try:
# Test connection
account = await client.get_account_info()
print(f"Connected as: {{account['name']@}}")
print(f"Checks remaining: {{account['checks_remaining']@}}")
# Run complete plagiarism check
result = await run_plagiarism_check(client)
print(f"Check completed: {{result['check_id']@}}")
except CodequiryException as e:
print(f"Error: {{e@}}")
finally:
await client.close()
async def run_plagiarism_check(client):
"""Complete workflow example"""
# 1. Create check
check = await client.create_check(
name="Python Assignment - Data Analysis",
language=14 # Python language ID
)
print(f"Created check: {{check['id']@}}")
# 2. Upload files
files = [
'student1_submission.py',
'student2_submission.py',
'student3_submission.py'
]
upload_tasks = []
for file_path in files:
if Path(file_path).exists():
task = client.upload_file(check['id'], file_path)
upload_tasks.append(task)
# Upload files concurrently
uploads = await asyncio.gather(*upload_tasks, return_exceptions=True)
successful_uploads = [u for u in uploads if not isinstance(u, Exception)]
print(f"Uploaded {{len(successful_uploads)@}} files")
# 3. Start analysis
await client.start_check(
check['id'],
web_check=True,
database_check=True,
test_type=1
)
print("Analysis started...")
# 4. Wait for completion with progress
overview = await wait_for_completion_with_progress(client, check['id'])
print(f"Analysis complete! Found {{len(overview['submissions'])@}} submissions")
# 5. Process suspicious submissions
suspicious = [
sub for sub in overview['submissions']
if float(sub['total_result']) > 50.0
]
for submission in suspicious:
print(f"⚠️ {{submission['filename']@}}: {{submission['total_result']@}}% similarity")
# Get detailed results
try:
details = await client.get_detailed_results(check['id'], submission['id'])
# Show peer matches
for match in details.get('peer_matches', []):
print(f" → Match with {{match['file_matched']@}}: {{match['similarity']@}}%")
except Exception as e:
print(f" Error getting details: {{e@}}")
return {
'check_id': check['id'],
'overview': overview,
'suspicious_count': len(suspicious)
}
async def wait_for_completion_with_progress(client, check_id, timeout=600):
"""Wait for check completion with progress updates"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
status = await client.get_check_status(check_id)
if status['status_id'] == 4: # Completed
return await client.get_overview(check_id)
print(f"Status: {{status.get('message', 'Processing...')@}}")
await asyncio.sleep(30) # Wait 30 seconds
raise TimeoutError(f"Check did not complete within {{timeout@}} seconds")
# Run the example
if __name__ == "__main__":
asyncio.run(main())
🔧 Django Integration
# settings.py
CODEQUIRY = {
'API_KEY': 'your-api-key-here',
'BASE_URL': 'https://codequiry.com/api/v1',
'TIMEOUT': 30,
}
# models.py
from django.db import models
from django.contrib.auth.models import User
class Assignment(models.Model):
title = models.CharField(max_length=200)
instructor = models.ForeignKey(User, on_delete=models.CASCADE)
language_id = models.IntegerField(default=14) # Python
codequiry_check_id = models.CharField(max_length=50, blank=True)
plagiarism_results = models.JSONField(null=True, blank=True)
status = models.CharField(max_length=20, default='pending')
created_at = models.DateTimeField(auto_now_add=True)
processed_at = models.DateTimeField(null=True, blank=True)
class Submission(models.Model):
assignment = models.ForeignKey(Assignment, on_delete=models.CASCADE)
student = models.ForeignKey(User, on_delete=models.CASCADE)
file = models.FileField(upload_to='submissions/')
uploaded_at = models.DateTimeField(auto_now_add=True)
# views.py
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
from django.conf import settings
from codequiry import CodequiryClient
import asyncio
import json
@method_decorator(csrf_exempt, name='dispatch')
class PlagiarismCheckView(View):
def __init__(self):
super().__init__()
self.client = CodequiryClient(
api_key=settings.CODEQUIRY['API_KEY'],
base_url=settings.CODEQUIRY['BASE_URL'],
timeout=settings.CODEQUIRY['TIMEOUT']
)
async def post(self, request, assignment_id):
try:
assignment = Assignment.objects.get(id=assignment_id)
submissions = Submission.objects.filter(assignment=assignment)
if not submissions.exists():
return JsonResponse({
'success': False,
'error': 'No submissions found for this assignment'
})
# Create check
check = await self.client.create_check(
name=assignment.title,
language=assignment.language_id
)
# Save check ID
assignment.codequiry_check_id = check['id']
assignment.status = 'uploading'
assignment.save()
# Upload files
upload_tasks = []
for submission in submissions:
if submission.file:
task = self.client.upload_file_from_path(
check['id'],
submission.file.path,
f"{{submission.student.username@}}_{{submission.file.name@}}"
)
upload_tasks.append(task)
uploads = await asyncio.gather(*upload_tasks, return_exceptions=True)
successful_uploads = [u for u in uploads if not isinstance(u, Exception)]
if not successful_uploads:
assignment.status = 'failed'
assignment.save()
return JsonResponse({
'success': False,
'error': 'Failed to upload any files'
})
# Start analysis in background
asyncio.create_task(self.process_check_async(assignment))
return JsonResponse({
'success': True,
'check_id': check['id'],
'uploaded_files': len(successful_uploads),
'message': 'Analysis started'
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
})
async def process_check_async(self, assignment):
"""Process check in background"""
try:
# Start analysis
await self.client.start_check(
assignment.codequiry_check_id,
web_check=True,
database_check=True
)
assignment.status = 'processing'
assignment.save()
# Wait for completion
overview = await self.wait_for_completion(assignment.codequiry_check_id)
# Save results
assignment.plagiarism_results = overview
assignment.status = 'completed'
assignment.processed_at = timezone.now()
assignment.save()
# Send notification to instructor
self.notify_instructor(assignment, overview)
except Exception as e:
assignment.status = 'failed'
assignment.save()
print(f"Check processing failed: {{e@}}")
async def wait_for_completion(self, check_id, timeout=600):
"""Wait for check completion"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
status = await self.client.get_check_status(check_id)
if status['status_id'] == 4: # Completed
return await self.client.get_overview(check_id)
await asyncio.sleep(30)
raise TimeoutError("Check timed out")
def get(self, request, assignment_id):
"""Get check status"""
try:
assignment = Assignment.objects.get(id=assignment_id)
if assignment.status == 'completed':
return JsonResponse({
'status': 'completed',
'results': assignment.plagiarism_results
})
return JsonResponse({
'status': assignment.status,
'message': 'Analysis in progress...'
})
except Assignment.DoesNotExist:
return JsonResponse({'error': 'Assignment not found'}, status=404)
# tasks.py (for Celery background processing)
from celery import shared_task
from codequiry import CodequiryClient
import asyncio
@shared_task
def process_plagiarism_check(assignment_id):
"""Celery task for background processing"""
from .models import Assignment
assignment = Assignment.objects.get(id=assignment_id)
async def run_check():
client = CodequiryClient(api_key=settings.CODEQUIRY['API_KEY'])
try:
# Get submissions and upload
submissions = assignment.submission_set.all()
upload_tasks = []
for submission in submissions:
task = client.upload_file_from_path(
assignment.codequiry_check_id,
submission.file.path
)
upload_tasks.append(task)
await asyncio.gather(*upload_tasks)
# Start and wait for completion
await client.start_check(assignment.codequiry_check_id)
# Monitor progress
while True:
status = await client.get_check_status(assignment.codequiry_check_id)
if status['status_id'] == 4:
break
await asyncio.sleep(30)
# Get and save results
overview = await client.get_overview(assignment.codequiry_check_id)
assignment.plagiarism_results = overview
assignment.status = 'completed'
assignment.save()
except Exception as e:
assignment.status = 'failed'
assignment.save()
raise
finally:
await client.close()
# Run async function
asyncio.run(run_check())
⚡ FastAPI Integration
from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from codequiry import CodequiryClient
from typing import List, Optional
import asyncio
import os
from pathlib import Path
app = FastAPI(title="Plagiarism Detection API")
# Global client instance
client = CodequiryClient(
api_key=os.getenv('CODEQUIRY_API_KEY'),
base_url='https://codequiry.com/api/v1'
)
# In-memory storage (use Redis in production)
checks_status = {}
@app.post("/plagiarism/check")
async def create_plagiarism_check(
name: str,
language: int,
files: List[UploadFile] = File(...),
web_check: bool = True,
database_check: bool = True,
background_tasks: BackgroundTasks = None
):
"""Create and start a plagiarism check"""
if not files:
raise HTTPException(status_code=400, detail="No files provided")
try:
# Create check
check = await client.create_check(name=name, language=language)
check_id = check['id']
# Store check info
checks_status[check_id] = {
'status': 'uploading',
'progress': 0,
'total_files': len(files),
'uploaded_files': 0
}
# Save uploaded files temporarily
temp_files = []
for file in files:
temp_path = f"/tmp/{{check_id@}}_{{file.filename@}}"
with open(temp_path, "wb") as buffer:
content = await file.read()
buffer.write(content)
temp_files.append((temp_path, file.filename))
# Process uploads and analysis in background
background_tasks.add_task(
process_check_background,
check_id,
temp_files,
web_check,
database_check
)
return JSONResponse({
"success": True,
"check_id": check_id,
"message": "Files uploaded. Analysis started.",
"files_count": len(files)
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
async def process_check_background(
check_id: str,
temp_files: List[tuple],
web_check: bool,
database_check: bool
):
"""Background task to process plagiarism check"""
try:
# Upload files
upload_tasks = []
for temp_path, original_name in temp_files:
task = client.upload_file_from_path(check_id, temp_path, original_name)
upload_tasks.append(task)
# Track upload progress
for i, task in enumerate(asyncio.as_completed(upload_tasks)):
try:
await task
checks_status[check_id]['uploaded_files'] = i + 1
checks_status[check_id]['progress'] = (i + 1) / len(temp_files) * 50 # 50% for uploads
except Exception as e:
print(f"Upload failed: {{e@}}")
# Clean up temp files
for temp_path, _ in temp_files:
try:
os.remove(temp_path)
except:
pass
# Start analysis
await client.start_check(
check_id,
web_check=web_check,
database_check=database_check
)
checks_status[check_id]['status'] = 'processing'
checks_status[check_id]['progress'] = 60
# Wait for completion
while True:
status = await client.get_check_status(check_id)
if status['status_id'] == 4: # Completed
overview = await client.get_overview(check_id)
checks_status[check_id].update({
'status': 'completed',
'progress': 100,
'results': overview
})
break
checks_status[check_id]['progress'] = min(95, checks_status[check_id]['progress'] + 5)
await asyncio.sleep(30)
except Exception as e:
checks_status[check_id] = {
'status': 'failed',
'error': str(e),
'progress': 0
}
@app.get("/plagiarism/status/{check_id}")
async def get_check_status(check_id: str):
"""Get the status of a plagiarism check"""
if check_id not in checks_status:
raise HTTPException(status_code=404, detail="Check not found")
return checks_status[check_id]
@app.get("/plagiarism/results/{check_id}")
async def get_check_results(check_id: str):
"""Get detailed results for a completed check"""
if check_id not in checks_status:
raise HTTPException(status_code=404, detail="Check not found")
check_info = checks_status[check_id]
if check_info['status'] != 'completed':
raise HTTPException(status_code=400, detail="Check not completed yet")
return check_info['results']
@app.get("/plagiarism/detailed/{check_id}/{submission_id}")
async def get_detailed_results(check_id: str, submission_id: str):
"""Get detailed results for a specific submission"""
try:
details = await client.get_detailed_results(check_id, submission_id)
return details
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Data analysis endpoint
@app.post("/plagiarism/analyze")
async def analyze_results(check_id: str):
"""Analyze plagiarism results with statistical insights"""
try:
overview = await client.get_overview(check_id)
# Calculate statistics
similarities = [float(sub['total_result']) for sub in overview['submissions']]
analysis = {
'total_submissions': len(similarities),
'average_similarity': sum(similarities) / len(similarities) if similarities else 0,
'max_similarity': max(similarities) if similarities else 0,
'min_similarity': min(similarities) if similarities else 0,
'high_risk_count': len([s for s in similarities if s > 70]),
'medium_risk_count': len([s for s in similarities if 30 < s <= 70]),
'low_risk_count': len([s for s in similarities if s <= 30]),
'distribution': {
'high_risk': [sub for sub in overview['submissions'] if float(sub['total_result']) > 70],
'medium_risk': [sub for sub in overview['submissions'] if 30 < float(sub['total_result']) <= 70],
'low_risk': [sub for sub in overview['submissions'] if float(sub['total_result']) <= 30]
}
}
return analysis
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.on_event("shutdown")
async def shutdown_event():
"""Clean up resources on shutdown"""
await client.close()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
📊 Data Science Integration
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from codequiry import CodequiryClient
import asyncio
from pathlib import Path
class PlagiarismAnalyzer:
"""Data science tools for plagiarism analysis"""
def __init__(self, api_key):
self.client = CodequiryClient(api_key=api_key)
async def analyze_assignment_batch(self, assignments_dir: Path, language_id: int = 14):
"""Analyze multiple assignments and generate comprehensive report"""
results = []
for assignment_folder in assignments_dir.iterdir():
if not assignment_folder.is_dir():
continue
print(f"Processing {{assignment_folder.name@}}...")
try:
# Get all Python files in the assignment folder
files = list(assignment_folder.glob("*.py"))
if not files:
continue
# Create check
check = await self.client.create_check(
name=assignment_folder.name,
language=language_id
)
# Upload files
upload_tasks = [
self.client.upload_file(check['id'], str(file))
for file in files
]
await asyncio.gather(*upload_tasks)
# Start analysis
await self.client.start_check(check['id'], web_check=True, database_check=True)
# Wait for completion
overview = await self.wait_for_completion(check['id'])
# Extract data for analysis
for submission in overview['submissions']:
results.append({
'assignment': assignment_folder.name,
'filename': submission['filename'],
'total_similarity': float(submission['total_result']),
'web_similarity': float(submission.get('result1', 0)),
'peer_similarity': float(submission.get('result2', 0)),
'database_similarity': float(submission.get('result3', 0)),
'check_id': check['id'],
'submission_id': submission['id']
})
except Exception as e:
print(f"Error processing {{assignment_folder.name@}}: {{e@}}")
return pd.DataFrame(results)
async def wait_for_completion(self, check_id, timeout=600):
"""Wait for check completion"""
import time
start_time = time.time()
while time.time() - start_time < timeout:
status = await self.client.get_check_status(check_id)
if status['status_id'] == 4:
return await self.client.get_overview(check_id)
await asyncio.sleep(30)
raise TimeoutError("Check timed out")
def generate_report(self, df: pd.DataFrame, output_dir: Path = None):
"""Generate comprehensive plagiarism analysis report"""
if output_dir is None:
output_dir = Path("plagiarism_report")
output_dir.mkdir(exist_ok=True)
# Basic statistics
stats = {
'total_submissions': len(df),
'assignments_analyzed': df['assignment'].nunique(),
'average_similarity': df['total_similarity'].mean(),
'median_similarity': df['total_similarity'].median(),
'std_similarity': df['total_similarity'].std(),
'high_risk_submissions': len(df[df['total_similarity'] > 70]),
'medium_risk_submissions': len(df[(df['total_similarity'] > 30) & (df['total_similarity'] <= 70)]),
'low_risk_submissions': len(df[df['total_similarity'] <= 30])
}
# Save statistics
with open(output_dir / "statistics.json", "w") as f:
import json
json.dump(stats, f, indent=2)
# Create visualizations
plt.style.use('seaborn')
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# 1. Distribution of similarity scores
axes[0, 0].hist(df['total_similarity'], bins=20, alpha=0.7, color='skyblue', edgecolor='black')
axes[0, 0].set_title('Distribution of Similarity Scores')
axes[0, 0].set_xlabel('Similarity Percentage')
axes[0, 0].set_ylabel('Frequency')
axes[0, 0].axvline(df['total_similarity'].mean(), color='red', linestyle='--', label=f'Mean: {df["total_similarity"].mean():.1f}%')
axes[0, 0].legend()
# 2. Risk categories
risk_counts = [stats['low_risk_submissions'], stats['medium_risk_submissions'], stats['high_risk_submissions']]
risk_labels = ['Low Risk (≤30%)', 'Medium Risk (30-70%)', 'High Risk (>70%)']
colors = ['green', 'orange', 'red']
axes[0, 1].pie(risk_counts, labels=risk_labels, colors=colors, autopct='%1.1f%%', startangle=90)
axes[0, 1].set_title('Risk Categories Distribution')
# 3. Similarity by assignment
if df['assignment'].nunique() > 1:
df.boxplot(column='total_similarity', by='assignment', ax=axes[1, 0])
axes[1, 0].set_title('Similarity Distribution by Assignment')
axes[1, 0].set_xlabel('Assignment')
axes[1, 0].set_ylabel('Similarity Percentage')
plt.suptitle('') # Remove automatic title
# 4. Correlation matrix
corr_data = df[['total_similarity', 'web_similarity', 'peer_similarity', 'database_similarity']].corr()
sns.heatmap(corr_data, annot=True, cmap='coolwarm', center=0, ax=axes[1, 1])
axes[1, 1].set_title('Similarity Type Correlations')
plt.tight_layout()
plt.savefig(output_dir / "analysis_charts.png", dpi=300, bbox_inches='tight')
plt.close()
# Generate detailed CSV report
detailed_report = df.copy()
detailed_report['risk_category'] = pd.cut(
detailed_report['total_similarity'],
bins=[0, 30, 70, 100],
labels=['Low', 'Medium', 'High']
)
detailed_report.to_csv(output_dir / "detailed_report.csv", index=False)
# Generate summary HTML report
html_report = f"""
Plagiarism Analysis Report
Plagiarism Analysis Report
Summary Statistics
Total Submissions Analyzed: {{stats['total_submissions']@}}
Assignments Analyzed: {{stats['assignments_analyzed']@}}
Average Similarity: {{stats['average_similarity']:.2f@}}%
Median Similarity: {{stats['median_similarity']:.2f@}}%
Risk Assessment
High Risk Submissions (>70%): {{stats['high_risk_submissions']@}}
Medium Risk Submissions (30-70%): {{stats['medium_risk_submissions']@}}
Low Risk Submissions (≤30%): {{stats['low_risk_submissions']@}}
Visualizations
High Risk Submissions
Assignment
Filename
Similarity
"""
high_risk = df[df['total_similarity'] > 70].sort_values('total_similarity', ascending=False)
for _, row in high_risk.iterrows():
html_report += f"""
{{row['assignment']@}}
{{row['filename']@}}
{{row['total_similarity']:.2f@}}%
"""
html_report += """
"""
with open(output_dir / "report.html", "w") as f:
f.write(html_report)
print(f"Report generated in {{output_dir@}}")
return stats
async def close(self):
"""Close the client connection"""
await self.client.close()
# Usage example
async def main():
analyzer = PlagiarismAnalyzer(api_key="your-api-key-here")
try:
# Analyze all assignments in a directory
assignments_dir = Path("./student_assignments")
df = await analyzer.analyze_assignment_batch(assignments_dir, language_id=14) # Python
# Generate comprehensive report
stats = analyzer.generate_report(df)
print("Analysis complete!")
print(f"Processed {{stats['total_submissions']@}} submissions")
print(f"Found {{stats['high_risk_submissions']@}} high-risk submissions")
finally:
await analyzer.close()
# Run the analysis
if __name__ == "__main__":
asyncio.run(main())