Add more netcode

This commit is contained in:
Flare Microsystems
2024-11-13 08:49:21 -08:00
parent a2a5322835
commit 16b66b6094
10 changed files with 369 additions and 113 deletions

View File

@@ -2,15 +2,39 @@ package com.flaremicro.crossjeeves;
import java.util.List;
import com.flaremicro.crossjeeves.net.ClientHandler;
import com.flaremicro.crossjeeves.net.ErrorCodes;
public class CrossJeevesClient {
private List<AgentInfo> agentList;
private String script;
//TODO Make configurable
private static final int MAX_ATTEMPTS = 2;
public CrossJeevesClient(List<AgentInfo> agentList, String script) {
// TODO Auto-generated constructor stub
this.agentList = agentList;
this.script = script;
}
public void beginJob() {
// TODO Auto-generated method stub
for(int i = 0; i < MAX_ATTEMPTS; i++)
{
for(AgentInfo agent : agentList)
{
int exitCode = ClientHandler.connect(agent.addr, agent.port, script);
if(ErrorCodes.getErrorCode(exitCode) == null || ErrorCodes.getErrorCode(exitCode).isTerminal)
{
System.out.println("Recieved terminal exit code " + exitCode);
System.exit(exitCode);
}
else if(exitCode == ErrorCodes.OK.id)
{
System.out.println("Job Completed!");
System.exit(0);
}
}
}
}
}

View File

@@ -1,14 +1,61 @@
package com.flaremicro.crossjeeves;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import com.flaremicro.crossjeeves.net.NetworkHandler;
import com.flaremicro.crossjeeves.net.ServerHandler;
import com.flaremicro.util.Util;
import static com.flaremicro.crossjeeves.net.ErrorCodes.*;
public class CrossJeevesHost {
int port;
boolean running = false;
List<NetworkHandler> connections = Collections.synchronizedList(new ArrayList<NetworkHandler>());
public CrossJeevesHost(int port) {
// TODO Auto-generated constructor stub
this.port = port;
}
public void startHosting() {
// TODO Auto-generated method stub
public void endConnections()
{
while(connections.size() > 0)
{
connections.remove(0).disconnect(AGENT_TERMINATED.id, "This agent has terminated");
}
}
public void removeConnection(NetworkHandler netHandler)
{
connections.remove(netHandler);
}
public void startHosting() throws IOException {
running = true;
ServerSocket server = new ServerSocket(port);
while (running)
{
Socket sock = null;
try
{
sock = server.accept();
//TODO properties!
ServerHandler handler = new ServerHandler(sock, this, new Properties());
connections.add(handler);
}
catch (IOException ex)
{
ex.printStackTrace();
Util.cleanClose(sock);
}
}
}
}

View File

@@ -1,5 +1,6 @@
package com.flaremicro.crossjeeves;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@@ -15,10 +16,19 @@ public class CrossJeevesMain {
{
try
{
int port = Integer.parseInt(parsedArgs.get("host"));
int port = 10801;
if(parsedArgs.get("host").length() != 0)
port = Integer.parseInt(parsedArgs.get("host"));
CrossJeevesHost host = new CrossJeevesHost(port);
try
{
host.startHosting();
}
catch (IOException e)
{
e.printStackTrace();
}
}
catch (NumberFormatException ex)
{
System.out.println("Invalid port specified: " + parsedArgs.get("host"));

View File

@@ -20,9 +20,11 @@ import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
import com.flaremicro.crossjeeves.net.ErrorCodes;
import com.flaremicro.crossjeeves.net.NetworkHandler;
import com.flaremicro.crossjeeves.net.packet.Packet;
import com.flaremicro.crossjeeves.net.packet.Packet3Clone;
import com.flaremicro.util.ZipUtils;
public class ScriptProcessor {
private Properties properties;
@@ -31,10 +33,13 @@ public class ScriptProcessor {
private NetworkHandler netHandler;
private Random random = new Random();
private Thread executionThread = null;
private File workspace = null;
private boolean terminated = false;
public ScriptProcessor(NetworkHandler netHandler, Properties properties) {
public ScriptProcessor(NetworkHandler netHandler, File workspace, Properties properties) {
this.netHandler = netHandler;
this.properties = properties;
this.workspace = workspace;
}
public String requireAttribute(Element e, String attribute) throws ScriptProcessingException {
@@ -60,6 +65,8 @@ public class ScriptProcessor {
}
public void processScript(String script) throws ScriptProcessingException {
if(terminated)
throw new ScriptProcessingException("Processor has been terminated");
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
try
{
@@ -71,6 +78,8 @@ public class ScriptProcessor {
for (int i = 0; i < nodeList.getLength(); i++)
{
if(terminated)
throw new ScriptProcessingException("Processor has been terminated");
Node node = nodeList.item(i);
Element e = getElement(node);
if (e == null)
@@ -121,7 +130,7 @@ public class ScriptProcessor {
}
catch (Exception ex)
{
throw new ScriptProcessingException("Script threw an exception during processing" ,ex);
}
}
@@ -158,6 +167,8 @@ public class ScriptProcessor {
File file = netHandler.waitForFile(id);
if (file == null)
throw new ScriptProcessingException("File failed to transfer");
if(!ZipUtils.unzipDirectory(file, workspace));
throw new ScriptProcessingException("File failed to decompress");
}
public File writeScriptToTempFile(String script, String type) throws ScriptProcessingException {
@@ -259,7 +270,7 @@ public class ScriptProcessor {
}
catch (ScriptProcessingException e)
{
//TODO Disconnect
netHandler.disconnect(ErrorCodes.SCRIPT_ERROR.id, e.toString());
e.printStackTrace();
}
finally
@@ -271,4 +282,14 @@ public class ScriptProcessor {
executionThread.start();
}
}
public void terminate() {
terminated = true;
if(runningProcess != null)
runningProcess.destroy();
if(this.executionThread != null)
{
this.executionThread.interrupt();
}
}
}

View File

@@ -1,11 +1,12 @@
package com.flaremicro.crossjeeves.net;
import static com.flaremicro.crossjeeves.net.ErrorCodes.FILE_DOWNLOAD_FAILURE;
import static com.flaremicro.crossjeeves.net.ErrorCodes.*;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import com.flaremicro.crossjeeves.net.packet.Packet;
@@ -21,33 +22,47 @@ import com.flaremicro.util.Util;
import com.flaremicro.util.ZipUtils;
public class ClientHandler extends NetworkHandler {
private String script;
public ClientHandler(Socket socket) throws IOException {
public ClientHandler(Socket socket, String script) throws IOException {
super(socket);
this.script = script;
}
private void init() {
enqueue(new Packet0Identify(0));
this.beginWriteThread();
this.beginReading();
}
@Override
public void handlePacket(Packet packet) {
// TODO Auto-generated method stub
disconnect(INVALID_PACKET_RECIEVED.id, "Recieved invalid packet " + packet.getId() + " (Unknown)");
}
@Override
public void handlePacket(Packet0Identify packet) {
// TODO Auto-generated method stub
if (packet.getProtocolVersion() != Packet.PROTOCOL_VERSION)
{
disconnect(OUTDATED_AGENT.id, "Recieved outdated protocol version " + packet.getProtocolVersion() + " (Expected " + Packet.PROTOCOL_VERSION + ")");
}
else
{
enqueue(new Packet2Script(script));
}
}
@Override
public void handlePacket(Packet1Status packet) {
// TODO Auto-generated method stub
if((packet.getFlags() & Packet1Status.BUSY) != 0)
{
disconnect(OUTDATED_AGENT.id, "Agent is too busy");
}
}
@Override
public void handlePacket(Packet2Script packet) {
// TODO Auto-generated method stub
disconnect(INVALID_PACKET_RECIEVED.id, "Recieved invalid packet " + packet.getId() + " (Script) (Server should not send a script)");
}
@Override
@@ -56,7 +71,7 @@ public class ClientHandler extends NetworkHandler {
String workspace = System.getenv("WORKSPACE");
if (workspace == null || workspace.trim().length() <= 0)
{
//disconnect
disconnect(INVALID_SYSTEM_STATE.id, "Workspace is not defined");
}
BufferedInputStream fileStream = null;
try
@@ -79,7 +94,7 @@ public class ClientHandler extends NetworkHandler {
}
catch (IOException e)
{
//disconnect
disconnect(FILE_UPLOAD_FAILURE.id, "Failed to upload file");
e.printStackTrace();
}
finally
@@ -97,26 +112,39 @@ public class ClientHandler extends NetworkHandler {
catch (IOException e)
{
e.printStackTrace();
disconnect(FILE_DOWNLOAD_FAILURE, "Failed to download transferred file");
disconnect(FILE_DOWNLOAD_FAILURE.id, "Failed to download transferred file");
}
}
@Override
public void handlePacket(Packet5Artifact packet) {
// TODO Auto-generated method stub
}
@Override
public void handlePacket(Packet127KeepAlive packet) {
// TODO Auto-generated method stub
enqueue(packet);
}
@Override
public void handlePacket(Packet6Disconnect packet) {
// TODO Auto-generated method stub
System.out.printf("Disconnect %d: %s%n", packet.getCode(), packet.getReason());
doDisconnect(packet.getCode());
}
public static int connect(InetAddress addr, int port, String script) {
try
{
Socket socket = new Socket(addr, port);
ClientHandler clientHandler = new ClientHandler(socket, script);
clientHandler.init();
return clientHandler.getExitCode();
}
catch (IOException e)
{
e.printStackTrace();
return CONNECT_FAILED.id;
}
}
}

View File

@@ -1,12 +1,38 @@
package com.flaremicro.crossjeeves.net;
/* TODO This class was simple and became grosser over time.
* Consider replacing with enum in the near future.
*/
public class ErrorCodes {
public static final int OK = 0;
public static final int READ_FAILED = 1;
public static final int WRITE_FAILED = 2;
public static final int THREAD_INTERRUPTED = 3;
public static final int SCRIPT_ERROR = 4;
public static final int FILE_DOWNLOAD_FAILURE = 5;
public static final int FILE_UPLOAD_FAILURE = 6;
public static final int INVALID_PACKET_RECIEVED = 7;
public static final ErrorCodes OK = new ErrorCodes(0, false);
public static final ErrorCodes READ_FAILED = new ErrorCodes(1, false);
public static final ErrorCodes WRITE_FAILED = new ErrorCodes(2, false);
public static final ErrorCodes THREAD_INTERRUPTED = new ErrorCodes(3, false);
public static final ErrorCodes SCRIPT_ERROR = new ErrorCodes(4, false);
public static final ErrorCodes FILE_DOWNLOAD_FAILURE = new ErrorCodes(5, false);
public static final ErrorCodes FILE_UPLOAD_FAILURE = new ErrorCodes(6, false);
public static final ErrorCodes INVALID_PACKET_RECIEVED = new ErrorCodes(7, false);
public static final ErrorCodes INVALID_SYSTEM_STATE = new ErrorCodes(8, false);
public static final ErrorCodes OUTDATED_AGENT = new ErrorCodes(9, false);
public static final ErrorCodes SERVER_BUSY = new ErrorCodes(10, false);
public static final ErrorCodes AGENT_TERMINATED = new ErrorCodes(11, false);
public static final ErrorCodes CONNECT_FAILED = new ErrorCodes(12, false);
private static final ErrorCodes codes[] = new ErrorCodes[12];
public final int id;
public final boolean isTerminal;
public ErrorCodes(int id, boolean isTerminal)
{
this.id = id;
this.isTerminal = isTerminal;
codes[id] = this;
}
public static ErrorCodes getErrorCode(int code)
{
if(code < 0 || code >= codes.length)
return null;
return codes[code];
}
}

View File

@@ -32,6 +32,7 @@ public abstract class NetworkHandler {
private DataInputStream in;
private DataOutputStream out;
private Thread readThread;
private int exitCode = 0;
private HashMap<Long, FileTransferInfo> downloadQueue = new HashMap<Long, FileTransferInfo>();
private HashMap<Long, File> downloadComplete = new HashMap<Long, File>();
@@ -75,10 +76,11 @@ public abstract class NetworkHandler {
public void disconnect(int code, String message) {
System.out.println("Disconnect code " + code + ": " + message);
doDisconnect();
doDisconnect(code);
}
protected void doDisconnect() {
protected void doDisconnect(int exitCode) {
this.exitCode = exitCode;
connected = false;
readThread.interrupt();
Util.cleanClose(in);
@@ -86,6 +88,10 @@ public abstract class NetworkHandler {
Util.cleanClose(socket);
}
public int getExitCode() {
return exitCode;
}
protected void beginReading() {
while (connected)
{
@@ -103,7 +109,7 @@ public abstract class NetworkHandler {
catch (IOException e)
{
e.printStackTrace();
disconnect(READ_FAILED, e.toString());
disconnect(READ_FAILED.id, e.toString());
}
}
}
@@ -124,14 +130,14 @@ public abstract class NetworkHandler {
}
catch (IOException e)
{
disconnect(WRITE_FAILED, e.toString());
disconnect(WRITE_FAILED.id, e.toString());
e.printStackTrace();
}
}
}
catch (InterruptedException e)
{
disconnect(THREAD_INTERRUPTED, e.toString());
disconnect(THREAD_INTERRUPTED.id, e.toString());
e.printStackTrace();
}
}
@@ -158,8 +164,7 @@ public abstract class NetworkHandler {
public abstract void handlePacket(Packet127KeepAlive packet);
public void clearFile(long fileId)
{
public void clearFile(long fileId) {
FileTransferInfo fileTransferInfo = downloadQueue.get(fileId);
if (fileTransferInfo != null)
{
@@ -173,8 +178,7 @@ public abstract class NetworkHandler {
}
}
public void beginFile(long fileId, long expectedSize) throws IOException
{
public void beginFile(long fileId, long expectedSize) throws IOException {
File file = File.createTempFile(fileId + "-cj-tmp", ".zip");
file.deleteOnExit();
OutputStream is = null;
@@ -191,8 +195,7 @@ public abstract class NetworkHandler {
}
}
public boolean appendFile(long fileId, byte[] chunk) throws IOException
{
public boolean appendFile(long fileId, byte[] chunk) throws IOException {
FileTransferInfo fileTransferInfo = downloadQueue.get(fileId);
if (fileTransferInfo == null)
return false;
@@ -213,8 +216,8 @@ public abstract class NetworkHandler {
public final File file;
public final OutputStream outputStream;
public final long expectedSize;
FileTransferInfo(File file, OutputStream outputStream, long expectedSize)
{
FileTransferInfo(File file, OutputStream outputStream, long expectedSize) {
this.file = file;
this.outputStream = outputStream;
this.expectedSize = expectedSize;

View File

@@ -2,10 +2,13 @@ package com.flaremicro.crossjeeves.net;
import static com.flaremicro.crossjeeves.net.ErrorCodes.*;
import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.util.Properties;
import java.util.Random;
import com.flaremicro.crossjeeves.CrossJeevesHost;
import com.flaremicro.crossjeeves.ScriptProcessor;
import com.flaremicro.crossjeeves.net.packet.Packet;
import com.flaremicro.crossjeeves.net.packet.Packet0Identify;
@@ -18,21 +21,32 @@ import com.flaremicro.crossjeeves.net.packet.Packet4FileData;
import com.flaremicro.crossjeeves.net.packet.Packet6Disconnect;
public class ServerHandler extends NetworkHandler {
Properties properties;
public ServerHandler(Socket socket, Properties properties) throws IOException {
private CrossJeevesHost host;
private ScriptProcessor scriptProcessor;
private File workspace = new File("./workspace/"+new Random().nextLong()+"/");
public ServerHandler(Socket socket, CrossJeevesHost host, Properties properties) throws IOException {
super(socket);
this.properties = properties;
this.host = host;
this.scriptProcessor = new ScriptProcessor(this, workspace, properties);
}
@Override
protected void doDisconnect(int exitCode)
{
//scriptProcessor.terminate();
host.removeConnection(this);
super.doDisconnect(exitCode);
}
@Override
public void handlePacket(Packet packet) {
disconnect(INVALID_PACKET_RECIEVED, "Recieved invalid packet " + packet.getId() + " (Unknown)");
disconnect(INVALID_PACKET_RECIEVED.id, "Recieved invalid packet " + packet.getId() + " (Unknown)");
}
@Override
public void handlePacket(Packet0Identify packet) {
// TODO Auto-generated method stub
enqueue(new Packet0Identify(0));
}
@Override
@@ -42,13 +56,12 @@ public class ServerHandler extends NetworkHandler {
@Override
public void handlePacket(Packet2Script packet) {
ScriptProcessor scriptProcessor = new ScriptProcessor(this, properties);
scriptProcessor.processAsync(packet.script);
}
@Override
public void handlePacket(Packet3Clone packet) {
disconnect(INVALID_PACKET_RECIEVED, "Recieved invalid packet " + packet.getId() + " (Clone)");
disconnect(INVALID_PACKET_RECIEVED.id, "Recieved invalid packet " + packet.getId() + " (Clone)");
}
@Override
@@ -60,13 +73,13 @@ public class ServerHandler extends NetworkHandler {
catch (IOException e)
{
e.printStackTrace();
disconnect(FILE_DOWNLOAD_FAILURE, "Failed to download transferred file");
disconnect(FILE_DOWNLOAD_FAILURE.id, "Failed to download transferred file");
}
}
@Override
public void handlePacket(Packet5Artifact packet) {
disconnect(INVALID_PACKET_RECIEVED, "Recieved invalid packet " + packet.getId() + " (Artifact)");
disconnect(INVALID_PACKET_RECIEVED.id, "Recieved invalid packet " + packet.getId() + " (Artifact)");
}
@Override
@@ -75,8 +88,8 @@ public class ServerHandler extends NetworkHandler {
@Override
public void handlePacket(Packet6Disconnect packet) {
// TODO Auto-generated method stub
System.out.printf("Disconnect %d: %s%n", packet.getCode(), packet.getReason());
doDisconnect(packet.getCode());
}
}

View File

@@ -5,6 +5,7 @@ import java.io.IOException;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.ZipFile;
public class Util {
public static Map<String, String> parseArgs(String[] args, boolean toLowercaseKey, boolean exceptOnDuplicateKey)
@@ -50,6 +51,20 @@ public class Util {
}
}
public static void cleanClose(ZipFile zipFile) {
if(zipFile != null)
{
try
{
zipFile.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
public static void cleanClose(Socket socket) {
if(socket != null)
{

View File

@@ -1,35 +1,49 @@
package com.flaremicro.util;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Enumeration;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipFile;
import java.util.zip.ZipOutputStream;
public class ZipUtils {
public static void zipDirectory(File sourceDir, File zipFile) throws IOException {
FileOutputStream fos = new FileOutputStream(zipFile);
ZipOutputStream zos = new ZipOutputStream(fos);
try {
try
{
zipFilesRecursively(sourceDir, sourceDir, zos);
} finally {
}
finally
{
zos.close();
}
}
private static void zipFilesRecursively(File rootDir, File source, ZipOutputStream zos) throws IOException {
if (source.isDirectory()) {
if (source.isDirectory())
{
File[] files = source.listFiles();
if (files != null) {
for (File file : files) {
if (files != null)
{
for (File file : files)
{
zipFilesRecursively(rootDir, file, zos);
}
}
} else {
FileInputStream fis = new FileInputStream(source);
try {
}
else
{
FileInputStream fis = null;
try
{
fis = new FileInputStream(source);
// Create the relative path for the entry
String zipEntryName = source.getAbsolutePath().substring(rootDir.getAbsolutePath().length() + 1).replace("\\", "/");
ZipEntry zipEntry = new ZipEntry(zipEntryName);
@@ -37,13 +51,68 @@ public class ZipUtils {
byte[] buffer = new byte[4096];
int bytesRead;
while ((bytesRead = fis.read(buffer)) != -1) {
while ((bytesRead = fis.read(buffer)) != -1)
{
zos.write(buffer, 0, bytesRead);
}
zos.closeEntry();
} finally {
fis.close();
}
finally
{
Util.cleanClose(fis);
}
}
}
public static boolean unzipDirectory(File zipFile, File destination) {
ZipFile zip = null;
try
{
zip = new ZipFile(zipFile);
Enumeration<? extends ZipEntry> zipEntries = zip.entries();
while (zipEntries.hasMoreElements())
{
ZipEntry zipEntry = zipEntries.nextElement();
File newFile = new File(destination, zipEntry.getName());
//create sub directories
newFile.getParentFile().mkdirs();
if (!zipEntry.isDirectory())
{
FileOutputStream outputStream = null;
BufferedInputStream inputStream = null;
try
{
outputStream = new FileOutputStream(newFile);
inputStream = new BufferedInputStream(zip.getInputStream(zipEntry));
while (inputStream.available() > 0)
{
outputStream.write(inputStream.read());
}
}
finally
{
Util.cleanClose(outputStream);
Util.cleanClose(inputStream);
}
}
}
return true;
}
catch (ZipException e)
{
e.printStackTrace();
}
catch (IOException e)
{
e.printStackTrace();
}
finally
{
Util.cleanClose(zip);
}
return false;
}
}